Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Commit experimental things for others to consider.

  • Loading branch information...
commit a94504ff16cdb05c11469e20c4dcd38b5eab9832 1 parent 4dba602
@rcaputo authored
View
83 eg/proto/Sidecar.pm
@@ -0,0 +1,83 @@
+package Sidecar;
+# vim: ts=2 sw=2 noexpandtab
+
+# "Sidecar" is what I call a subprocess that handles a particular
+# object. The analogy is to motorcycle sidecars.
+
+use warnings;
+use strict;
+
+use Storable qw(nfreeze thaw);
+
+sub BUILD {
+ use IPC::Run qw(start);
+ use Symbol qw(gensym);
+
+ my ($fh_in, $fh_out, $fh_err) = (gensym(), gensym(), gensym());
+
+ my $ipc_run = start(
+ $cmd,
+ '<pipe', $fh_in,
+ '>pipe', $fh_out,
+ '2>pipe', $fh_err,
+ ) or die "IPC::Run start() failed: $? ($!)";
+
+ return($ipc_run, $fh_in, $fh_out, $fh_err);
+}
+
+sub _sidecar_drive {
+ my $self = shift;
+
+ my $buffer = "";
+ my $read_length;
+
+ binmode(STDIN);
+ binmode(STDOUT);
+ select STDOUT; $| = 1;
+
+ use bytes;
+
+ while (1) {
+ if (defined $read_length) {
+ if (length($buffer) >= $read_length) {
+ my $request = thaw(substr($buffer, 0, $read_length, ""));
+ $read_length = undef;
+
+ my ($request_id, $context, $method, @args) = @$request;
+
+ my $streamable;
+
+ if ($context eq "array") {
+ my (@return) = eval { $self->$method(@args); };
+ $streamable = nfreeze( [ $request_id, $context, $@, @return ] );
+ }
+ elsif ($context eq "scalar") {
+ my $return = eval { $self->$method(@args); };
+ $streamable = nfreeze( [ $request_id, $context, $@, $return ] );
+ }
+ else {
+ eval { $self->$method(@args); undef; };
+ $streamable = nfreeze( [ $request_id, $context, $@ ] );
+ }
+
+ my $stream = length($streamable) . chr(0) . $streamable;
+
+ my $octets_wrote = syswrite(STDOUT, $stream);
+ die $! unless $octets_wrote == length($stream);
+
+ next;
+ }
+ }
+ elsif ($buffer =~ s/^(\d+)\0//) {
+ $read_length = $1;
+ next;
+ }
+
+ my $octets_read = sysread(STDIN, $buffer, 4096, length($buffer));
+ last unless $octets_read;
+ }
+
+ exit 0;
+}
+
+1;
View
22 eg/proto/eg-52-subclassed-timeout.pl
@@ -0,0 +1,22 @@
+#!/usr/bin/env perl
+# vim: ts=2 sw=2 noexpandtab
+
+use warnings;
+use strict;
+use lib qw(./lib ../lib ./eg);
+
+{
+ package Foo;
+ use Moose;
+ extends 'Reflex::Timeout';
+ use ExampleHelpers qw(eg_say);
+
+ sub on_done {
+ eg_say "custom got timeout";
+ $_[0]->reset();
+ }
+}
+
+my $to = Foo->new(delay => 1);
+Reflex->run_all();
+exit;
View
107 eg/proto/eg-61-run-collection.pl
@@ -0,0 +1,107 @@
+# vim: ts=2 sw=2 noexpandtab
+# This is a quick, one-off implementation of a one-shot worker pool.
+# Give it some jobs, and it'll run them all in parallel. It will
+# return results in the order of completion.
+#
+# It doesn't use the proposed collection promise.
+# It doesn't limit simultaneous workers.
+# It doesn't implement a generic Enterprise Integration Pattern.
+# In short, it does almost nothing generically useful.
+#
+# It does, however, act as an example of Reflex::POE::Wheel::Run used
+# for a practical purpose.
+
+use lib qw(../lib);
+
+# Start a parallel runner with a list of jobs.
+# ParallelRunner's implementation is below.
+
+my $pr = ParallelRunner->new(
+ jobs => [
+ [ \&adder, 1, 2, 3 ],
+ [ \&multiplier, 4, 5, 6 ],
+ ]
+);
+
+# Consume results until we're done.
+
+while (my $event = $pr->next()) {
+ use YAML;
+ print YAML::Dump($event->{arg}{result});
+}
+
+exit;
+
+# Jobs to run.
+
+sub adder {
+ use Time::HiRes qw(sleep); sleep rand();
+
+ my $accumulator = 0;
+ $accumulator += $_ foreach @_;
+ return [ adder => $accumulator ];
+}
+
+sub multiplier {
+ use Time::HiRes qw(sleep); sleep rand();
+
+ my $accumulator = 1;
+ $accumulator *= $_ foreach @_;
+ return [ multiplier => $accumulator ];
+}
+
+# Implementation of the ParallelRunner.
+
+BEGIN {
+ package ParallelRunner;
+
+ use Moose;
+ extends 'Reflex::Base';
+ use Reflex::Collection;
+ use Reflex::POE::Wheel::Run;
+ use Reflex::Callbacks;
+
+ use POE::Filter::Line;
+ use POE::Filter::Reference;
+
+ has jobs => (
+ isa => 'ArrayRef[ArrayRef]',
+ is => 'ro',
+ );
+
+ has_many workers => ( handles => { remember_worker => "remember" } );
+
+ sub BUILD {
+ my ($self, $args) = @_;
+
+ foreach my $job (@{$self->jobs()}) {
+ my ($coderef, @parameters) = @$job;
+
+ $self->remember_worker(
+ Reflex::POE::Wheel::Run->new(
+ Program => sub {
+ my $f = POE::Filter::Reference->new();
+ my $output = $f->put( [ $coderef->(@parameters) ] );
+ syswrite(STDOUT, $_) foreach @$output;
+ close STDOUT;
+ },
+ StdoutFilter => POE::Filter::Reference->new(),
+ cb_role($self, "child"),
+ )
+ );
+ }
+ }
+
+ sub on_child_stderr {
+ warn "child reported: $_[1]{output}\n";
+ }
+
+ sub on_child_stdout {
+ my ($self, $args) = @_;
+
+ $self->emit(
+ event => 'result',
+ args => { result => $args->{output} },
+ );
+ }
+}
View
45 eg/proto/leonerd-resolver-poe.pl
@@ -0,0 +1,45 @@
+#!/usr/bin/perl
+# vim: ts=2 sw=2 noexpandtab
+
+use strict;
+use warnings;
+use feature qw( say );
+
+use Socket qw( AF_INET unpack_sockaddr_in inet_ntoa );
+use Socket::GetAddrInfo qw( :newapi getaddrinfo );
+
+sub format_addr {
+ my ($port, $inaddr) = unpack_sockaddr_in $_[0];
+ sprintf "%s:%d", inet_ntoa($inaddr), $port;
+}
+
+use POE qw( Session Kernel Wheel::ReadWrite Wheel::Run Filter::Reference );
+
+{
+ my $wheel_resolver;
+
+ POE::Session->create(
+ inline_states => {
+ _start => sub {
+ $wheel_resolver = POE::Wheel::Run->new(
+ Program => sub {
+ my ($err, @addrs) =
+ getaddrinfo("localhost", "www", {family => AF_INET});
+ die "$err" if $err;
+ print @{POE::Filter::Reference->new->put([$addrs[0]])};
+ },
+ StdoutFilter => POE::Filter::Reference->new,
+ StdoutEvent => 'resolver_input',
+ StderrEvent => 'resolver_error',
+ );
+ },
+
+ resolver_input =>
+ sub { say "POE resolved " . format_addr($_[ARG0]->{addr}) },
+ resolver_error => sub { say "POE resolver error $_[ARG0]" },
+ },
+ );
+}
+
+POE::Kernel->run;
+
View
22 eg/proto/sidecar.pl
@@ -0,0 +1,22 @@
+#!/usr/bin/env perl
+# vim: ts=2 sw=2 noexpandtab
+
+use Moose;
+use Sidecar;
+
+{
+ package BlockingService;
+
+ use Moose;
+
+ has counter => ( is => 'rw', isa => 'Int', default => 0 );
+
+ sub next {
+ my $self = shift;
+
+ return "pid($$) counter = " . $self->counter( $self->counter() + 1 );
+ }
+}
+
+my $scbs = Sidecar->new(class => 'BlockingService');
+
View
70 eg/proto/test-observer.pl
@@ -0,0 +1,70 @@
+#!/usr/bin/perl
+# vim: ts=2 sw=2 noexpandtab
+
+use warnings;
+use strict;
+use lib qw(../lib);
+
+# Demonstrate how wheels may be encapsulated in thin,
+# configuration-only subclasses.
+
+{
+ package Runner;
+ use Moose;
+ extends 'Reflex::Base';
+ use Reflex::POE::Wheel::Run;
+ use Reflex::Trait::Watched qw(watches);
+
+ watches child => (
+ isa => 'Maybe[Reflex::POE::Wheel::Run]',
+ );
+
+ sub BUILD {
+ my $self = shift;
+ $self->child(
+ Reflex::POE::Wheel::Run->new(
+ Program => "$^X -wle '\$|=1; while (<STDIN>) { chomp; print qq[pid(\$\$) moo(\$_)] } exit'",
+ )
+ );
+
+ $self->child()->put("one", "two", "three", "last");
+ }
+
+ sub on_child_stdin {
+ print "stdin flushed\n";
+ }
+
+ sub on_child_stdout {
+ my ($self, $args) = @_;
+ print "stdout: $args->{output}\n";
+ $self->child()->kill() if $args->{output} =~ /moo\(last\)/;
+ }
+
+ sub on_child_stderr {
+ my ($self, $args) = @_;
+ print "stderr: $args->{output}\n";
+ }
+
+ sub on_child_error {
+ my ($self, $args) = @_;
+ return if $args->{operation} eq "read";
+ print "$args->{operation} error $args->{errnum}: $args->{errstr}\n";
+ }
+
+ sub on_child_close {
+ my ($self, $args) = @_;
+ print "child closed all output\n";
+ }
+
+ sub on_child_signal {
+ my ($self, $args) = @_;
+ print "child $args->{pid} exited: $args->{exit}\n";
+ $self->child(undef);
+ }
+}
+
+# Main.
+
+my $runner = Runner->new();
+Reflex->run_all();
+exit;
View
47 lib/Reflex/Encoder/Line.pm
@@ -0,0 +1,47 @@
+package Reflex::Encoder::Line;
+# vim: ts=2 sw=2 noexpandtab
+
+use Moose;
+with 'Reflex::Role::Encoding';
+use Reflex::Codec::Message;
+
+use Scalar::Util qw(blessed);
+
+has newline => ( is => 'rw', isa => 'Str', default => "\x0D\x0A" );
+
+# Data translation is lazy.
+# Translation happens when data is shifted off the encoding buffer.
+# The original data is available as long as possible.
+# Protocol swapping requires this.
+
+
+sub push_data {
+ my $self = shift;
+
+ if (
+ $self->_has_messages() and
+ $self->messages()->[-1]->is_combinable()
+ ) {
+ $self->messages()->[-1]->append_data(@_);
+ return;
+ }
+
+ $self->push_message(Reflex::Codec::Message::Stream->new(data => $_))
+ foreach @_;
+
+ return;
+}
+
+sub shift {
+ my $self = shift;
+
+ return unless defined(my $next = $self->_shift());
+
+ if (defined(my $next_data = $next->data())) {
+ $next->data($next_data . $self->newline());
+ }
+
+ return $next;
+}
+
+1;
View
23 lib/Reflex/Role/Decoding/Datagram.pm
@@ -0,0 +1,23 @@
+package Reflex::Role::Decoding::Datagram;
+# vim: ts=2 sw=2 noexpandtab
+
+use Reflex::Role;
+use Reflex::Codec::Message::Datagram;
+
+role {
+ my $p = shift;
+
+ method push_datagram => sub {
+ my $self = shift;
+
+ return unless @_;
+
+ $self->push_message(
+ Reflex::Codec::Message::Datagram->new( octets => $_ )
+ ) foreach @_;
+
+ return;
+ };
+};
+
+1;
View
44 lib/Reflex/Role/Encoding.pm
@@ -0,0 +1,44 @@
+package Reflex::Role::Encoding;
+# vim: ts=2 sw=2 noexpandtab
+
+use Reflex::Role;
+use Reflex::Codec::Message;
+use Reflex::Codec::Message::Eof;
+
+role {
+ my $p = shift;
+
+ has buffer => (
+ is => 'rw',
+ isa => 'ArrayRef[Reflex::Codec::Message]',
+ traits => ['Array'],
+ default => sub { [] },
+ handles => {
+ _push => 'push',
+ _shift => 'shift',
+ _has_message => 'count',
+ },
+ );
+
+ method push_eof => sub {
+ my $self = shift;
+ $self->push(Reflex::Codec::Message::Eof->new());
+ };
+
+ method push_message => sub {
+ my ($self, $message) = @_;
+
+ if (
+ $self->_has_message() and
+ $message->is_combinable() and
+ $self->messages()->[-1]->is_combinable()
+ ) {
+ $self->messages()->[-1]->absorb($message);
+ return;
+ }
+
+ $self->_push($message);
+ };
+};
+
+1;

0 comments on commit a94504f

Please sign in to comment.
Something went wrong with that request. Please try again.