Skip to content

Commit

Permalink
Allow getting native descriptor in Proc::Async.
Browse files Browse the repository at this point in the history
This takes advantage of recent MoarVM changes in order to allow the
created stdout and stderr pipes to be used either for their data (by
tapping the returned object) or for the file descriptor. This is done
by making `stdout` and `stderr` return a `Proc::Async::Pipe`, which is
a subclass of `Supply` (for full back-compat), but with the addition
of a `native-descriptor` method that returns a `Promise` that will,
when the process is started, be kept with the descriptor.

Reading data from stdout/stderr will only commence when the `Supply`
has been tapped, rather than immediately after process start. This
means that the native descriptor can be obtained without any data
being read by the Perl 6 process.

The `Promise` returned by `start` now depends on *either* stdout or
stderr being read to completion *or* the native descriptor having been
obtained instead.

So far, this isn't useful for much, but follow-up commits will make it
possible to chain the output of one process into the input of another
and have the whole lot plumbed together at file descriptor level. This
will also be used to fix the synchronous `Proc`, by implementing the
`native-descriptor` method on `IO::Pipe`. That way the special case
multi for `IO::Pipe` in `Proc::Async` can go away.
  • Loading branch information
jnthn committed Jun 26, 2017
1 parent 62d54c7 commit 8230112
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 23 deletions.
122 changes: 100 additions & 22 deletions src/core/Proc/Async.pm
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,48 @@ my class X::Proc::Async::OpenForWriting does X::Proc::Async {
}

