-
Notifications
You must be signed in to change notification settings - Fork 376
/
fiber.c
1953 lines (1734 loc) · 46.7 KB
/
fiber.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "fiber.h"
#include <trivia/config.h>
#include <trivia/util.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pmatomic.h>
#include "assoc.h"
#include "memory.h"
#include "trigger.h"
#include "errinj.h"
#include "clock.h"
extern void cord_on_yield(void);
#if ENABLE_FIBER_TOP
static inline void
clock_stat_add_delta(struct clock_stat *stat, uint64_t clock_delta)
{
stat->delta += clock_delta;
}
/**
* Calculate the exponential moving average for the clock deltas
* per loop iteration. The coeffitient is 1/16.
*/
static inline uint64_t
clock_diff_accumulate(uint64_t acc, uint64_t delta)
{
return delta / 16 + 15 * acc / 16;
}
static inline void
clock_stat_update(struct clock_stat *stat, double nsec_per_clock)
{
stat->acc = clock_diff_accumulate(stat->acc, stat->delta);
stat->prev_delta = stat->delta;
stat->cputime += stat->delta * nsec_per_clock;
stat->delta = 0;
}
static inline void
clock_stat_reset(struct clock_stat *stat)
{
stat->acc = 0;
stat->delta = 0;
stat->prev_delta = 0;
stat->cputime = 0;
}
static void
cpu_stat_start(struct cpu_stat *stat)
{
stat->prev_clock = clock_monotonic64();
stat->cpu_miss_count = 0;
/*
* We want to measure thread cpu time here to calculate
* each fiber's cpu time, so don't use libev's ev_now() or
* ev_time() since they use either monotonic or realtime
* system clocks.
*/
stat->prev_cputime = clock_thread64();
}
static inline void
cpu_stat_reset(struct cpu_stat *stat)
{
stat->prev_cpu_miss_count = 0;
cpu_stat_start(stat);
}
static uint64_t
cpu_stat_on_csw(struct cpu_stat *stat)
{
uint64_t delta, clock = clock_monotonic64();
/*
* Just in case. On Linux CLOCK_MONOTONIC guarantee that the
* time returned by consecutive calls to clock_gettime will not
* go backwards, however for other systems it might not be true.
*/
if (clock < stat->prev_clock)
delta = 0;
else
delta = clock - stat->prev_clock;
stat->prev_clock = clock;
return delta;
}
static double
cpu_stat_end(struct cpu_stat *stat, struct clock_stat *cord_clock_stat)
{
stat->prev_cpu_miss_count = stat->cpu_miss_count;
stat->cpu_miss_count = 0;
double nsec_per_clock = 0;
uint64_t delta_time = clock_thread64();
if (delta_time > stat->prev_cputime && cord_clock_stat->delta > 0) {
delta_time -= stat->prev_cputime;
nsec_per_clock = (double)delta_time / cord()->clock_stat.delta;
}
return nsec_per_clock;
}
#endif /* ENABLE_FIBER_TOP */
#include <valgrind/memcheck.h>
static int (*fiber_invoke)(fiber_func f, va_list ap);
#if ENABLE_ASAN
#include <sanitizer/asan_interface.h>
#define ASAN_START_SWITCH_FIBER(fake_stack_save, will_switch_back, bottom, \
size) \
/* \
* When leaving a fiber definitely, NULL must be passed as the first \
* argument so that the fake stack is destroyed. \
*/ \
void *fake_stack_save = NULL; \
__sanitizer_start_switch_fiber((will_switch_back) ? &fake_stack_save \
: NULL, \
(bottom), (size))
#if ASAN_INTERFACE_OLD
#define ASAN_FINISH_SWITCH_FIBER(fake_stack_save) \
__sanitizer_finish_switch_fiber(fake_stack_save)
#else
#define ASAN_FINISH_SWITCH_FIBER(fake_stack_save) \
__sanitizer_finish_switch_fiber(fake_stack_save, NULL, NULL)
#endif
#else
#define ASAN_START_SWITCH_FIBER(fake_stack_save, will_switch_back, bottom, size)
#define ASAN_FINISH_SWITCH_FIBER(fake_stack_save)
#endif
static inline int
fiber_madvise(void *addr, size_t len, int advice)
{
int rc = 0;
ERROR_INJECT(ERRINJ_FIBER_MADVISE, {
errno = ENOMEM;
rc = -1;
});
if (rc != 0 || madvise(addr, len, advice) != 0) {
diag_set(SystemError, "fiber madvise failed");
return -1;
}
return 0;
}
static inline int
fiber_mprotect(void *addr, size_t len, int prot)
{
int rc = 0;
struct errinj *inj = errinj(ERRINJ_FIBER_MPROTECT, ERRINJ_INT);
if (inj != NULL && inj->iparam == prot) {
errno = ENOMEM;
rc = -1;
}
if (rc != 0 || mprotect(addr, len, prot) != 0) {
diag_set(SystemError, "fiber mprotect failed");
return -1;
}
return 0;
}
#if ENABLE_FIBER_TOP
static __thread bool fiber_top_enabled = false;
#endif /* ENABLE_FIBER_TOP */
#ifdef ENABLE_BACKTRACE
static __thread bool fiber_parent_backtrace_enabled;
#endif /* ENABLE_BACKTRACE */
/**
* An action performed each time a context switch happens.
* Used to count each fiber's processing time.
*/
static inline void
clock_set_on_csw(struct fiber *caller)
{
caller->csw++;
#if ENABLE_FIBER_TOP
if (!fiber_top_enabled)
return;
uint64_t delta = cpu_stat_on_csw(&cord()->cpu_stat);
clock_stat_add_delta(&cord()->clock_stat, delta);
clock_stat_add_delta(&caller->clock_stat, delta);
#endif /* ENABLE_FIBER_TOP */
}
/*
* Defines a handler to be executed on exit from cord's thread func,
* accessible via cord()->on_exit (normally NULL). It is used to
* implement cord_cojoin.
*/
struct cord_on_exit {
void (*callback)(void*);
void *argument;
};
/*
* A special value distinct from any valid pointer to cord_on_exit
* structure AND NULL. This value is stored in cord()->on_exit by the
* thread function prior to thread termination.
*/
static const struct cord_on_exit cord_on_exit_sentinel = { NULL, NULL };
#define CORD_ON_EXIT_WONT_RUN (&cord_on_exit_sentinel)
static struct cord main_cord;
__thread struct cord *cord_ptr = NULL;
pthread_t main_thread_id;
static size_t page_size;
static int stack_direction;
#ifndef FIBER_STACK_SIZE_DEFAULT
#error "Default fiber stack size is not set"
#endif
enum {
/* The minimum allowable fiber stack size in bytes */
FIBER_STACK_SIZE_MINIMAL = 16384,
/* Stack size watermark in bytes. */
FIBER_STACK_SIZE_WATERMARK = 65536,
};
/** Default fiber attributes */
static const struct fiber_attr fiber_attr_default = {
.stack_size = FIBER_STACK_SIZE_DEFAULT,
.flags = FIBER_DEFAULT_FLAGS
};
#ifdef HAVE_MADV_DONTNEED
/*
* Random values generated with uuid.
* Used for stack poisoning.
*/
static const uint64_t poison_pool[] = {
0x74f31d37285c4c37, 0xb10269a05bf10c29,
0x0994d845bd284e0f, 0x9ffd4f7129c184df,
0x357151e6711c4415, 0x8c5e5f41aafe6f28,
0x6917dd79e78049d5, 0xba61957c65ca2465,
};
/*
* We poison by 8 bytes as it's natural for stack
* step on x86-64. Also 128 byte gap between poison
* values should cover common cases.
*/
#define POISON_SIZE (sizeof(poison_pool) / sizeof(poison_pool[0]))
#define POISON_OFF (128 / sizeof(poison_pool[0]))
#endif /* HAVE_MADV_DONTNEED */
void
fiber_attr_create(struct fiber_attr *fiber_attr)
{
*fiber_attr = fiber_attr_default;
}
struct fiber_attr *
fiber_attr_new(void)
{
struct fiber_attr *fiber_attr = malloc(sizeof(*fiber_attr));
if (fiber_attr == NULL) {
diag_set(OutOfMemory, sizeof(*fiber_attr),
"runtime", "fiber attr");
return NULL;
}
fiber_attr_create(fiber_attr);
return fiber_attr;
}
void
fiber_attr_delete(struct fiber_attr *fiber_attr)
{
free(fiber_attr);
}
int
fiber_attr_setstacksize(struct fiber_attr *fiber_attr, size_t stack_size)
{
if (stack_size < FIBER_STACK_SIZE_MINIMAL) {
errno = EINVAL;
diag_set(SystemError, "stack size is too small");
return -1;
}
fiber_attr->stack_size = stack_size;
if (stack_size != FIBER_STACK_SIZE_DEFAULT) {
fiber_attr->flags |= FIBER_CUSTOM_STACK;
} else {
fiber_attr->flags &= ~FIBER_CUSTOM_STACK;
}
return 0;
}
size_t
fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
{
return fiber_attr != NULL ? fiber_attr->stack_size :
fiber_attr_default.stack_size;
}
void
fiber_on_stop(struct fiber *f)
{
/*
* The most common case is when the list is empty. Do an
* inlined check before calling trigger_run().
*/
if (rlist_empty(&f->on_stop))
return;
if (trigger_run(&f->on_stop, f) != 0)
panic("On_stop triggers can't fail");
/*
* All on_stop triggers are supposed to remove themselves.
* So as no to waste time on that here, and to make them
* all work uniformly.
*/
assert(rlist_empty(&f->on_stop));
}
static void
fiber_recycle(struct fiber *fiber);
static void
fiber_stack_recycle(struct fiber *fiber);
static void
fiber_delete(struct cord *cord, struct fiber *f);
/**
* Try to delete a fiber right now or later if can't do now. The latter happens
* for self fiber - can't delete own stack.
*/
static void
cord_add_garbage(struct cord *cord, struct fiber *f);
/**
* True if a fiber with `fiber_flags` can be reused.
* A fiber can not be reused if it is somehow non-standard.
*/
static bool
fiber_is_reusable(uint32_t fiber_flags)
{
/* For now we can not reuse fibers with custom stack size. */
return (fiber_flags & FIBER_CUSTOM_STACK) == 0;
}
/**
* Transfer control to callee fiber.
*/
static void
fiber_call_impl(struct fiber *callee)
{
struct fiber *caller = fiber();
struct cord *cord = cord();
/* Ensure we aren't switching to a fiber parked in fiber_loop */
assert(callee->f != NULL && callee->fid != 0);
assert(callee->flags & FIBER_IS_READY || callee == &cord->sched);
assert(! (callee->flags & FIBER_IS_DEAD));
/*
* Ensure the callee was removed from cord->ready list.
* If it wasn't, the callee will observe a 'spurious' wakeup
* later, due to a fiber_wakeup() performed in the past.
*/
assert(rlist_empty(&callee->state));
assert(caller);
assert(caller != callee);
assert((caller->flags & FIBER_IS_RUNNING) != 0);
assert((callee->flags & FIBER_IS_RUNNING) == 0);
caller->flags &= ~FIBER_IS_RUNNING;
cord->fiber = callee;
callee->flags = (callee->flags & ~FIBER_IS_READY) | FIBER_IS_RUNNING;
ASAN_START_SWITCH_FIBER(asan_state, 1,
callee->stack,
callee->stack_size);
coro_transfer(&caller->ctx, &callee->ctx);
ASAN_FINISH_SWITCH_FIBER(asan_state);
}
void
fiber_call(struct fiber *callee)
{
struct fiber *caller = fiber();
assert(! (caller->flags & FIBER_IS_READY));
assert(rlist_empty(&callee->state));
assert(! (callee->flags & FIBER_IS_READY));
/** By convention, these triggers must not throw. */
if (! rlist_empty(&caller->on_yield))
trigger_run(&caller->on_yield, NULL);
if (cord_is_main())
cord_on_yield();
clock_set_on_csw(caller);
callee->caller = caller;
callee->flags |= FIBER_IS_READY;
caller->flags |= FIBER_IS_READY;
fiber_call_impl(callee);
}
void
fiber_start(struct fiber *callee, ...)
{
va_start(callee->f_data, callee);
fiber_call(callee);
va_end(callee->f_data);
}
bool
fiber_checkstack(void)
{
return false;
}
static void
fiber_make_ready(struct fiber *f)
{
/**
* Do nothing if the fiber is already in cord->ready
* list *or* is in the call chain created by
* fiber_schedule_list(). While it's harmless to re-add
* a fiber to cord->ready, even if it's already there,
* but the same game is deadly when the fiber is in
* the callee list created by fiber_schedule_list().
*
* To put it another way, fiber_make_ready() is a 'request' to
* schedule the fiber for execution, and once it is executing
* the 'make ready' request is considered complete and it must be
* removed.
*/
assert((f->flags & (FIBER_IS_DEAD | FIBER_IS_READY)) == 0);
struct cord *cord = cord();
if (rlist_empty(&cord->ready)) {
/*
* ev_feed_event(EV_CUSTOM) gets scheduled in the
* same event loop iteration, and we rely on this
* for quick scheduling. For a wakeup which
* actually can invoke a poll() in libev,
* use fiber_sleep(0)
*/
ev_feed_event(cord->loop, &cord->wakeup_event, EV_CUSTOM);
}
/**
* Removes the fiber from whatever wait list it is on.
*
* It's critical that the newly scheduled fiber is
* added to the tail of the list, to preserve correct
* transaction commit order after a successful WAL write.
* (see tx_schedule_commit()/tx_schedule_rollback() in
* box/wal.cc)
*/
rlist_move_tail_entry(&cord->ready, f, state);
f->flags |= FIBER_IS_READY;
}
void
fiber_wakeup(struct fiber *f)
{
/*
* DEAD fiber can be lingering in the cord fiber list
* if it is joinable. And once its execution is complete
* it should be reaped with fiber_join() call.
*
* Still our API allows to call fiber_wakeup() on dead
* joinable fibers so simply ignore it.
*/
assert((f->flags & FIBER_IS_DEAD) == 0 ||
(f->flags & FIBER_IS_JOINABLE) != 0);
const int no_flags = FIBER_IS_READY | FIBER_IS_DEAD | FIBER_IS_RUNNING;
if ((f->flags & no_flags) == 0)
fiber_make_ready(f);
}
/** Cancel the subject fiber.
*
* Note: cancelation is asynchronous. Use fiber_join() to wait for the
* cancelation to complete.
*
* A fiber may opt to set FIBER_IS_CANCELLABLE to false, and never test
* that it was cancelled. Such fiber can not ever be cancelled.
* However, as long as most of the cooperative code calls
* fiber_testcancel(), most of the fibers are cancellable.
*
* The fiber which is cancelled, has FiberIsCancelled raised
* in it. For cancellation to work, this exception type should be
* re-raised whenever (if) it is caught.
*/
void
fiber_cancel(struct fiber *f)
{
/**
* Do nothing if the fiber is dead, since cancelling
* the fiber would clear the diagnostics area and
* the cause of death would be lost.
*/
if (fiber_is_dead(f)) {
if ((f->flags & FIBER_IS_JOINABLE) == 0) {
panic("Cancel of a finished and already "
"recycled fiber");
}
assert(f->fid != 0);
return;
}
f->flags |= FIBER_IS_CANCELLED;
/**
* Don't wake self and zombies.
*/
if ((f->flags & FIBER_IS_CANCELLABLE) != 0)
fiber_wakeup(f);
}
/**
* Change the current cancellation state of a fiber. This is not
* a cancellation point.
*/
bool
fiber_set_cancellable(bool yesno)
{
bool prev = fiber()->flags & FIBER_IS_CANCELLABLE;
if (yesno == true)
fiber()->flags |= FIBER_IS_CANCELLABLE;
else
fiber()->flags &= ~FIBER_IS_CANCELLABLE;
return prev;
}
bool
fiber_is_cancelled(void)
{
return fiber()->flags & FIBER_IS_CANCELLED;
}
void
fiber_set_joinable(struct fiber *fiber, bool yesno)
{
if (yesno == true)
fiber->flags |= FIBER_IS_JOINABLE;
else
fiber->flags &= ~FIBER_IS_JOINABLE;
}
/** Report libev time (cheap). */
double
fiber_time(void)
{
return ev_now(loop());
}
int64_t
fiber_time64(void)
{
return (int64_t)(ev_now(loop()) * 1000000 + 0.5);
}
double
fiber_clock(void)
{
return ev_monotonic_now(loop());
}
int64_t
fiber_clock64(void)
{
return (int64_t)(ev_monotonic_now(loop()) * 1000000 + 0.5);
}
/**
* Move current fiber to the end of ready fibers list and switch to next
*/
void
fiber_reschedule(void)
{
struct fiber *f = fiber();
/*
* The current fiber can't be dead, the flag is set when the fiber
* function returns. Can't be ready, because such status is assigned
* only to the queued fibers in the ready-list.
*/
assert((f->flags & (FIBER_IS_READY | FIBER_IS_DEAD)) == 0);
fiber_make_ready(f);
fiber_yield();
}
int
fiber_join(struct fiber *fiber)
{
return fiber_join_timeout(fiber, TIMEOUT_INFINITY);
}
bool
fiber_wait_on_deadline(struct fiber *fiber, double deadline)
{
rlist_add_tail_entry(&fiber->wake, fiber(), state);
return fiber_yield_deadline(deadline);
}
int
fiber_join_timeout(struct fiber *fiber, double timeout)
{
if ((fiber->flags & FIBER_IS_JOINABLE) == 0)
panic("the fiber is not joinable");
if (!fiber_is_dead(fiber)) {
double deadline = fiber_clock() + timeout;
while (!fiber_wait_on_deadline(fiber, deadline) &&
!fiber_is_dead(fiber)) {
}
if (!fiber_is_dead(fiber)) {
/*
* Not exactly the right error message for this place.
* Error message is generated based on the ETIMEDOUT
* code, that is used for network timeouts in linux. But
* in other places, this type of error is always used
* when the timeout expires, regardless of whether it is
* related to the network (see cbus_call for example).
*/
diag_set(TimedOut);
return -1;
}
}
assert((fiber->flags & FIBER_IS_RUNNING) == 0);
assert((fiber->flags & FIBER_IS_JOINABLE) != 0);
fiber->flags &= ~FIBER_IS_JOINABLE;
/* Move exception to the caller */
int ret = fiber->f_ret;
if (ret != 0) {
assert(!diag_is_empty(&fiber->diag));
diag_move(&fiber->diag, &fiber()->diag);
}
/* The fiber is already dead. */
fiber_recycle(fiber);
return ret;
}
/**
* Implementation of `fiber_yield()` and `fiber_yield_final()`.
* `will_switch_back` argument is used only by ASAN.
*/
static void
fiber_yield_impl(MAYBE_UNUSED bool will_switch_back)
{
struct cord *cord = cord();
struct fiber *caller = cord->fiber;
struct fiber *callee = caller->caller;
caller->caller = &cord->sched;
/** By convention, these triggers must not throw. */
if (! rlist_empty(&caller->on_yield))
trigger_run(&caller->on_yield, NULL);
if (cord_is_main())
cord_on_yield();
clock_set_on_csw(caller);
assert(callee->flags & FIBER_IS_READY || callee == &cord->sched);
assert(! (callee->flags & FIBER_IS_DEAD));
assert((caller->flags & FIBER_IS_RUNNING) != 0);
assert((callee->flags & FIBER_IS_RUNNING) == 0);
caller->flags &= ~FIBER_IS_RUNNING;
cord->fiber = callee;
callee->flags = (callee->flags & ~FIBER_IS_READY) | FIBER_IS_RUNNING;
ASAN_START_SWITCH_FIBER(asan_state, will_switch_back, callee->stack,
callee->stack_size);
coro_transfer(&caller->ctx, &callee->ctx);
ASAN_FINISH_SWITCH_FIBER(asan_state);
}
void
fiber_yield(void)
{
fiber_yield_impl(true);
}
/**
* Like `fiber_yield()`, but should be used when this is the last switch from
* a dead fiber to the scheduler.
*/
static void
fiber_yield_final(void)
{
fiber_yield_impl(false);
}
struct fiber_watcher_data {
struct fiber *f;
bool timed_out;
};
static void
fiber_schedule_timeout(ev_loop *loop,
ev_timer *watcher, int revents)
{
(void) loop;
(void) revents;
assert(fiber() == &cord()->sched);
struct fiber_watcher_data *state =
(struct fiber_watcher_data *) watcher->data;
state->timed_out = true;
fiber_wakeup(state->f);
}
/**
* @brief yield & check timeout
* @return true if timeout exceeded
*/
bool
fiber_yield_timeout(ev_tstamp delay)
{
struct ev_timer timer;
ev_timer_init(&timer, fiber_schedule_timeout, delay, 0);
struct fiber_watcher_data state = { fiber(), false };
timer.data = &state;
ev_timer_start(loop(), &timer);
fiber_yield();
ev_timer_stop(loop(), &timer);
return state.timed_out;
}
bool
fiber_yield_deadline(ev_tstamp deadline)
{
ev_tstamp timeout = deadline - ev_monotonic_now(loop());
return fiber_yield_timeout(timeout);
}
/**
* Yield the current fiber to events in the event loop.
*/
void
fiber_sleep(double delay)
{
/*
* libev sleeps at least backend_mintime, which is 1 ms in
* case of poll()/Linux, unless there are idle watchers.
* So, to properly implement fiber_sleep(0), i.e. a sleep
* with a zero timeout, we set up an idle watcher, and
* it triggers libev to poll() with zero timeout.
*/
if (delay == 0) {
ev_idle_start(loop(), &cord()->idle_event);
}
fiber_yield_timeout(delay);
if (delay == 0) {
ev_idle_stop(loop(), &cord()->idle_event);
}
}
void
fiber_schedule_cb(ev_loop *loop, ev_watcher *watcher, int revents)
{
(void) loop;
(void) revents;
struct fiber *fiber = watcher->data;
assert(fiber() == &cord()->sched);
fiber_wakeup(fiber);
}
static inline void
fiber_schedule_list(struct rlist *list)
{
struct fiber *first;
struct fiber *last;
/*
* Happens when a fiber exits and is removed from cord->ready
* resulting in the empty list.
*/
if (rlist_empty(list))
return;
first = last = rlist_shift_entry(list, struct fiber, state);
assert(last->flags & FIBER_IS_READY);
while (! rlist_empty(list)) {
last->caller = rlist_shift_entry(list, struct fiber, state);
last = last->caller;
assert(last->flags & FIBER_IS_READY);
}
last->caller = fiber();
assert(fiber() == &cord()->sched);
clock_set_on_csw(fiber());
fiber_call_impl(first);
}
static void
fiber_schedule_wakeup(ev_loop *loop, ev_async *watcher, int revents)
{
(void) loop;
(void) watcher;
(void) revents;
struct cord *cord = cord();
fiber_schedule_list(&cord->ready);
}
static void
fiber_schedule_idle(ev_loop *loop, ev_idle *watcher,
int revents)
{
(void) loop;
(void) watcher;
(void) revents;
}
struct fiber *
fiber_find(uint64_t fid)
{
struct mh_i64ptr_t *fiber_registry = cord()->fiber_registry;
mh_int_t k = mh_i64ptr_find(fiber_registry, fid, NULL);
if (k == mh_end(fiber_registry))
return NULL;
return mh_i64ptr_node(fiber_registry, k)->val;
}
static void
register_fid(struct fiber *fiber)
{
struct mh_i64ptr_node_t node = { fiber->fid, fiber };
mh_i64ptr_put(cord()->fiber_registry, &node, NULL, NULL);
}
static void
unregister_fid(struct fiber *fiber)
{
struct mh_i64ptr_node_t node = { fiber->fid, NULL };
mh_i64ptr_remove(cord()->fiber_registry, &node, NULL);
}
struct fiber *
fiber_self(void)
{
return fiber();
}
void
fiber_gc(void)
{
if (region_used(&fiber()->gc) < 128 * 1024) {
region_reset(&fiber()->gc);
return;
}
region_free(&fiber()->gc);
}
/** Common part of fiber_new() and fiber_recycle(). */
static void
fiber_reset(struct fiber *fiber)
{
rlist_create(&fiber->on_yield);
rlist_create(&fiber->on_stop);
#if ENABLE_FIBER_TOP
clock_stat_reset(&fiber->clock_stat);
#endif /* ENABLE_FIBER_TOP */
}
/** Destroy an active fiber and prepare it for reuse or delete it. */
static void
fiber_recycle(struct fiber *fiber)
{
assert((fiber->flags & FIBER_IS_DEAD) != 0);
/* no exceptions are leaking */
assert(diag_is_empty(&fiber->diag));
/* no pending wakeup */
assert(rlist_empty(&fiber->state));
fiber_stack_recycle(fiber);
fiber_reset(fiber);
fiber->name[0] = '\0';
fiber->f = NULL;
fiber->wait_pad = NULL;
memset(&fiber->storage, 0, sizeof(fiber->storage));
fiber->storage.lua.storage_ref = FIBER_LUA_NOREF;
fiber->storage.lua.fid_ref = FIBER_LUA_NOREF;
unregister_fid(fiber);
fiber->fid = 0;
region_free(&fiber->gc);
if (fiber_is_reusable(fiber->flags)) {
rlist_move_entry(&cord()->dead, fiber, link);
} else {
cord_add_garbage(cord(), fiber);
}
}
static void
fiber_loop(MAYBE_UNUSED void *data)
{
ASAN_FINISH_SWITCH_FIBER(NULL);
for (;;) {
struct fiber *fiber = fiber();
assert(fiber != NULL && fiber->f != NULL && fiber->fid != 0);
fiber->f_ret = fiber_invoke(fiber->f, fiber->f_data);
if (fiber->f_ret != 0) {
struct error *e = diag_last_error(&fiber->diag);
/* diag must not be empty on error */
assert(e != NULL || fiber->flags & FIBER_IS_CANCELLED);
/*
* For joinable fibers, it's the business
* of the caller to deal with the error.
*/
if (!(fiber->flags & FIBER_IS_JOINABLE)) {
if (!(fiber->flags & FIBER_IS_CANCELLED))
error_log(e);
diag_clear(&fiber()->diag);
}
} else {
/*
* Make sure a leftover exception does not
* propagate up to the joiner.
*/
diag_clear(&fiber()->diag);
}
fiber->flags |= FIBER_IS_DEAD;
while (! rlist_empty(&fiber->wake)) {
struct fiber *f;
f = rlist_shift_entry(&fiber->wake, struct fiber,
state);
assert(f != fiber);
fiber_wakeup(f);
}
fiber_on_stop(fiber);
/* reset pending wakeups */
rlist_del(&fiber->state);
if (! (fiber->flags & FIBER_IS_JOINABLE))
fiber_recycle(fiber);
/*
* Crash if spurious wakeup happens, don't call the old
* function again, ap is garbage by now.
*/
fiber->f = NULL;
/*
* Give control back to the scheduler.
* If the fiber is not reusable, this is its final yield.
*/
if (fiber_is_reusable(fiber->flags))
fiber_yield();
else
fiber_yield_final();
}
}