Skip to content

Commit

Permalink
Make ThreadPoolScheduler.cue a multi
Browse files Browse the repository at this point in the history
The "cue" method takes *many* named parameters, which causes a lot of
overhead.  Whereas the most common use, namely in from "start { ... }"
only uses *one* named parameter.  Making it a multi with one required
named parameter in each candidate, makes the most common call to .cue
about 5x as fast with 25% fewer allocations.  This makes something like

  await do for ^100000 { start { } }

go from 2.6 seconds wallclock to 2.1 seconds wallclock.
  • Loading branch information
lizmat committed Mar 30, 2018
1 parent 20495f0 commit ac5cf2b
Showing 1 changed file with 142 additions and 77 deletions.
219 changes: 142 additions & 77 deletions src/core/ThreadPoolScheduler.pm6
Expand Up @@ -828,89 +828,154 @@ my class ThreadPoolScheduler does Scheduler {
}
}

my class TimerCancellation is repr('AsyncTask') { }
method cue(&code, :$at, :$in, :$every, :$times = 1, :&stop is copy, :&catch ) {
die "Cannot specify :at and :in at the same time"
if $at.defined and $in.defined;
die "Cannot specify :every, :times and :stop at the same time"
if $every.defined and $times > 1 and &stop;

# For $in/$at times, if the resultant delay is less than 0.001
# (including negatives) equate those to zero. For $every intervals,
# we convert such values to minimum resolution of 0.001 and warn
# about that
sub to-millis(Numeric() $value) {
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed,
nqp::stmts(
warn("Minimum timer resolution is 1ms; using that instead of {1000 * $value}ms"),
1
)
)
}
sub to-millis-allow-zero(Numeric() $value) {
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed
# not true == 0 == what we need
)
}
my $delay = to-millis-allow-zero($at ?? $at - now !! $in // 0);

# Wrap any catch handler around the code to run.
my &run := &catch ?? wrap-catch(&code, &catch) !! &code;

# need repeating
if $every {
# generate a stopper if needed
if $times > 1 {
my $todo = $times;
&stop = sub { $todo ?? !$todo-- !! True }
}
sub to-millis(Numeric() $value) {
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed,
nqp::stmts(
warn("Minimum timer resolution is 1ms; using that instead of {1000 * $value}ms"),
1
)
)
}
sub to-millis-allow-zero(Numeric() $value) {
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed
# not true == 0 == what we need
)
}
sub wrap-catch(&code, &catch) {
-> { code(); CATCH { default { catch($_) } } }
}

# we have a stopper
if &stop {
my $handle;
my $cancellation;
sub cancellation() {
$cancellation //=
Cancellation.new(async_handles => [$handle]);
}
$handle := nqp::timer(self!timer-queue(),
{ stop() ?? cancellation().cancel !! run() },
$delay, to-millis($every),
TimerCancellation);
cancellation()
}
my class TimerCancellation is repr('AsyncTask') { }

# no stopper
else {
my $handle := nqp::timer(self!timer-queue(), &run,
$delay, to-millis($every),
TimerCancellation);
Cancellation.new(async_handles => [$handle])
}
}
method CUE_DELAY_TIMES(&code, int $delay, int $times, %args) {
nqp::stmts(
(my &run := nqp::if( # set up what we need to run
nqp::isnull(my $catch :=
nqp::atkey(nqp::getattr(%args,Map,'$!storage'),"catch")),
&code,
wrap-catch(&code, $catch) # wrap any catch handler around code
)),
Cancellation.new( async_handles => nqp::if(
$times,
nqp::stmts( # need to run more than once
(my @async_handles),
(my int $i = -1),
nqp::while(
nqp::islt_i(($i = nqp::add_i($i,1)),$times),
@async_handles.push(
nqp::timer(self!timer-queue,&run,$delay,0,TimerCancellation)
)
),
@async_handles
),
[ # only needs to run once
nqp::timer(self!timer-queue,&run,$delay,0,TimerCancellation)
]
))
)
}