my class Proc::Async {
# An asynchornous process output pipe is a Supply that also can provide
# the native descriptor of the underlying pipe.
class Pipe is Supply {
my class PermitOnTap does Tappable {
has Tappable $.delegate;
has &.on-tap;
method tap(|c) {
&!on-tap();
$!delegate.tap(|c)
}
method live() { self.delegate.live }
method serial() { self.delegate.serial }
method sane() { self.delegate.sane }
}

has Promise $.native-descriptor;
has &!on-nd-used;

submethod BUILD(:$!native-descriptor!, :&!on-nd-used) {}

method native-descriptor() {
&!on-nd-used();
$!native-descriptor
}

method new($delegate, $native-descriptor, &on-tap, &on-nd-used) {
self.bless(
tappable => PermitOnTap.bless(:$delegate, :&on-tap),
:$native-descriptor, :&on-nd-used
)
}
}

my class ProcessCancellation is repr('AsyncTask') { }
my enum CharsOrBytes ( :Bytes(0), :Chars(1) );

has $!ready_promise = Promise.new;
has $!ready_vow = $!ready_promise.vow;
has $!stdout_descriptor_vow;
has $!stderr_descriptor_vow;
has $!stdout_descriptor_used = Promise.new;
has $!stderr_descriptor_used = Promise.new;
has $.path;
has @.args;
has $.w;
Expand All @@ -71,6 +108,8 @@ my class Proc::Async {
has CharsOrBytes $!stderr_type;
has $!merge_supply;
has CharsOrBytes $!merge_type;
has $!stdout_fd_promise;
has $!stderr_fd_promise;
has Int $!stdin-fd;
has Int $!stdout-fd;
has Int $!stderr-fd;
Expand All @@ -88,14 +127,41 @@ my class Proc::Async {
$!encoder := Encoding::Registry.find($!enc).encoder(:$!translate-nl);
}

method !supply(\what,\the-supply,\type,\value) {
method !pipe-cbs(\channel) {
-> { $!ready_promise.then({ nqp::permit($!process_handle, channel, -1) }) },
-> { (channel == 1 ?? $!stdout_descriptor_used !! $!stderr_descriptor_used).keep(True) }
}

method !pipe(\what, \the-supply, \type, \value, \fd-vow, \permit-channel) {
X::Proc::Async::TapBeforeSpawn.new(handle => what, proc => self).throw
if $!started;
X::Proc::Async::CharsOrBytes.new(handle => what, proc => self).throw
if the-supply and type != value;

type = value;
the-supply //= Supplier::Preserving.new;

if nqp::iscont(fd-vow) {
my $native-descriptor = Promise.new;
fd-vow = $native-descriptor.vow;
Pipe.new(the-supply.Supply.Tappable, $native-descriptor, |self!pipe-cbs(permit-channel))
}
else {
the-supply.Supply
}
}

method !wrap-decoder(Supply:D $bin-supply, $enc, \fd-vow, \permit-channel, :$translate-nl) {
my \sup = Rakudo::Internals.BYTE_SUPPLY_DECODER($bin-supply, $enc // $!enc,
:translate-nl($translate-nl // $!translate-nl));
if nqp::iscont(fd-vow) {
my $native-descriptor = Promise.new;
fd-vow = $native-descriptor.vow;
Pipe.new(sup.Supply.Tappable, $native-descriptor, |self!pipe-cbs(permit-channel))
}
else {
sup
}
}

proto method stdout(|) { * }
Expand All @@ -104,16 +170,16 @@ my class Proc::Async {
die X::Proc::Async::BindOrUse.new(:handle<stdout>, :use('get the stdout Supply'))
if $!stdout-fd;
$bin
?? self!supply('stdout', $!stdout_supply, $!stdout_type, Bytes).Supply
?? self!pipe('stdout', $!stdout_supply, $!stdout_type, Bytes, $!stdout_descriptor_vow, 1)
!! self.stdout(|%_)
}
multi method stdout(Proc::Async:D: :$enc, :$translate-nl) {
die X::Proc::Async::SupplyOrStd.new if $!merge_supply;
die X::Proc::Async::BindOrUse.new(:handle<stdout>, :use('get the stdout Supply'))
if $!stdout-fd;
self!wrap-decoder:
self!supply('stdout', $!stdout_supply, $!stdout_type, Chars).Supply,
$enc, :$translate-nl
self!pipe('stdout', $!stdout_supply, $!stdout_type, Chars, Nil, 1),
$enc, $!stdout_descriptor_vow, 1, :$translate-nl
}

proto method stderr(|) { * }
Expand All @@ -122,16 +188,16 @@ my class Proc::Async {
die X::Proc::Async::BindOrUse.new(:handle<stderr>, :use('get the stderr Supply'))
if $!stderr-fd;
$bin
?? self!supply('stderr', $!stderr_supply, $!stderr_type, Bytes).Supply
?? self!pipe('stderr', $!stderr_supply, $!stderr_type, Bytes, $!stderr_descriptor_vow, 2)
!! self.stderr(|%_)
}
multi method stderr(Proc::Async:D: :$enc, :$translate-nl) {
die X::Proc::Async::SupplyOrStd.new if $!merge_supply;
die X::Proc::Async::BindOrUse.new(:handle<stderr>, :use('get the stderr Supply'))
if $!stderr-fd;
self!wrap-decoder:
self!supply('stderr', $!stderr_supply, $!stderr_type, Chars).Supply,
$enc, :$translate-nl
self!pipe('stderr', $!stderr_supply, $!stderr_type, Chars, Nil, 2),
$enc, $!stderr_descriptor_vow, 2, :$translate-nl
}

proto method Supply(|) { * }
Expand All @@ -142,7 +208,7 @@ my class Proc::Async {
die X::Proc::Async::BindOrUse.new(:handle<stderr>, :use('get the output Supply'))
if $!stderr-fd;
$bin
?? self!supply('merge', $!merge_supply, $!merge_type, Bytes).Supply
?? self!pipe('merge', $!merge_supply, $!merge_type, Bytes, Nil, 0)
!! self.Supply(|%_)
}
multi method Supply(Proc::Async:D: :$enc, :$translate-nl) {
Expand All @@ -152,8 +218,8 @@ my class Proc::Async {
die X::Proc::Async::BindOrUse.new(:handle<stderr>, :use('get the output Supply'))
if $!stderr-fd;
self!wrap-decoder:
self!supply('merge', $!merge_supply, $!merge_type, Chars).Supply,
$enc, :$translate-nl
self!pipe('merge', $!merge_supply, $!merge_type, Chars, Nil, 0),
$enc, Nil, 0, :$translate-nl
}

proto method bind-stdin($) {*}
Expand Down Expand Up @@ -194,11 +260,6 @@ my class Proc::Async {
$!ready_promise;
}

method !wrap-decoder(Supply:D $bin-supply, $enc, :$translate-nl) {
Rakudo::Internals.BYTE_SUPPLY_DECODER($bin-supply, $enc // $!enc,
:translate-nl($translate-nl // $!translate-nl))
}

method !capture(\callbacks,\std,\the-supply) {
my $promise = Promise.new;
my $vow = $promise.vow;
Expand Down Expand Up @@ -228,7 +289,21 @@ my class Proc::Async {
))
});

nqp::bindkey($callbacks, 'ready', {
nqp::bindkey($callbacks, 'ready', -> Mu \handles = Nil {
if nqp::isconcrete(handles) {
with $!stdout_descriptor_vow {
my $fd = nqp::atpos_i(handles, 0);
$fd < 0
?? .break("Desciptor not available")
!! .keep($fd)
}
with $!stderr_descriptor_vow {
my $fd = nqp::atpos_i(handles, 1);
$fd < 0
?? .break("Desciptor not available")
!! .keep($fd)
}
}
$!ready_vow.keep(Nil);
});

Expand All @@ -238,12 +313,14 @@ my class Proc::Async {
$!ready_vow.break($error);
});

@!promises.push(
self!capture($callbacks,'stdout',$!stdout_supply)
) if $!stdout_supply;
@!promises.push(
self!capture($callbacks,'stderr',$!stderr_supply)
) if $!stderr_supply;
@!promises.push(Promise.anyof(
self!capture($callbacks,'stdout',$!stdout_supply),
$!stdout_descriptor_used
)) if $!stdout_supply;
@!promises.push(Promise.anyof(
self!capture($callbacks,'stderr',$!stderr_supply),
$!stderr_descriptor_used
)) if $!stderr_supply;
@!promises.push(
self!capture($callbacks,'merge',$!merge_supply)
) if $!merge_supply;
Expand All @@ -260,6 +337,7 @@ my class Proc::Async {
CLONE-HASH-DECONTAINERIZED(%ENV),
$callbacks,
);
nqp::permit($!process_handle, 0, -1) if $!merge_supply;
Promise.allof( $!exit_promise, @!promises ).then({
$!exit_promise.status == Broken
?? $!exit_promise.cause.throw
Expand Down
1 change: 1 addition & 0 deletions src/core/Supply.pm
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ my class Supply does Awaitable {

method live(Supply:D:) { $!tappable.live }
method serial(Supply:D:) { $!tappable.serial }
method Tappable(--> Tappable) { $!tappable }

my \DISCARD = -> $ {};
my \NOP = -> {};
Expand Down
2 changes: 1 addition & 1 deletion tools/build/NQP_REVISION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2017.06-3-g0b45398
2017.06-13-gc5fc79f

0 comments on commit 8230112

Please sign in to comment.