diff --git a/src/core/ThreadPoolScheduler.pm6 b/src/core/ThreadPoolScheduler.pm6 index 367901430ab..209a2d315c3 100644 --- a/src/core/ThreadPoolScheduler.pm6 +++ b/src/core/ThreadPoolScheduler.pm6 @@ -320,66 +320,85 @@ my class ThreadPoolScheduler does Scheduler { has Thread $!supervisor; method !general-queue() { - unless $!general-queue.DEFINITE { - $!state-lock.protect: { - unless $!general-queue.DEFINITE { + nqp::if( + nqp::isconcrete($!general-queue), + $!general-queue, + nqp::stmts( + $!state-lock.protect( { + nqp::unless( + nqp::isconcrete($!general-queue), + nqp::stmts( # We don't have any workers yet, so start one. - $!general-queue := nqp::create(Queue); - $!general-workers := first-worker( + ($!general-queue := nqp::create(Queue)), + ($!general-workers := first-worker( GeneralWorker.new( queue => $!general-queue, scheduler => self ) - ); - scheduler-debug "Created initial general worker thread"; - self!maybe-start-supervisor(); - } - } - } - $!general-queue + )), + scheduler-debug("Created initial general worker thread"), + self!maybe-start-supervisor + ) + ) + } ), + $!general-queue + ) + ) } method !timer-queue() { - unless $!timer-queue.DEFINITE { - $!state-lock.protect: { - unless $!timer-queue.DEFINITE { + nqp::if( + nqp::isconcrete($!timer-queue), + $!timer-queue, + nqp::stmts( + $!state-lock.protect( { + nqp::unless( + nqp::isconcrete($!timer-queue), + nqp::stmts( # We don't have any workers yet, so start one. - $!timer-queue := nqp::create(Queue); - $!timer-workers := first-worker( + ($!timer-queue := nqp::create(Queue)), + ($!timer-workers := first-worker( TimerWorker.new( queue => $!timer-queue, scheduler => self ) - ); - scheduler-debug "Created initial timer worker thread"; - self!maybe-start-supervisor(); - } - } - } - $!timer-queue + )), + scheduler-debug("Created initial general worker thread"), + self!maybe-start-supervisor + ) + ) + } ), + $!timer-queue + ) + ) } constant @affinity-add-thresholds = 1, 5, 10, 20, 50, 100; method !affinity-queue() { # If there's no affinity workers, start one. - my $cur-affinity-workers := $!affinity-workers; - if $cur-affinity-workers.elems == 0 { - $!state-lock.protect: { - if $!affinity-workers.elems == 0 { + nqp::unless( + nqp::elems(my $cur-affinity-workers := $!affinity-workers), + nqp::stmts( + $!state-lock.protect( { + nqp::unless( + nqp::elems($!affinity-workers), + nqp::stmts( # We don't have any affinity workers yet, so start one # and return its queue. - $!affinity-workers := first-worker( + ($!affinity-workers := first-worker( AffinityWorker.new( scheduler => self ) - ); - scheduler-debug "Created initial affinity worker thread"; - self!maybe-start-supervisor(); - return $!affinity-workers[0].queue; - } - } - $cur-affinity-workers := $!affinity-workers; # lost race for first - } + )), + scheduler-debug("Created initial affinity worker thread"), + self!maybe-start-supervisor, + (return $!affinity-workers[0].queue) + ) + ) + } ), + ($cur-affinity-workers := $!affinity-workers) # lost race for first + ) + ); # Otherwise, see which has the least load (this is inherently racey # and approximate, but enough to help us avoid a busy worker). If we @@ -387,17 +406,23 @@ my class ThreadPoolScheduler does Scheduler { my $most-free-worker; my int $i = -1; nqp::while( - ++$i < nqp::elems($cur-affinity-workers), + nqp::islt_i( + ($i = nqp::add_i($i,1)), + nqp::elems($cur-affinity-workers) + ), nqp::if( - $most-free-worker.DEFINITE, + nqp::isconcrete($most-free-worker), nqp::stmts( (my $cand := nqp::atpos($cur-affinity-workers,$i)), nqp::unless( - (my $queue := $cand.queue).elems, + nqp::elems(my $queue := $cand.queue), (return $queue) ), nqp::if( - nqp::islt_i($queue.elems,$most-free-worker.queue.elems), + nqp::islt_i( + nqp::elems($queue), + nqp::elems($most-free-worker.queue) + ), $most-free-worker := $cand ) ),