Skip to content

Commit

Permalink
Assign to closed promise only once
Browse files Browse the repository at this point in the history
These changes were missing from /pull/2987
  • Loading branch information
dumarchie committed Sep 4, 2019
1 parent c7ec96a commit 0f31d35
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions src/core.c/Channel.pm6
Expand Up @@ -56,15 +56,13 @@ my class Channel does Awaitable {
nqp::istype(msg,CHANNEL_CLOSE),
nqp::stmts(
nqp::push($!queue, msg), # make sure other readers see it
$!closed_promise_vow.keep(Nil),
X::Channel::ReceiveOnClosed.new(channel => self).throw
),
nqp::if(
nqp::istype(msg,CHANNEL_FAIL),
nqp::stmts(
nqp::push($!queue,msg), # make sure other readers see it
$!closed_promise_vow.break(my $error := msg.error),
$error.rethrow
msg.error.rethrow
),
nqp::stmts(
self!peek(), # trigger promise if closed
Expand All @@ -81,14 +79,12 @@ my class Channel does Awaitable {
nqp::if(
nqp::istype(msg, CHANNEL_CLOSE),
nqp::stmts(
$!closed_promise_vow.keep(Nil),
nqp::push($!queue, msg),
Nil
),
nqp::if(
nqp::istype(msg, CHANNEL_FAIL),
nqp::stmts(
$!closed_promise_vow.break(msg.error),
nqp::push($!queue, msg),
Nil
),
Expand Down Expand Up @@ -166,36 +162,42 @@ my class Channel does Awaitable {
}
}

my class Iterate { ... }
trusts Iterate;
my class Iterate does Iterator {
has $!queue;
has $!vow;
method !SET-SELF(\queue,\vow) {
has $!channel;
method !SET-SELF(\queue,\channel) {
$!queue := queue;
$!vow := vow;
$!channel := channel;
self
}
method new(\queue,\vow) { nqp::create(self)!SET-SELF(queue,vow) }
method new(\queue,\channel) {
nqp::create(self)!SET-SELF(queue,channel);
}
method pull-one() {
my \msg := nqp::shift($!queue);
nqp::if(
nqp::istype((my \msg := nqp::shift($!queue)),CHANNEL_CLOSE),
nqp::istype(msg,CHANNEL_CLOSE),
nqp::stmts(
nqp::push($!queue,msg), # make sure other readers see it
$!vow.keep(Nil),
IterationEnd
),
nqp::if(
nqp::istype(msg,CHANNEL_FAIL),
nqp::stmts(
nqp::push($!queue,msg), # make sure other readers see it
$!vow.break(my $error := msg.error),
$error.rethrow
msg.error.rethrow
),
msg
nqp::stmts(
$!channel!Channel::peek(), # trigger promise if closed
msg
)
)
)
}
}
method iterator(Channel:D:) { Iterate.new($!queue,$!closed_promise_vow) }
method iterator(Channel:D:) { Iterate.new($!queue,self) }

method list(Channel:D:) { self.Seq.list }

Expand Down

0 comments on commit 0f31d35

Please sign in to comment.