Skip to content

Commit

Permalink
Refactor supply/react code running
Browse files Browse the repository at this point in the history
Rather than blocks that poke into the SupplyState object, just put
the logic inside of methods inside of SupplyState. This saves making
3 closures for every single message a Supply processes. Further, a
bunch of accessor calls give way to attribute accesses, and further
improvements are possible as a result of the code being in a righter
place.
  • Loading branch information
jnthn committed Jan 15, 2018
1 parent 9c0db1c commit fbf432f
Showing 1 changed file with 45 additions and 40 deletions.
85 changes: 45 additions & 40 deletions src/core/Supply.pm
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,10 @@ augment class Rakudo::Internals {
self.CREATE!SET-SELF(&emit, &done, &quit)
}

method !SET-SELF(&!emit, &!done, &!quit) {
method !SET-SELF(&emit, &done, &quit) {
&!emit := &emit;
&!done := &done;
&!quit := &quit;
$!active = 1;
$!lock := Lock.new;
$!run-async-lock := Lock::Async.new;
Expand Down Expand Up @@ -1835,14 +1838,45 @@ augment class Rakudo::Internals {
}
}

method consume-active-taps() {
my @active;
method teardown(--> Nil) {
my $to-close := nqp::create(IterationBuffer);
$!lock.protect: {
@active = %!active-taps.values;
%!active-taps.values.iterator.push-all($to-close);
%!active-taps = ();
$!active = 0;
}
@active
my int $n = nqp::elems($to-close);
loop (my int $i = 0; $i < $n; $i++) {
nqp::atpos($to-close, $i).close();
}
my @close-phasers := @!close-phasers;
while @close-phasers {
@close-phasers.pop()();
}
}

method run-emit(--> Nil) {
if $!active {
my \ex := nqp::exception();
my $emit-handler := &!emit;
$emit-handler(nqp::getpayload(ex)) if $emit-handler.DEFINITE;
nqp::resume(ex)
}
}

method run-done(--> Nil) {
self.get-and-zero-active();
self.teardown();
my $done-handler := &!done;
$done-handler() if $done-handler.DEFINITE;
}

method run-catch(--> Nil) {
my \ex = EXCEPTION(nqp::exception());
self.get-and-zero-active();
self.teardown();
my $quit-handler = &!quit;
$quit-handler(ex) if $quit-handler;
}
}

Expand Down Expand Up @@ -1891,7 +1925,7 @@ augment class Rakudo::Internals {
}
if !$handled && $state.get-and-zero-active() {
$state.quit().(ex) if $state.quit;
self!teardown($state);
$state.teardown();
}
}, Nil, $state, &add-whenever);
if $handled {
Expand All @@ -1911,7 +1945,7 @@ augment class Rakudo::Internals {

# Create and pass on tap; when closed, tear down the state and all
# of our subscriptions.
my $t := Tap.new(-> { self!teardown($state) });
my $t := Tap.new(-> { $state.teardown() });
tap($t);

# Run the Supply block, then decrease active count afterwards (it
Expand All @@ -1928,31 +1962,10 @@ augment class Rakudo::Internals {
my @run-after;
my $queued := $state.run-async-lock.protect-or-queue-on-recursion: {
my &*ADD-WHENEVER := &add-whenever;
my $emitter = -> {
if $state.active {
my \ex := nqp::exception();
my $emit-handler := $state.emit;
$emit-handler(nqp::getpayload(ex)) if $emit-handler.DEFINITE;
nqp::resume(ex)
}
}
my $done = -> {
$state.get-and-zero-active();
self!teardown($state);
my $done-handler := $state.done;
$done-handler() if $done-handler.DEFINITE;
}
my $catch = -> {
my \ex = EXCEPTION(nqp::exception());
$state.get-and-zero-active();
self!teardown($state);
my $quit-handler = $state.quit;
$quit-handler(ex) if $quit-handler;
}
$state.active > 0 and nqp::handle(code(value),
'EMIT', $emitter(),
'DONE', $done(),
'CATCH', $catch(),
'EMIT', $state.run-emit(),
'DONE', $state.run-done(),
'CATCH', $state.run-catch(),
'NEXT', 0);
@run-after = $state.awaiter.take-all;
}
Expand Down Expand Up @@ -1987,15 +2000,7 @@ augment class Rakudo::Internals {
if $state.decrement-active() == 0 {
my $done-handler := $state.done;
$done-handler() if $done-handler;
self!teardown($state);
}
}

method !teardown($state) {
.close for $state.consume-active-taps;
my @close-phasers := $state.close-phasers;
while @close-phasers {
@close-phasers.pop()();
$state.teardown();
}
}

Expand Down

0 comments on commit fbf432f

Please sign in to comment.