Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting ThreadPoolScheduler's max_threads to semi-infinite value #4941

Merged
merged 3 commits into from
Jun 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 41 additions & 20 deletions src/core.c/ThreadPoolScheduler.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ my class ThreadPoolScheduler does Scheduler {
# specifically wrt GH #1202.
PROCESS::<$PID> := nqp::p6box_i(my int $pid = nqp::getpid);

my constant UNLIMITED_THREADS = 9223372036854775807; # 2⁶³-1

# Scheduler defaults controlled by environment variables
my $ENV := nqp::getattr(%*ENV,Map,'$!storage');
my int $scheduler-debug;
Expand Down Expand Up @@ -311,16 +313,14 @@ my class ThreadPoolScheduler does Scheduler {

# Initial and maximum threads allowed.
#?if moar
has uint $.initial_threads;
has uint $.max_threads;
has uint $!max_threads;
#?endif
#?if !moar
has Int $.initial_threads;
has Int $.max_threads;
has Int $!max_threads;
#?endif

# All of the worker and queue state below is guarded by this lock.
has Lock $!state-lock = Lock.new;
has Lock $!state-lock;

# The general queue and timer queue, if created.
has Queue $!general-queue;
Expand Down Expand Up @@ -778,35 +778,56 @@ my class ThreadPoolScheduler does Scheduler {
}
}

submethod BUILD(
Int() :$!initial_threads = 0,
Int() :$!max_threads =
%*ENV<RAKUDO_MAX_THREADS> // (Kernel.cpu-cores * 8 max 64)
--> Nil) {
die "Initial thread pool threads ($!initial_threads) must be less than or equal to maximum threads ($!max_threads)"
if $!initial_threads > $!max_threads;
method !SET-SELF($initial_threads, $max_threads) {
my $default_max = (Kernel.cpu-cores * 8) max 64;
with $max_threads // %*ENV<RAKUDO_MAX_THREADS> {
$!max_threads = nqp::istype($_,Whatever)
?? UNLIMITED_THREADS
!! nqp::istype($_, Numeric) && $_ == Inf
?? UNLIMITED_THREADS
!! nqp::istype($_, Int)
?? ($_ < 0 ?? UNLIMITED_THREADS !! (.Int || $default_max))
!! nqp::istype($_, Str)
?? (.lc eq any <unlimited inf>)
?? UNLIMITED_THREADS
!! die "Cannot use '$_' as a value for maximum threads"
!! die "Cannot use a '" ~ $_.^name ~ "' for maximum threads value"
}
else {
$!max_threads = $default_max;
}

die "Initial thread pool threads ($initial_threads) must be less than or equal to maximum threads ($!max_threads)"
if $initial_threads > $!max_threads;

$!general-workers := nqp::create(IterationBuffer);
$!timer-workers := nqp::create(IterationBuffer);
$!affinity-workers := nqp::create(IterationBuffer);
$!state-lock := Lock.new;

if $!initial_threads > 0 {
if $initial_threads > 0 {
# We've been asked to make some initial threads; we interpret this
# as general workers.
$!general-queue := nqp::create(Queue);
$!general-queue := nqp::create(Queue);
nqp::push(
$!general-workers,
GeneralWorker.new(
queue => $!general-queue,
scheduler => self
)
) for ^$!initial_threads;
scheduler-debug "Created scheduler with $!initial_threads initial general workers";
GeneralWorker.new(queue => $!general-queue, scheduler => self)
) for ^$initial_threads;
scheduler-debug "Created scheduler with $initial_threads initial general workers";
self!maybe-start-supervisor();
}
else {
scheduler-debug "Created scheduler without initial general workers";
}
self
}

method new(Int:D() :$initial_threads = 0, :$max_threads) {
nqp::create(self)!SET-SELF($initial_threads, $max_threads)
}

method max_threads(ThreadPoolScheduler:D:) {
$!max_threads == UNLIMITED_THREADS ?? Inf !! $!max_threads
}

method queue(Bool :$hint-time-sensitive, :$hint-affinity) {
Expand Down