# only after waiting a bit or more than once
elsif $delay or $times > 1 {
my @async_handles;
@async_handles.push(
nqp::timer(self!timer-queue(), &run, $delay, 0, TimerCancellation)
) for 1 .. $times;
Cancellation.new(:@async_handles)
proto method cue(|) {*}
multi method cue(&code, :$every!, :$times = 1, *%_) {
# these need to exist in this scope
my $handle;
my $cancellation;
sub cancellation() {
$cancellation //= Cancellation.new(async_handles => [$handle])
}

# just cue the code
else {
nqp::push(self!general-queue(), &run);
Nil
}
nqp::if(
nqp::isconcrete(
nqp::atkey((my $args := nqp::getattr(%_,Map,'$!storage')),"stop")
) && nqp::isconcrete($times) && $times > 1,
die("Cannot specify :every, :times and :stop at the same time"),
nqp::if(
nqp::isconcrete(nqp::atkey($args,"at"))
&& nqp::isconcrete(nqp::atkey($args,"in")),
die("Cannot specify :at and :in at the same time"),
nqp::stmts(
(my int $delay = to-millis-allow-zero( # set up any delay
nqp::if(
nqp::isnull(my $at := nqp::atkey($args,"at")),
nqp::ifnull(nqp::atkey($args,"in"),0),
$at - now
)
)),
(my &run := nqp::if( # set up what should run
nqp::isnull(my $catch := nqp::atkey($args,"catch")),
&code,
wrap-catch(&code, $catch) # wrap any catch handler around code
)),
nqp::if(
nqp::isconcrete((my $stopper := nqp::if(
nqp::isgt_i($times,1),
nqp::stmts( # create our own stopper
(my int $todo = nqp::add_i($times,1)),
sub { nqp::not_i($todo = nqp::sub_i($todo,1)) }
),
nqp::atkey($args,"stop")
))),
nqp::stmts( # we have a stopper
($handle := nqp::timer(
self!timer-queue,
-> { nqp::if($stopper(),cancellation().cancel,run()) },
$delay, to-millis($every), TimerCancellation
)),
cancellation()
),
nqp::stmts( # no stopper
($handle := nqp::timer(
self!timer-queue,
&run,
$delay, to-millis($every), TimerCancellation
)),
Cancellation.new(async_handles => [$handle])
)
)
)
)
)
}

sub wrap-catch(&code, &catch) {
-> { code(); CATCH { default { catch($_) } } }
multi method cue(&code, :$times!, *%_) {
nqp::stmts(
(my $args := nqp::getattr(%_,Map,'$!storage')),
nqp::if(
nqp::isconcrete(my $at := nqp::atkey($args,"at"))
&& nqp::isconcrete(my $in := nqp::atkey($args,"in")),
die("Cannot specify :at and :in at the same time"),
self.CUE_DELAY_TIMES(
&code,
to-millis(nqp::ifnull(
$in,
nqp::if(nqp::isnull($at), .001, $at - now)
)),
$times // 1,
%_
)
)
)
}
multi method cue(&code, :$at!, *%_) {
nqp::if(
nqp::isconcrete($at)
&& nqp::isconcrete(
nqp::atkey(nqp::getattr(%_,Map,'$!storage'),"in")),
die("Cannot specify :at and :in at the same time"),
self.CUE_DELAY_TIMES(&code, to-millis-allow-zero($at - now), 0, %_)
)
}
multi method cue(&code, :$in!, *%_) {
self.CUE_DELAY_TIMES(&code, to-millis-allow-zero($in), 0, %_)
}
multi method cue(&code, :&catch! --> Nil) {
nqp::push(self!general-queue, wrap-catch(&code, &catch))
}
multi method cue(&code --> Nil) {
nqp::push(self!general-queue,&code)
}

method loads() is raw {
Expand Down

0 comments on commit ac5cf2b

Please sign in to comment.