Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Deliver PID exit noficiations only to interested objects.

Agorman in irc.perl.org #reflex discovered that multiple PID watchers
receive notification of all exiting child processes.  This commit
limits notification to just those PIDs that are watched.
  • Loading branch information...
commit 344d969e7647059de8a988d18b1eb01d4bc7b885 1 parent fc6cbf2
@rcaputo authored
View
20 eg/eg-41-signal-twice.pl
@@ -0,0 +1,20 @@
+# Watch signals a few different ways.
+
+use warnings;
+use strict;
+use lib qw(../lib);
+
+use Reflex::Signal;
+use ExampleHelpers qw(eg_say);
+
+eg_say("Process $$ is waiting for SIGUSR1 twice.");
+
+my $usr1_a = Reflex::Signal->new(
+ signal => "USR1",
+ on_signal => sub { eg_say("Got SIGUSR1 callback.") },
+);
+
+my $usr1_b = Reflex::Signal->new( signal => "USR1" );
+while ($usr1_b->next()) {
+ eg_say("Got SIGUSR1 from promise.");
+}
View
48 lib/Reflex/PID.pm
@@ -1,33 +1,29 @@
package Reflex::PID;
use Moose;
-extends qw(Reflex::Signal);
+extends 'Reflex::Base';
-has '+signal' => (
- required => 0,
- default => 'CHLD',
-);
-
-has 'pid' => (
- isa => 'Int',
+has pid => (
is => 'ro',
+ isa => 'Int',
required => 1,
- default => sub { die "required" },
);
-__PACKAGE__->_register_signal_params(qw(pid exit));
-
-sub resume {
- my $self = shift;
- return unless $self->call_gate("resume");
- $POE::Kernel::poe_kernel->sig_child($self->pid(), "signal_happened");
-}
+has active => (
+ is => 'ro',
+ isa => 'Bool',
+ default => 1,
+);
-sub pause {
- my $self = shift;
- return unless $self->call_gate("pause");
- $POE::Kernel::poe_kernel->sig_child($self->pid(), undef);
-}
+with 'Reflex::Role::PidCatcher' => {
+ pid => 'pid',
+ active => 'active',
+ cb_exit => 'on_exit',
+ method_start => 'start',
+ method_stop => 'stop',
+ method_pause => 'pause',
+ method_resume => 'resume',
+};
1;
@@ -44,11 +40,11 @@ Reflex::PID - Observe the exit of a subprocess by its SIGCHLD signal.
use Reflex::PID;
- has sigchild_watcher => (
+ has pid_watcher => (
isa => 'Reflex::PID|Undef',
is => 'rw',
traits => ['Reflex::Trait::Observed'],
- role => 'sigchld',
+ role => 'process',
);
sub some_method {
@@ -64,7 +60,7 @@ Reflex::PID - Observe the exit of a subprocess by its SIGCHLD signal.
);
}
- sub on_sigchld_signal {
+ sub on_process_exit {
# Handle the event.
}
@@ -87,9 +83,9 @@ Reflex::Collection won't know when to destroy them.
=head2 Public Events
-=head3 signal
+=head3 exit
-Reflex::PID's "signal" event includes two named parameters. "pid"
+Reflex::PID's "exit" event includes two named parameters. "pid"
contains the process ID that exited. "exit" contains the process'
exit value---a copy of C<$?> at the time the process exited. Please
see L<perlvar/"$?"> for more information about that special Perl
View
2  lib/Reflex/POE/Wheel/Run.pm
@@ -99,7 +99,7 @@ sub BUILD {
}
# Rethrow our signal event.
-sub on_sigchld_signal {
+sub on_sigchld_pid {
my ($self, $args) = @_;
$self->emit(
event => 'signal',
View
284 lib/Reflex/Role/PidCatcher.pm
@@ -0,0 +1,284 @@
+package Reflex::Role::PidCatcher;
+use Reflex::Role;
+
+use Scalar::Util qw(weaken);
+
+attribute_parameter pid => "pid";
+
+parameter active => (
+ isa => 'Str',
+ default => 'active',
+);
+
+callback_parameter cb_exit => qw( on pid exit );
+method_parameter method_start => qw( start pid _ );
+method_parameter method_stop => qw( stop pid _ );
+method_parameter method_pause => qw( pause pid _ );
+method_parameter method_resume => qw( resume pid _ );
+
+# A session may only watch a distinct pid once.
+# So we must map each distinct pid to all the interested objects.
+# This is class scoped data.
+#
+# TODO - We could put this closer to the POE::Session and obviate the
+# need for the deliver() redirector.
+
+my %callbacks;
+
+sub deliver {
+ my ($class, $signal_name, $pid, $exit, @etc) = @_;
+
+ # If nobody's watching us, then why did we do it in the road?
+ # TODO - Diagnostic warning/error?
+ return unless exists $callbacks{$pid};
+
+ # Calculate the event arguments based on the signal name.
+ my %event_args = (
+ signal => $signal_name,
+ pid => $pid,
+ exit => $exit,
+ );
+
+ # Deliver the signal.
+ # TODO - map() magic to speed this up?
+
+ foreach my $callback_recs (values %{$callbacks{$pid}}) {
+ foreach my $callback_rec (values %$callback_recs) {
+ my ($object, $method) = @$callback_rec;
+ $object->$method(\%event_args);
+ }
+ }
+}
+
+# The role itself.
+
+role {
+ my $p = shift;
+
+ my $pid = $p->pid();
+ my $active = $p->active();
+ my $cb_exit = $p->cb_exit();
+
+ my $method_start = $p->method_start();
+ my $method_stop = $p->method_stop();
+ my $method_pause = $p->method_pause();
+ my $method_resume = $p->method_resume();
+
+ # Work around a Moose edge case.
+ sub BUILD {}
+
+ after BUILD => sub {
+ return unless $active;
+ shift()->$method_start();
+ return;
+ };
+
+ # Work around a Moose edge case.
+ sub DEMOLISH {}
+
+ after DEMOLISH => sub {
+ shift()->$method_stop();
+ };
+
+ method $method_start => sub {
+ my $self = shift;
+
+ my $pid_value = $self->$pid();
+
+ # Register this object with that PID.
+ $callbacks{$pid_value}->{$self->session_id()}->{$self} = [
+ $self, $cb_exit
+ ];
+ weaken $callbacks{$pid_value}->{$self->session_id()}->{$self}->[0];
+
+ # First time this object is watching that PID? Start the
+ # watcher. Otherwise, a watcher should already be going.
+
+ return if (
+ (scalar keys %{$callbacks{$pid_value}->{$self->session_id()}}) > 1
+ );
+
+ $self->$method_resume();
+ };
+
+ method $method_pause => sub {
+ my $self = shift;
+
+ # Be in the session associated with this object.
+ return unless $self->call_gate($method_pause);
+
+ $POE::Kernel::poe_kernel->sig_child($self->$pid(), undef);
+ };
+
+ method $method_resume => sub {
+ my $self = shift;
+
+ # Be in the session associated with this object.
+ return unless $self->call_gate($method_resume);
+
+ $POE::Kernel::poe_kernel->sig_child(
+ $self->$pid(), "signal_happened", ref($self)
+ );
+ };
+
+ method $method_stop => sub {
+ my $self = shift;
+
+ my $pid_value = $self->$pid();
+
+ # Nothing to do?
+ return unless exists $callbacks{$pid_value}->{$self->session_id()};
+
+ # Unregister this object with that signal.
+ my $sw = $callbacks{$pid_value}->{$self->session_id()};
+ return unless delete $sw->{$self};
+
+ # Deactivate the signal watcher if this was the last object.
+ unless (scalar keys %$sw) {
+ delete $callbacks{$pid_value}->{$self->session_id()};
+ delete $callbacks{$pid_value} unless (
+ scalar keys %{$callbacks{$pid_value}}
+ );
+ $self->$method_pause();
+ }
+ };
+
+ method_emit $cb_exit => "pid";
+};
+
+__END__
+
+=head1 NAME
+
+Reflex::Role::PidCatcher - add async process reaping behavior to a class
+
+=head1 SYNOPSIS
+
+ package Reflex::PID;
+
+ use Moose;
+ extends 'Reflex::Base';
+
+ has pid => (
+ is => 'ro',
+ isa => 'Int',
+ required => 1,
+ );
+
+ has active => (
+ is => 'ro',
+ isa => 'Bool',
+ default => 1,
+ );
+
+ with 'Reflex::Role::PidCatcher' => {
+ pid => 'pid',
+ active => 'active',
+ cb_exit => 'on_exit',
+ method_start => 'start',
+ method_stop => 'stop',
+ method_pause => 'pause',
+ method_resume => 'resume',
+ };
+
+ 1;
+
+=head1 DESCRIPTION
+
+Reflex::Role::PidCatcher is a Moose parameterized role that adds
+asynchronous child process reaping behavior to Reflex based classes.
+The SYNOPSIS is the entire implementation of Reflex::PID, a simple
+class that allows Reflex::Role::PidCatcher to be used as an object.
+
+=head2 Required Role Parameters
+
+None. All role parameters as of this writing have what we hope are
+sensible defaults. Please let us know if they don't seem all that
+sensible.
+
+=head2 Optional Role Parameters
+
+=head3 pid
+
+C<pid> sets the name of an attribute that will contain the process ID
+to wait for. Process IDs must be integers.
+
+=head3 active
+
+C<active> specifies whether Reflex::Role::PidCatcher should be created
+in the active, process-watching state. All Reflex watchers are
+enabled by default. Set it to a false value, preferably 0, to
+initialize the catcher in an inactive or paused mode.
+
+Process watchers may currently be paused and resumed, but this
+functionality may be dropped later. It's not good to leave child
+processes hanging. See C<method_pause> and C<method_resume> for ways
+to override the default method names.
+
+=head3 cb_exit
+
+C<cb_exit> names the $self method that will be called when the child
+process identified in C<<$self->$pid()>> exits. It defaults to
+"on_%s_exit", where %s is the name of the PID attribute. For example,
+it will be "on_transcoder_exit" if the process ID is stored in a
+"transcoder" attribute.
+
+=head3 method_start
+
+C<method_start> sets the name of the method that may be used to start
+watching for a process exit. It's "start_%s" by default, where %s is
+the name of the process ID's attribute.
+
+Reflex::Role::PidCatcher will automatically start watching for its
+process ID if the value of C<active>'s attribute is true.
+
+=head3 method_stop
+
+C<method_stop> may be used to permanently stop a process ID watcher.
+Stopped watchers cannot be restarted, so use C<method_pause> if you
+need to temporarily disable them instead. C<method_resume> may be
+used to resume them again.
+
+Process ID catchers will automatically stop watching for process exit
+upon DEMOLISH.
+
+=head3 method_pause
+
+C<method_pause> sets the name of the method that may be used to pause
+process catching. It is "pause_%s" by default, where %s is the name
+of the PID attribute.
+
+=head3 method_resume
+
+C<method_resume> sets the name of the method that may be used to
+resume process reaping. It is "resume_%s" by default, where %s is the
+name of the attribute holding the process ID.
+
+=head1 EXAMPLES
+
+Reflex::Role::PidCatcher was initially written to support Reflex::PID,
+so there aren't many examples of the role's use by itself.
+
+L<Reflex::POE::Wheel::Run> actualy uses Reflex::PID.
+
+eg/eg-07-wheel-run.pl uses Reflex::POE::Wheel::Run.
+
+=head1 SEE ALSO
+
+L<Reflex>
+L<Reflex::Role::SigCatcher>
+L<Reflex::Signal>
+L<Reflex::Role::PidCatcher>
+L<Reflex::PID>
+
+L<Reflex/ACKNOWLEDGEMENTS>
+L<Reflex/ASSISTANCE>
+L<Reflex/AUTHORS>
+L<Reflex/BUGS>
+L<Reflex/BUGS>
+L<Reflex/CONTRIBUTORS>
+L<Reflex/COPYRIGHT>
+L<Reflex/LICENSE>
+L<Reflex/TODO>
+
+=cut
View
3  lib/Reflex/Role/Reactive.pm
@@ -67,7 +67,8 @@ my $singleton_session_id = POE::Session->create(
### Signals.
signal_happened => sub {
- Reflex::Role::SigCatcher->deliver(@_[ARG0..$#_]);
+ my $signal_class = pop @_;
+ $signal_class->deliver(@_[ARG0..$#_]);
},
### Cross-session emit() is converted into these events.
View
14 lib/Reflex/Role/SigCatcher.pm
@@ -28,7 +28,9 @@ my %signal_param_names;
sub _register_signal_params {
my ($class, @names) = @_;
- $signal_param_names{$class->meta->get_attribute("signal")->default()} = \@names;
+ $signal_param_names{$class->meta->get_attribute("signal")->default()} = (
+ \@names
+ );
}
sub deliver {
@@ -125,7 +127,9 @@ role {
# Be in the session associated with this object.
return unless $self->call_gate($method_resume);
- $POE::Kernel::poe_kernel->sig($self->$signal(), "signal_happened");
+ $POE::Kernel::poe_kernel->sig(
+ $self->$signal(), "signal_happened", ref($self)
+ );
};
method $method_stop => sub {
@@ -196,7 +200,7 @@ entire implementation of Reflex::SigCatcher, a simple class that
allows Reflex::Role::SigCatcher to be used as an object.
Reflex::Role::SigCatcher is not suitable for SIGCHLD use. The
-specialized Reflex::Role::PidReaper class is used for that, and it
+specialized Reflex::Role::PidCatcher class is used for that, and it
will automatically wait() for processes and return their exit
statuses.
@@ -280,8 +284,8 @@ Reflex::Role::SigCatcher.
L<Reflex>
L<Reflex::Signal>
-L<Reflex::Role::PidReaper>
-L<Reflex::PidReaper>
+L<Reflex::Role::PidCatcher>
+L<Reflex::PID>
L<Reflex/ACKNOWLEDGEMENTS>
L<Reflex/ASSISTANCE>
View
8 lib/Reflex/Signal.pm
@@ -68,8 +68,8 @@ That role's documentation contains important details that won't be
covered here.
Reflex::Signal is not suitable for SIGCHLD use. The specialized
-Reflex::PidReaper class is used for that, and it will automatically
-wait() for processes and return their exit statuses.
+Reflex::PID class is used for that, and it will automatically wait()
+for processes and return their exit statuses.
=head2 Public Attributes
@@ -188,8 +188,8 @@ Reflex::Stream and a few other classes.
L<Reflex>
L<Reflex::Role::SigCatcher>
-L<Reflex::Role::PID>
-L<Reflex::PidReaper>
+L<Reflex::Role::PidCatcher>
+L<Reflex::PID>
L<Reflex/ACKNOWLEDGEMENTS>
L<Reflex/ASSISTANCE>
Please sign in to comment.
Something went wrong with that request. Please try again.