Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Introduce preemptable coroutines

- Includes an alternate PL_runops that will cede automatically

- $coro->preempt and $coro->no_preempt can be used to control preemption
  at any time.

- 3 different strategies for timesharing (background timer posix thread,
  opcode counting, and opcode counting + a call to nvtime())
  • Loading branch information...
commit 54902809d8d3ad054d75a0d8b7ed58df5c20127c 1 parent 9cf27d1
@nothingmuch authored
Showing with 473 additions and 17 deletions.
  1. +22 −0 Coro.pm
  2. +347 −17 Coro/State.xs
  3. +104 −0 t/19_preempt.t
View
22 Coro.pm
@@ -628,6 +628,28 @@ coro). This is a bug that will be fixed in some future version.
Similar to C<prio>, but subtract the given value from the priority (i.e.
higher values mean lower priority, just as in unix).
+=item $coro->preempt
+
+Enable automatic cedeing for a coroutine.
+
+Preempting allows CPU bound code which does not call cede to still be used
+without starving the other coroutines:
+
+ my $coro = async { factorial(5000) };
+ $coro->preempt;
+ cede;
+
+This cannot be enabled on the main coroutine because runops is already recursed
+into.
+
+=item $coro->no_preempt
+
+Disable preemption. Can be used to protect critical sections:
+
+ current->no_preempt;
+ ... will not cede automatically ...
+ current->preempt;
+
=item $olddesc = $coro->desc ($newdesc)
Sets (or gets in case the argument is missing) the description for this
View
364 Coro/State.xs
@@ -218,12 +218,13 @@ static struct coro_cctx *cctx_first;
static int cctx_count, cctx_idle;
enum {
- CC_MAPPED = 0x01,
- CC_NOREUSE = 0x02, /* throw this away after tracing */
- CC_TRACE = 0x04,
- CC_TRACE_SUB = 0x08, /* trace sub calls */
- CC_TRACE_LINE = 0x10, /* trace each statement */
- CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
+ CC_MAPPED = 0x01,
+ CC_NOREUSE = 0x02, /* throw this away after tracing */
+ CC_TRACE = 0x04,
+ CC_TRACE_SUB = 0x08, /* trace sub calls */
+ CC_TRACE_LINE = 0x10, /* trace each statement */
+ CC_TRACE_ALL = CC_TRACE_SUB | CC_TRACE_LINE,
+ CC_PREEMPTABLE = 0x20, /* alternate runops */
};
/* this is a structure representing a c-level coroutine */
@@ -245,7 +246,7 @@ typedef struct coro_cctx
#if CORO_USE_VALGRIND
int valgrind_id;
#endif
- unsigned char flags;
+ unsigned int flags;
} coro_cctx;
coro_cctx *cctx_current; /* the currently running cctx */
@@ -253,11 +254,12 @@ coro_cctx *cctx_current; /* the currently running cctx */
/*****************************************************************************/
enum {
- CF_RUNNING = 0x0001, /* coroutine is running */
- CF_READY = 0x0002, /* coroutine is ready */
- CF_NEW = 0x0004, /* has never been switched to */
- CF_DESTROYED = 0x0008, /* coroutine data has been freed */
- CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
+ CF_RUNNING = 0x0001, /* coroutine is running */
+ CF_READY = 0x0002, /* coroutine is ready */
+ CF_NEW = 0x0004, /* has never been switched to */
+ CF_DESTROYED = 0x0008, /* coroutine data has been freed */
+ CF_SUSPENDED = 0x0010, /* coroutine can't be scheduled */
+ CF_PREEMPTABLE = 0x0020, /* coroutine cedes automatically */
};
/* the structure where most of the perl state is stored, overlaid on the cxstack */
@@ -485,6 +487,249 @@ SvSTATE_ (pTHX_ SV *coro)
#define SvSTATE_hv(hv) ((struct coro *)CORO_MAGIC_NN ((SV *)hv, CORO_MAGIC_type_state)->mg_ptr)
#define SvSTATE_current SvSTATE_hv (SvRV (coro_current))
+
+
+/*****************************************************************************/
+/* preemptable runops loop */
+
+/* by default a background pthread that sleeps for 10ms and then sets a
+ * volatile flag is used to signal preemption
+ *
+ * define CORO_PREEMPT_OPCOUNT to cede every preempt_opcount ops instead of
+ * using the thread
+ *
+ * define CORO_PREEMPT_OPCOUNT_NVTIME to use opcounting + nvtime() to only
+ * switch after enough time has elapsed (like Coro::Storable)
+ *
+ * the default preemption is based on a background pthread setting a volatile
+ * int, somewhat similar to the way the ruby thread preemption is implemented
+ *
+ *
+ * TODO
+ *
+ * - make the various parameters customizable (quantum size, etc)
+ *
+ * - the different strategies can at least in principle coexist, the ifdef is
+ * for simplicity
+ *
+ * - I'm not sure what the correct (as in portable) way to gettimeofday and to
+ * manage pthreads is (windows is probably a nightmare?)
+ *
+ * - only start the background timer thread when a coro is made preemptable
+ * (thread creation is guarded by a mutex so this should be trivial)
+ *
+ */
+
+
+
+/* FIXME not sure where to put this stuff. It feels wrong to predeclare part of
+ * the public api though */
+static int api_cede (pTHX);
+
+
+
+/* when true this means a preemptable coro is executing
+ *
+ * using pthreads this is a shared var guarded by mutex. This is synchronized
+ * to CF_PREEMPTABLE as appropriate in coro_preempt_reset, but not necessarily
+ * to CC_PREEMPTABLE (which might be true even if this is false, inside a
+ * critical section) */
+static volatile bool preemptable = FALSE;
+
+
+INLINE void
+coro_preempt_reset_preemptable(pTHX)
+{
+ preemptable = (SvSTATE_current->flags & CF_PREEMPTABLE ? TRUE : FALSE );
+}
+
+
+
+#if defined(CORO_PREEMPT_OPCOUNT) || defined(CORO_PREEMPT_OPCOUNT_NVTIME)
+
+static unsigned int preempt_opcount;
+INLINE void
+coro_preempt_reset_opcount()
+{
+ preempt_opcount = 500; /* TODO customizable per coro */
+}
+
+#ifdef CORO_PREEMPT_OPCOUNT_NVTIME
+
+/* only cedes if enough time has passed */
+static NV preempt_next;
+
+static void
+coro_preempt_reset(pTHX)
+{
+ preempt_next = nvtime() + 0.01; /* TODO customizable per coro */
+ coro_preempt_reset_preemptable(aTHX);
+ coro_preempt_reset_opcount();
+}
+
+
+/* FIXME I guess I'm cargo culting perl's style here, but maybe it should just
+ * be an inline static function? */
+
+#define CORO_PREEMPT_CHECK() ({ \
+ if ( preemptable && coro_nready && --preempt_opcount == 0 ) \
+ { \
+ coro_preempt_reset_opcount(); \
+ if ( nvtime() >= preempt_next ) \
+ { \
+ coro_preempt_reset(aTHX); \
+ api_cede(aTHX); \
+ } \
+ } \
+ })
+
+
+#else /* CORO_PREEMPT_OPCOUNT_NVTIME */
+
+/* this variation simply cedes unconditionally every opcount ops */
+
+static void
+coro_preempt_reset(pTHX)
+{
+ coro_preempt_reset_preemptable(aTHX);
+ coro_preempt_reset_opcount();
+}
+
+#define CORO_PREEMPT_CHECK() ({ \
+ if ( preemptable && coro_nready && --preempt_opcount == 0 ) \
+ { \
+ coro_preempt_reset(aTHX); \
+ api_cede(aTHX); \
+ } \
+ })
+
+#endif /* CORO_PREEMPT_OPCOUNT_NVTIME */
+
+INLINE void
+coro_preempt_boot (pTHX) { }
+
+#else /* CORO_PREEMPT_OPCOUNT */
+
+/* this variation uses a background thread to set a volatile preempt_now flag,
+ * similar to how ruby swaps the active thread */
+
+#include <pthread.h>
+
+static volatile bool preempt_now = FALSE; /* signal to preempt, not guarded (race condition is OK) */
+
+static pthread_cond_t preempt_cond_var;
+static pthread_mutex_t preempt_mutex;
+static pthread_t timer_thread;
+
+/* signal to the background thread to either stop or start looping as appropriate */
+static void coro_preempt_reset(pTHX)
+{
+ if ( (SvSTATE_current->flags & CF_PREEMPTABLE ? TRUE : FALSE ) != preemptable )
+ {
+ /* the signal wakes up the background thread regardless of state, it
+ * checks the preemptable flag and switches mode accordingly */
+ pthread_mutex_lock(&preempt_mutex);
+ coro_preempt_reset_preemptable(aTHX);
+ preempt_now = FALSE;
+ pthread_cond_signal(&preempt_cond_var);
+ pthread_mutex_unlock(&preempt_mutex);
+ }
+}
+
+#define CORO_PREEMPT_CHECK() \
+ if ( preempt_now ) \
+ {\
+ preempt_now = FALSE; \
+ api_cede(aTHX); \
+ }
+
+/* produce a timespec for the next point in time a coro should be preempted */
+static struct timespec *
+timeslice_next(struct timespec *ts)
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ ts->tv_sec = tv.tv_sec;
+ ts->tv_nsec = 1000 * tv.tv_usec;
+
+
+ /* 10ms time slice TODO make this customizable per coro */
+ ts->tv_nsec += 10 * 1000000;
+
+ /* handle tv_nsec overflow */
+ while (ts->tv_nsec >= 1000000000)
+ {
+ ts->tv_sec++;
+ ts->tv_nsec -= 1000000000;
+ }
+
+ return ts;
+}
+
+
+static void *preempt_timer_loop (void *ptr)
+{
+ struct timespec ts;
+
+ pthread_mutex_lock(&preempt_mutex);
+ pthread_cond_signal(&preempt_cond_var);
+
+ for (;;)
+ {
+ /* this loop doesn't bother checking for errors since there is nothing we
+ * can do about them (all EINVAL). the pthread_cond_timedwait can return
+ * an ETIMEDOUT but that's expected */
+ if ( preemptable )
+ {
+ /* preemptable coro is running, wait for a cancelation signal for a single timeslice */
+ pthread_cond_timedwait(&preempt_cond_var, &preempt_mutex, timeslice_next(&ts));
+
+ /* if no cancellation arrived (ETIMEDOUT) then we set the preempttion
+ * flag and loop */
+ if ( preemptable )
+ preempt_now = TRUE;
+ }
+ else
+ /* wait for a coro to request preemption */
+ pthread_cond_wait(&preempt_cond_var, &preempt_mutex);
+ }
+
+ return NULL;
+}
+
+
+/* TODO only start background thread when necessary */
+static void
+coro_preempt_boot(pTHX)
+{
+ pthread_mutex_init(&preempt_mutex, NULL);
+ pthread_cond_init(&preempt_cond_var, NULL);
+ pthread_mutex_lock(&preempt_mutex);
+ pthread_create(&timer_thread, NULL, preempt_timer_loop, NULL);
+ pthread_cond_wait(&preempt_cond_var, &preempt_mutex);
+ pthread_mutex_unlock(&preempt_mutex);
+}
+
+
+#endif /* CORO_PREEMPT_OPCOUNT */
+
+/* derived from Perl_runops_standard */
+static int
+runops_preempt (pTHX)
+{
+ while ((PL_op = CALL_FPTR(PL_op->op_ppaddr)(aTHX)))
+ {
+ PERL_ASYNC_CHECK();
+ CORO_PREEMPT_CHECK();
+ }
+
+ TAINT_NOT;
+ return 0;
+}
+
+
+
/*****************************************************************************/
/* padlist management and caching */
@@ -983,6 +1228,9 @@ slf_check_repeat (pTHX_ struct CoroSLF *frame)
static UNOP coro_setup_op;
+static int runops_preempt (pTHX);
+static coro_cctx *cctx_new_run ();
+
static void NOINLINE /* noinline to keep it out of the transfer fast path */
coro_setup (pTHX_ struct coro *coro)
{
@@ -991,7 +1239,7 @@ coro_setup (pTHX_ struct coro *coro)
*/
coro_init_stacks (aTHX);
- PL_runops = RUNOPS_DEFAULT;
+ PL_runops = coro->flags & CF_PREEMPTABLE ? runops_preempt : RUNOPS_DEFAULT;
PL_curcop = &PL_compiling;
PL_in_eval = EVAL_NULL;
PL_comppad = 0;
@@ -1280,6 +1528,8 @@ cctx_prepare (pTHX)
if (cctx_current->flags & CC_TRACE)
PL_runops = runops_trace;
+ else if (cctx_current->flags & CC_PREEMPTABLE)
+ PL_runops = runops_preempt;
/* we already must be executing an SLF op, there is no other valid way
* that can lead to creation of a new cctx */
@@ -1298,6 +1548,7 @@ INLINE void
transfer_tail (pTHX)
{
free_coro_mortal (aTHX);
+ coro_preempt_reset(aTHX);
}
/*
@@ -1554,7 +1805,7 @@ transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
/* possibly untie and reuse the cctx */
if (expect_true (
cctx_current->idle_sp == STACKLEVEL
- && !(cctx_current->flags & CC_TRACE)
+ && !(cctx_current->flags & (CC_TRACE|CC_PREEMPTABLE))
&& !force_cctx
))
{
@@ -1570,12 +1821,32 @@ transfer (pTHX_ struct coro *prev, struct coro *next, int force_cctx)
cctx_put (cctx_current);
}
else
- prev->cctx = cctx_current;
+ {
+ if ( prev->flags & CF_PREEMPTABLE && !(cctx_current->flags & (CC_PREEMPTABLE)) )
+ {
+ prev->flags &= ~CF_PREEMPTABLE;
+ /* this should probably croak() but that is untrappable */
+ warn("cannot make coroutine preemptable (has a custom stack or is the main one)");
+ }
+ prev->cctx = cctx_current;
+ }
++next->usecount;
- cctx_prev = cctx_current;
- cctx_current = expect_false (next->cctx) ? next->cctx : cctx_get (aTHX);
+ cctx_prev = cctx_current;
+
+ if ( next->cctx )
+ cctx_current = next->cctx;
+ else if ( next->flags & CF_PREEMPTABLE )
+ {
+ /* I expect preemptable coros to be long lived so the teardown/setup
+ * cost shouldn't be a big deal. reusing them would require
+ * maintaining separate pools in cctx_get/cctx_put */
+ cctx_current = cctx_new_run ();
+ cctx_current->flags |= CC_PREEMPTABLE|CC_NOREUSE;
+ }
+ else
+ cctx_current = cctx_get (aTHX);
next->cctx = 0;
@@ -1957,6 +2228,45 @@ api_trace (pTHX_ SV *coro_sv, int flags)
}
}
+static int
+api_preempt (pTHX_ SV *coro_sv, int flags)
+{
+ struct coro *coro = SvSTATE (coro_sv);
+
+ if ( flags & CF_PREEMPTABLE )
+ {
+ coro->flags |= CF_PREEMPTABLE;
+
+ if (coro->flags & CF_RUNNING)
+ if ( !(cctx_current->flags & CC_PREEMPTABLE) )
+ /* need to cede in order for the PL_runops change to take effect in
+ * transfer (a new context will be created)
+ *
+ * this causes the preempt function to call
+ * CORO_EXECUTE_SLF_XS(slf_init_cede) (can't do that in here cleanly)
+ */
+ return 1;
+ else
+ /* otherwise kick the preemption code, either waking up the timer
+ * thread or just tripping the flag under opcount */
+ coro_preempt_reset(aTHX);
+ else if ( coro == SvSTATE_current )
+ {
+ coro->flags &= ~CF_PREEMPTABLE;
+ croak("the main coroutine cannot be made preemptable");
+ }
+ }
+ else if ( coro->flags & CF_PREEMPTABLE )
+ {
+ coro->flags &= ~CF_PREEMPTABLE;
+
+ if (coro->flags & CF_RUNNING)
+ coro_preempt_reset(aTHX);
+ }
+
+ return 0;
+}
+
static void
coro_call_on_destroy (pTHX_ struct coro *coro)
{
@@ -3065,6 +3375,8 @@ BOOT:
}
assert (("PRIO_NORMAL must be 0", !CORO_PRIO_NORMAL));
+
+ coro_preempt_boot(aTHX);
}
SV *
@@ -3496,6 +3808,24 @@ prio (Coro::State coro, int newprio = 0)
OUTPUT:
RETVAL
+void
+preempt (...)
+ CODE:
+ /* FIXME the pp_slf magic should probably happen inside api_preempt but
+ * passing around the extra args seems even worse than this.
+ *
+ * I'm not sure what the correct way to bypass the various checks would be,
+ * but this is simple and it works */
+
+ if ( api_preempt(aTHX_ coro_current, CF_PREEMPTABLE) )
+ CORO_EXECUTE_SLF_XS (slf_init_cede);
+
+
+void
+no_preempt (...)
+ CODE:
+ (void)api_preempt(aTHX_ coro_current, 0);
+
SV *
ready (SV *self)
PROTOTYPE: $
View
104 t/19_preempt.t
@@ -0,0 +1,104 @@
+$| = 1;
+print "1..25\n";
+
+use Coro qw(:prio cede async current);
+use Time::HiRes qw(time);
+
+print "ok\n";
+
+my @log;
+
+sub trace {
+ push @log, [ $_[0] => 0 + current ];
+}
+
+my @bg_coros;
+
+my $t = time;
+
+sub waste_time {
+ my ( $i, $loop );
+ my $preemptable;
+
+ print "ok - started\n";
+
+ while ( $t + 1.5 > time ) {
+ if ($preemptable) {
+ current->no_preempt;
+ trace("no_preempt");
+ }
+ else {
+ trace("preempt");
+ current->preempt;
+ }
+
+ $preemptable = !$preemptable;
+
+ for ( 1 .. 100 ) {
+ for ( 1 .. 10 ) {
+
+ # waste some time
+ # i think just $i++ is not sufficient for a random
+ # distribution of opcode durations
+ my $x = $i++ ** 4;
+ my $foo = ( ( "x" x 5000 ) . "zxx" );
+ $foo =~ /x+x/;
+ sub { index( shift, "z" ) }->($foo);
+ }
+
+ trace("loop");
+ }
+ }
+
+ trace("done");
+
+ print "ok - wasted CPU time\n";
+}
+
+@bg_coros = map {
+ async { waste_time }
+} 1 .. 10;
+
+cede;
+
+$_->join for @bg_coros;
+
+my $prev_loop;
+my $switches;
+my $preempted_critical;
+
+# analyzes the trace log
+# $switches counts the number of times loops interlaces (meaning that the coros
+# preempted each other at least once)
+# $preempted_critical critical counts such intersections that have happened
+# between calling no_preempt and preempt (meaning that a critical section was
+# ceded even though it shouldn't have)
+while (@log) {
+ my ( $type, $id ) = @{ shift @log };
+
+ if ( $type eq 'loop' ) {
+ $switches++ if defined $prev_loop and $prev_loop != $id;
+ $prev_loop = $id;
+ }
+ elsif ( $type eq 'no_preempt' ) {
+ while (@log) {
+ my ( $type, $inner_id ) = @{ shift @log };
+ last if $type eq 'preempt' or $type eq 'done';
+ $preempted_critical++ if $id != $inner_id;
+ }
+ }
+}
+
+# check that there were at least 2 more cedes than would have happened using
+# just a join
+print "not " if $switches < 7;
+print "ok - ceded implicitly\n";
+
+print "not " if $switches < 30;
+print "ok - many times times ($switches)\n";
+
+print "not " if $preempted_critical;
+print "ok - did not cede while inside critical sections\n";
+
+print "ok - end\n";
+
Please sign in to comment.
Something went wrong with that request. Please try again.