Permalink
Browse files

Experimental queued I/O model, like a pared down STREAMS system.

Meant to address ordering issues when I/O races with notifications
like EOF and SIGCHLD, which can happen in pipe/fork/exec situations.
  • Loading branch information...
rcaputo committed May 18, 2011
1 parent 8cbeb92 commit 20c1bb9c51c40c519dd311f1932b47be926972c5
View
@@ -0,0 +1,61 @@
+#!/usr/bin/env perl
+
+use Moose;
+use Reflex::Decoder::Line;
+
+use constant NEWLINE => '<nl>';
+
+my $decoder = Reflex::Decoder::Line->new( newline => NEWLINE );
+$decoder->push_stream("test ", "line ", "one", NEWLINE);
+$decoder->push_stream("test ", "line ", "two", NEWLINE);
+
+# Fails because you can't push a mix of datagrams and streams onto a
+# streams-only decoder.
+#$decoder->push_datagram("datagram one", "datagram two", NEWLINE);
+
+$decoder->push_stream("test ", "line ", "three", NEWLINE);
+$decoder->push_stream("test ", "line ", "four", NEWLINE);
+
+$decoder->push_eof();
+
+use YAML; print YAML::Dump($decoder);
+while (my $next = $decoder->shift()) {
+ use YAML; print YAML::Dump($next);
+ last if $next->isa('Reflex::Codec::Message::Empty');
+}
+
+__END__
+
+% perl -I../lib eg-02-encoding.pl
+--- !!perl/hash:Reflex::Decoder::Line
+messages:
+ - !!perl/hash:Reflex::Codec::Message::Stream
+ is_combinable: 1
+ octets: 'test line one<nl>test line two<nl>test line three<nl>test line four<nl>'
+ priority: 500
+ - !!perl/hash:Reflex::Codec::Message::Eof
+ is_combinable: 0
+ priority: 500
+newline: '<nl>'
+--- !!perl/hash:Reflex::Codec::Message::Datagram
+is_combinable: 0
+octets: test line one
+priority: 500
+--- !!perl/hash:Reflex::Codec::Message::Datagram
+is_combinable: 0
+octets: test line two
+priority: 500
+--- !!perl/hash:Reflex::Codec::Message::Datagram
+is_combinable: 0
+octets: test line three
+priority: 500
+--- !!perl/hash:Reflex::Codec::Message::Datagram
+is_combinable: 0
+octets: test line four
+priority: 500
+--- !!perl/hash:Reflex::Codec::Message::Eof
+is_combinable: 0
+priority: 500
+--- !!perl/hash:Reflex::Codec::Message::Empty
+is_combinable: 0
+priority: 500
@@ -0,0 +1,15 @@
+package Reflex::Codec::Message;
+
+use Moose;
+
+has is_combinable => ( is => 'ro', isa => 'Bool', default => 0 );
+
+# TODO - Currently unused, but eventually "push" will honor priority.
+
+has priority => (
+ is => 'rw',
+ isa => 'Int',
+ default => 500,
+);
+
+1;
@@ -0,0 +1,12 @@
+package Reflex::Codec::Message::Datagram;
+
+use Moose;
+extends 'Reflex::Codec::Message';
+
+has octets => (
+ is => 'rw',
+ isa => 'Str',
+ default => '',
+);
+
+1;
@@ -0,0 +1,6 @@
+package Reflex::Codec::Message::Empty;
+
+use Moose;
+extends 'Reflex::Codec::Message';
+
+1;
@@ -0,0 +1,6 @@
+package Reflex::Codec::Message::Eof;
+
+use Moose;
+extends 'Reflex::Codec::Message';
+
+1;
@@ -0,0 +1,21 @@
+package Reflex::Codec::Message::Stream;
+
+use Moose;
+extends 'Reflex::Codec::Message';
+
+has '+is_combinable' => ( default => 1 );
+
+has octets => (
+ is => 'rw',
+ isa => 'Str',
+ default => '',
+ traits => [ 'String' ],
+ handles => {
+ append => 'append',
+ match => 'match',
+ substr => 'substr',
+ length => 'length',
+ },
+);
+
+1;
View
@@ -0,0 +1,44 @@
+package Reflex::Decoder::Line;
+
+use Moose;
+with 'Reflex::Role::Decoding';
+with 'Reflex::Role::Decoding::Stream';
+
+use Reflex::Codec::Message::Empty;
+
+has newline => ( is => 'rw', isa => 'Str', default => "\x0D\x0A" );
+
+# <doy>
+# # probably the best that's possible at the moment
+# my $header = $obj->match(qr/^(stuff)/);
+# $obj->substr(0, length($header), '');
+# <doy>
+# other than converting it to a scalarref and writing the method by hand
+
+sub shift {
+ my $self = shift;
+
+ return Reflex::Codec::Message::Empty->new() unless
+
+ my $next = $self->messages()->[0];
+ return $self->next_message() unless $next->isa(
+ 'Reflex::Codec::Message::Stream'
+ );
+
+ my $newline = $self->newline();
+ return Reflex::Codec::Message::Incomplete->new() unless (
+ my (@matches) = $next->match(qr/^(.*?)\Q$newline\E/)
+ );
+
+ if ($next->length() > length($matches[0]) + length($newline)) {
+ $next->substr(0, length($matches[0]) + length($newline), '');
+ }
+ else {
+ # Discard our empties.
+ $self->next_message();
+ }
+
+ return Reflex::Codec::Message::Datagram->new(octets => $matches[0]);
+}
+
+1;
@@ -0,0 +1,22 @@
+package Reflex::Role::Decoding;
+use Reflex::Role;
+use Reflex::Codec::Message::Stream;
+use Reflex::Codec::Message::Datagram;
+
+role {
+ my $p = shift;
+
+ has messages => (
+ is => 'rw',
+ isa => 'ArrayRef[Reflex::Codec::Message]',
+ traits => ['Array'],
+ default => sub { [] },
+ handles => {
+ push_message => 'push',
+ has_message => 'count',
+ next_message => 'shift',
+ },
+ );
+};
+
+1;
@@ -0,0 +1,46 @@
+package Reflex::Role::Decoding::Stream;
+use Reflex::Role;
+use Reflex::Codec::Message::Stream;
+use Reflex::Codec::Message::Eof;
+
+role {
+ my $p = shift;
+
+ method push_stream => sub {
+ my $self = shift;
+
+ return unless @_;
+ return unless length(my $data = join "", @_);
+
+ if (
+ $self->has_message() and
+ $self->messages()->[-1]->is_combinable()
+ ) {
+ $self->messages()->[-1]->append($data);
+ return;
+ }
+
+ $self->push_message(
+ Reflex::Codec::Message::Stream->new( octets => $data )
+ );
+
+ return;
+ };
+
+ method push_eof => sub {
+ my $self = shift;
+
+ # Already got one, thanks.
+ return if (
+ $self->has_message() and
+ $self->messages()->[-1]->isa('Reflex::Codec::Message::Eof')
+ );
+
+ $self->push_message(Reflex::Codec::Message::Eof->new());
+
+ undef;
+ };
+};
+
+1;
+

0 comments on commit 20c1bb9

Please sign in to comment.