Skip to content

Commit

Permalink
Optimize IO::Socket::Async.listen Supply
Browse files Browse the repository at this point in the history
Again, by avoiding Supply.on-demand and its various protections. Gives
a performance boost to async socket servers; in Cro it amounts to ~4%
more requests per second.
  • Loading branch information
jnthn committed Sep 22, 2017
1 parent 40c2d0c commit c46de00
Showing 1 changed file with 77 additions and 34 deletions.
111 changes: 77 additions & 34 deletions src/core/IO/Socket/Async.pm
Expand Up @@ -171,43 +171,86 @@ my class IO::Socket::Async {
$p
}

method listen(IO::Socket::Async:U: Str() $host, Int() $port, Int() $backlog = 128,
:$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
my $cancellation;
my $encoding = Encoding::Registry.find($enc);
Supply.on-demand(-> $s {
$cancellation := nqp::asynclisten(
$scheduler.queue(:hint-affinity),
-> Mu \socket, Mu \err, Mu \peer-host, Mu \peer-port, Mu \socket-host, Mu \socket-port {
if err {
$s.quit(err);
}
else {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc',
$encoding.name);
nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
$encoding.encoder());
nqp::bindattr($client_socket, IO::Socket::Async, '$!peer-host', peer-host);
nqp::bindattr($client_socket, IO::Socket::Async, '$!peer-port', peer-port);
nqp::bindattr($client_socket, IO::Socket::Async, '$!socket-host', socket-host);
nqp::bindattr($client_socket, IO::Socket::Async, '$!socket-port', socket-port);
my class SocketListenerTappable does Tappable {
has $!host;
has $!port;
has $!backlog;
has $!encoding;
has $!scheduler;

setup-close($client_socket);
$s.emit($client_socket);
method new(:$host!, :$port!, :$backlog!, :$encoding!, :$scheduler!) {
self.CREATE!SET-SELF($host, $port, $backlog, $encoding, $scheduler)
}

method !SET-SELF($!host, $!port, $!backlog, $!encoding, $!scheduler) { self }

method tap(&emit, &done, &quit, &tap) {
my $lock := Lock::Async.new;
my $tap;
my int $finished = 0;
$lock.protect: {
my $cancellation := nqp::asynclisten(
$!scheduler.queue(:hint-affinity),
-> Mu \socket, Mu \err, Mu \peer-host, Mu \peer-port,
Mu \socket-host, Mu \socket-port {
$lock.protect: {
if $finished {
# do nothing
}
elsif err {
say "here with error";
quit(X::AdHoc.new(message => err));
$finished = 1;
}
else {
my $client_socket := nqp::create(IO::Socket::Async);
nqp::bindattr($client_socket, IO::Socket::Async,
'$!VMIO', socket);
nqp::bindattr($client_socket, IO::Socket::Async,
'$!enc', $!encoding.name);
nqp::bindattr($client_socket, IO::Socket::Async,
'$!encoder', $!encoding.encoder());
nqp::bindattr($client_socket, IO::Socket::Async,
'$!peer-host', peer-host);
nqp::bindattr($client_socket, IO::Socket::Async,
'$!peer-port', peer-port);
nqp::bindattr($client_socket, IO::Socket::Async,
'$!socket-host', socket-host);
nqp::bindattr($client_socket, IO::Socket::Async,
'$!socket-port', socket-port);
setup-close($client_socket);
emit($client_socket);
}
}
},
$!host, $!port, $!backlog, SocketCancellation);
$tap = Tap.new: {
my $p = Promise.new;
my $v = $p.vow;
nqp::cancelnotify($cancellation, $!scheduler.queue, { $v.keep(True); });
$p
}
tap($tap);
CATCH {
default {
tap($tap = Tap.new({ Nil })) unless $tap;
quit($_);
}
},
$host, $port, $backlog, SocketCancellation);
},
closing => {
if $cancellation {
my $p = Promise.new;
my $v = $p.vow;
nqp::cancelnotify($cancellation, $scheduler.queue, { $v.keep(True); });
$p
}
}
});
$tap
}

method live(--> False) { }
method sane(--> True) { }
method serial(--> True) { }
}

method listen(IO::Socket::Async:U: Str() $host, Int() $port, Int() $backlog = 128,
:$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
my $encoding = Encoding::Registry.find($enc);
Supply.new: SocketListenerTappable.new:
:$host, :$port, :$backlog, :$encoding, :$scheduler
}

sub setup-close(\socket --> Nil) {
Expand Down

0 comments on commit c46de00

Please sign in to comment.