Permalink
Browse files

Add the beginnings of a socket listener, a stream reader/writer, and …

…an example TCP server.
  • Loading branch information...
1 parent 4adc50c commit de14455620a1df3005cd609af130780b093f126f @rcaputo committed Apr 11, 2010
@@ -0,0 +1,81 @@
+# A TCP echo server.
+
+use lib qw(./lib ../lib);
+
+{
+ package TcpEchoSession;
+ use Moose;
+ extends 'Reflex::Stream';
+
+ sub on_my_stream {
+ my ($self, $args) = @_;
+ $self->put($args->{data});
+ }
+
+ sub on_my_fail {
+ my ($self, $args) = @_;
+ warn "$args->{errfun} error $args->{errnum}: $args->{errstr}\n";
+ $self->emit( event => "shutdown", args => {} );
+ }
+
+ sub on_my_close {
+ my ($self, $args) = @_;
+ $self->emit( event => "shutdown", args => {} );
+ }
+}
+
+{
+ package TcpEchoServer;
+
+ use Moose;
+ extends 'Reflex::Listener';
+ use Reflex::Callbacks qw(cb_method);
+
+ has clients => (
+ is => 'rw',
+ isa => 'HashRef[Reflex::Stream]',
+ default => sub { {} },
+ );
+
+ sub on_my_accepted {
+ my ($self, $args) = @_;
+
+ # TODO - We're developing a pattern here:
+ # 1. Create a managed object,
+ # 2. The new managed object will contain a weak manager reference.
+ # 3. The manager enters the object into a hash, keyed on object.
+ # 4. Later, the object will tell the manager when it shuts down.
+
+ my $client = TcpEchoSession->new(
+ handle => $args->{socket},
+ rd => 1,
+ );
+
+ $self->observe(
+ $client,
+ shutdown => cb_method($self, "on_client_shutdown")
+ );
+
+ $self->clients()->{$client} = $client;
+ }
+
+ sub on_my_fail {
+ my ($self, $args) = @_;
+ warn "$args->{errfun} error $args->{errnum}: $args->{errstr}\n";
+ }
+
+ sub on_client_shutdown {
+ my ($self, $args) = @_;
+ delete $self->clients()->{$args->{_sender}};
+ }
+}
+
+TcpEchoServer->new(
+ handle => IO::Socket::INET->new(
+ LocalAddr => '127.0.0.1',
+ LocalPort => 12345,
+ Listen => 5,
+ Reuse => 1,
+ ),
+ rd => 1,
+)->run_all();
@@ -3,8 +3,6 @@ package Reflex::Callback::CodeRef;
use Moose;
extends 'Reflex::Callback';
-use POE::Kernel; # for $poe_kernel
-
has code_ref => (
is => 'ro',
isa => 'CodeRef',
View
@@ -7,26 +7,27 @@ extends 'Reflex::Object';
use Scalar::Util qw(weaken);
has handle => (
- isa => 'IO::Handle',
+ isa => 'FileHandle',
is => 'rw',
+ # TODO - On change, stop the old handle and start the new one.
);
has rd => (
- isa => 'Bool',
- is => 'rw',
- # TODO - On set, change the handle's watcher state.
+ isa => 'Bool',
+ is => 'rw',
+ trigger => \&_changed_rd,
);
has wr => (
- isa => 'Bool',
- is => 'rw',
- # TODO - On set, change the handle's watcher state.
+ isa => 'Bool',
+ is => 'rw',
+ trigger => \&_changed_wr,
);
has ex => (
- isa => 'Bool',
- is => 'rw',
- # TODO - On set, change the handle's watcher state.
+ isa => 'Bool',
+ is => 'rw',
+ trigger => \&_changed_ex,
);
sub BUILD {
@@ -38,22 +39,71 @@ sub start {
my $self = shift;
return unless $self->call_gate("start");
+ # TODO - Repeated code between this and the _changed_rd() etc.
+ # methods. Repeating code is bad, but it's more efficient. Is
+ # there an efficient way to avoid the repetition?
+
my $envelope = [ $self ];
weaken $envelope->[0];
$POE::Kernel::poe_kernel->select_read(
- $self->handle(), 'select_ready', $envelope, 'read'
+ $self->handle(), 'select_ready', $envelope, 'readable'
) if $self->rd();
$POE::Kernel::poe_kernel->select_write(
- $self->handle(), 'select_ready', $envelope, 'write'
+ $self->handle(), 'select_ready', $envelope, 'writable'
) if $self->wr();
$POE::Kernel::poe_kernel->select_expedite(
- $self->handle(), 'select_ready', $envelope, 'expedite'
+ $self->handle(), 'select_ready', $envelope, 'exception'
) if $self->ex();
}
+sub _changed_rd {
+ my ($self, $value) = @_;
+ return unless $self->call_gate("_changed_rd", $value);
+ if ($value) {
+ my $envelope = [ $self ];
+ weaken $envelope->[0];
+ $POE::Kernel::poe_kernel->select_read(
+ $self->handle(), 'select_ready', $envelope, 'readable'
+ );
+ }
+ else {
+ $POE::Kernel::poe_kernel->select_read($self->handle(), undef);
+ }
+}
+
+sub _changed_wr {
+ my ($self, $value) = @_;
+ return unless $self->call_gate("_changed_rd", $value);
+ if ($value) {
+ my $envelope = [ $self ];
+ weaken $envelope->[0];
+ $POE::Kernel::poe_kernel->select_read(
+ $self->handle(), 'select_ready', $envelope, 'writable'
+ );
+ }
+ else {
+ $POE::Kernel::poe_kernel->select_read($self->handle(), undef);
+ }
+}
+
+sub _changed_ex {
+ my ($self, $value) = @_;
+ return unless $self->call_gate("_changed_rd", $value);
+ if ($value) {
+ my $envelope = [ $self ];
+ weaken $envelope->[0];
+ $POE::Kernel::poe_kernel->select_read(
+ $self->handle(), 'select_ready', $envelope, 'exception'
+ );
+ }
+ else {
+ $POE::Kernel::poe_kernel->select_read($self->handle(), undef);
+ }
+}
+
sub stop {
my $self = shift;
return unless $self->call_gate("stop");
@@ -63,6 +113,7 @@ sub stop {
$POE::Kernel::poe_kernel->select_expedite($self->handle(), undef) if $self->ex();
}
+# Part of the POE/Reflex contract.
sub _deliver {
my ($self, $handle, $mode) = @_;
$self->emit(
@@ -79,7 +130,6 @@ sub DEMOLISH {
}
no Moose;
-__PACKAGE__->meta()->make_immutable();
1;
View
@@ -0,0 +1,34 @@
+# A listen/accept server.
+
+package Reflex::Listener;
+use Moose;
+extends 'Reflex::Handle';
+
+sub on_my_readable {
+ my ($self, $args) = @_;
+
+ my $peer = accept(my ($socket), $args->{handle});
+ if ($peer) {
+ $self->emit(
+ event => "accepted",
+ args => {
+ peer => $peer,
+ socket => $socket,
+ }
+ );
+ return;
+ }
+
+ $self->emit(
+ event => "fail",
+ args => {
+ peer => undef,
+ socket => undef,
+ errnum => ($!+0),
+ errstr => "$!",
+ errfun => "accept",
+ },
+ );
+}
+
+1;
View
@@ -6,9 +6,6 @@ with 'Reflex::Role::Object';
# Composes the Reflex::Role::Object into a class.
# Does nothing of its own.
-no Moose;
-__PACKAGE__->meta()->make_immutable();
-
1;
__END__
View
@@ -29,9 +29,6 @@ sub stop_watching {
$self->name(undef);
}
-no Moose;
-__PACKAGE__->meta()->make_immutable();
-
1;
__END__
View
@@ -87,9 +87,6 @@ sub _deliver {
);
}
-no Moose;
-__PACKAGE__->meta()->make_immutable();
-
1;
__END__
@@ -107,9 +107,6 @@ sub on_sigchld_signal {
);
}
-no Moose;
-__PACKAGE__->meta()->make_immutable();
-
1;
__END__
View
@@ -346,6 +346,11 @@ sub emit {
my $event = $args->{event};
my $callback_args = $args->{args} || {};
+ # TODO - Needs consideration:
+ # TODO - Weaken?
+ # TODO - Underscores for Reflex parameters?
+ $callback_args->{_sender} = $self;
+
# Look for self-handling of the event.
# TODO - can() calls are also candidates for caching.
# (AKA: Cache as cache can()?)
@@ -484,6 +489,7 @@ sub ignore {
$observed->stop_observers($self, \@events);
}
else {
+ use Carp qw(cluck); cluck "whaaaa" unless defined $observed;
delete $self->watched_object_events()->{$observed};
delete $self->watched_objects()->{$observed};
$observed->stop_observers($self);
@@ -542,10 +548,6 @@ sub wait {
return $self->promise()->wait();
}
-
-no Moose;
-#__PACKAGE__->meta()->make_immutable();
-
1;
__END__
@@ -36,7 +36,7 @@ after 'BUILD' => sub {
undef;
};
-sub on_remote_read {
+sub on_remote_readable {
my ($self, $args) = @_;
my $remote_address = recv(
@@ -64,6 +64,7 @@ sub send {
[ ],
);
+ # Success!
return if send(
$self->handle()->handle(), # TODO - Ugh!
$args->{datagram},
@@ -86,9 +87,6 @@ sub destruct {
$self->handle(undef);
}
-no Moose;
-#__PACKAGE__->meta()->make_immutable();
-
1;
__END__
View
@@ -100,9 +100,6 @@ sub DEMOLISH {
}
}
-no Moose;
-__PACKAGE__->meta()->make_immutable();
-
1;
__END__
Oops, something went wrong.

0 comments on commit de14455

Please sign in to comment.