Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Use Supply.on_demand in sockets implementation.
  • Loading branch information
jnthn committed Jun 5, 2014
1 parent 7fd39c3 commit a83d819
Showing 1 changed file with 62 additions and 50 deletions.
112 changes: 62 additions & 50 deletions src/core/IO/Socket/Async.pm
Expand Up @@ -45,44 +45,52 @@ my class IO::Socket::Async {
}

method chars_supply(IO::Socket::Async:D: :$scheduler = $*SCHEDULER) {
my $s = Supply.new;
nqp::asyncreadchars(
$!VMIO,
$scheduler.queue,
-> Mu \seq, Mu \data, Mu \err {
if err {
$s.quit(err);
}
elsif seq < 0 {
$s.done();
}
else {
$s.more(data);
}
},
SocketCancellation);
$s
my $cancellation;
Supply.on_demand(-> $s {
$cancellation := nqp::asyncreadchars(
$!VMIO,
$scheduler.queue,
-> Mu \seq, Mu \data, Mu \err {
if err {
$s.quit(err);
}
elsif seq < 0 {
$s.done();
}
else {
$s.more(data);
}
},
SocketCancellation);
},
closing => {
$cancellation && nqp::cancel($cancellation)
});
}

method bytes_supply(IO::Socket::Async:D: :$scheduler = $*SCHEDULER, :$buf = buf8.new) {
my $s = Supply.new;
nqp::asyncreadbytes(
$!VMIO,
$scheduler.queue,
-> Mu \seq, Mu \data, Mu \err {
if err {
$s.quit(err);
}
elsif seq < 0 {
$s.done();
}
else {
$s.more(data);
}
},
nqp::decont($buf),
SocketCancellation);
$s
my $cancellation;
Supply.on_demand(-> $s {
$cancellation := nqp::asyncreadbytes(
$!VMIO,
$scheduler.queue,
-> Mu \seq, Mu \data, Mu \err {
if err {
$s.quit(err);
}
elsif seq < 0 {
$s.done();
}
else {
$s.more(data);
}
},
nqp::decont($buf),
SocketCancellation);
},
closing => {
$cancellation && nqp::cancel($cancellation)
});
}

method close(IO::Socket::Async:D:) {
Expand Down Expand Up @@ -112,20 +120,24 @@ my class IO::Socket::Async {

method listen(IO::Socket::Async:U: $host as Str, $port as Int,
:$scheduler = $*SCHEDULER) {
my $s = Supply.new;
nqp::asynclisten(
$scheduler.queue,
-> Mu \socket, Mu \err {
if err {
$s.quit(err);
}
else {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
$s.more($client_socket);
}
},
$host, $port, SocketCancellation);
$s
my $cancellation;
Supply.on_demand(-> $s {
$cancellation := nqp::asynclisten(
$scheduler.queue,
-> Mu \socket, Mu \err {
if err {
$s.quit(err);
}
else {
my $client_socket := nqp::create(self);
nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
$s.more($client_socket);
}
},
$host, $port, SocketCancellation);
},
closing => {
$cancellation && nqp::cancel($cancellation)
});
}
}

0 comments on commit a83d819

Please sign in to comment.