Skip to content

Commit

Permalink
Fix Supplier::Preserving.
Browse files Browse the repository at this point in the history
Under certain conditions it could disorder messages.
  • Loading branch information
jnthn committed Feb 15, 2017
1 parent b51a550 commit cabf6fb
Showing 1 changed file with 51 additions and 37 deletions.
88 changes: 51 additions & 37 deletions src/core/Supply.pm
Expand Up @@ -1534,8 +1534,10 @@ my class Supplier::Preserving is Supplier {
# need lock on modification).
has Mu $!tappers;

# Events to reply, and a lock to protect it.
# Events to reply, whether the replay was done, and a lock to protect
# updates to these.
has @!replay;
has int $!replay-done;
has $!replay-lock = Lock.new;

method tap(&emit, &done, &quit) {
Expand All @@ -1546,73 +1548,85 @@ my class Supplier::Preserving is Supplier {
?? nqp::clone($!tappers)
!! nqp::list();
nqp::push($update, $tle);
$!tappers := $update;
$replay = 1 if nqp::elems($update) == 1;
self!replay($tle) if $replay;
$!tappers := $update;
});
self!replay($tle) if $replay;
Tap.new({
$!lock.protect({
my Mu $update := nqp::list();
for nqp::hllize($!tappers) -> \entry {
nqp::push($update, entry) unless entry =:= $tle;
}
$!replay-done = 0 if nqp::elems($update) == 0;
$!tappers := $update;
});
})
}

method emit(\value --> Nil) {
my int $sent = 0;
my $snapshot := $!tappers;
if nqp::isconcrete($snapshot) {
my int $n = nqp::elems($snapshot);
loop (my int $i = 0; $i < $n; $i = $i + 1) {
nqp::atpos($snapshot, $i).emit()(value);
$sent = 1;
loop {
my int $sent = 0;
my $snapshot := $!tappers;
if nqp::isconcrete($snapshot) {
$sent = nqp::elems($snapshot);
loop (my int $i = 0; $i < $sent; $i = $i + 1) {
nqp::atpos($snapshot, $i).emit()(value);
}
}
}
unless $sent {
self!add-replay({ $_.emit()(value) })
return if $sent;
return if self!add-replay({ $_.emit()(value) });
}
}

method done(--> Nil) {
my int $sent = 0;
my $snapshot := $!tappers;
if nqp::isconcrete($snapshot) {
my int $n = nqp::elems($snapshot);
loop (my int $i = 0; $i < $n; $i = $i + 1) {
nqp::atpos($snapshot, $i).done()();
$sent = 1;
loop {
my int $sent = 0;
my $snapshot := $!tappers;
if nqp::isconcrete($snapshot) {
$sent = nqp::elems($snapshot);
loop (my int $i = 0; $i < $sent; $i = $i + 1) {
nqp::atpos($snapshot, $i).done()();
}
}
}
unless $sent {
self!add-replay({ $_.done()() })
return if $sent;
return if self!add-replay({ $_.done()() });
}
}

method quit($ex --> Nil) {
my int $sent = 0;
my $snapshot := $!tappers;
if nqp::isconcrete($snapshot) {
my int $n = nqp::elems($snapshot);
loop (my int $i = 0; $i < $n; $i = $i + 1) {
nqp::atpos($snapshot, $i).quit()($ex);
$sent = 1;
loop {
my int $sent = 0;
my $snapshot := $!tappers;
if nqp::isconcrete($snapshot) {
$sent = nqp::elems($snapshot);
loop (my int $i = 0; $i < $sent; $i = $i + 1) {
nqp::atpos($snapshot, $i).quit()($ex);
}
}
}
unless $sent {
self!add-replay({ $_.quit()($ex) })
return if $sent;
return if self!add-replay({ $_.quit()($ex) });
}
}

method !add-replay(&replay) {
$!replay-lock.protect: { @!replay.push(&replay) }
method !add-replay(&replay --> Bool) {
$!replay-lock.protect: {
if $!replay-done {
False
}
else {
@!replay.push(&replay);
True
}
}
}

method !replay($tle) {
while $!replay-lock.protect({ @!replay.shift }) -> $rep {
$rep($tle)
$!replay-lock.protect: {
while @!replay.shift -> $rep {
$rep($tle);
}
$!replay-done = 1;
}
}

Expand Down

0 comments on commit cabf6fb

Please sign in to comment.