Permalink
Browse files

Experimentally coerce callbacks from contextual parameters.

  • Loading branch information...
1 parent 4dcae46 commit 9479f27167e894e18f36a4a7fbbb72155549156a @rcaputo committed Jun 28, 2011
View
42 eg/Proxy.pm
@@ -2,37 +2,37 @@ package Proxy;
# vim: ts=2 sw=2 noexpandtab
use Moose;
-extends 'Reflex::Base';
-use Reflex::Callbacks qw(make_null_handler);
+extends "Reflex::Base";
+use Reflex::Callbacks "make_null_handler";
-has handle_a => ( is => 'rw', isa => 'FileHandle', required => 1 );
-has handle_b => ( is => 'rw', isa => 'FileHandle', required => 1 );
+has client => ( is => "rw", isa => "FileHandle", required => 1 );
+has server => ( is => "rw", isa => "FileHandle", required => 1 );
-has active => ( is => 'ro', isa => 'Bool', default => 1 );
+has active => ( is => "ro", isa => "Bool", default => 1 );
-make_null_handler("on_handle_a_closed");
-make_null_handler("on_handle_b_closed");
-make_null_handler("on_handle_a_error");
-make_null_handler("on_handle_b_error");
+make_null_handler("on_client_closed");
+make_null_handler("on_server_closed");
+make_null_handler("on_client_error");
+make_null_handler("on_server_error");
-with 'Reflex::Role::Streaming' => {
- att_active => 'active',
- att_handle => 'handle_a',
+with "Reflex::Role::Streaming" => {
+ att_active => "active",
+ att_handle => "client",
};
-with 'Reflex::Role::Streaming' => {
- att_active => 'active',
- att_handle => 'handle_b',
+with "Reflex::Role::Streaming" => {
+ att_active => "active",
+ att_handle => "server",
};
-sub on_handle_a_data {
- my ($self, $arg) = @_;
- $self->put_handle_b($arg->{data});
+sub on_client_data {
+ my ($self, $arg) = @_;
+ $self->put_server($arg->{data});
}
-sub on_handle_b_data {
- my ($self, $arg) = @_;
- $self->put_handle_a($arg->{data});
+sub on_server_data {
+ my ($self, $arg) = @_;
+ $self->put_client($arg->{data});
}
1;
View
4 eg/eg-40-proxy.pl
@@ -16,8 +16,8 @@
# Proxy. Data appearing at either end is written to the other.
my $p = Proxy->new(
- handle_a => $socket_1b,
- handle_b => $socket_2b,
+ client => $socket_1b,
+ server => $socket_2b,
);
my $s1 = Reflex::Stream->new( handle => $socket_1a );
View
4 lib/Reflex/Interval.pm
@@ -5,8 +5,8 @@ use Moose;
extends 'Reflex::Base';
use Reflex::Callbacks qw(make_emitter);
-has interval => ( isa => 'Num', is => 'ro' );
-has auto_repeat => ( isa => 'Bool', is => 'ro', default => 1 );
+has interval => ( isa => 'Num', is => 'rw' );
+has auto_repeat => ( isa => 'Bool', is => 'rw', default => 1 );
has auto_start => ( isa => 'Bool', is => 'ro', default => 1 );
with 'Reflex::Role::Interval' => {
View
5 lib/Reflex/Role/Interval.pm
@@ -20,7 +20,10 @@ role {
my $att_interval = $p->att_interval();
my $cb_tick = $p->cb_tick();
- requires $att_auto_repeat, $att_auto_start, $att_interval, $cb_tick;
+ requires $att_interval, $cb_tick;
+
+ has $att_auto_repeat => ( is => 'ro', isa => 'Bool', default => 1 );
+ has $att_auto_start => ( is => 'ro', isa => 'Bool', default => 1 );
my $method_repeat = $p->method_repeat();
my $method_stop = $p->method_stop();
View
49 lib/Reflex/Role/Reactive.pm
@@ -46,6 +46,7 @@ sub _create_singleton_session {
_start => sub {
# No-op to satisfy assertions.
+ $_[KERNEL]->alias_set("alias_" . $_[SESSION]->ID);
undef;
},
_stop => sub {
@@ -150,6 +151,7 @@ sub session_id {
has watchers => (
isa => 'HashRef',
is => 'rw',
+ lazy => 1,
default => sub { {} },
);
@@ -158,6 +160,7 @@ has watchers => (
has watchers_by_event => (
isa => 'HashRef',
is => 'rw',
+ lazy => 1,
default => sub { {} },
);
@@ -166,12 +169,14 @@ has watchers_by_event => (
has watched_object_events => (
isa => 'HashRef',
is => 'rw',
+ lazy => 1,
default => sub { {} },
);
has watched_objects => (
isa => 'HashRef',
is => 'rw',
+ lazy => 1,
default => sub { {} },
);
@@ -265,23 +270,31 @@ after BUILD => sub {
CALLBACK: while (my ($param, $value) = each %$args) {
next unless $param =~ /^on_(\S+)/;
+ my $event = $1;
+
if (ref($value) eq "CODE") {
$value = Reflex::Callback::CodeRef->new(
object => $self,
code_ref => $value,
);
}
+ elsif (ref($value) eq "ARRAY") {
+ $value = Reflex::Callback::Method->new(
+ object => $value->[0],
+ method_name => $value->[1],
+ );
+ }
# There is an object, so we have a watcher.
if ($value->object()) {
- $value->object()->watch($self, $1 => $value);
+ $value->object()->watch($self, $event => $value);
next CALLBACK;
}
# TODO - Who is the watcher?
# TODO - Optimization! watch() takes multiple event/callback
# pairs. We can combine them into a hash and call watch() once.
- $self->watch($self, $1 => $value);
+ $self->watch($self, $event => $value);
next CALLBACK;
}
@@ -300,6 +313,31 @@ sub watch {
while (my ($event, $callback) = each %callbacks) {
$event =~ s/^on_//;
+ if (ref $callback) {
+ if (blessed($callback)) {
+ unless ($callback->isa('Reflex::Callback')) {
+ croak "Can't use $callback as a callback";
+ }
+ }
+ elsif (ref($callback) eq "CODE") {
+ # Coerce sub{} into Reflex::Callback.
+ $callback = Reflex::Callback::CodeRef->new(
+ object => $self,
+ code_ref => $callback,
+ );
+ }
+ else {
+ croak "Can't use $callback as a callback."
+ }
+ }
+ else {
+ # Coerce method name into a callback.
+ $callback = Reflex::Callback::Method->new(
+ object => $self,
+ method_name => $callback,
+ );
+ }
+
my $interest = {
callback => $callback,
event => $event,
@@ -312,7 +350,7 @@ sub watch {
weaken $self->watched_objects()->{$watched_id};
# Keep this object's session alive.
- $POE::Kernel::poe_kernel->refcount_increment($self->session_id, "in_use");
+ #$POE::Kernel::poe_kernel->refcount_increment($self->session_id, "in_use");
}
push @{$self->watched_object_events()->{$watched_id}->{$event}}, $interest;
@@ -428,6 +466,7 @@ sub emit {
# This event isn't watched.
my $deliver_event = $event;
+ #warn $deliver_event;
unless (exists $self->watchers_by_event()->{$deliver_event}) {
if ($self->promise()) {
$self->promise()->deliver($event, $callback_args);
@@ -551,7 +590,7 @@ sub ignore {
delete $self->watched_objects()->{$watched_id};
# Decrement the session's use count.
- $POE::Kernel::poe_kernel->refcount_decrement($self->session_id, "in_use");
+ #$POE::Kernel::poe_kernel->refcount_decrement($self->session_id, "in_use");
}
$watched->_stop_watchers($self, \@events);
}
@@ -562,7 +601,7 @@ sub ignore {
$watched->_stop_watchers($self);
# Decrement the session's use count.
- $POE::Kernel::poe_kernel->refcount_decrement($self->session_id, "in_use");
+ #$POE::Kernel::poe_kernel->refcount_decrement($self->session_id, "in_use");
}
}
View
2 lib/Reflex/Role/Readable.pm
@@ -79,7 +79,7 @@ role {
# Turn off watcher during destruction.
after DEMOLISH => sub {
my $self = shift;
- $POE::Kernel::poe_kernel->select_read($self->$att_handle(), undef);
+ $self->$method_stop();
};
};
View
10 lib/Reflex/Role/SigCatcher.pm
@@ -117,6 +117,11 @@ role {
# Be in the session associated with this object.
return unless $self->call_gate($method_pause);
+ $POE::Kernel::poe_kernel->refcount_decrement(
+ $self->session_id(),
+ "signals_keep_alive"
+ );
+
$POE::Kernel::poe_kernel->sig($self->$att_signal(), undef);
};
@@ -126,6 +131,11 @@ role {
# Be in the session associated with this object.
return unless $self->call_gate($method_resume);
+ $POE::Kernel::poe_kernel->refcount_increment(
+ $self->session_id(),
+ "signals_keep_alive"
+ );
+
$POE::Kernel::poe_kernel->sig(
$self->$att_signal(), "signal_happened", ref($self)
);
View
24 lib/Reflex/Role/Timeout.pm
@@ -14,11 +14,13 @@ method_parameter method_stop => qw( stop att_delay _ );
role {
my $p = shift;
- my $att_delay = $p->att_delay();
- my $auto_start = $p->att_auto_start();
- my $cb_timeout = $p->cb_timeout();
+ my $att_delay = $p->att_delay();
+ my $att_auto_start = $p->att_auto_start();
+ my $cb_timeout = $p->cb_timeout();
- requires $att_delay, $auto_start, $cb_timeout;
+ requires $att_delay, $cb_timeout;
+
+ has $att_auto_start => ( is => 'ro', isa => 'Bool', default => 1 );
my $method_reset = $p->method_reset();
my $method_start = $p->method_start();
@@ -36,7 +38,7 @@ role {
after BUILD => sub {
my ($self, $args) = @_;
- $self->$method_start() if $self->$auto_start();
+ $self->$method_start() if $self->$att_auto_start();
};
my $code_start = sub {
@@ -106,12 +108,12 @@ Reflex::Role::Timeout - set a wakeup callback for a relative delay
has auto_start => ( isa => 'Bool', is => 'ro', default => 1 );
with 'Reflex::Role::Timeout' => {
- delay => "delay",
- cb_timeout => "on_done",
- auto_start => "auto_start",
- method_start => "start",
- method_stop => "stop",
- method_reset => "reset",
+ delay => "delay",
+ cb_timeout => "on_done",
+ att_auto_start => "auto_start",
+ method_start => "start",
+ method_stop => "stop",
+ method_reset => "reset",
};
1;

0 comments on commit 9479f27

Please sign in to comment.