Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Initial stab at serializing stdout/stderr on P::A
I'm still not sure whether I covered the case that data may come in *after*
the pipe has been closed.  Probably not.  But at least this is an initial
stab at it.  FWIW, the associated test file fails about 1 in 5 times still
with segfaults and all sorts of other strange messages.  So there's still
work to be done!
  • Loading branch information
lizmat committed Sep 17, 2014
1 parent 5f30b5d commit df69aa6
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions src/core/Proc/Async.pm
Expand Up @@ -105,21 +105,52 @@ my class Proc::Async {

my $promise = Promise.new;
my $lock = Lock.new;
my int $last_seq = -1;
my int $moreing;
my int $next_seq;
my @buffer; # should be Mu, as data can be Mu

nqp::bindkey(callbacks,
std ~ ( type ?? '_chars' !! '_bytes' ),
-> Mu \seq, Mu \data, Mu \err {

# oh noes!
if err {
$promise.keep( (supply,err) );
}

# we're done
elsif seq < 0 {
$promise.keep( supply );
}

# got new data to process
else {
#say "{std}: seq = {seq} with {data} in {$*THREAD}" if std eq 'stdout';
supply.more(data);
# cannot simply return out of here, so we need a flag
my int $in_charge;

$lock.protect( {
#say "seq = {seq} with {data} in {$*THREAD}" if std eq 'stdout';
@buffer[ seq - $next_seq ] := data;
$in_charge = $moreing = 1 unless $moreing;
} );

if $in_charge {
my int $done;
while @buffer.exists_pos($done) {
#say "moreing { $next_seq + $done }: {@buffer[$done]}" if std eq 'stdout';
supply.more( @buffer[$done] );
$done = $done + 1;
}

$lock.protect( {
if $done {
#say "discarding from $next_seq for $done" if std eq 'stdout';
@buffer.splice(0,$done);
$next_seq = $next_seq + $done;
}
$moreing = 0;
} );
}
}
}
);
Expand Down

0 comments on commit df69aa6

Please sign in to comment.