Skip to content
Browse files

Add a signal watcher class, and port POE::Wheel wrappers forward.

  • Loading branch information...
1 parent bdf5d7f commit 05d2e40d383a68baaa46d4cbbeb539a24c4866b4 @rcaputo committed Aug 16, 2009
Showing with 565 additions and 13 deletions.
  1. +114 −0 docs/Signal.pm
  2. +47 −0 docs/SignalChild.pm
  3. +74 −10 docs/StageRole.pm
  4. +138 −0 docs/Wheel.pm
  5. +116 −0 docs/WheelRun.pm
  6. +67 −0 docs/eg-07-wheel-run.pl
  7. +9 −3 docs/requirements.otl
View
114 docs/Signal.pm
@@ -0,0 +1,114 @@
+package Signal;
+
+use Moose;
+extends qw(Stage);
+use Scalar::Util qw(weaken);
+
+# A session may only watch a distinct signal once.
+# So we must map each distinct signal to all the interested objects.
+
+my %session_watchers;
+
+has name => (
+ isa => 'Str|Undef',
+ is => 'rw',
+ default => 'TERM',
+);
+
+sub event_param_names {
+ return [ ];
+}
+
+sub BUILD {
+ my $self = shift;
+
+ # Register this object with that signal.
+ $session_watchers{$self->name()}->{$self->session_id()}->{$self} = $self;
+ weaken $session_watchers{$self->name()}->{$self->session_id()}->{$self};
+
+ if (
+ (
+ scalar keys
+ %{$session_watchers{$self->name()}->{$self->session_id()}}
+ ) == 1
+ ) {
+ $self->start_watching();
+ }
+}
+
+sub DESTROY {
+ my $self = shift;
+
+ my $sw = $session_watchers{$self->name()}->{$self->session_id()};
+ delete $sw->{$self};
+
+ unless (scalar keys %$sw) {
+ delete $session_watchers{$self->name()};
+
+ delete $session_watchers{$self->name()} unless (
+ scalar keys %{$session_watchers{$self->name()}}
+ );
+
+ $self->stop_watching();
+ }
+}
+
+sub start_watching {
+ my $self = shift;
+
+ return $POE::Kernel::poe_kernel->call(
+ $self->session_id(), "call_gate", $self, "start_watching", @_
+ ) if (
+ $self->session_id() ne $POE::Kernel::poe_kernel->get_active_session()->ID()
+ );
+
+ $POE::Kernel::poe_kernel->sig($self->name(), "signal_happened");
+}
+
+sub stop_watching {
+ my $self = shift;
+
+ return $POE::Kernel::poe_kernel->call(
+ $self->session_id(), "call_gate", $self, "stop_watching", @_
+ ) if (
+ $self->session_id() ne $POE::Kernel::poe_kernel->get_active_session()->ID()
+ );
+
+ $POE::Kernel::poe_kernel->sig($self->name(), undef);
+ $self->name(undef);
+}
+
+sub _deliver {
+ my ($class, $signal_name, @signal_args) = @_;
+
+ # If nobody's watching us, then why did we do it in the road?
+ return unless exists $session_watchers{$signal_name};
+
+ # Deliver the signal.
+
+ while (
+ my ($session_id, $stage_rec) = each %{$session_watchers{$signal_name}}
+ ) {
+ foreach my $stage (values %$stage_rec) {
+
+ # TODO - All stages here theoretically have the same class, and
+ # therefore the same parameters. Don't recalculate %args.
+ my $i = 0;
+ my $param_names = $stage->event_param_names();
+ my %event_args = map { $_ => $signal_args[$i++] } @$param_names;
+ $event_args{name} = $signal_name;
+
+ $stage->emit(
+ event => 'signal',
+ args => \%event_args,
+ );
+ }
+ }
+}
+
+sub DEMOLISH {
+ my $self = shift;
+ $self->stop_watching() if defined $self->name();
+}
+
+1;
View
47 docs/SignalChild.pm
@@ -0,0 +1,47 @@
+package SignalChild;
+
+use Moose;
+extends qw(Signal);
+
+has '+name' => (
+ default => 'CHLD',
+);
+
+has 'pid' => (
+ isa => 'Int',
+ is => 'ro',
+ required => 1,
+ default => sub { die "required" },
+);
+
+sub event_param_names {
+ return [qw(pid exit)];
+}
+
+sub start_watching {
+ my $self = shift;
+
+ return $POE::Kernel::poe_kernel->call(
+ $self->session_id(), "call_gate", $self, "start_watching", @_
+ ) if (
+ $self->session_id() ne $POE::Kernel::poe_kernel->get_active_session()->ID()
+ );
+
+ $POE::Kernel::poe_kernel->sig_child($self->pid(), "signal_happened");
+}
+
+sub stop_watching {
+ my $self = shift;
+
+ return $POE::Kernel::poe_kernel->call(
+ $self->session_id(), "call_gate", $self, "stop_watching", @_
+ ) if (
+ $self->session_id() ne $POE::Kernel::poe_kernel->get_active_session()->ID()
+ );
+
+ $POE::Kernel::poe_kernel->sig_child($self->pid(), undef);
+ $self->name(undef);
+}
+
+1;
+
View
84 docs/StageRole.pm
@@ -14,7 +14,7 @@ our @CARP_NOT = (__PACKAGE__);
# Singleton POE::Session.
-sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+#sub POE::Kernel::ASSERT_DEFAULT () { 1 }
use POE;
# Disable a warning.
@@ -28,11 +28,15 @@ my $singleton_session_id = POE::Session->create(
# Although we're using the $singleton_session_id, so why bother?
_start => sub {
- # No-op.
+ # No-op to satisfy assertions.
+ undef;
+ },
+ _stop => sub {
+ # No-op to satisfy assertions.
+ undef;
},
- # Handle a timer. Deliver it to its resource.
- # $resource is an envelope around a weak POE::Watcher reference.
+ ### Timer manipulators and callbacks.
timer_set => sub {
my ($kernel, $interval, $object) = @_[KERNEL, ARG0, ARG1];
@@ -59,6 +63,8 @@ my $singleton_session_id = POE::Session->create(
die if $@;
},
+ ### I/O manipulators and callbacks.
+
select_on => sub {
my ($kernel, $object, @selects) = @_[KERNEL, ARG0..$#_];
@@ -100,19 +106,71 @@ my $singleton_session_id = POE::Session->create(
die if $@;
},
+ ### Signals.
+
+ signal_happened => sub {
+ Signal->_deliver(@_[ARG0..$#_]);
+ },
+
+ signal_child_watch => sub {
+ my ($kernel, $stage, $pid) = @_;
+ $kernel->sig_child($pid => $stage->poe_event_name());
+ },
+
+ ### Cross-session emit() is converted into these events.
+
emit_to_coderef => sub {
my ($callback, $args) = @_[ARG0, ARG1];
$callback->($args);
},
emit_to_method => sub {
- my ($observer, $method, $args) = @_[ARG0..ARG2];
+ my ($observer, $method, $args) = @_[ARG0..$#_];
$observer->$method($args);
},
- _stop => sub {
- #warn "stage session stopped";
- undef;
+ # Used to call a stage's method in the appropriate session.
+ call_gate => sub {
+ my ($stage, $method, @args) = @_[ARG0..$#_];
+ $stage->$method(@args);
+ },
+
+ ### Support POE::Wheel classes.
+
+ # Deliver to wheels based on the wheel ID. Different wheels pass
+ # their IDs in different ARGn offsets, so we need a few of these.
+ wheel_setup => sub {
+ my ($wheel_class, $args, $stage) = @_[ARG0, ARG1, ARG2];
+ $stage->create_wheel($wheel_class, $args);
+ },
+ wheel_shutodnw => sub {
+ my $stage = $_[ARG0];
+ $stage->demolish_wheel();
+ },
+ wheel_event_0 => sub {
+ $_[CALLER_FILE] =~ m{/([^/.]+)\.pm};
+ eval { "Wheel$1"->deliver(0, @_[ARG0..$#_]); };
+ die if $@;
+ },
+ wheel_event_1 => sub {
+ $_[CALLER_FILE] =~ m{/([^/.]+)\.pm};
+ eval { "Wheel$1"->deliver(1, @_[ARG0..$#_]); };
+ die if $@;
+ },
+ wheel_event_2 => sub {
+ $_[CALLER_FILE] =~ m{/([^/.]+)\.pm};
+ eval { "Wheel$1"->deliver(2, @_[ARG0..$#_]); };
+ die if $@;
+ },
+ wheel_event_3 => sub {
+ $_[CALLER_FILE] =~ m{/([^/.]+)\.pm};
+ eval { "Wheel$1"->deliver(3, @_[ARG0..$#_]); };
+ die if $@;
+ },
+ wheel_event_4 => sub {
+ $_[CALLER_FILE] =~ m{/([^/.]+)\.pm};
+ eval { "Wheel$1"->deliver(4, @_[ARG0..$#_]); };
+ die if $@;
},
},
)->ID();
@@ -370,7 +428,10 @@ sub emit {
if (ref($callback) eq 'CODE') {
# Same session. Just call it.
- if ($callback_rec->{observer}->session_id() == $self->session_id()) {
+ if (
+ $callback_rec->{observer}->session_id() eq
+ $POE::Kernel::poe_kernel->get_active_session()->ID
+ ) {
$callback->($callback_args);
next CALLBACK;
}
@@ -387,7 +448,10 @@ sub emit {
# Method callback.
- if ($callback_rec->{observer}->session_id() == $self->session_id()) {
+ if (
+ $callback_rec->{observer}->session_id() eq
+ $POE::Kernel::poe_kernel->get_active_session()->ID
+ ) {
# Same session. Just call it.
$callback_rec->{observer}->$callback($callback_args);
}
View
138 docs/Wheel.pm
@@ -0,0 +1,138 @@
+package Wheel;
+use Moose;
+extends 'Stage';
+use Scalar::Util qw(weaken);
+use POE::Wheel;
+
+#has event_to_index => (
+# isa => 'HashRef',
+# is => 'ro',
+# default => sub { die "override" },
+#);
+#
+#has event_emit_names => (
+# isa => 'ArrayRef[Str]',
+# is => 'ro',
+# default => sub { die "override" },
+#);
+#
+#has event_param_names => (
+# isa => 'ArrayRef[ArrayRef]',
+# is => 'ro',
+# default => sub { die "override" },
+#);
+#
+#has wheel_class => (
+# isa => 'Str',
+# is => 'ro',
+# default => sub { die "override" },
+#);
+#
+#has 'valid_params' => (
+# isa => 'HashRef',
+# is => 'ro',
+# default => sub { die "override" },
+#);
+
+has wheel => (
+ isa => 'Object|Undef',
+ is => 'rw',
+);
+
+my %wheel_id_to_object;
+
+sub BUILD {
+ my ($self, $args) = @_;
+
+ my $wheel_class = $self->wheel_class();
+
+ # Get rid of stuff we don't need.
+ foreach my $param (keys %$args) {
+ delete $args->{$param} unless exists $self->valid_params()->{$param};
+ }
+
+ # Map methods to events in the wheel parameters.
+ my $event_to_index = $self->event_to_index();
+ while (my ($wheel_param, $event_idx) = each %$event_to_index) {
+ $args->{$wheel_param} = "wheel_event_$event_idx";
+ }
+
+ if (
+ $self->session_id() eq $POE::Kernel::poe_kernel->get_active_session()->ID()
+ ) {
+ $self->create_wheel($wheel_class, $args);
+ }
+ else {
+ $POE::Kernel::poe_kernel->call(
+ $self->session_id(), "wheel_setup",
+ $wheel_class, $args, $self
+ );
+ }
+}
+
+sub create_wheel {
+ my ($self, $wheel_class, $args) = @_;
+ my $wheel = $wheel_class->new( %$args );
+ $self->wheel($wheel);
+
+ $wheel_id_to_object{$wheel->ID()} = $self;
+ weaken $wheel_id_to_object{$wheel->ID()};
+}
+
+sub wheel_id {
+ my $self = shift;
+ return $self->wheel()->ID();
+}
+
+sub DEMOLISH {
+ my $self = shift;
+ if (
+ $self->session_id() eq $POE::Kernel::poe_kernel->get_active_session()->ID()
+ ) {
+ $self->demolish_wheel($self);
+ }
+ else {
+ $POE::Kernel::poe_kernel->call(
+ $self->session_id(), "wheel_shutdown",
+ $$self
+ );
+ }
+}
+
+sub demolish_wheel {
+ my $self = shift;
+ if (defined $self->wheel()) {
+ delete $wheel_id_to_object{ $self->wheel_id() };
+ $self->wheel(undef);
+ }
+}
+
+sub deliver {
+ my ($class, $event_idx, @event_args) = @_;
+
+ # Map parameter offsets to named parameters.
+ my $param_names = $class->event_param_names()->[$event_idx];
+
+ my $i = 0;
+ my %event_args = map { $_ => $event_args[$i++] } @$param_names;
+
+ # Get the wheel that sent us an event.
+
+ my $wheel_id = $event_args{wheel_id};
+
+ # Get the Stage object that owns this wheel.
+
+ my $self = $wheel_id_to_object{$wheel_id};
+ die unless $self;
+
+ # Get the emitted event name associated with this event.
+ my $event_name = $self->event_emit_names()->[$event_idx];
+
+ # Emit the event.
+ $self->emit(
+ event => $event_name,
+ args => \%event_args,
+ );
+}
+
+1;
View
116 docs/WheelRun.pm
@@ -0,0 +1,116 @@
+package WheelRun;
+use Moose;
+extends 'Wheel';
+use POE::Wheel::Run;
+
+# These are class methods, returning static class data.
+# TODO - How does Moose do this?
+
+sub event_to_index {
+ return(
+ {
+ StdinEvent => 0,
+ StdoutEvent => 1,
+ StderrEvent => 2,
+ ErrorEvent => 3,
+ CloseEvent => 4,
+ },
+ );
+}
+
+sub event_emit_names {
+ return(
+ [
+ 'stdin', # StdinEvent
+ 'stdout', # StdoutEvent
+ 'stderr', # StderrEvent
+ 'error', # ErrorEvent
+ 'closed', # ClosedEvent
+ ],
+ );
+}
+
+sub event_param_names {
+ return(
+ [
+ # 0 = StdinEvent
+ [ "wheel_id" ],
+
+ # 1 = StdoutEvent
+ [ "output", "wheel_id" ],
+
+ # 2 = StderrEvent
+ [ "output", "wheel_id" ],
+
+ # 3 = ErrorEvent
+ [ "operation", "errnum", "errstr", "wheel_id", "handle_name" ],
+
+ # 4 = CloseEvent
+ [ "wheel_id" ],
+ ]
+ );
+}
+
+sub wheel_class {
+ return 'POE::Wheel::Run';
+}
+
+sub valid_params {
+ return(
+ {
+ CloseOnCall => 1,
+ Conduit => 1,
+ Filter => 1,
+ Group => 1,
+ NoSetPgrp => 1,
+ NoSetSid => 1,
+ Priority => 1,
+ Program => 1,
+ ProgramArgs => 1,
+ StderrDriver => 1,
+ StderrFilter => 1,
+ StdinDriver => 1,
+ StdinFilter => 1,
+ StdioDriver => 1,
+ StdioFilter => 1,
+ StdoutDriver => 1,
+ StdoutFilter => 1,
+ User => 1,
+ }
+ );
+}
+
+# Also handle signals.
+
+use SignalChild;
+has sigchild_watcher => (
+ isa => 'SignalChild|Undef',
+ is => 'rw',
+);
+
+sub BUILD {
+ my $self = shift;
+
+ $self->sigchild_watcher(
+ SignalChild->new(
+ pid => $self->wheel()->PID(),
+ observers => [
+ {
+ observer => $self,
+ role => 'sigchld',
+ },
+ ],
+ )
+ );
+}
+
+# Rethrow our signal event.
+sub on_sigchld_signal {
+ my ($self, $args) = @_;
+ $self->emit(
+ event => 'signal',
+ args => $args,
+ );
+}
+
+1;
View
67 docs/eg-07-wheel-run.pl
@@ -0,0 +1,67 @@
+#!/usr/bin/perl
+
+# Demonstrate how wheels may be encapsulated in thin,
+# configuration-only subclasses.
+
+{
+ package Runner;
+ use Moose;
+ extends 'Stage';
+ use WheelRun;
+
+ has wheel => (
+ isa => 'WheelRun|Undef',
+ is => 'rw',
+ );
+
+ sub BUILD {
+ my $self = shift;
+
+ $self->wheel(
+ WheelRun->new(
+ Program => "$^X -wle 'print qq[pid(\$\$) moo(\$_)] for 1..10; exit'",
+ observers => [
+ {
+ observer => $self,
+ role => 'child',
+ },
+ ],
+ )
+ );
+ }
+
+ sub on_child_stdin {
+ print "stdin flushed\n";
+ }
+
+ sub on_child_stdout {
+ my ($self, $args) = @_;
+ print "stdout: $args->{output}\n";
+ }
+
+ sub on_child_stderr {
+ my ($self, $args) = @_;
+ print "stderr: $args->{output}\n";
+ }
+
+ sub on_child_error {
+ my ($self, $args) = @_;
+ return if $args->{operation} eq "read";
+ print "$args->{operation} error $args->{errnum}: $args->{errstr}\n";
+ }
+
+ sub on_child_close {
+ my ($self, $args) = @_;
+ print "child closed all output\n";
+ }
+
+ sub on_child_signal {
+ my ($self, $args) = @_;
+ print "child $args->{pid} exited: $args->{exit}\n";
+ $self->wheel(undef);
+ }
+}
+
+my $runner = Runner->new();
+POE::Kernel->run();
+exit;
View
12 docs/requirements.otl
@@ -1,4 +1,4 @@
-[_] 31% Framework Requirements
+[_] 30% Framework Requirements
About
This document summarizes the best ideas from the patterns document.
The patterns document tries to enumerate all available options.
@@ -137,7 +137,7 @@
[_] 0% Receiver data is not visible to the sender.
[_] 0% Message-scoped resources should be stored in the message's continuation.
[_] 0% Message cancelation triggers associated resource cleanup.
- [_] 22% Common primitive classes must be provided.
+ [_] 20% Common primitive classes must be provided.
[_] 29% Low-level event watchers.
[X] 100% I/O
[X] 100% Handle
@@ -150,10 +150,16 @@
[_] 0% Signals
[_] 0% Signal
[_] 0% What else?
- [_] 16% Primitive program pieces.
+ [_] 11% Primitive program pieces.
[_] 0% Application
[_] 0% Server
[_] 0% Stream
[_] 0% Process
+ [_] 0% Resolver
+ [_] 0% TcpClient
+ [_] 0% TcpServer
[X] 100% UdpPeer
[_] 0% What else? Probably a lot!
+ [_] % POE Interfaces
+ [_] % Wheel event to callback.
+ [_] % Postback to callback.

0 comments on commit 05d2e40

Please sign in to comment.
Something went wrong with that request. Please try again.