Skip to content

Commit

Permalink
Parameterize events the roles can emit. Create eg-15-ipc-run.pl and a…
Browse files Browse the repository at this point in the history
… supporting role and class.
  • Loading branch information
rcaputo committed Oct 3, 2010
1 parent 5b2d561 commit 2ea8fb8
Show file tree
Hide file tree
Showing 23 changed files with 408 additions and 25 deletions.
16 changes: 16 additions & 0 deletions eg/Runner.pm
@@ -0,0 +1,16 @@
package Runner;
use Moose;
extends 'Reflex::Base';

has [qw(stdin stdout stderr)] => ( isa => 'FileHandle', is => 'ro' );
has pid => ( isa => 'Int', is => 'ro' );

with 'RunnerRole' => {
stdin => 'stdin',
stdout => 'stdout',
stderr => 'stderr',
pid => 'pid',
ev_exit => 'exit',
};

1;
226 changes: 226 additions & 0 deletions eg/RunnerRole.pm
@@ -0,0 +1,226 @@
package RunnerRole;
use Reflex::Role;

attribute_parameter stdin => "stdin";
attribute_parameter stdout => "stdin";
attribute_parameter stderr => "stdin";
attribute_parameter pid => "pid";

callback_parameter cb_stdout_data => qw( on stdout data );
callback_parameter cb_stdout_error => qw( on stdout error );

callback_parameter cb_stdout_closed => qw( on stdout closed );
event_parameter ev_stdout_closed => qw( _ stdout closed );

callback_parameter cb_stderr_data => qw( on stderr data );
callback_parameter cb_stderr_error => qw( on stderr error );

callback_parameter cb_stderr_closed => qw( on stderr closed );
event_parameter ev_stderr_closed => qw( _ stderr closed );

callback_parameter cb_exit => qw( on pid exit );
event_parameter ev_exit => qw( _ pid exit );

method_parameter method_put => qw( put stdin _ );

role {
my $p = shift;

with 'Reflex::Role::OutStreaming' => {
handle => $p->stdin(),
method_put => $p->method_put(),
};

my $m_stdout_stop = "stop_" . $p->stdout();
my $cb_stdout_closed = $p->cb_stdout_closed();
my $ev_stdout_closed = $p->ev_stdout_closed();

after $cb_stdout_closed => sub {
my ($self, $args) = @_;
$self->$m_stdout_stop();
};

with 'Reflex::Role::InStreaming' => {
handle => $p->stdout(),
cb_data => $p->cb_stdout_data(),
cb_error => $p->cb_stdout_error(),
cb_closed => $cb_stdout_closed,
};

my $m_stderr_stop = "stop_" . $p->stderr();
my $cb_stderr_closed = $p->cb_stderr_closed();
my $ev_stderr_closed = $p->ev_stderr_closed();

after $cb_stderr_closed => sub {
my ($self, $args) = @_;
$self->$m_stderr_stop();
$self->emit(event => $ev_stderr_closed, args => $args);
};

with 'Reflex::Role::InStreaming' => {
handle => $p->stderr(),
cb_data => $p->cb_stderr_data(),
cb_error => $p->cb_stderr_error(),
cb_closed => $cb_stderr_closed,
};

with 'Reflex::Role::PidCatcher' => {
pid => 'pid',
cb_exit => $p->cb_exit(),
ev_exit => $p->ev_exit(),
};
};

1;

