Skip to content

Commit

Permalink
Merge pull request #4941 from rakudo/vrurg-unlimited-threads
Browse files Browse the repository at this point in the history
Allow setting ThreadPoolScheduler's max_threads to semi-infinite value
  • Loading branch information
vrurg committed Jun 3, 2022
2 parents ab8ef0b + febff8f commit 77bc030
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions src/core.c/ThreadPoolScheduler.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ my class ThreadPoolScheduler does Scheduler {
# specifically wrt GH #1202.
PROCESS::<$PID> := nqp::p6box_i(my int $pid = nqp::getpid);

my constant UNLIMITED_THREADS = 9223372036854775807; # 2⁶³-1

# Scheduler defaults controlled by environment variables
my $ENV := nqp::getattr(%*ENV,Map,'$!storage');
my int $scheduler-debug;
Expand Down Expand Up @@ -311,16 +313,14 @@ my class ThreadPoolScheduler does Scheduler {

# Initial and maximum threads allowed.
#?if moar
has uint $.initial_threads;
has uint $.max_threads;
has uint $!max_threads;
#?endif
#?if !moar
has Int $.initial_threads;
has Int $.max_threads;
has Int $!max_threads;
#?endif

# All of the worker and queue state below is guarded by this lock.
has Lock $!state-lock = Lock.new;
has Lock $!state-lock;

# The general queue and timer queue, if created.
has Queue $!general-queue;
Expand Down Expand Up @@ -778,35 +778,56 @@ my class ThreadPoolScheduler does Scheduler {
}
}

submethod BUILD(
Int() :$!initial_threads = 0,
Int() :$!max_threads =
%*ENV<RAKUDO_MAX_THREADS> // (Kernel.cpu-cores * 8 max 64)
--> Nil) {
die "Initial thread pool threads ($!initial_threads) must be less than or equal to maximum threads ($!max_threads)"
if $!initial_threads > $!max_threads;
method !SET-SELF($initial_threads, $max_threads) {
my $default_max = (Kernel.cpu-cores * 8) max 64;
with $max_threads // %*ENV<RAKUDO_MAX_THREADS> {
$!max_threads = nqp::istype($_,Whatever)
?? UNLIMITED_THREADS
!! nqp::istype($_, Numeric) && $_ == Inf
?? UNLIMITED_THREADS
!! nqp::istype($_, Int)
?? ($_ < 0 ?? UNLIMITED_THREADS !! (.Int || $default_max))
!! nqp::istype($_, Str)
?? (.lc eq any <unlimited inf>)
?? UNLIMITED_THREADS
!! die "Cannot use '$_' as a value for maximum threads"
!! die "Cannot use a '" ~ $_.^name ~ "' for maximum threads value"
}
else {
$!max_threads = $default_max;
}

die "Initial thread pool threads ($initial_threads) must be less than or equal to maximum threads ($!max_threads)"
if $initial_threads > $!max_threads;

$!general-workers := nqp::create(IterationBuffer);
$!timer-workers := nqp::create(IterationBuffer);
$!affinity-workers := nqp::create(IterationBuffer);
$!state-lock := Lock.new;

if $!initial_threads > 0 {
if $initial_threads > 0 {
# We've been asked to make some initial threads; we interpret this
# as general workers.
$!general-queue := nqp::create(Queue);
$!general-queue := nqp::create(Queue);
nqp::push(
$!general-workers,
GeneralWorker.new(
queue => $!general-queue,
scheduler => self
)
) for ^$!initial_threads;
scheduler-debug "Created scheduler with $!initial_threads initial general workers";
GeneralWorker.new(queue => $!general-queue, scheduler => self)
) for ^$initial_threads;
scheduler-debug "Created scheduler with $initial_threads initial general workers";
self!maybe-start-supervisor();
}
else {
scheduler-debug "Created scheduler without initial general workers";
}
self
}

method new(Int:D() :$initial_threads = 0, :$max_threads) {
nqp::create(self)!SET-SELF($initial_threads, $max_threads)
}

method max_threads(ThreadPoolScheduler:D:) {
$!max_threads == UNLIMITED_THREADS ?? Inf !! $!max_threads
}

method queue(Bool :$hint-time-sensitive, :$hint-affinity) {
Expand Down

0 comments on commit 77bc030

Please sign in to comment.