Skip to content
Permalink
Browse files

Improve ThreadPoolScheduler.cue behaviour when Inf/-Inf/NaN is passed

- If Inf is passed to :at/:in/:every, the method will never invoke the
block passed, but will return a Cancellation anyway
- If -Inf is passed, the method will invoke the block as soon as
possible and return a Cancellation
- If NaN is passed, the method will throw an exception

This also ensures a Cancellation is always returned from the dispatchers
for .cue that are supposed to be returning them, since calling it with
:every as an argument would sometimes return a List instead.
  • Loading branch information...
Kaiepi committed Apr 6, 2019
1 parent b0dd44b commit de76a056b74f1848f2b215e1d6e288bf53d06fd9
Showing with 80 additions and 38 deletions.
  1. +80 −38 src/core/ThreadPoolScheduler.pm6
@@ -839,32 +839,43 @@ my class ThreadPoolScheduler does Scheduler {
}
}

sub to-millis(Numeric() $value) {
# Checks if the value given is Inf, -Inf, or NaN. If NaN, this throws.
# if Inf, this returns Nil for ThreadPoolScheduler.cue to return an empty
# Cancellation for. If -Inf, returns 0. Otherwise, returns the value.
sub validate-seconds(Numeric() $value) {
nqp::unless(
nqp::isnanorinf($value.Num),
nqp::istype($value, Num),
$value,
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed,
nqp::stmts(
warn("Minimum timer resolution is 1ms; using that instead of {1000 * $value}ms"),
1
nqp::iseq_n($value,nqp::inf()),
Nil,
nqp::if(
nqp::iseq_n($value,nqp::neginf()),
0,
nqp::if(
nqp::isnanorinf($value),
die("Cannot set NaN as a number of seconds"),
$value
)
)
),
)
)
}
sub to-millis(Numeric() $value) {
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed,
nqp::stmts(
warn("Minimum timer resolution is 1ms; using that instead of {$value}ms"),
warn("Minimum timer resolution is 1ms; using that instead of {1000 * $value}ms"),
1
)
)
}
sub to-millis-allow-zero(Numeric() $value) {
nqp::unless(
nqp::isnanorinf($value.Num),
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int), 0),
$proposed,
# not true == 0 == what we need
),
0
nqp::if(
nqp::isgt_i((my int $proposed = (1000 * $value).Int),0),
$proposed
# not true == 0 == what we need
)
}
sub wrap-catch(&code, &catch) {
@@ -920,42 +931,52 @@ my class ThreadPoolScheduler does Scheduler {
&& nqp::isconcrete(nqp::atkey($args,"in")),
die("Cannot specify :at and :in at the same time"),
nqp::stmts(
(my int $delay = to-millis-allow-zero( # set up any delay
(my $interval := validate-seconds($every)), # ensure the interval given is sane
nqp::unless(
nqp::isconcrete($interval),
(return Cancellation.new(async_handles => []))
),
(my $delay-in-seconds := validate-seconds( # ensure the delay or time given is sane
nqp::if(
nqp::isnull(my $at := nqp::atkey($args,"at")),
nqp::ifnull(nqp::atkey($args,"in"),0),
$at - now
)
)),
(my &run := nqp::if( # set up what should run
));
nqp::unless(
nqp::isconcrete($delay-in-seconds),
(return Cancellation.new(async_handles => []))
),
(my int $delay = to-millis-allow-zero($delay-in-seconds)),
(my &run := nqp::if( # set up what should run
nqp::isnull(my $catch := nqp::atkey($args,"catch")),
&code,
wrap-catch(&code, $catch) # wrap any catch handler around code
wrap-catch(&code, $catch) # wrap any catch handler around code
)),
nqp::if(
nqp::isconcrete((my $stopper := nqp::if(
nqp::isgt_i($times,1),
nqp::stmts( # create our own stopper
nqp::stmts( # create our own stopper
(my int $todo = nqp::add_i($times,1)),
sub { nqp::not_i($todo = nqp::sub_i($todo,1)) }
),
nqp::atkey($args,"stop")
))),
nqp::stmts( # we have a stopper
nqp::stmts( # we have a stopper
($handle := nqp::timer(
self!timer-queue,
-> { nqp::if($stopper(),cancellation().cancel,run()) },
$delay, to-millis($every), TimerCancellation
$delay, to-millis($interval), TimerCancellation
)),
cancellation()
(return cancellation())
),
nqp::stmts( # no stopper
nqp::stmts( # no stopper
($handle := nqp::timer(
self!timer-queue,
&run,
$delay, to-millis($every), TimerCancellation
$delay, to-millis($interval), TimerCancellation
)),
Cancellation.new(async_handles => [$handle])
(return cancellation())
)
)
)
@@ -969,14 +990,27 @@ my class ThreadPoolScheduler does Scheduler {
nqp::isconcrete(my $at := nqp::atkey($args,"at"))
&& nqp::isconcrete(my $in := nqp::atkey($args,"in")),
die("Cannot specify :at and :in at the same time"),
self!CUE_DELAY_TIMES(
&code,
to-millis(nqp::ifnull(
$in,
nqp::if(nqp::isnull($at), .001, $at - now)
)),
$times // 1,
%_
nqp::stmts(
nqp::if(
nqp::isconcrete($at),
nqp::unless(
nqp::isconcrete($at := validate-seconds($at)),
(return Cancellation.new(async_handles => []))
),
nqp::unless(
nqp::isconcrete($in := validate-seconds($in)),
(return Cancellation.new(async_handles => []))
)
),
self!CUE_DELAY_TIMES(
&code,
to-millis(nqp::ifnull(
$in,
nqp::if(nqp::isnull($at), .001, $at - now)
)),
$times // 1,
%_
)
)
)
)
@@ -987,11 +1021,19 @@ my class ThreadPoolScheduler does Scheduler {
&& nqp::isconcrete(
nqp::atkey(nqp::getattr(%_,Map,'$!storage'),"in")),
die("Cannot specify :at and :in at the same time"),
self!CUE_DELAY_TIMES(&code, to-millis-allow-zero($at - now), 0, %_)
nqp::if(
nqp::isconcrete(my $time := validate-seconds($at)),
(self!CUE_DELAY_TIMES(&code, to-millis-allow-zero($time - now), 0, %_)),
(Cancellation.new(async_handles => []))
)
)
}
multi method cue(&code, :$in!, *%_) {
self!CUE_DELAY_TIMES(&code, to-millis-allow-zero($in), 0, %_)
nqp::if(
nqp::isconcrete(my $delay := validate-seconds($in)),
(self!CUE_DELAY_TIMES(&code, to-millis-allow-zero($delay), 0, %_)),
(Cancellation.new(async_handles => []))
)
}
multi method cue(&code, :&catch! --> Nil) {
nqp::push(self!general-queue, wrap-catch(&code, &catch))

0 comments on commit de76a05

Please sign in to comment.
You can’t perform that action at this time.