Skip to content

Commit

Permalink
Queue Supply messages on recursion
Browse files Browse the repository at this point in the history
This handles the case where a Supply does an emit/done/quit and there
is a code path that ends up looping back to the Supply itself. Due to
messages on a Supply being serialized, this would result in a deadlock
after the recent changes to the Supply concurrency model. Previously,
it would work out for `supply`/`react` blocks due to their queueing of
messages (which we eliminated due to it being a broken backpressure
model and not allowing us to fix the tap-of-a-synchronous-emitter bug)
and "work" (but violate the one-message-at-a-time constraint) in other
cases due to the use of a reentrant mutex to manage concurrency control
in non-`supply`/`react`-block cases.

This fixes it with a unified approach, by adding a mechanism whereby a
recursive attempt to acquire a Lock::Async will queue the work instead
of blocking on the lock (which is actually non-blocking in the thread
pool). Thus, we retain the backpressure for competing senders, but
allow the current holder to queue messages to resolve what would be a
deadlock otherwise. A holder's recursion competes fairly with outside
messages, thanks to the queueing being through Lock::Async.

The recursion detection uses dynamic variables, not thread IDs. This is
important because stacks including a Lock::Async acquisition may move
between threads over their lifetime, so we cannot simply go on thread
ID.

Finally, there is also a mechanism to temporarily hide a lock from the
recursion detector, since the handling of doing a `whenever` that leads
to synchronous emits already exists. It's interesting to consider if
the mechanisms may be unified, but not immediately clear if they can
be.
  • Loading branch information
jnthn committed Sep 22, 2017
1 parent 0d600a0 commit 5478392
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 41 deletions.
50 changes: 50 additions & 0 deletions src/core/Lock/Async.pm
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,54 @@ my class Lock::Async {
LEAVE self.unlock() if $acquired;
code()
}

# This either runs the code now if we can obtain the lock, releasing the
# lock afterwards, or queues the code to run if a recursive use of the
# lock is observed. It relies on all users of the lock to use it through
# this method only. This is useful for providing back-pressure while also
# avoiding code deadlocking on itself by providing a way for it to get run
# later on. Returns Nil if the code was run now (maybe after blocking), or
# a Promise if it was queued for running later.
method protect-or-queue-on-recursion(Lock::Async:D: &code) {
my $try-acquire = self.lock();
if $try-acquire {
# We could acquire the lock. Run the code right now.
LEAVE self.unlock();
self!run-with-updated-recursion-list(&code);
Nil
}
elsif (@*LOCK-ASYNC-RECURSION-LIST // Empty).first(* === self) {
# Lock is already held on the stack, so we're recursing. Queue.
$try-acquire.then({
LEAVE self.unlock();
self!run-with-updated-recursion-list(&code);
});
}
else {
# Lock is held but by something else. Await it's availability.
my int $acquired = 0;
$*AWAITER.await($try-acquire);
$acquired = 1;
LEAVE self.unlock() if $acquired;
self!run-with-updated-recursion-list(&code);
Nil
}
}

method !run-with-updated-recursion-list(&code) {
my @new-held = @*LOCK-ASYNC-RECURSION-LIST // ();
@new-held.push(self);
{
my @*LOCK-ASYNC-RECURSION-LIST := @new-held;
code();
}
}

