Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/rakudo/rakudo into rakudo…
Browse files Browse the repository at this point in the history
…_3151
  • Loading branch information
vrurg committed Sep 7, 2019
2 parents 7a643a2 + 6b2739a commit 480271e
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 224 deletions.
36 changes: 19 additions & 17 deletions src/core.c/Channel.pm6
Expand Up @@ -56,15 +56,13 @@ my class Channel does Awaitable {
nqp::istype(msg,CHANNEL_CLOSE),
nqp::stmts(
nqp::push($!queue, msg), # make sure other readers see it
$!closed_promise_vow.keep(Nil),
X::Channel::ReceiveOnClosed.new(channel => self).throw
),
nqp::if(
nqp::istype(msg,CHANNEL_FAIL),
nqp::stmts(
nqp::push($!queue,msg), # make sure other readers see it
$!closed_promise_vow.break(my $error := msg.error),
$error.rethrow
msg.error.rethrow
),
nqp::stmts(
self!peek(), # trigger promise if closed
Expand All @@ -81,14 +79,12 @@ my class Channel does Awaitable {
nqp::if(
nqp::istype(msg, CHANNEL_CLOSE),
nqp::stmts(
$!closed_promise_vow.keep(Nil),
nqp::push($!queue, msg),
Nil
),
nqp::if(
nqp::istype(msg, CHANNEL_FAIL),
nqp::stmts(
$!closed_promise_vow.break(msg.error),
nqp::push($!queue, msg),
Nil
),
Expand All @@ -107,11 +103,11 @@ my class Channel does Awaitable {
Nil
} else {
if nqp::istype(msg, CHANNEL_CLOSE) {
$!closed_promise_vow.keep(Nil);
try $!closed_promise_vow.keep(Nil);
Nil
}
elsif nqp::istype(msg, CHANNEL_FAIL) {
$!closed_promise_vow.break(msg.error);
try $!closed_promise_vow.break(msg.error);
Nil
}
else {
Expand Down Expand Up @@ -166,36 +162,42 @@ my class Channel does Awaitable {
}
}

my class Iterate { ... }
trusts Iterate;
my class Iterate does Iterator {
has $!queue;
has $!vow;
method !SET-SELF(\queue,\vow) {
has $!channel;
method !SET-SELF(\queue,\channel) {
$!queue := queue;
$!vow := vow;
$!channel := channel;
self
}
method new(\queue,\vow) { nqp::create(self)!SET-SELF(queue,vow) }
method new(\queue,\channel) {
nqp::create(self)!SET-SELF(queue,channel);
}
method pull-one() {
my \msg := nqp::shift($!queue);
nqp::if(
nqp::istype((my \msg := nqp::shift($!queue)),CHANNEL_CLOSE),
nqp::istype(msg,CHANNEL_CLOSE),
nqp::stmts(
nqp::push($!queue,msg), # make sure other readers see it
$!vow.keep(Nil),
IterationEnd
),
nqp::if(
nqp::istype(msg,CHANNEL_FAIL),
nqp::stmts(
nqp::push($!queue,msg), # make sure other readers see it
$!vow.break(my $error := msg.error),
$error.rethrow
msg.error.rethrow
),
msg
nqp::stmts(
$!channel!Channel::peek(), # trigger promise if closed
msg
)
)
)
}
}
method iterator(Channel:D:) { Iterate.new($!queue,$!closed_promise_vow) }
method iterator(Channel:D:) { Iterate.new($!queue,self) }

method list(Channel:D:) { self.Seq.list }

Expand Down
2 changes: 1 addition & 1 deletion src/core.c/Exception.pm6
Expand Up @@ -3038,7 +3038,7 @@ my class X::Language::ModRequired is Exception {
my class X::Proc::Unsuccessful is Exception {
has $.proc;
method message() {
"The spawned command '{$.proc.command[0]}' exited unsuccessfully (exit code: $.proc.exitcode())"
"The spawned command '{$.proc.command[0]}' exited unsuccessfully (exit code: $.proc.exitcode(), signal: $.proc.signal())"
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/core.c/IO/Socket/Async.pm6
Expand Up @@ -166,7 +166,7 @@ my class IO::Socket::Async {

method close(IO::Socket::Async:D: --> True) {
nqp::closefh($!VMIO);
$!close-vow.keep(True);
try $!close-vow.keep(True);
}

method connect(IO::Socket::Async:U: Str() $host, Int() $port where Port-Number,
Expand Down
8 changes: 6 additions & 2 deletions src/core.c/Proc.pm6
Expand Up @@ -213,16 +213,20 @@ my class Proc {
}
multi method Bool(Proc:D:) {
self!wait-for-finish;
$!exitcode == 0
$!exitcode == 0 && $!signal == 0
}
method exitcode {
self!wait-for-finish;
$!exitcode
}
method signal {
self!wait-for-finish;
$!signal
}

method sink(--> Nil) {
self!wait-for-finish;
X::Proc::Unsuccessful.new(:proc(self)).throw if $!exitcode > 0;
X::Proc::Unsuccessful.new(:proc(self)).throw if $!exitcode > 0 || $!signal > 0;
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/core.c/Promise.pm6
Expand Up @@ -15,6 +15,12 @@ my class X::Promise::Vowed is Exception {
has $.promise;
method message() { "Access denied to keep/break this Promise; already vowed" }
}
my class X::Promise::Resolved is Exception {
has $.promise;
method message() {
"Cannot keep/break a Promise more than once (status: $!promise.status())";
}
}
my role X::Promise::Broken {
has $.result-backtrace;
multi method gist(::?CLASS:D:) {
Expand Down Expand Up @@ -96,6 +102,9 @@ my class Promise does Awaitable {

method !keep(Mu \result --> Nil) {
$!lock.protect({
X::Promise::Resolved.new(promise => self).throw
if $!status != Planned;

$!result := result;
$!status := Kept;
self!schedule_thens();
Expand Down Expand Up @@ -125,6 +134,9 @@ my class Promise does Awaitable {

method !break(\result --> Nil) {
$!lock.protect({
X::Promise::Resolved.new(promise => self).throw
if $!status != Planned;

$!result := nqp::istype(result, Exception)
?? result
!! X::AdHoc.new(payload => result);
Expand Down
18 changes: 6 additions & 12 deletions src/core.c/Rakudo/Internals/HyperToIterator.pm6
Expand Up @@ -60,10 +60,8 @@ my class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
default {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
}
nqp::shift($!current-items)
Expand All @@ -84,10 +82,8 @@ my class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner
when X::Channel::ReceiveOnClosed {
return 0;
}
default {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
}
0
Expand All @@ -103,10 +99,8 @@ my class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
default {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/core.c/Rakudo/Internals/RaceToIterator.pm6
Expand Up @@ -29,10 +29,8 @@ my class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner d
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
default {
unless nqp::istype($_, X::HyperRace::Died) {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
}
unless nqp::istype($_, X::HyperRace::Died) {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
}
}
}
Expand Down

0 comments on commit 480271e

Please sign in to comment.