Skip to content

Commit

Permalink
Streamline ThreadPoolScheduler!xxxx-queue a bit
Browse files Browse the repository at this point in the history
- nqp::isconcrete instead of .DEFINITE
- nqp::elems instead of .elems
  • Loading branch information
lizmat committed Mar 27, 2018
1 parent 9ffb06b commit 637147a
Showing 1 changed file with 66 additions and 41 deletions.
107 changes: 66 additions & 41 deletions src/core/ThreadPoolScheduler.pm6
Expand Up @@ -320,84 +320,109 @@ my class ThreadPoolScheduler does Scheduler {
has Thread $!supervisor;

method !general-queue() {
unless $!general-queue.DEFINITE {
$!state-lock.protect: {
unless $!general-queue.DEFINITE {
nqp::if(
nqp::isconcrete($!general-queue),
$!general-queue,
nqp::stmts(
$!state-lock.protect( {
nqp::unless(
nqp::isconcrete($!general-queue),
nqp::stmts(
# We don't have any workers yet, so start one.
$!general-queue := nqp::create(Queue);
$!general-workers := first-worker(
($!general-queue := nqp::create(Queue)),
($!general-workers := first-worker(
GeneralWorker.new(
queue => $!general-queue,
scheduler => self
)
);
scheduler-debug "Created initial general worker thread";
self!maybe-start-supervisor();
}
}
}
$!general-queue
)),
scheduler-debug("Created initial general worker thread"),
self!maybe-start-supervisor
)
)
} ),
$!general-queue
)
)
}

method !timer-queue() {
unless $!timer-queue.DEFINITE {
$!state-lock.protect: {
unless $!timer-queue.DEFINITE {
nqp::if(
nqp::isconcrete($!timer-queue),
$!timer-queue,
nqp::stmts(
$!state-lock.protect( {
nqp::unless(
nqp::isconcrete($!timer-queue),
nqp::stmts(
# We don't have any workers yet, so start one.
$!timer-queue := nqp::create(Queue);
$!timer-workers := first-worker(
($!timer-queue := nqp::create(Queue)),
($!timer-workers := first-worker(
TimerWorker.new(
queue => $!timer-queue,
scheduler => self
)
);
scheduler-debug "Created initial timer worker thread";
self!maybe-start-supervisor();
}
}
}
$!timer-queue
)),
scheduler-debug("Created initial general worker thread"),
self!maybe-start-supervisor
)
)
} ),
$!timer-queue
)
)
}

constant @affinity-add-thresholds = 1, 5, 10, 20, 50, 100;
method !affinity-queue() {
# If there's no affinity workers, start one.
my $cur-affinity-workers := $!affinity-workers;
if $cur-affinity-workers.elems == 0 {
$!state-lock.protect: {
if $!affinity-workers.elems == 0 {
nqp::unless(
nqp::elems(my $cur-affinity-workers := $!affinity-workers),
nqp::stmts(
$!state-lock.protect( {
nqp::unless(
nqp::elems($!affinity-workers),
nqp::stmts(
# We don't have any affinity workers yet, so start one
# and return its queue.
$!affinity-workers := first-worker(
($!affinity-workers := first-worker(
AffinityWorker.new(
scheduler => self
)
);
scheduler-debug "Created initial affinity worker thread";
self!maybe-start-supervisor();
return $!affinity-workers[0].queue;
}
}
$cur-affinity-workers := $!affinity-workers; # lost race for first
}
)),
scheduler-debug("Created initial affinity worker thread"),
self!maybe-start-supervisor,
(return $!affinity-workers[0].queue)
)
)
} ),
($cur-affinity-workers := $!affinity-workers) # lost race for first
)
);

# Otherwise, see which has the least load (this is inherently racey
# and approximate, but enough to help us avoid a busy worker). If we
# find an empty queue, return it immediately.
my $most-free-worker;
my int $i = -1;
nqp::while(
++$i < nqp::elems($cur-affinity-workers),
nqp::islt_i(
($i = nqp::add_i($i,1)),
nqp::elems($cur-affinity-workers)
),
nqp::if(
$most-free-worker.DEFINITE,
nqp::isconcrete($most-free-worker),
nqp::stmts(
(my $cand := nqp::atpos($cur-affinity-workers,$i)),
nqp::unless(
(my $queue := $cand.queue).elems,
nqp::elems(my $queue := $cand.queue),
(return $queue)
),
nqp::if(
nqp::islt_i($queue.elems,$most-free-worker.queue.elems),
nqp::islt_i(
nqp::elems($queue),
nqp::elems($most-free-worker.queue)
),
$most-free-worker := $cand
)
),
Expand Down

0 comments on commit 637147a

Please sign in to comment.