Skip to content

Commit

Permalink
Streamline async socket message Supply
Browse files Browse the repository at this point in the history
Implement the Tappable interface directly, rather than going through
the `Supply.on-demand` machinery that adds extra safety checks and
indirections. This boosts the number of requests per second in a
simple Cro HTTP example by around 13%; for more direct users of
IO::Socket::Async it will be more significant than that.
  • Loading branch information
jnthn committed Sep 22, 2017
1 parent 3deda84 commit f58ac99
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 19 deletions.
21 changes: 2 additions & 19 deletions src/core/IO/Socket/Async.pm
Expand Up @@ -42,27 +42,10 @@ my class IO::Socket::Async {
$p
}

my sub capture(\supply) {
my $ss = Rakudo::Internals::SupplySequencer.new(
on-data-ready => -> \data { supply.emit(data) },
on-completed => -> { supply.done() },
on-error => -> \err { supply.quit(err) });
-> Mu \seq, Mu \data, Mu \err { $ss.process(seq, data, err) }
}

multi method Supply(IO::Socket::Async:D: :$bin, :$buf = buf8.new, :$enc, :$scheduler = $*SCHEDULER) {
if $bin {
my $cancellation;
Supply.on-demand:
-> $supply {
$cancellation := nqp::asyncreadbytes($!VMIO,
$scheduler.queue(:hint-affinity),
capture($supply), nqp::decont($buf), SocketCancellation);
$!close-promise.then({ $supply.done });
},
closing => {
$cancellation && nqp::cancel($cancellation)
}
Supply.new: Rakudo::Internals::IOReaderTappable.new:
:$!VMIO, :$scheduler, :$buf, :$!close-promise
}
else {
my $bin-supply = self.Supply(:bin);
Expand Down
76 changes: 76 additions & 0 deletions src/core/Supply.pm
Expand Up @@ -1728,6 +1728,82 @@ my class Supplier::Preserving is Supplier {
}

augment class Rakudo::Internals {
class IOReaderTappable does Tappable {
my class ReaderCancellation is repr('AsyncTask') { }

has $!VMIO;
has $!scheduler;
has $!buf;
has $!close-promise;

method new(Mu :$VMIO!, :$scheduler!, :$buf!, :$close-promise!) {
self.CREATE!SET-SELF($VMIO, $scheduler, $buf, $close-promise)
}

method !SET-SELF(Mu $!VMIO, $!scheduler, $!buf, $!close-promise) { self }

method tap(&emit, &done, &quit, &tap) {
my $buffer := nqp::list();
my int $buffer-start-seq = 0;
my int $done-target = -1;
my int $finished = 0;

sub emit-events() {
until nqp::elems($buffer) == 0 || nqp::isnull(nqp::atpos($buffer, 0)) {
emit(nqp::shift($buffer));
$buffer-start-seq = $buffer-start-seq + 1;
}
if $buffer-start-seq == $done-target {
done();
$finished = 1;
}
}

my $lock = Lock::Async.new;
my $tap;
$lock.protect: {
my $cancellation := nqp::asyncreadbytes(nqp::decont($!VMIO),
$!scheduler.queue(:hint-affinity),
-> Mu \seq, Mu \data, Mu \err {
$lock.protect: {
unless $finished {
if err {
quit(err);
$finished = 1;
}
elsif nqp::isconcrete(data) {
my int $insert-pos = seq - $buffer-start-seq;
nqp::bindpos($buffer, $insert-pos, data);
emit-events();
}
else {
$done-target = seq;
emit-events();
}
}
}
},
nqp::decont($!buf), ReaderCancellation);
$tap := Tap.new({ nqp::cancel($cancellation) });
tap($tap);
}
$!close-promise.then: {
$lock.protect: {
unless $finished {
done();
$finished = 1;
}
}
}

$tap
}

method live(--> False) { }
method sane(--> True) { }
method serial(--> True) { }
}

my constant ADD_WHENEVER_PROMPT = Mu.new;

class CachedAwaitHandle does Awaitable {
Expand Down

0 comments on commit f58ac99

Please sign in to comment.