Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Massive redoing of Reflex roles and some of the objects that use them.

For example, Reflex::Handle is deprecated in favor of Reflex::Stream,
which is implemented now in terms of Reflex::Role::Streaming.
  • Loading branch information...
commit 08bafbd2b824ae0ceed83ea9e70757929899527d 1 parent c1c573e
@rcaputo authored
View
6 eg/EchoStream.pm
@@ -2,18 +2,18 @@ package EchoStream;
use Moose;
extends 'Reflex::Stream';
-sub on_stream_data {
+sub on_handle_data {
my ($self, $args) = @_;
$self->put($args->{data});
}
-sub on_stream_failure {
+sub on_handle_error {
my ($self, $args) = @_;
warn "$args->{errfun} error $args->{errnum}: $args->{errstr}\n";
$self->emit( event => "stopped", args => {} );
}
-sub on_stream_closed {
+sub on_handle_closed {
my ($self, $args) = @_;
$self->emit( event => "stopped", args => {} );
}
View
15 eg/eg-04-inheritance.pl
@@ -9,16 +9,16 @@
# the composition architectures in past examples.
{
- package Reflex::UdpPeer::Echo;
+ package Reflex::Udp::Echo;
use Moose;
extends 'Reflex::UdpPeer';
- sub on_udppeer_datagram {
+ sub on_socket_datagram {
my ($self, $args) = @_;
my $data = $args->{datagram};
if ($data =~ /^\s*shutdown\s*$/) {
- $self->destruct();
+ $self->stop_socket_readable();
return;
}
@@ -28,7 +28,7 @@
);
}
- sub on_udppeer_error {
+ sub on_socket_error {
my ($self, $args) = @_;
warn "$args->{op} error $args->{errnum}: $args->{errstr}";
$self->destruct();
@@ -38,7 +38,12 @@
# Main.
my $port = 12345;
-my $peer = Reflex::UdpPeer::Echo->new( port => $port );
+my $peer = Reflex::Udp::Echo->new(
+ socket => IO::Socket::INET->new(
+ LocalPort => $port,
+ Proto => 'udp',
+ )
+);
print "UDP echo service is listening on port $port.\n";
Reflex::Object->run_all();
exit;
View
14 eg/eg-05-composition.pl
@@ -9,7 +9,7 @@
# Reflex::UdpPeer object rather than inheriting from that class.
{
- package Reflex::UdpPeer::Echo;
+ package Reflex::Udp::Echo;
use Moose;
extends 'Reflex::Object';
use Reflex::UdpPeer;
@@ -20,12 +20,17 @@
);
has peer => (
- isa => 'Reflex::UdpPeer|Undef',
+ isa => 'Maybe[Reflex::UdpPeer]',
is => 'rw',
traits => ['Reflex::Trait::Observed'],
setup => sub {
my $self = shift;
- Reflex::UdpPeer->new(port => $self->port());
+ Reflex::UdpPeer->new(
+ socket => IO::Socket::INET->new(
+ LocalPort => $self->port(),
+ Proto => 'udp',
+ )
+ )
},
);
@@ -46,6 +51,7 @@
sub on_peer_error {
my ($self, $args) = @_;
+
warn "$args->{op} error $args->{errnum}: $args->{errstr}";
$self->peer(undef);
}
@@ -54,7 +60,7 @@
# Main.
my $port = 12345;
-my $peer = Reflex::UdpPeer::Echo->new( port => $port );
+my $peer = Reflex::Udp::Echo->new( port => $port );
print "UDP echo service is listening on port $port.\n";
Reflex::Object->run_all();
exit;
View
47 eg/eg-06-moose-roles.pl
@@ -4,42 +4,51 @@
use strict;
use lib qw(../lib);
-# An object's emitted events can also trigger methods in the subclass.
# This example creates a UDP echo server using a role rather than
# inheritance.
{
- package Reflex::UdpPeer::Echo;
+ package Reflex::Udp::Echo;
use Moose;
extends 'Reflex::Object';
- with 'Reflex::Role::UdpPeer';
+ use IO::Socket::INET;
- sub on_udppeer_datagram {
- my ($self, $args) = @_;
- my $data = $args->{datagram};
+ has socket => (
+ is => 'ro',
+ isa => 'FileHandle',
+ required => 1,
+ );
- if ($data =~ /^\s*shutdown\s*$/) {
- $self->destruct();
+ with 'Reflex::Role::Recving' => {
+ handle => 'socket',
+
+ # Expose send_socket() as send().
+ -alias => { send_socket => 'send' },
+ -excludes => 'send_socket'
+
+ };
+
+ sub on_socket_datagram {
+ my ($self, $arg) = @_;
+
+ if ($arg->{datagram} =~ /^\s*shutdown\s*$/) {
+ $self->stop_socket_readable();
return;
}
- $self->send(
- datagram => $data,
- remote_addr => $args->{remote_addr},
- );
- }
-
- sub on_udppeer_error {
- my ($self, $args) = @_;
- warn "$args->{op} error $args->{errnum}: $args->{errstr}";
- $self->destruct();
+ $self->send(%$arg);
}
}
# Main.
my $port = 12345;
-my $peer = Reflex::UdpPeer::Echo->new( port => $port );
+my $peer = Reflex::Udp::Echo->new(
+ socket => IO::Socket::INET->new(
+ LocalPort => $port,
+ Proto => 'udp',
+ )
+);
print "UDP echo service is listening on port $port.\n";
Reflex::Object->run_all();
exit;
View
15 eg/roles/Stream.pm
@@ -1,15 +0,0 @@
-package Stream;
-use Moose;
-extends 'Reflex::Object';
-
-has handle => ( is => 'rw', isa => 'FileHandle', required => 1 );
-
-with 'Reflex::Role::Streaming' => {
- handle => 'handle',
-
- # Expose put_handle() as put().
- -alias => { put_handle => 'put' },
- -excludes => 'put_handle',
-};
-
-1;
View
8 eg/roles/proxy.pl
@@ -1,7 +1,9 @@
use Moose;
use Socket qw(AF_UNIX SOCK_STREAM PF_UNSPEC);
use Proxy;
-use Stream;
+use Reflex::Stream;
+1;
+
# Socket pair 1. Writes to either end are readable at the other.
my ($socket_1a, $socket_1b);
@@ -17,8 +19,8 @@
handle_b => $socket_2b,
);
-my $s1 = Stream->new( handle => $socket_1a );
-my $s2 = Stream->new( handle => $socket_2a );
+my $s1 = Reflex::Stream->new( handle => $socket_1a );
+my $s2 = Reflex::Stream->new( handle => $socket_2a );
# Write data to Socket 1a.
# It will appear on Socket 1b, via the socketpair.
View
6 lib/Reflex/Callback/Promise.pm
@@ -20,8 +20,10 @@ sub next {
my $queue = $self->queue();
- # TODO - Probably should bail out if the event loop ends.
- $POE::Kernel::poe_kernel->run_one_timeslice() while @$queue < 1;
+ # Run while the queue is empty and POE has things to do.
+ 1 while (
+ @$queue < 1 and $POE::Kernel::poe_kernel->run_one_timeslice()
+ );
return shift @$queue;
}
View
2  lib/Reflex/Connector.pm
@@ -39,7 +39,7 @@ sub BUILD {
my $packed_address;
if ($handle->isa("IO::Socket::INET")) {
- # TODO - Non-bollocking resolver.
+ # TODO - Need a non-bollocking resolver.
my $inet_address = inet_aton($self->remote_addr());
$packed_address = pack_sockaddr_in($self->remote_port(), $inet_address);
}
View
2  lib/Reflex/POE/Wheel/Run.pm
@@ -4,7 +4,7 @@ extends 'Reflex::POE::Wheel';
use POE::Wheel::Run;
# These are class methods, returning static class data.
-# TODO - How does Moose do this?
+# TODO - What's the proper way to do this with Moose?
sub event_to_index {
return(
View
20 lib/Reflex/Role/Object.pm
@@ -13,12 +13,15 @@ END {
our @CARP_NOT = (__PACKAGE__);
# Singleton POE::Session.
-# TODO - Extract the POE bits into another role.
+# TODO - Extract the POE bits into another role if we want to support
+# other event loops at the top level rather than beneath POE.
-# TODO - How to prevent this from being redefined?
+# TODO - How to prevent these from being redefined?
+# TODO - Such as if POE is loaded elsewhere first?
+#
#sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+#sub POE::Kernel::CATCH_EXCEPTIONS () { 0 }
-sub POE::Kernel::CATCH_EXCEPTIONS () { 0 }
use POE;
use Reflex::POE::Session;
@@ -92,7 +95,7 @@ my $singleton_session_id = POE::Session->create(
return if Reflex::POE::Session->deliver($_[SENDER]->ID, $event, $args);
# Unhandled event.
- # TODO - Anything special?
+ # TODO - Should anything special be done in this case?
},
### Support POE::Wheel classes.
@@ -243,8 +246,8 @@ sub BUILD {
}
# TODO - Who is the watcher?
- # TODO - watch() takes multiple event/callback pairs. We can
- # combine them into a hash and call watch() once.
+ # TODO - Optimization! watch() takes multiple event/callback
+ # pairs. We can combine them into a hash and call watch() once.
$self->watch($self, $1 => $value);
next CALLBACK;
}
@@ -332,8 +335,9 @@ sub _is_watched {
sub emit {
my ($self, @args) = @_;
- # TODO - Checking arguments is tedious, but check_args() method
- # calls add up.
+ # TODO - Is there a better way to check parameters? Checking them
+ # in custom code is tedious. Calling check_args() is relatively
+ # slow. Can we have our peanut butter and our chocolate together?
my $args = $self->check_args(
\@args,
View
35 lib/Reflex/Role/Readable.pm
@@ -3,7 +3,8 @@ use MooseX::Role::Parameterized;
use Reflex::Util::Methods qw(emit_an_event);
# TODO - Reflex::Role::Readable and Writable are nearly identical.
-# Can they be abstracted further?
+# Can they be abstracted further? Possibly composed as parameterized
+# instances of a common base role?
use Scalar::Util qw(weaken);
@@ -44,6 +45,15 @@ parameter method_resume => (
lazy => 1,
);
+parameter method_stop => {
+ isa => 'Str',
+ default => sub {
+ my $self = shift;
+ "stop_" . $self->handle() . "_readable";
+ },
+ lazy => 1,
+};
+
role {
my $p = shift;
@@ -51,8 +61,6 @@ role {
my $active = $p->active();
my $cb_name = $p->cb_ready();
- my $pause_name = $p->method_pause();
- my $resume_name = $p->method_resume();
my $setup_name = "_setup_${h}_readable";
method $setup_name => sub {
@@ -72,16 +80,21 @@ role {
$POE::Kernel::poe_kernel->select_pause_read($self->$h());
};
- method $pause_name => sub {
+ method $p->method_pause() => sub {
my $self = shift;
$POE::Kernel::poe_kernel->select_pause_read($self->$h());
};
- method $resume_name => sub {
+ method $p->method_resume() => sub {
my $self = shift;
$POE::Kernel::poe_kernel->select_resume_read($self->$h());
};
+ method $p->method_stop() => sub {
+ my $self = shift;
+ $POE::Kernel::poe_kernel->select_read($self->$h(), undef);
+ };
+
after BUILD => sub {
my ($self, $arg) = @_;
$self->$setup_name($arg);
@@ -90,7 +103,7 @@ role {
# Turn off watcher during destruction.
after DEMOLISH => sub {
my $self = shift;
- $POE::Kernel::poe_kernel->select_read($self->h(), undef);
+ $POE::Kernel::poe_kernel->select_read($self->$h(), undef);
};
# Part of the POE/Reflex contract.
@@ -100,7 +113,7 @@ role {
};
# Default callbacks that re-emit their parameters.
- method $cb_name => emit_an_event("${h}_readable");
+ method $cb_name => emit_an_event("readable");
};
1;
@@ -151,6 +164,7 @@ would generates these methods by default:
cb_ready => "on_XYZ_readable",
method_pause => "pause_XYZ_readable",
method_resume => "resume_XYZ_readable",
+ method_stop => "stop_XYZ_readable",
This naming convention allows the role to be used for more than one
handle in the same class. Each handle will have its own name, and the
@@ -196,6 +210,13 @@ the watcher. It is "pause_${handle}_readable" by default.
C<method_resume> may be used to resume paused readability watchers, or
to activate them if they are started in an inactive state.
+=head3 method_stop
+
+C<method_stop> may be used to stop readability watchers. These
+watchers may not be restarted once they've been stopped. If you want
+to pause and resume watching, see C<method_pause> and
+C<method_resume>.
+
=head1 EXAMPLES
TODO - I'm sure there are some.
View
28 lib/Reflex/Role/Recving.pm
@@ -4,7 +4,7 @@ use Reflex::Util::Methods qw(emit_an_event);
parameter handle => (
isa => 'Str',
- default => 'handle',
+ default => 'socket',
);
parameter cb_datagram => (
@@ -34,6 +34,15 @@ parameter method_send => (
lazy => 1,
);
+parameter method_stop => (
+ isa => 'Str',
+ default => sub {
+ my $self = shift;
+ "stop_" . $self->handle();
+ },
+ lazy => 1,
+);
+
parameter max_datagram_size => (
isa => 'Int',
is => 'rw',
@@ -52,6 +61,11 @@ role {
handle => $h,
};
+ method $p->method_stop() => sub {
+ my $self = shift;
+ $self->$h(undef);
+ };
+
method "on_${h}_readable" => sub {
my ($self, $args) = @_;
@@ -108,8 +122,8 @@ role {
};
# Default callbacks that re-emit their parameters.
- method $cb_datagram => emit_an_event("${h}_data");
- method $cb_error => emit_an_event("${h}_error");
+ method $cb_datagram => emit_an_event("datagram");
+ method $cb_error => emit_an_event("error");
};
1;
@@ -117,10 +131,6 @@ role {
__END__
-sub destruct {
- my $self = shift;
- $self->handle(undef);
-}
1;
@@ -128,13 +138,15 @@ __END__
=head1 NAME
-Reflex::Role::UdpPeer - Add non-blocking UDP networking to an object.
+Reflex::Role::Recving - Mix standard send/recv code into a class.
=head1 SYNOPSIS
This UDP echo service comes from a more complete program,
eg/eg-06-moose-roles.pl in Reflex's tarball.
+TODO - New!
+
package Reflex::UdpPeer::Echo;
use Moose;
with 'Reflex::Role::UdpPeer';
View
28 lib/Reflex/Role/Streaming.pm
@@ -27,6 +27,15 @@ parameter cb_error => (
lazy => 1,
);
+parameter cb_closed => (
+ isa => 'Str',
+ default => sub {
+ my $self = shift;
+ "on_" . $self->handle() . "_closed";
+ },
+ lazy => 1,
+);
+
parameter method_put => (
isa => 'Str',
default => sub {
@@ -42,6 +51,7 @@ role {
my $h = $p->handle();
my $cb_data = $p->cb_data();
my $cb_error = $p->cb_error();
+ my $cb_closed = $p->cb_closed();
with 'Reflex::Role::Readable' => {
handle => $h,
@@ -62,13 +72,20 @@ role {
my ($self, $arg) = @_;
my $octet_count = sysread($arg->{handle}, my $buffer = "", 65536);
+
+ # Got data.
if ($octet_count) {
$self->$cb_data({ data => $buffer });
return;
}
- return if defined $octet_count;
+ # EOF
+ if (defined $octet_count) {
+ $self->$cb_closed({ });
+ return;
+ }
+ # Quelle erreur!
$self->cb_error(
{
errnum => ($! + 0),
@@ -78,10 +95,10 @@ role {
);
};
- method $self->method_put() => sub {
+ method $p->method_put() => sub {
my ($self, @chunks) = @_;
- # TODO - Benchmark string vs. array.
+ # TODO - Benchmark string vs. array buffering.
use bytes;
@@ -125,8 +142,9 @@ role {
};
# Default callbacks that re-emit their parameters.
- method $cb_data => emit_an_event("${h}_data");
- method $cb_error => emit_an_event("${h}_error");
+ method $cb_data => emit_an_event("data");
+ method $cb_error => emit_an_event("error");
+ method $cb_closed => emit_an_event("closed");
};
1;
View
35 lib/Reflex/Role/Writable.pm
@@ -3,7 +3,8 @@ use MooseX::Role::Parameterized;
use Reflex::Util::Methods qw(emit_an_event);
# TODO - Reflex::Role::Readable and Writable are nearly identical.
-# Can they be abstracted further?
+# Can they be abstracted further? Possibly composed as parameterized
+# instances of a common base role?
use Scalar::Util qw(weaken);
@@ -44,6 +45,15 @@ parameter method_resume => (
lazy => 1,
);
+parameter method_stop => {
+ isa => 'Str',
+ default => sub {
+ my $self = shift;
+ "stop_" . $self->handle() . "_writable";
+ },
+ lazy => 1,
+};
+
role {
my $p = shift;
@@ -51,8 +61,6 @@ role {
my $active = $p->active();
my $cb_name = $p->cb_ready();
- my $pause_name = $p->method_pause();
- my $resume_name = $p->method_resume();
my $setup_name = "_setup_${h}_writable";
method $setup_name => sub {
@@ -72,16 +80,21 @@ role {
$POE::Kernel::poe_kernel->select_pause_write($self->$h());
};
- method $pause_name => sub {
+ method $p->method_pause() => sub {
my $self = shift;
$POE::Kernel::poe_kernel->select_pause_read($self->$h());
};
- method $resume_name => sub {
+ method $p->method_resume() => sub {
my $self = shift;
$POE::Kernel::poe_kernel->select_resume_read($self->$h());
};
+ method $p->method_stop() => sub {
+ my $self = shift;
+ $POE::Kernel::poe_kernel->select_read($self->$h(), undef);
+ };
+
after BUILD => sub {
my ($self, $arg) = @_;
$self->$setup_name($arg);
@@ -90,7 +103,7 @@ role {
# Turn off watcher during destruction.
after DEMOLISH => sub {
my $self = shift;
- $POE::Kernel::poe_kernel->select_write($self->h(), undef);
+ $POE::Kernel::poe_kernel->select_write($self->$h(), undef);
};
# Part of the POE/Reflex contract.
@@ -100,7 +113,7 @@ role {
};
# Default callbacks that re-emit their parameters.
- method $cb_name => emit_an_event("${h}_writable");
+ method $cb_name => emit_an_event("writable");
};
1;
@@ -151,6 +164,7 @@ named "XYZ" would generates these methods by default:
cb_ready => "on_XYZ_writable",
method_pause => "pause_XYZ_writable",
method_resume => "resume_XYZ_writable",
+ method_stop => "stop_XYZ_writable",
This naming convention allows the role to be used for more than one
handle in the same class. Each handle will have its own name, and the
@@ -196,6 +210,13 @@ the watcher. It is "pause_${handle}_writable" by default.
C<method_resume> may be used to resume paused writability watchers, or
to activate them if they are started in an inactive state.
+=head3 method_stop
+
+C<method_stop> may be used to stop readability watchers. These
+watchers may not be restarted once they've been stopped. If you want
+to pause and resume watching, see C<method_pause> and
+C<method_resume>.
+
=head1 EXAMPLES
TODO - I'm sure there are some.
View
234 lib/Reflex/Stream.pm
@@ -1,139 +1,21 @@
package Reflex::Stream;
use Moose;
-extends 'Reflex::Handle';
-
-# TODO - I've seen output buffers done two ways. First as a string
-# that's appended to on push and lopped on srite. Second as an array
-# of chunks. The theory behind using arrays is that shift is faster
-# than substr($string, 0, 1024) = "". Or even 4-arg substr(). We
-# should comparatively benchmark them. Meanwhile, I'm going to use
-# the big string buffer for simplicity.
-#
-# Stored as a string reference so we can modify it without calling
-# accessors for silly things.
-
-# TODO - Buffer put() if not connected. Flush them after connect.
-
-has out_buffer => (
- is => 'rw',
- isa => 'ScalarRef',
- default => sub { my $x = ""; \$x },
-);
-
-sub put {
- my ($self, @chunks) = @_;
-
- # TODO - Benchmark string vs. array.
-
- my $out_buffer = $self->out_buffer();
- if (length $$out_buffer) {
- $$out_buffer .= $_ foreach @chunks;
- return;
- }
-
- # Try to flush 'em all.
- while (@chunks) {
- my $next = shift @chunks;
- my $octet_count = syswrite($self->handle(), $next);
-
- # Hard error.
- unless (defined $octet_count) {
- $self->_emit_failure("syswrite");
- return;
- }
-
- use bytes;
-
- # Wrote it all! Whooooo!
- next if $octet_count == length $next;
-
- # Wrote less than all. Save the rest, and turn on write
- # multiplexing.
-
- $$out_buffer = substr($next, $octet_count);
- $$out_buffer .= $_ foreach @chunks;
- $self->wr(1);
- return;
- }
-
- # Flushed it all. Yay!
- return;
-}
-
-sub on_handle_readable {
- my ($self, $args) = @_;
-
- my $in_buffer = "";
- my $octet_count = sysread($args->{handle}, $in_buffer, 65536);
-
- # Hard error.
- unless (defined $octet_count) {
- $self->_emit_failure("sysread");
- $self->rd(0);
- return;
- }
-
- # Closure.
- unless ($octet_count) {
- # TODO - It's getting a little tedious to specify empty args for
- # events that don't include data.
- $self->emit(event => "closed", args => {} );
- $self->rd(0);
- return;
- }
-
- $self->emit(
- event => "data",
- args => {
- data => $in_buffer
- },
- );
-
- return;
-}
-
-sub on_handle_writable {
- my ($self, $args) = @_;
-
- my $out_buffer = $self->out_buffer();
- my $octet_count = syswrite($args->{handle}, $$out_buffer);
-
- unless (defined $octet_count) {
- $self->_emit_failure("syswrite");
- $self->wr(0);
- return;
- }
-
- sue bytes;
-
- # Wrote it all! Whooooo!
- if ($octet_count == length $$out_buffer) {
- $$out_buffer = "";
- $self->wr(0);
- return;
- }
+extends 'Reflex::Object';
- # Only wrote some. Remove that.
- substr($$out_buffer, 0, $octet_count) = "";
- return;
-}
-
-sub _emit_failure {
- my ($self, $errfun) = @_;
-
- $self->emit(
- event => "failure",
- args => {
- data => undef, # TODO - Indicates fail another way.
- errnum => ($!+0),
- errstr => "$!",
- errfun => $errfun,
- },
- );
+has handle => (
+ is => 'rw',
+ isa => 'FileHandle',
+ required => 1
+);
- return;
-}
+with 'Reflex::Role::Streaming' => {
+ handle => 'handle',
+ method_put => 'put',
+ cb_error => 'on_stream_error',
+ cb_data => 'on_stream_data',
+ cb_closed => 'on_stream_closed',
+};
1;
@@ -153,18 +35,18 @@ Reflex::Collection.
use Moose;
extends 'Reflex::Stream';
- sub on_stream_data {
+ sub on_handle_data {
my ($self, $args) = @_;
$self->put($args->{data});
}
- sub on_stream_failure {
+ sub on_handle_error {
my ($self, $args) = @_;
warn "$args->{errfun} error $args->{errnum}: $args->{errstr}\n";
$self->emit( event => "stopped", args => {} );
}
- sub on_stream_closed {
+ sub on_handle_closed {
my ($self, $args) = @_;
$self->emit( event => "stopped", args => {} );
}
@@ -196,32 +78,67 @@ promise. This incomplte example comes from eg/eg-38-promise-client.pl:
=head1 DESCRIPTION
Reflex::Stream reads from and writes to a file handle, most often a
-socket. It uses Reflex::Handle to read data from the handle when it
-arrives, and to write data to the handle as space becomes available.
-Data that cannot be written right away will be buffered until
-Reflex::Handle says the handle can accept more.
+socket. It is almost entirely implemented in Reflex::Role::Streaming.
+That role's documentation contains important details that won't be
+covered here.
=head2 Public Attributes
-Reflex::Stream inherits attributes from Reflex::Handle. Please see
-the other module for the latest documentation.
+=head3 handle
-One Reflex::Handle attribute to be wary of is rd(). It defaults to
-false, so Reflex::Stream objects don't start off ready to read data.
-This is subject to change.
-
-No other public attributes are defined.
+Reflex::Stream implements a single attribute, handle, that must be set
+to the stream's file handle (which can be a socket or something).
=head2 Public Methods
-Reflex::Stream adds its own public methods to those that may be
-inherited by Refex::Handle.
+Reflex::Role::Streaming provides all of Reflex::Stream's methods.
+Reflex::Stream however renames them to make more sense in a class.
=head3 put
The put() method writes one or more chunks of raw octets to the
stream's handle. Any data that cannot be written immediately will be
-buffered until Reflex::Handle says it's safe to write again.
+buffered until Reflex::Role::Streaming can write it later.
+
+Please see L<Reflex::Role::Streaming/method_put> for details.
+
+=head2 Callbacks
+
+=head3 on_stream_closed
+
+Subclasses may define on_stream_closed() to be notified when the
+remote end of the stream has closed for output. No further data will
+be received after receipt of this callback.
+
+on_stream_closed() receives no parameters of note.
+
+The default on_stream_closed() callback will emit a "closed" event.
+
+=head3 on_stream_data
+
+on_stream_data() will be called whenever Reflex::Stream receives data.
+It will include one named parameter in $_[1], "data", containing raw
+octets received from the stream.
+
+ sub on_stream_data {
+ my ($self, $param) = @_;
+ print "Got data: $param->{data}\n";
+ }
+
+The default on_stream_data() callback will emit a "data" event.
+
+=head3 on_stream_error
+
+on_stream_error() will be called if an error occurs reading from or
+writing to the stream's handle. Its parameters are the usual for
+Reflex:
+
+ sub on_stream_error {
+ my ($self, $param) = @_;
+ print "$param->{errfun} error $param->{errnum}: $param->{errstr}\n";
+ }
+
+The default on_stream_error() callback will emit a "error" event.
=head2 Public Events
@@ -232,35 +149,28 @@ Reflex::Stream emits stream-related events, naturally.
The "closed" event indicates that the stream is closed. This is most
often caused by the remote end of a socket closing their connection.
+See L</on_stream_closed> for more details.
+
=head3 data
The "data" event is emitted when a stream produces data to work with.
It includes a single parameter, also "data", containing the raw octets
read from the handle.
-=head3 failure
-
-Reflex::Stream emits "failure" when any of a number of calls fails.
-This event's parameters include:
-
-=over 2
-
-=item * data - Undefined, since no data could be read.
-
-=item * errnum - The numeric value of $! at the time of error.
+See L</on_stream_data> for more details.
-=item * errstr - The string value of $! at the time of error.
+=head3 error
-=item * errfun - A brief description of the function call that failed.
+Reflex::Stream emits "error" when any of a number of calls fails.
-=back
+See L</on_stream_error> for more details.
=head1 EXAMPLES
eg/EchoStream.pm in the distribution is the same EchoStream that
appears in the SYNOPSIS.
-eg/eg-38-promise-client.pl shows a lengthy condvar-esque usage of
+eg/eg-38-promise-client.pl shows a lengthy inline usage of
Reflex::Stream and a few other classes.
=head1 SEE ALSO
View
47 lib/Reflex/UdpPeer.pm
@@ -1,10 +1,23 @@
package Reflex::UdpPeer;
use Moose;
extends 'Reflex::Object';
-with 'Reflex::Role::UdpPeer';
-# Composes Reflex::Role::udpPeer into a class.
-# Does nothing of its own.
+has socket => (
+ is => 'rw',
+ isa => 'Maybe[FileHandle]',
+ required => 1,
+);
+
+with 'Reflex::Role::Recving' => {
+ handle => 'socket',
+
+ # Expose role methods with more sensible names for a class.
+ -alias => {
+ send_socket => 'send',
+ stop_socket => 'stop',
+ },
+ -excludes => [ qw(send_socket stop_socket) ],
+};
1;
@@ -16,18 +29,20 @@ Reflex::UdpPeer - Base class for non-blocking UDP networking peers.
=head1 SYNOPSIS
+TODO - Rewritten. Need to rewrite docs, too.
+
Inherit it.
- package Reflex::UdpPeer::Echo;
+ package Reflex::Udp::Echo;
use Moose;
extends 'Reflex::UdpPeer';
- sub on_udppeer_datagram {
+ sub on_socket_datagram {
my ($self, $args) = @_;
my $data = $args->{datagram};
if ($data =~ /^\s*shutdown\s*$/) {
- $self->destruct();
+ $self->stop_socket_readable();
return;
}
@@ -37,7 +52,7 @@ Inherit it.
);
}
- sub on_udppeer_error {
+ sub on_socket_error {
my ($self, $args) = @_;
warn "$args->{op} error $args->{errnum}: $args->{errstr}";
$self->destruct();
@@ -45,23 +60,25 @@ Inherit it.
Use it as a helper.
- package Reflex::UdpPeer::Echo;
+ package Reflex::Udp::Echo;
use Moose;
extends 'Reflex::Object';
use Reflex::UdpPeer;
- has port => (
- isa => 'Int',
- is => 'ro',
- );
+ has port => ( isa => 'Int', is => 'ro' );
has peer => (
- isa => 'Reflex::UdpPeer|Undef',
+ isa => 'Maybe[Reflex::UdpPeer]',
is => 'rw',
traits => ['Reflex::Trait::Observed'],
setup => sub {
my $self = shift;
- Reflex::UdpPeer->new(port => $self->port());
+ Reflex::UdpPeer->new(
+ socket => IO::Socket::INET->new(
+ LocalPort => $self->port(),
+ Proto => 'udp',
+ )
+ )
},
);
@@ -88,7 +105,7 @@ Use it as a helper.
Compose objects with its base role.
- # See L<Reflex::Role::UdpPeer>.
+ # See L<Reflex::Role::Recving>.
Use it as a promise (like a condvar), or set callbacks in its
constructor.
View
14 lib/Reflex/Util/Methods.pm
@@ -10,12 +10,10 @@ our @EXPORT_OK = qw(emit_an_event);
sub emit_an_event {
my ($event_name) = @_;
- return(
- $cb_name => sub {
- my ($self, $args) = @_;
- $self->emit(event => $event_name, args => $args);
- }
- );
+ return sub {
+ my ($self, $args) = @_;
+ $self->emit(event => $event_name, args => $args);
+ };
}
1;
@@ -50,8 +48,8 @@ Reflex::Util::Methods - helper functions to generate methods
# (Lots of stuff omitted here.)
# Default callbacks that re-emit their parameters.
- method $cb_datagram => emit_an_event("${h}_data");
- method $cb_error => emit_an_event("${h}_error");
+ method $cb_datagram => emit_an_event("data");
+ method $cb_error => emit_an_event("error");
};
=head1 DESCRIPTION
Please sign in to comment.
Something went wrong with that request. Please try again.