__END__
extends 'Reflex::Base';
use Reflex::Trait::Observed;
use Reflex::PID;
use Carp qw(croak);
use IPC::Run qw(start);
use Symbol qw(gensym);
__END__
observes process => ( isa => 'Maybe[Reflex::PID]', is => 'rw' );
has [qw(stdin stdout stderr)] => (
isa => 'Maybe[FileHandle]',
is => 'rw',
);
has ipc_run => ( isa => 'IPC::Run', is => 'rw' );
has cmd => (
isa => 'ArrayRef',
is => 'ro',
required => 1,
);
### Reap the child process.
sub on_process_exit {
my ($self, $args) = @_;
$self->emit(event => 'exit', args => $args);
}
sub kill {
my ($self, $signal) = @_;
croak "no process to kill" unless $self->process();
$signal ||= 'TERM';
kill $signal, $self->process()->pid();
}
### Write to standard input.
sub on_stdin_error {
my ($self, $args) = @_;
$self->emit(event => 'stdin_error', args => $args);
}
with 'Reflex::Role::Writing' => { handle => 'stdin' };
sub on_stdin_writable {
my ($self, $arg) = @_;
my $octets_left = $self->flush_stdin();
return if $octets_left;
$self->flush_stdin();
}
with 'Reflex::Role::Writable' => { handle => 'stdin' };
### Read from standard output.
sub on_stdout_readable {
my ($self, $arg) = @_;
my $octets_read = $self->read_stdout($arg);
warn $octets_read;
return if $octets_read;
if (defined $octets_read) {
warn 111;
$self->pause_stdout_readable();
return;
}
$self->stop_stdout_readable();
}
sub on_stdout_error {
my ($self, $args) = @_;
$self->emit(event => 'stdout_error', args => $args);
$self->stop_stdout_readable();
}
with 'Reflex::Role::Reading' => {
handle => 'stdout',
cb_data => 'on_stdout',
ev_data => 'stdout',
};
with 'Reflex::Role::Readable' => {
handle => 'stdout',
cb_ready => 'on_stdout_readable',
};
### Read from standard error.
sub on_stderr_error {
my ($self, $args) = @_;
$self->emit(event => 'stderr_error', args => $args);
$self->stop_stderr_readable();
}
sub on_stderr_readable {
my ($self, $arg) = @_;
my $octets_read = $self->read_stderr($arg);
warn $octets_read;
return if $octets_read;
if (defined $octets_read) {
warn 111;
$self->pause_stderr_readable();
return;
}
$self->stop_stderr_readable();
}
with 'Reflex::Role::Reading' => {
handle => 'stderr',
cb_data => 'on_stderr',
ev_data => 'stderr',
};
with 'Reflex::Role::Readable' => {
handle => 'stderr',
cb_ready => 'on_stderr_readable',
};
sub BUILD {
my $self = shift;
my ($fh_in, $fh_out, $fh_err) = (gensym(), gensym(), gensym());
$self->ipc_run(
start(
$self->cmd(),
'<pipe', $fh_in,
'>pipe', $fh_out,
'2>pipe', $fh_err,
)
) or die "IPC::Run start() failed: $? ($!)";
$self->process(
Reflex::PID->new(
pid => $self->ipc_run->{KIDS}[0]{PID}
)
);
$self->stdin($fh_in);
$self->stdout($fh_out);
$self->stderr($fh_err);
}
1;
83 changes: 83 additions & 0 deletions eg/eg-15-ipc-run.pl
@@ -0,0 +1,83 @@
#!/usr/bin/perl

# Demonstrate subprocesses without POE::Wheel::Run.
# Test case for upcoming Reflex::Run, which drives IPC::Run.

use warnings;
use strict;
use lib qw(../lib);

use Runner;

my $cmd = [
$^X, '-MTime::HiRes=sleep', '-wle',
q($|=1;) .
q(for (1..3) { $_ = qq[pid($$) moo($_)]; print; warn "$_\n"; sleep rand; })
];

sub my_start {
my $cmd = shift;

use IPC::Run qw(start);
use Symbol qw(gensym);

my ($fh_in, $fh_out, $fh_err) = (gensym(), gensym(), gensym());

my $ipc_run = start(
$cmd,
'<pipe', $fh_in,
'>pipe', $fh_out,
'2>pipe', $fh_err,
) or die "IPC::Run start() failed: $? ($!)";

return($ipc_run, $fh_in, $fh_out, $fh_err);
}

