Skip to content

Commit

Permalink
Support for setting unlimited max_threads from environment
Browse files Browse the repository at this point in the history
Extending PR #4941.

Aside of just setting `$!max_threads` to a specific number via
`RAKUDO_MAX_THREADS` environment variable the following special cases
are now supported too:

- `RAKUDO_MAX_THREADS=` or `RAKUDO_MAX_THREADS=0` to use the default
  value
- `RAKUDO_MAX_THREADS=-1`, `RAKUDO_MAX_THREADS=inf`, or
  `RAKUDO_MAX_THREADS=unlimited` to use unlimited number of treads
- `RAKUDO_MAX_THREADS=<integer>` works as it always was
- Any other value would result in an error

Since a new scheduler is normally created at startup and exception
handling is barely needed its constructor currently throws just
`X::AdHoc`.

Note that the above special cases are now also supported for
constructor's `:max_threads` named argument:

    ThreadPoolScheduler.new(:max_threads(-1)); # Inf
    ThreadPoolScheduler.new(:max_threads<unlimited>); # Inf
    ThreadPoolScheduler.new(:max_threads(0)); # Same as omitting the
                                              # argument

Super-micro-optimization: use of BUILDPLAN is considered too much for
the `ThreadPoolScheduler` class. So, replaced it with `new`+`!SET-SELF`
approach.
  • Loading branch information
vrurg committed Jun 3, 2022
1 parent 4e56f24 commit 49172cb
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions src/core.c/ThreadPoolScheduler.pm6
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ my class ThreadPoolScheduler does Scheduler {
# specifically wrt GH #1202.
PROCESS::<$PID> := nqp::p6box_i(my int $pid = nqp::getpid);

my constant UNLIMITED_THREADS = 9223372036854775807; # 2⁶³-1

# Scheduler defaults controlled by environment variables
my $ENV := nqp::getattr(%*ENV,Map,'$!storage');
my int $scheduler-debug;
Expand Down Expand Up @@ -312,15 +314,15 @@ my class ThreadPoolScheduler does Scheduler {
# Initial and maximum threads allowed.
#?if moar
has uint $.initial_threads;
has uint $.max_threads;
has uint $!max_threads;
#?endif
#?if !moar
has Int $.initial_threads;
has Int $.max_threads;
has Int $!max_threads;
#?endif

# All of the worker and queue state below is guarded by this lock.
has Lock $!state-lock = Lock.new;
has Lock $!state-lock;

# The general queue and timer queue, if created.
has Queue $!general-queue;
Expand Down Expand Up @@ -778,22 +780,33 @@ my class ThreadPoolScheduler does Scheduler {
}
}

submethod TWEAK(:$initial_threads, :$max_threads --> Nil) {
method !SET-SELF($initial_threads, $max_threads) {
$!initial_threads = .Int with $initial_threads;
$!max_threads = nqp::istype($max_threads,Whatever)
?? 9223372036854775807 # XXX should be -1
!! $max_threads.defined
?? $max_threads == Inf
?? 9223372036854775807 # XXX should be -1
!! $max_threads.Int
!! (%*ENV<RAKUDO_MAX_THREADS> // (Kernel.cpu-cores * 8 max 64)).Int;
my $default_max = (Kernel.cpu-cores * 8) max 64;
with $max_threads // %*ENV<RAKUDO_MAX_THREADS> {
$!max_threads = nqp::istype($_,Whatever)
?? UNLIMITED_THREADS
!! nqp::istype($_, Numeric) && $_ == Inf
?? UNLIMITED_THREADS
!! nqp::istype($_, Int)
?? ($_ < 0 ?? UNLIMITED_THREADS !! (.Int || $default_max))
!! nqp::istype($_, Str)
?? (.lc eq any <unlimited inf>)
?? UNLIMITED_THREADS
!! die "Cannot use '$_' as a value for maximum threads"
!! die "Cannot use a '" ~ $_.^name ~ "' for maximum threads value"
}
else {
$!max_threads = $default_max;
}

die "Initial thread pool threads ($!initial_threads) must be less than or equal to maximum threads ($!max_threads)"
if $!initial_threads > $!max_threads;

$!general-workers := nqp::create(IterationBuffer);
$!timer-workers := nqp::create(IterationBuffer);
$!affinity-workers := nqp::create(IterationBuffer);
$!state-lock := Lock.new;

if $!initial_threads > 0 {
# We've been asked to make some initial threads; we interpret this
Expand All @@ -809,10 +822,15 @@ my class ThreadPoolScheduler does Scheduler {
else {
scheduler-debug "Created scheduler without initial general workers";
}
self
}

method new(:$initial_threads, :$max_threads) {
nqp::create(self)!SET-SELF($initial_threads, $max_threads)
}

method max_threads(ThreadPoolScheduler:D:) {
$!max_threads == 9223372036854775807 ?? Inf !! $!max_threads
$!max_threads == UNLIMITED_THREADS ?? Inf !! $!max_threads
}

method queue(Bool :$hint-time-sensitive, :$hint-affinity) {
Expand Down

0 comments on commit 49172cb

Please sign in to comment.