Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 1312 lines (1092 sloc) 28.372 kB
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1 /*
2 * Copyright (C) 2010 Mail.RU
3 * Copyright (C) 2010 Yuriy Vostrikov
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
65df38a @kostja Implement blueprint 'consistent-header-guards'.
kostja authored
27 #include "fiber.h"
b5b3a2b @kostja Blueprint 'unmmap-after-fork': review comments.
kostja authored
28 #include "config.h"
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
29 #include <arpa/inet.h>
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <limits.h>
33 #include <sys/types.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <signal.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <sys/mman.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43 #include <sysexits.h>
44 #include <third_party/queue.h>
45 #include <third_party/khash.h>
46
47 #include <palloc.h>
48 #include <salloc.h>
49 #include <say.h>
50 #include <tarantool.h>
82e966b @kostja Blueprint 'cmake-based-build' initial commit.
kostja authored
51 #include TARANTOOL_CONFIG
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
52 #include <tarantool_ev.h>
53 #include <tbuf.h>
54 #include <util.h>
55 #include <stat.h>
56 #include <pickle.h>
57
c28da21 @kostja Merge with the master.
kostja authored
58 @implementation FiberCancelException
927e9db @rtokarev Update files generated by confetti.
rtokarev authored
59 @end
60
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
61 static struct fiber sched;
62 struct fiber *fiber = &sched;
63 static struct fiber **sp, *call_stack[64];
64 static uint32_t last_used_fid;
65 static struct palloc_pool *ex_pool;
66
67 struct fiber_cleanup {
68 void (*handler) (void *data);
69 void *data;
70 };
71
72 struct fiber_server {
73 int port;
74 void *data;
75 void (*handler) (void *data);
76 void (*on_bind) (void *data);
77 };
78
79 struct fiber_msg {
80 u32 fid;
81 u32 data_len;
82 u8 data[];
83 };
84
85 static inline struct fiber_msg *
86 fiber_msg(const struct tbuf *buf)
87 {
88 return buf->data;
89 }
90
91 KHASH_MAP_INIT_INT(fid2fiber, void *, realloc);
ee90357 @delamonpansie [core] Print symbols in backtraces.
delamonpansie authored
92 static khash_t(fid2fiber) *fibers_registry;
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
93
acaadca @delamonpansie [core] Cleanup backtrace priniting.
delamonpansie authored
94 static void
85f92cd @delamonpansie [core] Fix wording.
delamonpansie authored
95 update_last_stack_frame(struct fiber *fiber)
acaadca @delamonpansie [core] Cleanup backtrace priniting.
delamonpansie authored
96 {
82e966b @kostja Blueprint 'cmake-based-build' initial commit.
kostja authored
97 #ifdef ENABLE_BACKTRACE
65df38a @kostja Implement blueprint 'consistent-header-guards'.
kostja authored
98 fiber->last_stack_frame = __builtin_frame_address(0);
acaadca @delamonpansie [core] Cleanup backtrace priniting.
delamonpansie authored
99 #else
100 (void)fiber;
82e966b @kostja Blueprint 'cmake-based-build' initial commit.
kostja authored
101 #endif /* ENABLE_BACKTRACE */
acaadca @delamonpansie [core] Cleanup backtrace priniting.
delamonpansie authored
102 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
103
104 void
105 fiber_call(struct fiber *callee)
106 {
107 struct fiber *caller = fiber;
108
109 assert(sp - call_stack < 8);
110 assert(caller);
111
112 fiber = callee;
113 *sp++ = caller;
114
85f92cd @delamonpansie [core] Fix wording.
delamonpansie authored
115 update_last_stack_frame(caller);
4e2a054 @delamonpansie [core] Fix rbp printing.
delamonpansie authored
116
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
117 callee->csw++;
118 coro_transfer(&caller->coro.ctx, &callee->coro.ctx);
119 }
120
c28da21 @kostja Merge with the master.
kostja authored
121
122 /** Interrupt a synchronous wait of a fiber inside the event loop.
123 * We do so by keeping an "async" event in every fiber, solely
124 * for this purpose, and raising this event here.
125 */
126
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
127 void
c28da21 @kostja Merge with the master.
kostja authored
128 fiber_wakeup(struct fiber *f)
f5e9d16 @delamonpansie [core] Add ability to raise an exception in the other fiber.
delamonpansie authored
129 {
c28da21 @kostja Merge with the master.
kostja authored
130 ev_async_start(&f->async);
131 ev_async_send(&f->async);
132 }
133
134 /** Cancel the subject fiber.
135 *
136 * Note: this is not guaranteed to succeed, and requires a level
137 * of cooperation on behalf of the fiber. A fiber may opt to set
138 * FIBER_CANCELLABLE to false, and never test that it was
139 * cancelled. Such fiber we won't be ever to cancel, ever, and
140 * for such fiber this call will lead to an infinite wait.
141 * However, fiber_testcancel() is embedded to the rest of fiber_*
142 * API (@sa yield()), which makes most of the fibers that opt in,
143 * cancellable.
144 *
145 * Currently cancellation can only be synchronous: this call
146 * returns only when the subject fiber has terminated.
147 *
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
148 * The fiber which is cancelled, has FiberCancelException raised
149 * in it. For cancellation to work, this exception type should be
150 * re-raised whenever (if) it is caught.
c28da21 @kostja Merge with the master.
kostja authored
151 */
f5e9d16 @delamonpansie [core] Add ability to raise an exception in the other fiber.
delamonpansie authored
152
c28da21 @kostja Merge with the master.
kostja authored
153 void
154 fiber_cancel(struct fiber *f)
155 {
156 assert(fiber->fid != 0);
157 assert(!(f->flags & FIBER_CANCEL));
158
159 f->flags |= FIBER_CANCEL;
160
161 if (f->flags & FIBER_CANCELLABLE)
162 fiber_wakeup(f);
163
164 assert(f->waiter == NULL);
165 f->waiter = fiber;
166
167 @try {
168 yield();
169 }
170 @finally {
171 f->waiter = NULL;
172 }
f5e9d16 @delamonpansie [core] Add ability to raise an exception in the other fiber.
delamonpansie authored
173 }
174
c28da21 @kostja Merge with the master.
kostja authored
175
176 /** Test if this fiber is in a cancellable state and was indeed
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
177 * cancelled, and raise an exception (FiberCancelException) if
c28da21 @kostja Merge with the master.
kostja authored
178 * that's the case.
179 */
180
181 void
182 fiber_testcancel(void)
183 {
184 if (!(fiber->flags & FIBER_CANCELLABLE))
185 return;
186
187 if (!(fiber->flags & FIBER_CANCEL))
188 return;
189
190 tnt_raise(FiberCancelException);
191 }
192
193 /** Change the current cancellation state of a fiber. This is not
194 * a cancellation point.
195 */
196
197 void fiber_setcancelstate(bool enable)
198 {
199 if (enable == true)
200 fiber->flags |= FIBER_CANCELLABLE;
201 else
202 fiber->flags &= ~FIBER_CANCELLABLE;
203 }
204
205 /**
206 * @note: this is a cancellation point (@sa fiber_testcancel())
207 */
208
f5e9d16 @delamonpansie [core] Add ability to raise an exception in the other fiber.
delamonpansie authored
209 void
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
210 yield(void)
211 {
212 struct fiber *callee = *(--sp);
213 struct fiber *caller = fiber;
214
215 fiber = callee;
85f92cd @delamonpansie [core] Fix wording.
delamonpansie authored
216 update_last_stack_frame(caller);
4e2a054 @delamonpansie [core] Fix rbp printing.
delamonpansie authored
217
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
218 callee->csw++;
219 coro_transfer(&caller->coro.ctx, &callee->coro.ctx);
944638a @rtokarev Add fiber->flags: FIBER_READING_INBOX and FIBER_RAISE.
rtokarev authored
220
c28da21 @kostja Merge with the master.
kostja authored
221 fiber_testcancel();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
222 }
223
c28da21 @kostja Merge with the master.
kostja authored
224 /**
225 * @note: this is a cancellation point (@sa fiber_testcancel())
226 */
227
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
228 void
229 fiber_sleep(ev_tstamp delay)
230 {
231 ev_timer_set(&fiber->timer, delay, 0.);
232 ev_timer_start(&fiber->timer);
c28da21 @kostja Merge with the master.
kostja authored
233 @try {
234 yield();
235 }
236 @finally {
237 ev_timer_stop(&fiber->timer);
238 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
239 }
240
c28da21 @kostja Merge with the master.
kostja authored
241 /** Wait for a forked child to complete.
242 * @note: this is a cancellation point (@sa fiber_testcancel()).
243 */
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
244
245 void
246 wait_for_child(pid_t pid)
247 {
248 ev_child_set(&fiber->cw, pid, 0);
249 ev_child_start(&fiber->cw);
c28da21 @kostja Merge with the master.
kostja authored
250 @try {
251 yield();
252 }
253 @finally {
254 ev_child_stop(&fiber->cw);
255 }
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
256 }
257
c28da21 @kostja Merge with the master.
kostja authored
258
c3538de @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
259 void
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
260 fiber_io_start(int fd, int events)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
261 {
262 ev_io *io = &fiber->io;
263
c28da21 @kostja Merge with the master.
kostja authored
264 assert (!ev_is_active(io));
265
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
266 ev_io_set(io, fd, events);
c28da21 @kostja Merge with the master.
kostja authored
267 ev_io_start(io);
268 }
269
270 /** @note: this is a cancellation point.
271 */
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
272
c3538de @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
273 void
c28da21 @kostja Merge with the master.
kostja authored
274 fiber_io_yield()
275 {
276 assert(ev_is_active(&fiber->io));
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
277
c28da21 @kostja Merge with the master.
kostja authored
278 @try {
279 yield();
280 }
281 @catch (id o)
282 {
283 ev_io_stop(&fiber->io);
284 @throw;
285 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
286 }
287
c3538de @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
288 void
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
289 fiber_io_stop(int fd __attribute__((unused)), int events __attribute__((unused)))
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
290 {
291 ev_io *io = &fiber->io;
292
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
293 assert(ev_is_active(io) && io->fd == fd && (io->events & events));
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
294
295 ev_io_stop(io);
296 }
297
298 static void
65df38a @kostja Implement blueprint 'consistent-header-guards'.
kostja authored
299 ev_schedule(ev_watcher *watcher, int event __attribute__((unused)))
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
300 {
301 assert(fiber == &sched);
302 fiber_call(watcher->data);
303 }
304
305 static struct fiber *
306 fid2fiber(int fid)
307 {
308 khiter_t k = kh_get(fid2fiber, fibers_registry, fid);
309
310 if (k == kh_end(fibers_registry))
311 return NULL;
312 if (!kh_exist(fibers_registry, k))
313 return NULL;
314 return kh_value(fibers_registry, k);
315 }
316
317 static void
318 register_fid(struct fiber *fiber)
319 {
320 int ret;
321 khiter_t k = kh_put(fid2fiber, fibers_registry, fiber->fid, &ret);
322 kh_key(fibers_registry, k) = fiber->fid;
323 kh_value(fibers_registry, k) = fiber;
324 }
325
326 static void
327 unregister_fid(struct fiber *fiber)
328 {
329 khiter_t k = kh_get(fid2fiber, fibers_registry, fiber->fid);
330 kh_del(fid2fiber, fibers_registry, k);
331 }
332
333 static void
334 clear_inbox(struct fiber *fiber)
335 {
336 for (size_t i = 0; i < fiber->inbox->size; i++)
337 fiber->inbox->ring[i] = NULL;
338 fiber->inbox->head = fiber->inbox->tail = 0;
339 }
340
341 static void
342 fiber_alloc(struct fiber *fiber)
343 {
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
344 prelease(fiber->gc_pool);
345 fiber->rbuf = tbuf_alloc(fiber->gc_pool);
346 fiber->iov = tbuf_alloc(fiber->gc_pool);
347 fiber->cleanup = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
348
349 fiber->iov_cnt = 0;
350 clear_inbox(fiber);
351 }
352
353 void
354 fiber_register_cleanup(fiber_cleanup_handler handler, void *data)
355 {
356 struct fiber_cleanup i;
357 i.handler = handler;
358 i.data = data;
359 tbuf_append(fiber->cleanup, &i, sizeof(struct fiber_cleanup));
360 }
361
362 void
363 fiber_cleanup(void)
364 {
365 struct fiber_cleanup *cleanup = fiber->cleanup->data;
366 int i = fiber->cleanup->len / sizeof(struct fiber_cleanup);
367
368 while (i-- > 0) {
369 cleanup->handler(cleanup->data);
370 cleanup++;
371 }
372 tbuf_reset(fiber->cleanup);
373 }
374
375 void
376 fiber_gc(void)
377 {
378 struct palloc_pool *tmp;
379
380 fiber_cleanup();
381
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
382 if (palloc_allocated(fiber->gc_pool) < 128 * 1024)
80ceadb @delamonpansie [core] delay fiber_gc until enough garbage accumulated
delamonpansie authored
383 return;
384
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
385 tmp = fiber->gc_pool;
386 fiber->gc_pool = ex_pool;
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
387 ex_pool = tmp;
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
388 palloc_set_name(fiber->gc_pool, fiber->name);
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
389 palloc_set_name(ex_pool, "ex_pool");
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
390
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
391 fiber->rbuf = tbuf_clone(fiber->gc_pool, fiber->rbuf);
392 fiber->cleanup = tbuf_clone(fiber->gc_pool, fiber->cleanup);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
393
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
394 struct tbuf *new_iov = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
395 for (int i = 0; i < fiber->iov_cnt; i++) {
396 struct iovec *v;
397 size_t o = tbuf_reserve(new_iov, sizeof(*v));
398 v = new_iov->data + o;
399 memcpy(v, iovec(fiber->iov) + i, sizeof(*v));
400 }
401 fiber->iov = new_iov;
402
403 for (int i = 0; i < fiber->inbox->size; i++) {
404 struct msg *ri = fiber->inbox->ring[i];
405 if (ri != NULL) {
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
406 fiber->inbox->ring[i] = palloc(fiber->gc_pool, sizeof(*ri));
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
407 fiber->inbox->ring[i]->sender_fid = ri->sender_fid;
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
408 fiber->inbox->ring[i]->msg = tbuf_clone(fiber->gc_pool, ri->msg);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
409 }
410 }
411
412 prelease(ex_pool);
413 }
414
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
415
416 /** Destroy the currently active fiber and prepare it for reuse.
417 */
418
419 static void
420 fiber_zombificate()
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
421 {
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
422 fiber_set_name(fiber, "zombie");
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
423 fiber->f = NULL;
424 unregister_fid(fiber);
425 fiber->fid = 0;
19a59c6 @kostja box-switch_mode: reorganize the code responsible for entering/leaving…
kostja authored
426 fiber->flags = 0;
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
427 fiber_alloc(fiber);
428
429 SLIST_INSERT_HEAD(&zombie_fibers, fiber, zombie_link);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
430 }
431
432 static void
65df38a @kostja Implement blueprint 'consistent-header-guards'.
kostja authored
433 fiber_loop(void *data __attribute__((unused)))
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
434 {
c28da21 @kostja Merge with the master.
kostja authored
435 for (;;) {
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
436 assert(fiber != NULL && fiber->f != NULL && fiber->fid != 0);
927e9db @rtokarev Update files generated by confetti.
rtokarev authored
437 @try {
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
438 fiber->f(fiber->f_data);
927e9db @rtokarev Update files generated by confetti.
rtokarev authored
439 }
c28da21 @kostja Merge with the master.
kostja authored
440 @catch (FiberCancelException *e) {
441 say_info("fiber `%s' has been cancelled", fiber->name);
442
443 if (fiber->waiter != NULL)
444 fiber_call(fiber->waiter);
445
927e9db @rtokarev Update files generated by confetti.
rtokarev authored
446 say_info("fiber `%s': exiting", fiber->name);
447 }
448 @catch (id e) {
449 say_error("fiber `%s': exception `%s'", fiber->name, [e name]);
450 panic("fiber `%s': exiting", fiber->name);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
451 }
452
453 fiber_close();
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
454 fiber_zombificate();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
455 yield(); /* give control back to scheduler */
456 }
457 }
458
c28da21 @kostja Merge with the master.
kostja authored
459 /** Set fiber name.
460 *
461 * @param[in] name the new name of the fiber. Truncated to
462 * FIBER_NAME_MAXLEN.
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
463 */
464
465 void
466 fiber_set_name(struct fiber *fiber, const char *name)
467 {
468 assert(name != NULL);
469 snprintf(fiber->name, sizeof(fiber->name), "%s", name);
470 }
471
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
472 /* fiber never dies, just become zombie */
473 struct fiber *
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
474 fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void *f_data)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
475 {
476 struct fiber *fiber = NULL;
477 if (inbox_size <= 0)
478 inbox_size = 64;
479
480 if (!SLIST_EMPTY(&zombie_fibers)) {
481 fiber = SLIST_FIRST(&zombie_fibers);
482 SLIST_REMOVE_HEAD(&zombie_fibers, zombie_link);
483 } else {
484 fiber = palloc(eter_pool, sizeof(*fiber));
485 if (fiber == NULL)
486 return NULL;
487
488 memset(fiber, 0, sizeof(*fiber));
489 if (tarantool_coro_create(&fiber->coro, fiber_loop, NULL) == NULL)
490 return NULL;
491
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
492 fiber->gc_pool = palloc_create_pool("");
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
493 fiber->inbox = palloc(eter_pool, (sizeof(*fiber->inbox) +
494 inbox_size * sizeof(struct tbuf *)));
495 fiber->inbox->size = inbox_size;
496
497 fiber_alloc(fiber);
498 ev_init(&fiber->io, (void *)ev_schedule);
c28da21 @kostja Merge with the master.
kostja authored
499 ev_async_init(&fiber->async, (void *)ev_schedule);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
500 ev_init(&fiber->timer, (void *)ev_schedule);
3c2e3a5 @kostja Fixes and tests for Bug##712447, Bug##695689, Bug#686411
kostja authored
501 ev_init(&fiber->cw, (void *)ev_schedule);
c28da21 @kostja Merge with the master.
kostja authored
502 fiber->io.data = fiber->async.data = fiber->timer.data = fiber->cw.data = fiber;
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
503
504 SLIST_INSERT_HEAD(&fibers, fiber, link);
505 }
506
507 fiber->fd = fd;
508 fiber->f = f;
509 fiber->f_data = f_data;
510 while (++last_used_fid <= 100) ; /* fids from 0 to 100 are reserved */
511 fiber->fid = last_used_fid;
944638a @rtokarev Add fiber->flags: FIBER_READING_INBOX and FIBER_RAISE.
rtokarev authored
512 fiber->flags = 0;
c28da21 @kostja Merge with the master.
kostja authored
513 fiber->waiter = NULL;
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
514 fiber_set_name(fiber, name);
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
515 palloc_set_name(fiber->gc_pool, fiber->name);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
516 register_fid(fiber);
517
518 return fiber;
519 }
520
669f85f @delamonpansie [core] Unmap unused memory right after fork in order to avoid COW.
delamonpansie authored
521 /*
522 * note, we can't release memory allocated via palloc(eter_pool, ...)
523 * so, struct fiber and some of its members are leaked forever
524 */
525
526 void
dfe3052 @delamonpansie Post review fixes.
delamonpansie authored
527 fiber_destroy(struct fiber *f)
528 {
529 if (f == fiber) /* do not destroy running fiber */
530 return;
531 if (strcmp(f->name, "sched") == 0)
532 return;
533
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
534 palloc_destroy_pool(f->gc_pool);
dfe3052 @delamonpansie Post review fixes.
delamonpansie authored
535 tarantool_coro_destroy(&f->coro);
536 }
537
538 void
669f85f @delamonpansie [core] Unmap unused memory right after fork in order to avoid COW.
delamonpansie authored
539 fiber_destroy_all()
540 {
541 struct fiber *f;
dfe3052 @delamonpansie Post review fixes.
delamonpansie authored
542 SLIST_FOREACH(f, &fibers, link)
543 fiber_destroy(f);
669f85f @delamonpansie [core] Unmap unused memory right after fork in order to avoid COW.
delamonpansie authored
544 }
545
546
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
547 const char *
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
548 fiber_peer_name(struct fiber *fiber)
549 {
550 struct sockaddr_in peer;
551 socklen_t peer_len = sizeof(peer);
552
553 if (!fiber->has_peer || fiber->fd < 3)
554 return NULL;
555
556 if (fiber->peer_name[0] != 0)
557 return fiber->peer_name;
558
559 memset(&peer, 0, peer_len);
560 if (getpeername(fiber->fd, (struct sockaddr *)&peer, &peer_len) < 0)
561 return NULL;
562
563 uint32_t zero = 0;
564 if (memcmp(&peer.sin_addr, &zero, sizeof(zero)) == 0)
565 return NULL;
566
567 snprintf(fiber->peer_name, sizeof(fiber->peer_name),
568 "%s:%d", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
569
40bfc91 @delamonpansie [core, box] support for cookie in WAL
delamonpansie authored
570 fiber->cookie = 0;
571 memcpy(&fiber->cookie, &peer, MIN(sizeof(peer), sizeof(fiber->cookie)));
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
572 return fiber->peer_name;
573 }
574
575 int
576 fiber_close(void)
577 {
578 if (fiber->fd < 0)
579 return 0;
580
c28da21 @kostja Merge with the master.
kostja authored
581 /* We don't know if IO is active if there was an error. */
582 if (ev_is_active(&fiber->io))
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
583 fiber_io_stop(fiber->fd, -1);
c28da21 @kostja Merge with the master.
kostja authored
584
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
585 int r = close(fiber->fd);
c28da21 @kostja Merge with the master.
kostja authored
586
587 fiber->fd = -1;
588 fiber->has_peer = false;
589 fiber->peer_name[0] = 0;
590
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
591 return r;
592 }
593
594 static int
595 ring_size(struct ring *inbox)
596 {
597 return (inbox->size + inbox->head - inbox->tail) % inbox->size;
598 }
599
600 int
f7de4af @delamonpansie [core] indent
delamonpansie authored
601 inbox_size(struct fiber *recipient)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
602 {
f7de4af @delamonpansie [core] indent
delamonpansie authored
603 return ring_size(recipient->inbox);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
604 }
605
c28da21 @kostja Merge with the master.
kostja authored
606 /**
607 * @note: this is a cancellation point (@sa fiber_testcancel())
608 */
609
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
610 void
f7de4af @delamonpansie [core] indent
delamonpansie authored
611 wait_inbox(struct fiber *recipient)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
612 {
613 while (ring_size(recipient->inbox) == 0) {
944638a @rtokarev Add fiber->flags: FIBER_READING_INBOX and FIBER_RAISE.
rtokarev authored
614 recipient->flags |= FIBER_READING_INBOX;
c28da21 @kostja Merge with the master.
kostja authored
615 @try {
616 yield();
617 }
618 @finally {
619 recipient->flags &= ~FIBER_READING_INBOX;
620 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
621 }
622 }
623
624 bool
f7de4af @delamonpansie [core] indent
delamonpansie authored
625 write_inbox(struct fiber *recipient, struct tbuf *msg)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
626 {
627 struct ring *inbox = recipient->inbox;
628 if (ring_size(inbox) == inbox->size - 1)
629 return false;
630
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
631 inbox->ring[inbox->head] = palloc(recipient->gc_pool, sizeof(struct msg));
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
632 inbox->ring[inbox->head]->sender_fid = fiber->fid;
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
633 inbox->ring[inbox->head]->msg = tbuf_clone(recipient->gc_pool, msg);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
634 inbox->head = (inbox->head + 1) % inbox->size;
635
944638a @rtokarev Add fiber->flags: FIBER_READING_INBOX and FIBER_RAISE.
rtokarev authored
636 if (recipient->flags & FIBER_READING_INBOX)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
637 fiber_call(recipient);
638 return true;
639 }
640
c28da21 @kostja Merge with the master.
kostja authored
641
642 /**
643 * @note: this is a cancellation point (@sa fiber_testcancel())
644 */
645
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
646 struct msg *
647 read_inbox(void)
648 {
649 struct ring *restrict inbox = fiber->inbox;
650 while (ring_size(inbox) == 0) {
944638a @rtokarev Add fiber->flags: FIBER_READING_INBOX and FIBER_RAISE.
rtokarev authored
651 fiber->flags |= FIBER_READING_INBOX;
c28da21 @kostja Merge with the master.
kostja authored
652 @try {
653 yield();
654 }
655 @finally {
656 fiber->flags &= ~FIBER_READING_INBOX;
657 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
658 }
659
660 struct msg *msg = inbox->ring[inbox->tail];
661 inbox->ring[inbox->tail] = NULL;
662 inbox->tail = (inbox->tail + 1) % inbox->size;
663
664 return msg;
665 }
666
c28da21 @kostja Merge with the master.
kostja authored
667 /**
668 * @note: this is a cancellation point.
669 */
670
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
671 int
672 fiber_bread(struct tbuf *buf, size_t at_least)
673 {
674 ssize_t r;
675 tbuf_ensure(buf, MAX(cfg.readahead, at_least));
676
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
677 fiber_io_start(fiber->fd, EV_READ);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
678 for (;;) {
c28da21 @kostja Merge with the master.
kostja authored
679 fiber_io_yield();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
680 r = read(fiber->fd, buf->data + buf->len, buf->size - buf->len);
681 if (r > 0) {
682 buf->len += r;
683 if (buf->len >= at_least)
684 break;
685 } else {
686 if (r < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
687 continue;
688 break;
689 }
690 }
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
691 fiber_io_stop(fiber->fd, EV_READ);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
692
693 return r;
694 }
695
696 void
828eed5 @kostja Move all iov functions to a common prefix (iov_).
kostja authored
697 iov_add(const void *buf, size_t len)
698 {
699 iov_ensure(1);
700 iov_add_unsafe(buf, len);
701 }
702
703 void
704 iov_dup(const void *buf, size_t len)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
705 {
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
706 void *copy = palloc(fiber->gc_pool, len);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
707 memcpy(copy, buf, len);
828eed5 @kostja Move all iov functions to a common prefix (iov_).
kostja authored
708 iov_add(copy, len);
709 }
710
711 void
712 iov_reset()
713 {
714 fiber->iov_cnt = 0; /* discard anything unwritten */
715 tbuf_reset(fiber->iov);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
716 }
717
c28da21 @kostja Merge with the master.
kostja authored
718 /**
719 * @note: this is a cancellation point.
720 */
721
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
722 ssize_t
828eed5 @kostja Move all iov functions to a common prefix (iov_).
kostja authored
723 iov_flush(void)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
724 {
725 ssize_t result, r = 0, bytes = 0;
726 struct iovec *iov = iovec(fiber->iov);
727 size_t iov_cnt = fiber->iov_cnt;
728
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
729 fiber_io_start(fiber->fd, EV_WRITE);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
730 while (iov_cnt > 0) {
c28da21 @kostja Merge with the master.
kostja authored
731 fiber_io_yield();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
732 bytes += r = writev(fiber->fd, iov, MIN(iov_cnt, IOV_MAX));
733 if (r <= 0) {
734 if (errno == EAGAIN || errno == EWOULDBLOCK)
735 continue;
736 else
737 break;
738 }
739
740 while (iov_cnt > 0) {
741 if (iov->iov_len > r) {
742 iov->iov_base += r;
743 iov->iov_len -= r;
744 break;
745 } else {
746 r -= iov->iov_len;
747 iov++;
748 iov_cnt--;
749 }
750 }
751 }
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
752 fiber_io_stop(fiber->fd, EV_WRITE);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
753
754 if (r < 0) {
755 size_t rem = 0;
756 for (int i = 0; i < iov_cnt; i++)
757 rem += iov[i].iov_len;
758
f7de4af @delamonpansie [core] indent
delamonpansie authored
759 say_syserror("client unexpectedly gone, %" PRI_SZ " bytes unwritten", rem);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
760 result = r;
761 } else
762 result = bytes;
763
828eed5 @kostja Move all iov functions to a common prefix (iov_).
kostja authored
764 iov_reset();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
765 return result;
766 }
767
c28da21 @kostja Merge with the master.
kostja authored
768 /**
769 * @note: this is a cancellation point.
770 */
771
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
772 ssize_t
773 fiber_read(void *buf, size_t count)
774 {
775 ssize_t r, done = 0;
776
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
777 fiber_io_start(fiber->fd, EV_READ);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
778 while (count != done) {
c28da21 @kostja Merge with the master.
kostja authored
779
780 fiber_io_yield();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
781
782 if ((r = read(fiber->fd, buf + done, count - done)) <= 0) {
783 if (errno == EAGAIN || errno == EWOULDBLOCK)
784 continue;
785 else
786 break;
787 }
788 done += r;
789 }
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
790 fiber_io_stop(fiber->fd, EV_READ);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
791
792 return done;
793 }
794
c28da21 @kostja Merge with the master.
kostja authored
795 /**
796 * @note: this is a cancellation point.
797 */
798
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
799 ssize_t
800 fiber_write(const void *buf, size_t count)
801 {
802 int r;
803 unsigned int done = 0;
804
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
805 fiber_io_start(fiber->fd, EV_WRITE);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
806
807 while (count != done) {
c28da21 @kostja Merge with the master.
kostja authored
808 fiber_io_yield();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
809 if ((r = write(fiber->fd, buf + done, count - done)) == -1) {
810 if (errno == EAGAIN || errno == EWOULDBLOCK)
811 continue;
812 else
813 break;
814 }
815 done += r;
816 }
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
817 fiber_io_stop(fiber->fd, EV_WRITE);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
818
819 return done;
820 }
821
c28da21 @kostja Merge with the master.
kostja authored
822 /**
823 * @note: this is a cancellation point.
824 */
825
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
826 int
827 fiber_connect(struct sockaddr_in *addr)
828 {
829 fiber->fd = socket(AF_INET, SOCK_STREAM, 0);
830 if (fiber->fd < 0)
831 goto error;
832
833 if (set_nonblock(fiber->fd) < 0)
834 goto error;
835
f7de4af @delamonpansie [core] indent
delamonpansie authored
836 if (connect(fiber->fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
c28da21 @kostja Merge with the master.
kostja authored
837
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
838 if (errno != EINPROGRESS)
839 goto error;
840
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
841 fiber_io_start(fiber->fd, EV_WRITE);
c28da21 @kostja Merge with the master.
kostja authored
842 fiber_io_yield();
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
843 fiber_io_stop(fiber->fd, EV_WRITE);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
844
c28da21 @kostja Merge with the master.
kostja authored
845 int error;
846 socklen_t error_size = sizeof(error);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
847
c28da21 @kostja Merge with the master.
kostja authored
848 if (getsockopt(fiber->fd, SOL_SOCKET, SO_ERROR,
849 &error, &error_size) < 0)
850 goto error;
851
852 assert(error_size == sizeof(error));
853
854 if (error != 0) {
855 errno = error;
856 goto error;
857 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
858 }
859
860 return fiber->fd;
c28da21 @kostja Merge with the master.
kostja authored
861
f7de4af @delamonpansie [core] indent
delamonpansie authored
862 error:
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
863 fiber_close();
864 return fiber->fd;
865 }
866
867 int
868 set_nonblock(int sock)
869 {
870 int flags;
871 if ((flags = fcntl(sock, F_GETFL, 0)) < 0 || fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0)
872 return -1;
873 return sock;
874 }
875
876 static int
877 read_atleast(int fd, struct tbuf *b, size_t to_read)
878 {
879 tbuf_ensure(b, to_read);
880 while (to_read > 0) {
881 int r = read(fd, b->data + b->len, to_read);
882 if (r <= 0) {
883 if (errno == EINTR)
884 continue;
885 return -1;
886 }
887 to_read -= r;
888 b->len += r;
889 }
890 return 0;
891 }
892
105a63d @kostja C-coding-style: document the coding style in use.
kostja authored
893 /** Write all data to a socket.
894 *
895 * This function is equivalent to 'write', except it would ensure
896 * that all data is written to the file unless a non-ignorable
897 * error occurs.
898 *
899 * @retval 0 Success
900 *
901 * @reval 1 An error occurred (not EINTR).
902 */
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
903 static int
904 write_all(int fd, void *data, size_t len)
905 {
906 while (len > 0) {
907 ssize_t r = write(fd, data, len);
908 if (r < 0) {
909 if (errno == EINTR)
910 continue;
911 return -1;
912 }
913 data += r;
914 len -= r;
915 }
916 return 0;
917 }
918
919 void __attribute__ ((noreturn))
920 blocking_loop(int fd, struct tbuf *(*handler) (void *state, struct tbuf *), void *state)
921 {
922 struct tbuf *request, *request_body, *reply, *reply_body;
923 u32 *request_size, reply_size;
924 int result = EXIT_FAILURE;
925
926 for (;;) {
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
927 request = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
928 if (read_atleast(fd, request, sizeof(u32)) < 0) {
929 result = EXIT_SUCCESS;
930 break;
931 }
932
933 if ((request_size = tbuf_peek(request, sizeof(u32))) == NULL) {
934 result = EXIT_SUCCESS;
935 break;
936 }
937 *request_size = ntohl(*request_size);
938
939 if (read_atleast(fd, request, *request_size) < 0) {
940 result = EXIT_SUCCESS;
941 break;
942 }
943
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
944 request_body = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
945 tbuf_append(request_body, fiber_msg(request)->data, fiber_msg(request)->data_len);
946
947 reply_body = handler(state, request_body);
948
949 reply_size = sizeof(struct fiber_msg) + reply_body->len;
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
950 reply = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
951 tbuf_reserve(reply, reply_size);
952
953 fiber_msg(reply)->fid = fiber_msg(request)->fid;
954 fiber_msg(reply)->data_len = reply_body->len;
955 memcpy(fiber_msg(reply)->data, reply_body->data, reply_body->len);
956
957 reply_size = htonl(reply_size);
958 if (write_all(fd, &reply_size, sizeof(reply_size)) < 0) {
959 result = EXIT_FAILURE;
960 break;
961 }
962 if (write_all(fd, reply->data, reply->len) < 0) {
963 result = EXIT_FAILURE;
964 break;
965 }
966
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
967 prelease(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
968 }
969
970 handler(state, NULL);
971 exit(result);
972 }
973
974 static void
65df38a @kostja Implement blueprint 'consistent-header-guards'.
kostja authored
975 inbox2sock(void *_data __attribute__((unused)))
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
976 {
977 struct tbuf *msg, *out;
978 struct msg *m;
979 u32 len;
980
981 for (;;) {
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
982 out = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
983
984 do {
985 m = read_inbox();
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
986 msg = tbuf_alloc(fiber->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
987
988 /* TODO: do not copy message twice */
989 tbuf_reserve(msg, sizeof(struct fiber_msg) + m->msg->len);
990 fiber_msg(msg)->fid = m->sender_fid;
991 fiber_msg(msg)->data_len = m->msg->len;
992 memcpy(fiber_msg(msg)->data, m->msg->data, m->msg->len);
993 len = htonl(msg->len);
994
995 tbuf_append(out, &len, sizeof(len));
996 tbuf_append(out, msg->data, msg->len);
997 } while (ring_size(fiber->inbox) > 0);
998
999 if (fiber_write(out->data, out->len) != out->len)
1000 panic("child is dead");
1001 fiber_gc();
1002 }
1003 }
1004
1005 static void
65df38a @kostja Implement blueprint 'consistent-header-guards'.
kostja authored
1006 sock2inbox(void *_data __attribute__((unused)))
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1007 {
1008 struct tbuf *msg, *msg_body;
1009 struct fiber *recipient;
1010 u32 len;
1011
1012 for (;;) {
1013 if (fiber->rbuf->len < sizeof(len)) {
1014 if (fiber_bread(fiber->rbuf, sizeof(len)) <= 0)
1015 panic("child is dead");
1016 }
1017
1018 len = read_u32(fiber->rbuf);
1019
1020 len = ntohl(len);
1021 if (fiber->rbuf->len < len) {
1022 if (fiber_bread(fiber->rbuf, len) <= 0)
1023 panic("child is dead");
1024 }
1025
1026 msg = tbuf_split(fiber->rbuf, len);
1027 recipient = fid2fiber(fiber_msg(msg)->fid);
1028 if (recipient == NULL) {
1029 say_error("recipient is lost");
1030 continue;
1031 }
1032
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
1033 msg_body = tbuf_alloc(recipient->gc_pool);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1034 tbuf_append(msg_body, fiber_msg(msg)->data, fiber_msg(msg)->data_len);
1035 write_inbox(recipient, msg_body);
1036 fiber_gc();
1037 }
1038 }
1039
1040 struct child *
f7de4af @delamonpansie [core] indent
delamonpansie authored
1041 spawn_child(const char *name, int inbox_size, struct tbuf *(*handler) (void *, struct tbuf *),
1042 void *state)
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1043 {
abc75c3 @kostja feature-feeder-in-core: review fixes
kostja authored
1044 char proxy_name[FIBER_NAME_MAXLEN];
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1045 int socks[2];
1046 int pid;
1047
1048 if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1) {
1049 say_syserror("socketpair");
1050 return NULL;
1051 }
1052
1053 if ((pid = fork()) == -1) {
1054 say_syserror("fork");
1055 return NULL;
1056 }
1057
1058 if (pid) {
1059 close(socks[0]);
1060 if (set_nonblock(socks[1]) == -1)
1061 return NULL;
1062
1063 struct child *c = palloc(eter_pool, sizeof(*c));
1064 c->pid = pid;
1065
abc75c3 @kostja feature-feeder-in-core: review fixes
kostja authored
1066 snprintf(proxy_name, sizeof(proxy_name), "%s/sock2inbox", name);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1067 c->in = fiber_create(proxy_name, socks[1], inbox_size, sock2inbox, NULL);
1068 fiber_call(c->in);
abc75c3 @kostja feature-feeder-in-core: review fixes
kostja authored
1069 snprintf(proxy_name, sizeof(proxy_name), "%s/inbox2sock", name);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1070 c->out = fiber_create(proxy_name, socks[1], inbox_size, inbox2sock, NULL);
944638a @rtokarev Add fiber->flags: FIBER_READING_INBOX and FIBER_RAISE.
rtokarev authored
1071 c->out->flags |= FIBER_READING_INBOX;
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1072 return c;
1073 } else {
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
1074 char child_name[FIBER_NAME_MAXLEN];
c28da21 @kostja Merge with the master.
kostja authored
1075 /*
1076 * Move to an own process group, to not receive
1077 * signals from the controlling tty.
1078 */
1079 setpgid(0, 0);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1080 salloc_destroy();
1081 close_all_xcpt(2, socks[0], sayfd);
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
1082 snprintf(child_name, sizeof(child_name), "%s/child", name);
1083 fiber_set_name(&sched, child_name);
cc14fc8 @kostja feature-feeder-in-core: review fixes
kostja authored
1084 set_proc_title("%s%s", name, custom_proc_title);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1085 say_crit("%s initialized", name);
1086 blocking_loop(socks[0], handler, state);
1087 }
1088 }
1089
1090 static void
1091 tcp_server_handler(void *data)
1092 {
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
1093 struct fiber_server *server = (void*) data;
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1094 struct fiber *h;
abc75c3 @kostja feature-feeder-in-core: review fixes
kostja authored
1095 char name[FIBER_NAME_MAXLEN];
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1096 int fd;
1097 int one = 1;
1098
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1099 if (fiber_serv_socket(fiber, server->port, true, 0.1) != 0) {
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1100 say_error("init server socket on port %i fail", server->port);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1101 exit(EX_OSERR);
1102 }
1103
c3538de @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1104 if (server->on_bind != NULL) {
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1105 server->on_bind(server->data);
c3538de @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1106 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1107
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
1108 fiber_io_start(fiber->fd, EV_READ);
c28da21 @kostja Merge with the master.
kostja authored
1109 for (;;) {
1110 fiber_io_yield();
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1111
1112 while ((fd = accept(fiber->fd, NULL, NULL)) > 0) {
1113 if (set_nonblock(fd) == -1) {
1114 say_error("can't set nonblock");
1115 close(fd);
1116 continue;
1117 }
7ff8c17 @kostja Test-runner: support for SQL.
kostja authored
1118 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
1119 &one, sizeof(one)) == -1) {
1120 say_syserror("setsockopt failed");
1121 /* Do nothing, not a fatal error. */
1122 }
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1123
1124 snprintf(name, sizeof(name), "%i/handler", server->port);
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
1125 h = fiber_create(name, fd, -1, server->handler, server->data);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1126 if (h == NULL) {
1127 say_error("can't create handler fiber, dropping client connection");
1128 close(fd);
1129 continue;
1130 }
1131
1132 h->has_peer = true;
1133 fiber_call(h);
1134 }
1135 if (fd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
1136 say_syserror("accept");
1137 continue;
1138 }
1139 }
099dff0 @kostja feature-feeder-in-core: review fixes
kostja authored
1140 fiber_io_stop(fiber->fd, EV_READ);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1141 }
1142
1143 struct fiber *
cc14fc8 @kostja feature-feeder-in-core: review fixes
kostja authored
1144 fiber_server(const char *name, int port, void (*handler) (void *data), void *data,
f7de4af @delamonpansie [core] indent
delamonpansie authored
1145 void (*on_bind) (void *data))
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1146 {
abc75c3 @kostja feature-feeder-in-core: review fixes
kostja authored
1147 char server_name[FIBER_NAME_MAXLEN];
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1148 struct fiber_server *server;
1149 struct fiber *s;
1150
cc14fc8 @kostja feature-feeder-in-core: review fixes
kostja authored
1151 snprintf(server_name, sizeof(server_name), "%i/%s", port, name);
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
1152 server = palloc(eter_pool, sizeof(struct fiber_server));
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1153 assert(server != NULL);
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
1154 server->data = data;
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1155 server->port = port;
1156 server->handler = handler;
1157 server->on_bind = on_bind;
0e118a4 @kostja Fix a memory leak when remote address was allocated in eter_pool.
kostja authored
1158 s = fiber_create(server_name, -1, -1, tcp_server_handler, server);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1159
1160 fiber_call(s); /* give a handler a chance */
1161 return s;
1162 }
1163
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1164 /** create new fiber's socket and set standat options. */
1165 static int
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1166 create_socket(struct fiber *fiber)
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1167 {
1168 if (fiber->fd != -1) {
1169 say_error("fiber is already has socket");
1170 goto create_socket_fail;
1171 }
1172
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1173 fiber->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1174 if (fiber->fd == -1) {
1175 say_syserror("socket");
1176 goto create_socket_fail;
1177 }
1178
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1179 int one = 1;
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1180 if (setsockopt(fiber->fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
1181 say_syserror("setsockopt");
1182 goto create_socket_fail;
1183 }
1184
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1185 struct linger ling = { 0, 0 };
c3538de @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1186 if (setsockopt(fiber->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)) != 0 ||
1187 setsockopt(fiber->fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0 ||
1188 setsockopt(fiber->fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) != 0) {
1189 say_syserror("setsockopt");
1190 goto create_socket_fail;
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1191 }
1192
1193 if (set_nonblock(fiber->fd) == -1) {
1194 goto create_socket_fail;
1195 }
1196
1197 return 0;
1198
1199 create_socket_fail:
1200
1201 if (fiber->fd != -1) {
1202 close(fiber->fd);
1203 }
1204 return -1;
1205 }
1206
1207 /** Create server socket and bind his on port. */
1208 int
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1209 fiber_serv_socket(struct fiber *fiber, unsigned short port, bool retry, ev_tstamp delay)
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1210 {
1211 const ev_tstamp min_delay = 0.001; /* minimal delay is 1 msec */
1212 struct sockaddr_in sin;
1213 bool warning_said = false;
1214
1215 if (delay < min_delay) {
1216 delay = min_delay;
1217 }
1218
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1219 if (create_socket(fiber) != 0) {
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1220 return -1;
1221 }
1222
1223 /* clean sockaddr_in struct */
1224 memset(&sin, 0, sizeof(struct sockaddr_in));
1225
1226 /* fill sockaddr_in struct */
1227 sin.sin_family = AF_INET;
1228 sin.sin_port = htons(port);
1229 if (strcmp(cfg.bind_ipaddr, "INADDR_ANY") == 0) {
1230 sin.sin_addr.s_addr = INADDR_ANY;
1231 } else {
1232 if (!inet_aton(cfg.bind_ipaddr, &sin.sin_addr)) {
1233 say_syserror("inet_aton");
1234 return -1;
1235 }
1236 }
1237
1238 while (true) {
1239 if (bind(fiber->fd, (struct sockaddr *)&sin, sizeof(sin)) != 0) {
1240 if (retry && (errno == EADDRINUSE)) {
1241 /* retry mode, try, to bind after delay */
1242 goto sleep_and_retry;
1243 }
1244 say_syserror("bind");
1245 return -1;
1246 }
6a8f51b @kostya-shulgin Merge branch 'master' into feature-feeder-in-core-merge
kostya-shulgin authored
1247 if (listen(fiber->fd, cfg.backlog) != 0) {
93d1296 @kostya-shulgin Implement blueprint: 'feature-feeder-in-core'.
kostya-shulgin authored
1248 if (retry && (errno == EADDRINUSE)) {
1249 /* retry mode, try, to bind after delay */
1250 goto sleep_and_retry;
1251 }
1252 say_syserror("listen");
1253 return -1;
1254 }
1255
1256 say_info("bound to port %i", port);
1257 break;
1258
1259 sleep_and_retry:
1260 if (!warning_said) {
1261 say_warn("port %i is already in use, "
1262 "will retry binding after %lf seconds.", port, delay);
1263 warning_said = true;
1264 }
1265 fiber_sleep(delay);
1266 }
1267
1268 return 0;
1269 }
ee90357 @delamonpansie [core] Print symbols in backtraces.
delamonpansie authored
1270
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1271 void
1272 fiber_info(struct tbuf *out)
1273 {
1274 struct fiber *fiber;
ee90357 @delamonpansie [core] Print symbols in backtraces.
delamonpansie authored
1275
fcba112 @kostja cmake-based-build: merging with master
kostja authored
1276 tbuf_printf(out, "fibers:" CRLF);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1277 SLIST_FOREACH(fiber, &fibers, link) {
1278 void *stack_top = fiber->coro.stack + fiber->coro.stack_size;
1279
fcba112 @kostja cmake-based-build: merging with master
kostja authored
1280 tbuf_printf(out, " - fid: %4i" CRLF, fiber->fid);
1281 tbuf_printf(out, " csw: %i" CRLF, fiber->csw);
1282 tbuf_printf(out, " name: %s" CRLF, fiber->name);
1283 tbuf_printf(out, " inbox: %i" CRLF, ring_size(fiber->inbox));
1284 tbuf_printf(out, " fd: %4i" CRLF, fiber->fd);
1285 tbuf_printf(out, " peer: %s" CRLF, fiber_peer_name(fiber));
1286 tbuf_printf(out, " stack: %p" CRLF, stack_top);
82e966b @kostja Blueprint 'cmake-based-build' initial commit.
kostja authored
1287 #ifdef ENABLE_BACKTRACE
fcba112 @kostja cmake-based-build: merging with master
kostja authored
1288 tbuf_printf(out, " backtrace:" CRLF "%s",
85f92cd @delamonpansie [core] Fix wording.
delamonpansie authored
1289 backtrace(fiber->last_stack_frame,
1290 fiber->coro.stack, fiber->coro.stack_size));
82e966b @kostja Blueprint 'cmake-based-build' initial commit.
kostja authored
1291 #endif /* ENABLE_BACKTRACE */
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1292 }
1293 }
1294
1295 void
1296 fiber_init(void)
1297 {
1298 SLIST_INIT(&fibers);
384dbcd @kostja Fix for Bug#803864 Constant resize of fiber_registry
kostja authored
1299 fibers_registry = kh_init(fid2fiber, NULL);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1300
1301 ex_pool = palloc_create_pool("ex_pool");
1302
1303 memset(&sched, 0, sizeof(sched));
1304 sched.fid = 1;
f5c4069 @rtokarev Change location of fiber->name and palloc_pool->name fields from
rtokarev authored
1305 fiber_set_name(&sched, "sched");
d6965dc @kostja A pre-requisite patch for Lua stored procedures: fiber->pool.
kostja authored
1306 sched.gc_pool = palloc_create_pool(sched.name);
9b8dd70 @delamonpansie Initial public import
delamonpansie authored
1307
1308 sp = call_stack;
1309 fiber = &sched;
1310 last_used_fid = 100;
1311 }
Something went wrong with that request. Please try again.