Skip to content

Commit

Permalink
Use Encoding::Encoder in async I/O handles.
Browse files Browse the repository at this point in the history
Means that they now also support user-defined encodings, and should
have an output performance boost too.
  • Loading branch information
jnthn committed Jun 20, 2017
1 parent f267928 commit 4f9aafa
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
26 changes: 20 additions & 6 deletions src/core/IO/Socket/Async.pm
Expand Up @@ -4,6 +4,7 @@ my class IO::Socket::Async {
has $!VMIO;
has int $!udp;
has $.enc;
has $!encoder;

method new() {
die "Cannot create an asynchronous socket directly; please use\n" ~
Expand All @@ -12,7 +13,7 @@ my class IO::Socket::Async {
}

method print(IO::Socket::Async:D: Str() $str, :$scheduler = $*SCHEDULER) {
self.write($str.encode($!enc))
self.write($!encoder.encode-chars($str))
}

method write(IO::Socket::Async:D: Blob $b, :$scheduler = $*SCHEDULER) {
Expand Down Expand Up @@ -76,6 +77,7 @@ my class IO::Socket::Async {
:$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
my $p = Promise.new;
my $v = $p.vow;
my $encoding = Encoding::Registry.find($enc);
nqp::asyncconnect(
$scheduler.queue,
-> Mu \socket, Mu \err {
Expand All @@ -85,7 +87,9 @@ my class IO::Socket::Async {
else {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $encoding.name);
nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
$encoding.encoder());
$v.keep($client_socket);
}
},
Expand All @@ -96,6 +100,7 @@ my class IO::Socket::Async {
method listen(IO::Socket::Async:U: Str() $host, Int() $port, Int() $backlog = 128,
:$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
my $cancellation;
my $encoding = Encoding::Registry.find($enc);
Supply.on-demand(-> $s {
$cancellation := nqp::asynclisten(
$scheduler.queue,
Expand All @@ -106,7 +111,10 @@ my class IO::Socket::Async {
else {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc',
$encoding.name);
nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
$encoding.encoder());
$s.emit($client_socket);
}
},
Expand All @@ -125,6 +133,7 @@ my class IO::Socket::Async {
#?if moar
method udp(IO::Socket::Async:U: :$broadcast, :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
my $p = Promise.new;
my $encoding = Encoding::Registry.find($enc);
nqp::asyncudp(
$scheduler.queue,
-> Mu \socket, Mu \err {
Expand All @@ -135,7 +144,9 @@ my class IO::Socket::Async {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
nqp::bindattr_i($client_socket, IO::Socket::Async, '$!udp', 1);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $encoding.name);
nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
$encoding.encoder());
$p.keep($client_socket);
}
},
Expand All @@ -147,6 +158,7 @@ my class IO::Socket::Async {
method bind-udp(IO::Socket::Async:U: Str() $host, Int() $port, :$broadcast,
:$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
my $p = Promise.new;
my $encoding = Encoding::Registry.find($enc);
nqp::asyncudp(
$scheduler.queue,
-> Mu \socket, Mu \err {
Expand All @@ -157,7 +169,9 @@ my class IO::Socket::Async {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
nqp::bindattr_i($client_socket, IO::Socket::Async, '$!udp', 1);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $enc);
nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $encoding.name);
nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
$encoding.encoder());
$p.keep($client_socket);
}
},
Expand All @@ -167,7 +181,7 @@ my class IO::Socket::Async {
}

method print-to(IO::Socket::Async:D: Str() $host, Int() $port, Str() $str, :$scheduler = $*SCHEDULER) {
self.write-to($host, $port, $str.encode($!enc))
self.write-to($host, $port, $!encoder.encode-chars($str))
}

method write-to(IO::Socket::Async:D: Str() $host, Int() $port, Blob $b, :$scheduler = $*SCHEDULER) {
Expand Down
7 changes: 6 additions & 1 deletion src/core/Proc/Async.pm
Expand Up @@ -77,12 +77,17 @@ my class Proc::Async {
has $!process_handle;
has $!exit_promise;
has @!promises;
has $!encoder;

proto method new(|) { * }
multi method new(*@ ($path, *@args), *%_) {
self.bless(:$path, :@args, |%_)
}

submethod TWEAK(--> Nil) {
$!encoder := Encoding::Registry.find($!enc).encoder(:$!translate-nl);
}

method !supply(\what,\the-supply,\type,\value) {
X::Proc::Async::TapBeforeSpawn.new(handle => what, proc => self).throw
if $!started;
Expand Down Expand Up @@ -266,7 +271,7 @@ my class Proc::Async {
X::Proc::Async::OpenForWriting.new(:method<print>, proc => self).throw if !$!w;
X::Proc::Async::MustBeStarted.new(:method<print>, proc => self).throw if !$!started;

self.write($str.encode($!enc, :$!translate-nl))
self.write($!encoder.encode-chars($str))
}

method put(Proc::Async:D: \x, |c) {
Expand Down

0 comments on commit 4f9aafa

Please sign in to comment.