my ($ipc_run_1, $runner_1);
{
($ipc_run_1, my($fh_in, $fh_out, $fh_err)) = my_start($cmd);

$runner_1 = Runner->new(
stdin => $fh_in,
stdout => $fh_out,
stderr => $fh_err,
pid => $ipc_run_1->{KIDS}[0]{PID},

on_stdout_closed => sub { print "runner_1 stdout closed\n" },
on_stderr_closed => sub { print "runner_1 stderr closed\n" },
on_stdout_data => sub { print "runner_1 stdout: $_[1]{data}" },
on_stderr_data => sub { print "runner_1 stderr: $_[1]{data}" },

on_exit => sub {
my ($self, $args) = @_;
warn "runner_1 child $args->{pid} exited: $args->{exit}\n";
$runner_1 = undef;
},
);
}

my ($ipc_run_2, $runner_2);
{
($ipc_run_2, my($fh_in, $fh_out, $fh_err)) = my_start($cmd);

$runner_2 = Runner->new(
stdin => $fh_in,
stdout => $fh_out,
stderr => $fh_err,
pid => $ipc_run_2->{KIDS}[0]{PID},

on_stdout_closed => sub { print "runner_2 stdout closed\n" },
on_stderr_closed => sub { print "runner_2 stderr closed\n" },
on_stdout_data => sub { print "runner_2 stdout: $_[1]{data}" },
on_stderr_data => sub { print "runner_2 stderr: $_[1]{data}" },

on_exit => sub {
my ($self, $args) = @_;
warn "runner_2 child $args->{pid} exited: $args->{exit}\n";
$runner_2 = undef;
},
);
}

Reflex->run_all();
exit;
1 change: 1 addition & 0 deletions lib/Reflex/Interval.pm
Expand Up @@ -12,6 +12,7 @@ with 'Reflex::Role::Interval' => {
auto_start => "auto_start",
auto_repeat => "auto_repeat",
cb_tick => "on_tick",
ev_tick => "tick",
method_start => "start",
method_stop => "stop",
method_repeat => "repeat",
Expand Down
1 change: 1 addition & 0 deletions lib/Reflex/PID.pm
Expand Up @@ -19,6 +19,7 @@ with 'Reflex::Role::PidCatcher' => {
pid => 'pid',
active => 'active',
cb_exit => 'on_exit',
ev_exit => 'exit',
method_start => 'start',
method_stop => 'stop',
method_pause => 'pause',
Expand Down
2 changes: 1 addition & 1 deletion lib/Reflex/POE/Wheel/Run.pm
Expand Up @@ -98,7 +98,7 @@ sub BUILD {
}

# Rethrow our signal event.
sub on_sigchld_pid {
sub on_sigchld_exit {
my ($self, $args) = @_;
$self->emit(
event => 'signal',
Expand Down
8 changes: 6 additions & 2 deletions lib/Reflex/Role.pm
Expand Up @@ -10,7 +10,7 @@ use Moose::Exporter;

Moose::Exporter->setup_import_methods(
with_caller => [ qw(
attribute_parameter method_parameter callback_parameter
attribute_parameter method_parameter callback_parameter event_parameter
method_emit_and_stop method_emit
) ],
also => 'MooseX::Role::Parameterized',
Expand Down Expand Up @@ -115,6 +115,8 @@ sub method_emit {
# parameter flags to automatically generate those methods.
BEGIN { *callback_parameter = *method_parameter; }

BEGIN { *event_parameter = *method_parameter; }

1;

__END__
Expand All @@ -125,6 +127,8 @@ Reflex::Role - define a Reflex paramaterized role
=head1 SYNOPSIS
TODO - Changed again;
package Reflex::Role::Streaming;
use Reflex::Role;
Expand All @@ -145,7 +149,7 @@ Reflex::Role - define a Reflex paramaterized role
with 'Reflex::Role::Collectible';
method_emit_and_stop $cb_error => "error";
method_emit_and_stop $cb_error => $p->ev_error();
with 'Reflex::Role::Reading' => {
handle => $h,
Expand Down

0 comments on commit 2ea8fb8

Please sign in to comment.