method with-lock-hidden-from-recursion-check(&code) {
my @new-held = (@*LOCK-ASYNC-RECURSION-LIST // ()).grep(* !=== self);
{
my @*LOCK-ASYNC-RECURSION-LIST := @new-held;
code();
}
}
}
94 changes: 53 additions & 41 deletions src/core/Supply.pm
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,16 @@ my class Supply does Awaitable {
tap($t);
},
-> \value{
$lock.protect: { emit(value); }
$lock.protect-or-queue-on-recursion: { emit(value); }
},
done => -> {
$lock.protect: {
$lock.protect-or-queue-on-recursion: {
done();
self!cleanup($cleaned-up, $source-tap);
}
},
quit => -> $ex {
$lock.protect: {
$lock.protect-or-queue-on-recursion: {
quit($ex);
self!cleanup($cleaned-up, $source-tap);
}
Expand Down Expand Up @@ -1834,46 +1834,48 @@ augment class Rakudo::Internals {
# Placed here so it can close over $state, but we only need to
# closure-clone it once per Supply block, not once per whenever.
sub add-whenever($supply, &whenever-block) {
my $*AWAITER := $state.awaiter;
$state.increment-active();
my $tap;
nqp::continuationreset(ADD_WHENEVER_PROMPT, {
$supply.tap(
tap => {
$tap = $_;
$state.add-active-tap($tap);
},
-> \value {
self!run-supply-code({ whenever-block(value) }, $state, &add-whenever)
},
done => {
$state.delete-active-tap($tap);
my @phasers := &whenever-block.phasers('LAST');
if @phasers {
self!run-supply-code({ .() for @phasers }, $state, &add-whenever)
}
$tap.close;
self!deactivate-one($state);
},
quit => -> \ex {
$state.delete-active-tap($tap);
my $handled;
self!run-supply-code({
my $phaser := &whenever-block.phasers('QUIT')[0];
if $phaser.DEFINITE {
$handled = $phaser(ex) === Nil;
}
if !$handled && $state.get-and-zero-active() {
$state.quit().(ex) if $state.quit;
self!teardown($state);
$state.run-async-lock.with-lock-hidden-from-recursion-check: {
my $*AWAITER := $state.awaiter;
$state.increment-active();
nqp::continuationreset(ADD_WHENEVER_PROMPT, {
$supply.tap(
tap => {
$tap = $_;
$state.add-active-tap($tap);
},
-> \value {
self!run-supply-code({ whenever-block(value) }, $state, &add-whenever)
},
done => {
$state.delete-active-tap($tap);
my @phasers := &whenever-block.phasers('LAST');
if @phasers {
self!run-supply-code({ .() for @phasers }, $state, &add-whenever)
}
}, $state, &add-whenever);
if $handled {
$tap.close;
self!deactivate-one($state);
}
});
});
},
quit => -> \ex {
$state.delete-active-tap($tap);
my $handled;
self!run-supply-code({
my $phaser := &whenever-block.phasers('QUIT')[0];
if $phaser.DEFINITE {
$handled = $phaser(ex) === Nil;
}
if !$handled && $state.get-and-zero-active() {
$state.quit().(ex) if $state.quit;
self!teardown($state);
}
}, $state, &add-whenever);
if $handled {
$tap.close;
self!deactivate-one($state);
}
});
});
}
$tap
}

Expand All @@ -1899,7 +1901,7 @@ augment class Rakudo::Internals {

method !run-supply-code(&code, $state, &add-whenever) {
my @run-after;
$state.run-async-lock.protect: {
my $queued := $state.run-async-lock.protect-or-queue-on-recursion: {
return unless $state.active > 0;
my &*ADD-WHENEVER = &add-whenever;
my $emitter = {
Expand All @@ -1925,6 +1927,15 @@ augment class Rakudo::Internals {
'NEXT', 0);
@run-after = $state.awaiter.take-all;
}
if $queued.defined {
$queued.then({ self!run-add-whenever-awaits(@run-after) });
}
else {
self!run-add-whenever-awaits(@run-after);
}
}

method !run-add-whenever-awaits(@run-after --> Nil) {
if @run-after {
my $nested-awaiter := SupplyBlockAddWheneverAwaiter.CREATE;
my $delegate-awaiter := $*AWAITER;
Expand All @@ -1939,7 +1950,8 @@ augment class Rakudo::Internals {
}

method !deactivate-one($state) {
$state.run-async-lock.protect: { self!deactivate-one-internal($state) };
$state.run-async-lock.protect-or-queue-on-recursion:
{ self!deactivate-one-internal($state) };
}

method !deactivate-one-internal($state) {
Expand Down

0 comments on commit 5478392

Please sign in to comment.