Browse files

touch-up distribution files for Wheel::Run and pipes

  • Loading branch information...
1 parent e5feab8 commit c3771043ff2d1dcafa753c89d10ff54a55862db6 @rcaputo committed Dec 29, 2000
Showing with 1,043 additions and 1 deletion.
  1. +13 −0 Changes
  2. +3 −0 MANIFEST
  3. +7 −0 README
  4. +161 −0 lib/POE/Pipe/Bidirectional.pm
  5. +152 −0 lib/POE/Pipe/Unidirectional.pm
  6. +520 −0 lib/POE/Wheel/Run.pm
  7. +1 −1 tests/14_wheels_ft.t
  8. +186 −0 tests/22_wheel_run.t
View
13 Changes
@@ -30,6 +30,19 @@ Version numbers have four fields: X.YYZZAA
Very minor optimization to Gtk read/write resuming. Replace Gtk's
noisy warning about resuming unpaused handles with quiet acceptance.
+Added POE::Pipe::Unidirectional, a portable way to create one-way
+pipes. It tries pipe first, then socketpair, then inet sockets before
+failing.
+
+Added POE::Pipe::Bidirectional, which does pretty much the same as
+Unidirectional, but it's designet to make two-way pipes. It's more
+efficient with filehandles, since socketpair() and inet sockets
+already are bidirectional.
+
+Added Wheel::Run, an open2/open3 style wheel that lets sessions spawn
+off spawn off background processes. It takes care of the whole pipe,
+fork, exec deal.
+
0.1202 2000.12.26
-----------------
View
3 MANIFEST
@@ -21,6 +21,8 @@ POE/Filter/Stream.pm
POE/Kernel.pm
POE/NFA.pm
POE/Object.pm
+POE/Pipe/Bidirectional.pm
+POE/Pipe/Unidirectional.pm
POE/Preprocessor.pm
POE/Repository/Array.pm
POE/Runtime.pm
@@ -89,3 +91,4 @@ t/18_filter_line.t
t/19_filterchange.t
t/20_accept.t
t/21_gtk.t
+t/22_wheel_run.t
View
7 README
@@ -9,6 +9,13 @@ threads. It includes a high-level I/O library that hides most of the
usual client/server tediosity. It has been developed and used in
mission-critical systems since August 1998.
+---------------------
+Documentation Roadmap
+---------------------
+
+The POE manpage's SEE ALSO section lists the topics covered in each
+manpage. It's a good starting place.
+
------------------
Basic Installation
------------------
View
161 lib/POE/Pipe/Bidirectional.pm
@@ -0,0 +1,161 @@
+# $Id$
+
+# Portable bidirectional pipe creation, trying as many different
+# methods as we can.
+
+package POE::Pipe::Bidirectional;
+
+use strict;
+use Symbol qw(gensym);
+use IO::Socket;
+
+sub DEBUG () { 0 }
+sub RUNNING_IN_HELL () { $^O eq 'MSWin32' }
+
+# This flag is set true/false after the first attempt at using plain
+# INET sockets as pipes.
+my $can_run_socket = undef;
+
+sub new {
+ my $type = shift;
+ my $conduit_type = shift;
+
+ # Generate symbols to be used as filehandles for the pipe's ends.
+ my $a_read = gensym();
+ my $a_write = gensym();
+ my $b_read = gensym();
+ my $b_write = gensym();
+
+ # Try the pipe if no preferred conduit type is specified, or if the
+ # specified conduit type is 'pipe'.
+ if ( (not defined $conduit_type) or
+ ($conduit_type eq 'pipe')
+ ) {
+
+ # Try using pipe, but don't bother on systems that don't support
+ # nonblocking pipes. Even if they support pipes themselves.
+ unless (RUNNING_IN_HELL) {
+
+ # Try pipes.
+ eval {
+ pipe($a_read, $b_write) or die "pipe 1 failed: $!";
+ pipe($b_read, $a_write) or die "pipe 2 failed: $!";
+ };
+
+ # Pipe succeeded.
+ unless (length $@) {
+ DEBUG and do {
+ warn "using a pipe\n";
+ warn "ar($a_read) aw($a_write) br($b_read) bw($b_write)\n";
+ };
+
+ # Turn off buffering. POE::Kernel does this for us, but
+ # someone might want to use the pipe class elsewhere.
+ select((select($a_write), $| = 1)[0]);
+ select((select($b_write), $| = 1)[0]);
+ return($a_read, $a_write, $b_read, $b_write);
+ }
+ }
+ }
+
+ # Try UNIX-domain socketpair if no preferred conduit type is
+ # specified, or if the specified conduit type is 'socketpair'.
+ if ( (not defined $conduit_type) or
+ ($conduit_type eq 'socketpair')
+ ) {
+ eval {
+ socketpair($a_read, $b_read, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+ or die "socketpair 1 failed: $!";
+ };
+
+ # Socketpair succeeded.
+ unless (length $@) {
+ DEBUG and do {
+ warn"using UNIX domain socketpairs\n";
+ warn "ar($a_read) aw($a_write) br($b_read) bw($b_write)\n";
+ };
+
+ # It's bidirectional, so each reader is also a writer.
+ $a_write = $a_read;
+ $b_write = $b_read;
+
+ # Turn off buffering. POE::Kernel does this for us, but someone
+ # might want to use the pipe class elsewhere.
+ select((select($a_write), $| = 1)[0]);
+ select((select($b_write), $| = 1)[0]);
+ return($a_read, $b_write);
+ }
+ }
+
+ # Try a pair of plain INET sockets if no preffered conduit type is
+ # specified, or if the specified conduit type is 'inet'.
+ if ( (not defined $conduit_type) or
+ ($conduit_type eq 'inet')
+ ) {
+
+ # Don't bother if we already know it won't work.
+ if ($can_run_socket or (not defined $can_run_socket)) {
+
+ # Try using a pair of plain INET domain sockets. Usurp SIGALRM
+ # in case it blocks. Normally POE programs don't use SIGALRM
+ # anyway. [fingers crossed here]
+ my $old_sig_alarm = $SIG{ALRM};
+ eval {
+ local $SIG{ALRM} = sub { die "deadlock" };
+ eval 'alarm(1)' unless RUNNING_IN_HELL;
+
+ my $acceptor = IO::Socket::INET->new
+ ( LocalAddr => '127.0.0.1',
+ LocalPort => 31415,
+ Listen => 5,
+ Reuse => 'yes',
+ );
+
+ $a_read = IO::Socket::INET->new
+ ( PeerAddr => '127.0.0.1',
+ PeerPort => 31415,
+ Reuse => 'yes',
+ );
+
+ $b_read = $acceptor->accept() or die "accept";
+
+ $a_write = $a_read;
+ $b_write = $b_read;
+ };
+ eval 'alarm(0)' unless RUNNING_IN_HELL;
+ $SIG{ALRM} = $old_sig_alarm;
+
+ # Sockets worked.
+ unless (length $@) {
+ DEBUG and do {
+ warn "using a plain INET socket\n";
+ warn "ar($a_read) aw($a_write) br($b_read) bw($b_write)\n";
+ };
+
+ # Try sockets more often.
+ $can_run_socket = 1;
+
+ # Turn off buffering. POE::Kernel does this for us, but someone
+ # might want to use the pipe class elsewhere.
+ select((select($a_write), $| = 1)[0]);
+ select((select($b_write), $| = 1)[0]);
+ return($a_read, $a_write, $b_read, $b_write);
+ }
+
+ # Sockets failed. Don't dry them again.
+ else {
+ $can_run_socket = 0;
+ }
+ }
+ }
+
+ # There's nothing left to try.
+ DEBUG and warn "nothing worked\n";
+ return(undef, undef, undef, undef);
+}
+
+###############################################################################
+1;
+
+__END__
+
View
152 lib/POE/Pipe/Unidirectional.pm
@@ -0,0 +1,152 @@
+# $Id$
+
+# Portable unidirectional pipe creation, trying as many different
+# methods as we can.
+
+package POE::Pipe::Unidirectional;
+
+use strict;
+use Symbol qw(gensym);
+use IO::Socket;
+
+sub DEBUG () { 0 }
+sub RUNNING_IN_HELL () { $^O eq 'MSWin32' }
+
+# This flag is set true/false after the first attempt at using plain
+# INET sockets as pipes.
+my $can_run_socket = undef;
+
+sub new {
+ my $type = shift;
+ my $conduit_type = shift;
+
+ # Generate symbols to be used as filehandles for the pipe's ends.
+ my $a_read = gensym();
+ my $b_write = gensym();
+
+ # Try the pipe if no preferred conduit type is specified, or if the
+ # specified conduit type is 'pipe'.
+ if ( (not defined $conduit_type) or
+ ($conduit_type eq 'pipe')
+ ) {
+
+ # Try using pipe, but don't bother on systems that don't support
+ # nonblocking pipes. Even if they support pipes themselves.
+ unless (RUNNING_IN_HELL) {
+
+ # Try pipe.
+ eval {
+ pipe($a_read, $b_write) or die "pipe failed: $!";
+ };
+
+ # Pipe succeeded.
+ unless (length $@) {
+ DEBUG and do {
+ warn "using a pipe\n";
+ warn "ar($a_read) bw($b_write)\n";
+ };
+
+ # Turn off buffering. POE::Kernel does this for us, but
+ # someone might want to use the pipe class elsewhere.
+ select((select($b_write), $| = 1)[0]);
+ return($a_read, $b_write);
+ }
+ }
+ }
+
+ # Try UNIX-domain socketpair if no preferred conduit type is
+ # specified, or if the specified conduit type is 'socketpair'.
+ if ( (not defined $conduit_type) or
+ ($conduit_type eq 'socketpair')
+ ) {
+ eval {
+ socketpair($a_read, $b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+ or die "socketpair failed: $!";
+ };
+
+ # Socketpair succeeded.
+ unless (length $@) {
+ DEBUG and do {
+ warn"using a UNIX domain socketpair: ar($a_read) bw($b_write)\n";
+ };
+
+ # It's unidirectional, so shut down the unused directions.
+ shutdown($a_read, 1);
+ shutdown($b_write, 0);
+
+ # Turn off buffering. POE::Kernel does this for us, but someone
+ # might want to use the pipe class elsewhere.
+ select((select($b_write), $| = 1)[0]);
+ return($a_read, $b_write);
+ }
+ }
+
+ # Try a pair of plain INET sockets if no preffered conduit type is
+ # specified, or if the specified conduit type is 'inet'.
+ if ( (not defined $conduit_type) or
+ ($conduit_type eq 'inet')
+ ) {
+
+ # Don't bother if we already know it won't work.
+ if ($can_run_socket or (not defined $can_run_socket)) {
+
+ # Try using a pair of plain INET domain sockets. Usurp SIGALRM
+ # in case it blocks. Normally POE programs don't use SIGALRM
+ # anyway. [fingers crossed here]
+ my $old_sig_alarm = $SIG{ALRM};
+ eval {
+ local $SIG{ALRM} = sub { die "deadlock" };
+ eval 'alarm(1)' unless RUNNING_IN_HELL;
+
+ my $acceptor = IO::Socket::INET->new
+ ( LocalAddr => '127.0.0.1',
+ LocalPort => 31415,
+ Listen => 5,
+ Reuse => 'yes',
+ );
+
+ $a_read = IO::Socket::INET->new
+ ( PeerAddr => '127.0.0.1',
+ PeerPort => 31415,
+ Reuse => 'yes',
+ );
+
+ $b_write = $acceptor->accept() or die "accept";
+ };
+ eval 'alarm(0)' unless RUNNING_IN_HELL;
+ $SIG{ALRM} = $old_sig_alarm;
+
+ # Sockets worked.
+ unless (length $@) {
+ DEBUG and do {
+ warn "using a plain INET socket\n";
+ warn "ar($a_read) bw($b_write)\n";
+ };
+
+ # Try sockets more often.
+ $can_run_socket = 1;
+
+ # It's unidirectional, so shut down the unused directions.
+ shutdown($a_read, 1);
+ shutdown($b_write, 0);
+
+ # Turn off buffering. POE::Kernel does this for us, but someone
+ # might want to use the pipe class elsewhere.
+ select((select($b_write), $| = 1)[0]);
+ return($a_read, $b_write);
+ }
+
+ # Sockets failed. Don't dry them again.
+ else {
+ $can_run_socket = 0;
+ }
+ }
+ }
+
+ # There's nothing left to try.
+ DEBUG and warn "nothing worked\n";
+ return(undef, undef);
+}
+
+###############################################################################
+1;
View
520 lib/POE/Wheel/Run.pm
@@ -0,0 +1,520 @@
+# $Id$
+
+# -><- error operations need to be better
+
+package POE::Wheel::Run;
+
+use strict;
+use Carp;
+use POE qw(Wheel Pipe::Unidirectional Driver::SysRW);
+
+# Offsets into $self.
+sub UNIQUE_ID () { 0 }
+sub DRIVER () { 1 }
+sub ERROR_EVENT () { 2 }
+sub PROGRAM () { 3 }
+sub CHILD_PID () { 4 }
+
+sub HANDLE_STDIN () { 5 }
+sub FILTER_STDIN () { 6 }
+sub EVENT_STDIN () { 7 }
+sub STATE_STDIN () { 8 }
+sub OCTETS_STDIN () { 9 }
+
+sub HANDLE_STDOUT () { 10 }
+sub FILTER_STDOUT () { 11 }
+sub EVENT_STDOUT () { 12 }
+sub STATE_STDOUT () { 13 }
+
+sub HANDLE_STDERR () { 14 }
+sub FILTER_STDERR () { 15 }
+sub EVENT_STDERR () { 16 }
+sub STATE_STDERR () { 17 }
+
+# Used to work around a bug in older perl versions.
+sub CRIMSON_SCOPE_HACK ($) { 0 }
+
+#------------------------------------------------------------------------------
+
+sub new {
+ my $type = shift;
+ my %params = @_;
+
+ croak "wheels no longer require a kernel reference as their first parameter"
+ if @_ and ref($_[0]) eq 'POE::Kernel';
+
+ croak "$type requires a working Kernel" unless defined $poe_kernel;
+
+ my $program = delete $params{Program};
+ croak "$type needs a Program parameter" unless defined $program;
+
+ my $priority_delta = delete $params{Priority};
+ $priority_delta = 0 unless defined $priority_delta;
+
+ my $user_id = delete $params{User};
+ my $group_id = delete $params{Group};
+
+ my $stdin_event = delete $params{StdinEvent};
+ my $stdout_event = delete $params{StdoutEvent};
+ my $stderr_event = delete $params{StderrEvent};
+
+ croak "$type needs at least one of StdinEvent, StdoutEvent or StderrEvent"
+ unless( defined($stdin_event) or defined($stdout_event) or
+ defined($stderr_event)
+ );
+
+ my $all_filter = delete $params{Filter};
+ my $stdin_filter = delete $params{StdinFilter};
+ my $stdout_filter = delete $params{StdoutFilter};
+ my $stderr_filter = delete $params{StderrFilter};
+
+ $stdin_filter = $all_filter unless defined $stdin_filter;
+ $stdout_filter = $all_filter unless defined $stdout_filter;
+ $stderr_filter = $all_filter unless defined $stderr_filter;
+
+ croak "$type needs either Filter or StdinFilter"
+ if defined($stdin_event) and not defined($stdin_filter);
+ croak "$type needs either Filter or StdoutFilter"
+ if defined($stdout_event) and not defined($stdout_filter);
+ croak "$type needs either Filter or StderrFilter"
+ if defined($stderr_event) and not defined($stderr_filter);
+
+ my $error_event = delete $params{ErrorEvent};
+
+ # Make sure the user didn't pass in parameters we're not aware of.
+ if (scalar keys %params) {
+ carp( "unknown parameters in $type constructor call: ",
+ join(', ', sort keys %params)
+ );
+ }
+
+ # Make the pipes. We make more pipes than strictly necessary in
+ # case someone wants to turn some onn later.
+ my ($stdin_read, $stdin_write) = POE::Pipe::Unidirectional->new();
+ croak "could not make stdin pipes: $!"
+ unless defined $stdin_read and defined $stdin_write;
+
+ my ($stdout_read, $stdout_write) = POE::Pipe::Unidirectional->new();
+ croak "could not make stdout pipes: $!"
+ unless defined $stdout_read and defined $stdout_write;
+
+ my ($stderr_read, $stderr_write) = POE::Pipe::Unidirectional->new();
+ croak "could not make stderr pipes: $!"
+ unless defined $stderr_read and defined $stderr_write;
+
+ # Fork! Woo-hoo!
+ my $pid = fork;
+
+ # Child. Parent side continues after this block.
+ unless ($pid) {
+ croak "couldn't fork: $!" unless defined $pid;
+
+ # Redirect STDIN from the read end of the stdin pipe.
+ open( STDIN, "<&=" . fileno($stdin_read) )
+ or die "can't redirect STDIN in child pid $$: $!";
+
+ # Redirect STDOUT to the write end of the stdout pipe.
+ open( STDOUT, ">&=" . fileno($stdout_write) )
+ or die "can't redirect stdout in child pid $$: $!";
+
+ # Redirect STDERR to the write end of the stderr pipe.
+ open( STDERR, ">&=" . fileno($stderr_write) )
+ or die "can't redirect stderr in child: $!";
+
+ # Fix the priority delta. -><- Hardcoded constants mean this
+ # process, at least here. [crosses fingers] -><- Also must add
+ # failure events for this. -><- Also must wrap it in eval for
+ # systems where it's not supported. -><- Warn if new priority is
+ # <0 and not superuser.
+ my $priority = getpriority(0, $$);
+ if (defined $priority) {
+ setpriority(0, $$, $priority + $priority_delta);
+ }
+
+ # Fix the user ID. -><- Add getpwnam so user IDs can be specified
+ # by name. -><- Warn if not superuser to begin with.
+ if (defined $user_id) {
+ $< = $> = $user_id;
+ }
+
+ # Fix the group ID. -><- Add getgrnam so group IDs can be
+ # specified by name. -><- Warn if not superuser to begin with.
+ if (defined $group_id) {
+ $( = $) = $group_id;
+ }
+
+ # Exec the program depending on its form.
+ if (ref($program) eq 'ARRAY') {
+ exec(@$program) or die "can't exec (@$program) in child pid $$: $!";
+ }
+ else {
+ exec($program) or die "can't exec ($program) in child pid $$: $!";
+ }
+ }
+
+ # Parent here.
+
+ my $self = bless
+ [ &POE::Wheel::allocate_wheel_id(), # UNIQUE_ID
+ POE::Driver::SysRW->new(), # DRIVER
+ $error_event, # ERROR_EVENT
+ $program, # PROGRAM
+ $pid, # CHILD_PID
+ # STDIN
+ $stdin_write, # HANDLE_STDIN
+ $stdin_filter, # FILTER_STDIN
+ $stdin_event, # EVENT_STDIN
+ undef, # STATE_STDIN
+ 0, # OCTETS_STDIN
+ # STDOUT
+ $stdout_read, # HANDLE_STDOUT
+ $stdout_filter, # FILTER_STDOUT
+ $stdout_event, # EVENT_STDOUT
+ undef, # STATE_STDOUT
+ # STDERR
+ $stderr_read, # HANDLE_STDERR
+ $stderr_filter, # FILTER_STDERR
+ $stderr_event, # EVENT_STDERR
+ undef, # STATE_STDERR
+ ], $type;
+
+ $self->_define_stdin_flusher() if defined $stdin_event;
+ $self->_define_stdout_reader() if defined $stdout_event;
+ $self->_define_stderr_reader() if defined $stderr_event;
+
+ return $self;
+}
+
+#------------------------------------------------------------------------------
+# Define the internal state that will flush output to the child
+# process' STDIN pipe.
+
+sub _define_stdin_flusher {
+ my $self = shift;
+
+ # Read-only members. If any of these change, then the write state
+ # is invalidated and needs to be redefined.
+ my $unique_id = $self->[UNIQUE_ID];
+ my $driver = $self->[DRIVER];
+ my $error_event = \$self->[ERROR_EVENT];
+ my $stdin_filter = $self->[FILTER_STDIN];
+ my $stdin_event = \$self->[EVENT_STDIN];
+
+ # Read/write members. These are done by reference, to avoid pushing
+ # $self into the anonymous sub. Extra copies of $self are bad and
+ # can prevent wheels from destructing properly.
+ my $stdin_octets = \$self->[OCTETS_STDIN];
+
+ # Register the select-write handler.
+ $poe_kernel->state
+ ( $self->[STATE_STDIN] = $self . ' select stdin',
+ sub { # prevents SEGV
+ 0 && CRIMSON_SCOPE_HACK('<');
+ # subroutine starts here
+ my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+
+ $$stdin_octets = $driver->flush($handle);
+
+ # When you can't write, nothing else matters.
+ if ($!) {
+ $$error_event && $k->call( $me, $$error_event,
+ 'write', ($!+0), $!, $unique_id
+ );
+ $k->select_write($handle);
+ }
+
+ # Could write, or perhaps couldn't but only because the
+ # filehandle's buffer is choked.
+ else {
+
+ # All chunks written; fire off a "flushed" event.
+ unless ($$stdin_octets) {
+ $k->select_pause_write($handle);
+ $$stdin_event && $k->call($me, $$stdin_event, $unique_id);
+ }
+ }
+ }
+ );
+
+ $poe_kernel->select_write($self->[HANDLE_STDIN], $self->[STATE_STDIN]);
+
+ # Pause the write select immediately, unless output is pending.
+ $poe_kernel->select_pause_write($self->[HANDLE_STDIN])
+ unless ($self->[OCTETS_STDIN]);
+}
+
+#------------------------------------------------------------------------------
+# Define the internal state that will read input from the child
+# process' STDOUT pipe. This is virtually identical to
+# _define_stderr_reader, but they aren't implemented as a common
+# function for speed reasons.
+
+sub _define_stdout_reader {
+ my $self = shift;
+
+ # Register the select-read handler for STDOUT.
+ if (defined $self->[EVENT_STDOUT]) {
+
+ # If any of these change, then the read state is invalidated and
+ # needs to be redefined.
+ my $unique_id = $self->[UNIQUE_ID];
+ my $driver = $self->[DRIVER];
+ my $error_event = \$self->[ERROR_EVENT];
+ my $stdout_filter = $self->[FILTER_STDOUT];
+ my $stdout_event = \$self->[EVENT_STDOUT];
+
+ $poe_kernel->state
+ ( $self->[STATE_STDOUT] = $self . ' select stdout',
+ sub {
+ # prevents SEGV
+ 0 && CRIMSON_SCOPE_HACK('<');
+
+ # subroutine starts here
+ my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+ if (defined(my $raw_input = $driver->get($handle))) {
+ foreach my $cooked_input (@{$stdout_filter->get($raw_input)}) {
+ $k->call($me, $$stdout_event, $cooked_input, $unique_id);
+ }
+ }
+ else {
+ $$error_event and
+ $k->call( $me, $$error_event, 'read', ($!+0), $!, $unique_id );
+ $k->select_read($handle);
+ }
+ }
+ );
+
+ # register the state's select
+ $poe_kernel->select_read($self->[HANDLE_STDOUT], $self->[STATE_STDOUT]);
+ }
+
+ # Register the select-read handler for STDOUT.
+ else {
+ $poe_kernel->select_read($self->[HANDLE_STDOUT])
+ }
+}
+
+#------------------------------------------------------------------------------
+# Define the internal state that will read input from the child
+# process' STDERR pipe.
+
+sub _define_stderr_reader {
+ my $self = shift;
+
+ # Register the select-read handler for STDERR.
+ if (defined $self->[EVENT_STDERR]) {
+
+ # If any of these change, then the read state is invalidated and
+ # needs to be redefined.
+ my $unique_id = $self->[UNIQUE_ID];
+ my $driver = $self->[DRIVER];
+ my $error_event = \$self->[ERROR_EVENT];
+ my $stderr_filter = $self->[FILTER_STDERR];
+ my $stderr_event = \$self->[EVENT_STDERR];
+
+ $poe_kernel->state
+ ( $self->[STATE_STDERR] = $self . ' select stderr',
+ sub {
+ # prevents SEGV
+ 0 && CRIMSON_SCOPE_HACK('<');
+
+ # subroutine starts here
+ my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
+ if (defined(my $raw_input = $driver->get($handle))) {
+ foreach my $cooked_input (@{$stderr_filter->get($raw_input)}) {
+ $k->call($me, $$stderr_event, $cooked_input, $unique_id);
+ }
+ }
+ else {
+ $$error_event and
+ $k->call( $me, $$error_event, 'read', ($!+0), $!, $unique_id );
+ $k->select_read($handle);
+ }
+ }
+ );
+
+ # register the state's select
+ $poe_kernel->select_read($self->[HANDLE_STDERR], $self->[STATE_STDERR]);
+ }
+
+ # Register the select-read handler for STDERR.
+ else {
+ $poe_kernel->select_read($self->[HANDLE_STDERR])
+ }
+}
+
+#------------------------------------------------------------------------------
+# Redefine events.
+
+sub event {
+ my $self = shift;
+ push(@_, undef) if (scalar(@_) & 1);
+
+ my ($redefine_stdin, $redefine_stdout, $redefine_stderr) = (0, 0, 0);
+
+ while (@_) {
+ my ($name, $event) = splice(@_, 0, 2);
+
+ if ($name eq 'StdinEvent') {
+ $self->[EVENT_STDIN] = $event;
+ $redefine_stdin = 1;
+ }
+ elsif ($name eq 'StdoutEvent') {
+ $self->[EVENT_STDOUT] = $event;
+ $redefine_stdout = 1;
+ }
+ elsif ($name eq 'StderrEvent') {
+ $self->[EVENT_STDERR] = $event;
+ $redefine_stderr = 1;
+ }
+ elsif ($name eq 'ErrorEvent') {
+ $self->[ERROR_EVENT] = $event;
+ $redefine_stdin = $redefine_stdout = $redefine_stderr = 1;
+ }
+ else {
+ carp "ignoring unknown ReadWrite parameter '$name'";
+ }
+ }
+
+ $self->_define_stdin_flusher() if defined $redefine_stdin;
+ $self->_define_stdout_reader() if defined $redefine_stdout;
+ $self->_define_stderr_reader() if defined $redefine_stderr;
+}
+
+#------------------------------------------------------------------------------
+# Destroy the wheel.
+
+sub DESTROY {
+ my $self = shift;
+
+ # Turn off the STDIN thing.
+ $poe_kernel->select($self->[HANDLE_STDIN]);
+ if ($self->[STATE_STDIN]) {
+ $poe_kernel->state($self->[STATE_STDIN]);
+ $self->[STATE_STDIN] = undef;
+ }
+
+ $poe_kernel->select($self->[HANDLE_STDOUT]);
+ if ($self->[STATE_STDOUT]) {
+ $poe_kernel->state($self->[STATE_STDOUT]);
+ $self->[STATE_STDOUT] = undef;
+ }
+
+ $poe_kernel->select($self->[HANDLE_STDERR]);
+ if ($self->[STATE_STDERR]) {
+ $poe_kernel->state($self->[STATE_STDERR]);
+ $self->[STATE_STDERR] = undef;
+ }
+
+ &POE::Wheel::free_wheel_id($self->[UNIQUE_ID]);
+}
+
+#------------------------------------------------------------------------------
+# Queue input for the child process.
+
+sub put {
+ my ($self, @chunks) = @_;
+ if ( $self->[OCTETS_STDIN] =
+ $self->[DRIVER]->put($self->[FILTER_STDIN]->put(\@chunks))
+ ) {
+ $poe_kernel->select_resume_write($self->[HANDLE_STDIN]);
+ }
+
+ # No watermark.
+ return 0;
+}
+
+#------------------------------------------------------------------------------
+# Redefine filters, one at a time or at once. This is based on PG's
+# code in Wheel::ReadWrite.
+
+sub set_filter {
+}
+
+sub set_stdin_filter {
+}
+
+sub set_stdout_filter {
+}
+
+sub set_stderr_filter {
+}
+
+#------------------------------------------------------------------------------
+# Data accessors.
+
+sub get_driver_out_octets {
+ $_[0]->[OCTETS_STDIN];
+}
+
+sub get_driver_out_messages {
+ $_[0]->[DRIVER]->get_out_messages_buffered();
+}
+
+sub ID {
+ $_[0]->[UNIQUE_ID];
+}
+
+sub PID {
+ $_[0]->[CHILD_PID];
+}
+
+###############################################################################
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Wheel::Run - event driven fork/exec with added value
+
+=head1 SYNOPSIS
+
+ $wheel = POE::Wheel::Run->new(
+
+ # -><- code
+ );
+
+ # -><- code
+
+=head1 DESCRIPTION
+
+Wheel::Run spawns child processes and establishes non-blocking, event
+based communication with them.
+
+=head1 PUBLIC METHODS
+
+=over 2
+
+=item new LOTS_OF_STUFF
+
+-><- code etc
+
+=back
+
+=head1 EVENTS AND PARAMETERS
+
+=over 2
+
+=item StdinEvent
+
+-><- code etc
+
+=back
+
+=head1 SEE ALSO
+
+POE::Wheel.
+
+The SEE ALSO section in L<POE> contains a table of contents covering
+the entire POE distribution.
+
+=head1 BUGS
+
+None currently known.
+
+=head1 AUTHORS & COPYRIGHTS
+
+Please see L<POE> for more information about authors and contributors.
+
+=cut
View
2 tests/14_wheels_ft.t
@@ -1,4 +1,4 @@
-#!/usr/bin/perl /w
+#!/usr/bin/perl -w
# $Id$
# Exercises Wheel::FollowTail, Wheel::ReadWrite, and Filter::Block.
View
186 tests/22_wheel_run.t
@@ -0,0 +1,186 @@
+#!/usr/bin/perl -w
+# $Id$
+
+# Test the portable pipe classes and Wheel::Run, which uses them.
+
+use strict;
+use lib qw(./lib ../lib);
+use Socket;
+
+use TestSetup;
+&test_setup(18);
+
+# Turn on all asserts, and use POE and other modules.
+sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+use POE qw( Wheel::Run Filter::Line Pipe::Bidirectional Pipe::Unidirectional );
+
+### Test unidirectional pipe() pipe.
+{ my ($uni_read, $uni_write) = POE::Pipe::Unidirectional->new('pipe');
+
+ if (defined $uni_read and defined $uni_write) {
+ &ok(1);
+
+ print $uni_write "whee pipe\n";
+ my $uni_input = <$uni_read>; chomp $uni_input;
+ &ok_if( 2, $uni_input eq 'whee pipe' );
+ }
+ else {
+ &many_not_ok(1, 2, "Skip: pipe not supported");
+ }
+}
+
+### Test unidirectional socketpair() pipe.
+{ my ($uni_read, $uni_write) = POE::Pipe::Unidirectional->new('socketpair');
+
+ if (defined $uni_read and defined $uni_write) {
+ &ok(3);
+
+ print $uni_write "whee socketpair\n";
+ my $uni_input = <$uni_read>; chomp $uni_input;
+ &ok_if( 4, $uni_input eq 'whee socketpair' );
+ }
+ else {
+ &many_not_ok(3, 4, "Skip: socketpair not supported");
+ }
+}
+
+### Test unidirectional pair of inet sockets.
+{ my ($uni_read, $uni_write) = POE::Pipe::Unidirectional->new('inet');
+
+ if (defined $uni_read and defined $uni_write) {
+ &ok(5);
+
+ print $uni_write "whee inet\n";
+ my $uni_input = <$uni_read>; chomp $uni_input;
+ &ok_if( 6, $uni_input eq 'whee inet' );
+ }
+ else {
+ &many_not_ok(5, 6, "Skip: inet sockets not supported");
+ }
+}
+
+### Test bidirectional pipe.
+{ my ($a_rd, $a_wr, $b_rd, $b_wr) =
+ POE::Pipe::Bidirectional->new('pipe');
+
+ if (defined $a_rd and defined $a_wr and defined $b_rd and defined $b_wr) {
+ &ok(7);
+
+ print $a_wr "a wr inet\n";
+ my $b_input = <$b_rd>; chomp $b_input;
+ &ok_if(8, $b_input eq 'a wr inet');
+
+ print $b_wr "b wr inet\n";
+ my $a_input = <$a_rd>; chomp $a_input;
+ &ok_if(9, $a_input eq 'b wr inet');
+ }
+ else {
+ &many_not_ok(7, 9, "Skip: pipe not supported");
+ }
+}
+
+### Test bidirectional socketpair.
+{ my ($a_rd, $a_wr, $b_rd, $b_wr) =
+ POE::Pipe::Bidirectional->new('socketpair');
+
+ if (defined $a_rd and defined $a_wr and defined $b_rd and defined $b_wr) {
+ &ok(10);
+
+ print $a_wr "a wr inet\n";
+ my $b_input = <$b_rd>; chomp $b_input;
+ &ok_if(11, $b_input eq 'a wr inet');
+
+ print $b_wr "b wr inet\n";
+ my $a_input = <$a_rd>; chomp $a_input;
+ &ok_if(12, $a_input eq 'b wr inet');
+ }
+ else {
+ &many_not_ok(10, 12, "Skip: socketpair not supported");
+ }
+}
+
+### Test bidirectional inet sockets.
+{ my ($a_rd, $a_wr, $b_rd, $b_wr) =
+ POE::Pipe::Bidirectional->new('inet');
+
+ if (defined $a_rd and defined $a_wr and defined $b_rd and defined $b_wr) {
+ &ok(13);
+
+ print $a_wr "a wr inet\n";
+ my $b_input = <$b_rd>; chomp $b_input;
+ &ok_if(14, $b_input eq 'a wr inet');
+
+ print $b_wr "b wr inet\n";
+ my $a_input = <$a_rd>; chomp $a_input;
+ &ok_if(15, $a_input eq 'b wr inet');
+ }
+ else {
+ &many_not_ok(13, 15, "Skip: inet sockets not supported");
+ }
+}
+
+### Test Wheel::Run. Uses "!" as a newline to avoid having to deal
+### with whatever the system uses.
+
+my $program =
+ ( '/usr/bin/perl -we \'' .
+ '$/ = q(!); select STDERR; $| = 1; select STDOUT; $| = 1; ' .
+ 'while (<STDIN>) { ' .
+ 'last if /^bye/; ' .
+ ' print(STDOUT qq(out: $_)) if s/^out //; ' .
+ ' print(STDERR qq(err: $_)) if s/^err //; ' .
+ '} ' .
+ 'exit 0;\''
+ );
+
+my $flush_count = 0;
+
+POE::Session->create
+ ( inline_states =>
+ { _start => sub {
+ my ($kernel, $heap) = @_[KERNEL, HEAP];
+
+ # Run a child process.
+ $heap->{wheel} = POE::Wheel::Run->new
+ ( Program => $program,
+ Filter => POE::Filter::Line->new( Literal => "!" ),
+ StdoutEvent => 'stdout',
+ StderrEvent => 'stderr',
+ ErrorEvent => 'error',
+ StdinEvent => 'stdin',
+ );
+
+ # Ask the child for something on stdout.
+ $heap->{wheel}->put( 'out test-out' );
+ },
+
+ # Catch SIGCHLD. Stop the wheel if the exited child is ours.
+ _signal => sub {
+ my $signame = $_[ARG0];
+ if ($signame eq 'CHLD') {
+ my ($heap, $child_pid) = @_[HEAP, ARG1];
+ delete $heap->{wheel} if $child_pid == $heap->{wheel}->PID();
+ }
+ return 0;
+ },
+
+ # Count every line that's flushed to the child.
+ stdin => sub { $flush_count++; },
+
+ # Got a stdout response. Ask for something on stderr.
+ stdout => sub { &ok_if(17, $_[ARG0] eq 'out: test-out');
+ $_[HEAP]->{wheel}->put( 'err test-err' );
+ },
+
+ # Got a sterr response. Tell the child to exit.
+ stderr => sub { &ok_if(18, $_[ARG0] eq 'err: test-err');
+ $_[HEAP]->{wheel}->put( 'bye' );
+ },
+ },
+ );
+
+$poe_kernel->run();
+
+&ok_if( 16, $flush_count == 3 );
+
+&results();

0 comments on commit c377104

Please sign in to comment.