Permalink
Browse files

add examples and update the README

  • Loading branch information...
1 parent 0318ca2 commit fc2745e2ca5075f3ddbb517912008e2be4e42a1d @richardhundt committed Jul 3, 2011
Showing with 931 additions and 0 deletions.
  1. +142 −0 README.pod
  2. +184 −0 eg/backend_httpd.pl
  3. +174 −0 eg/bad_backend_httpd.pl
  4. +36 −0 eg/echo_server.pl
  5. +78 −0 eg/http_client.pl
  6. +142 −0 eg/httpd.pl
  7. +175 −0 eg/webapp_httpd.pl
View
@@ -45,6 +45,148 @@ transformations are applied to the data stream.
Each component represents a symmetric coroutine with input/output channels
with which a component communicates with its upstream and downstream peers.
+=head1 EXAMPLE HTTP DAEMON
+
+This is taken from the examples directory (eg/httpd.pl) and implements a
+very simple HTTP Daemon.
+
+ {
+ package My::Parser;
+ use strict;
+ use HTTP::Request;
+ use base qw/Hive::Node/;
+
+ sub intake : In;
+ sub output : Out;
+
+ sub execute {
+ my $self = shift;
+ my $intake = $self->intake;
+ my $output = $self->output;
+ while ( ) {
+ my $buff = '';
+ my $line = $intake->get;
+ last unless defined $line;
+ while ( ) {
+ last unless defined $line;
+ $buff .= $line;
+ last if $line eq "\r\n";
+ $line = $intake->get;
+ }
+ if ( $buff ) {
+ $output->put( HTTP::Request->parse( $buff ) );
+ } else {
+ last;
+ }
+ }
+ $self->output->put( undef );
+ }
+ }
+
+ {
+ package My::Strify;
+
+ use strict;
+ use base qw/Hive::Node/;
+
+ sub intake : In;
+ sub output : Out;
+ sub execute {
+ my $self = shift;
+ my $intake = $self->intake;
+ my $output = $self->output;
+ while ( ) {
+ my $resp = $intake->get;
+ last unless defined $resp;
+ $resp->content_length( length( $resp->content ) );
+ $output->put( $resp->as_string("\r\n") );
+ }
+ $self->output->put( undef );
+ }
+ }
+
+ {
+ package My::WebApp;
+
+ use strict;
+
+ use HTTP::Response;
+
+ use base qw/Hive::Node/;
+ sub intake : In;
+ sub output : Out;
+ sub execute {
+ my $self = shift;
+ my $count = 0;
+ my $intake = $self->intake;
+ my $output = $self->output;
+ while ( ) {
+ my $req = $intake->get;
+ last unless defined $req;
+ my $rsp = HTTP::Response->new;
+ $rsp->code( 200 );
+ $rsp->protocol( 'HTTP/1.0' );
+ $rsp->header( Connection => 'keep-alive' );
+ $rsp->content(<<HTML);
+ <html>
+ <body><h2>Hello World! $count</h2></body>
+ </html>
+ HTML
+ $output->put( $rsp );
+ $count++;
+ }
+ $self->output->put( undef );
+ }
+ }
+
+ {
+ package My::HTTPD;
+ use strict;
+
+ use Coro;
+ use Coro::Socket;
+ use Hive::Read;
+ use Hive::Write;
+ use Hive::Muldex;
+
+ use base qw/Hive::Node/;
+
+ sub output : Out;
+ sub intake : In;
+
+ sub execute {
+ my ( $self, %opts ) = @_;
+ my $listen = Coro::Socket->new(
+ LocalHost => 'localhost',
+ LocalPort => 8080,
+ ReuseAddr => 1,
+ Listen => 1024,
+ ) or die $!;
+
+ while ( ) {
+ my $socket = $listen->accept;
+ next unless $socket;
+ my $reader = Hive::Read->new( "\r\n" );
+ my $writer = Hive::Write->new;
+
+ my $webapp = My::WebApp->new;
+ my $parser = My::Parser->new;
+ my $strify = My::Strify->new;
+
+ $reader->handle->put( $socket );
+ $writer->handle->put( $socket );
+
+ $reader->output->to( $parser->intake );
+ $parser->output->to( $webapp->intake );
+ $webapp->output->to( $strify->intake );
+ $strify->output->to( $writer->intake );
+ }
+ }
+ }
+
+ my $httpd = My::HTTPD->new;
+ $httpd->join;
+
=head1 AUTHOR
Richard Hundt
View
@@ -0,0 +1,184 @@
+use lib './lib';
+
+use strict;
+use warnings;
+
+use Coro;
+
+my $PORT = $ARGV[0] || 8188;
+
+{
+ package My::Parser;
+ use strict;
+ use HTTP::Request;
+ use base qw/Hive::Node/;
+
+ sub intake : In;
+ sub output : Out;
+ sub execute {
+ my $self = shift;
+ my $intake = $self->intake;
+ my $output = $self->output;
+ while ( ) {
+ my $buff = '';
+ my $line = $intake->get;
+ last unless defined $line;
+ while ( ) {
+ last unless defined $line;
+ $buff .= $line;
+ last if $line eq "\r\n";
+ $line = $intake->get;
+ }
+ if ( $buff ) {
+ $output->put( HTTP::Request->parse( $buff ) );
+ } else {
+ last;
+ }
+ }
+ $self->output->put( undef );
+ }
+}
+
+{
+ package My::Strify;
+
+ use strict;
+ use base qw/Hive::Node/;
+
+ sub intake : In;
+ sub output : Out;
+ sub execute {
+ my $self = shift;
+ my $intake = $self->intake;
+ my $output = $self->output;
+ while ( ) {
+ my $resp = $intake->get;
+ last unless defined $resp;
+ $resp->content_length( length( $resp->content ) );
+ $output->put( $resp->as_string("\r\n") );
+ }
+ $self->output->put( undef );
+ }
+}
+
+{
+ package My::Write;
+
+ use strict;
+
+ use Errno;
+ use Coro::Event;
+ use Coro::Handle ();
+
+ use base qw/Hive::Node/;
+
+ sub handle : In;
+ sub intake : In;
+
+ sub execute {
+ my $self = shift;
+ my $file = $self->handle->get;
+ unless ( UNIVERSAL::isa( $file, 'Coro::Handle' ) ) {
+ $file = Coro::Handle::unblock( $file );
+ }
+ my $data;
+ while ( ) {
+ $data = $self->intake->get;
+ last unless defined $data;
+ $file->print( $data );
+ }
+ $file->close;
+ }
+}
+
+
+{
+ package My::WebApp;
+
+ use strict;
+
+ use HTTP::Response;
+
+ use base qw/Hive::Node/;
+ sub intake : In;
+ sub output : Out;
+ sub execute {
+ my $self = shift;
+ my $socket = shift;
+ my $intake = $self->intake;
+ my $output = $self->output;
+ while ( ) {
+ my $req = $intake->get;
+ last unless defined $req;
+ warn "got request: ".$req->as_string();
+
+ my $len = $req->header('Content-Length');
+ my $got;
+ if ( $len ) {
+ my $buf = '';
+ while ($len > 0) {
+ $got = $socket->read($buf, $len);
+ last unless defined $got;
+ $len -= $got;
+ }
+ }
+ warn "done reading: got => '$got', len => '$len'";
+
+ my $rsp = HTTP::Response->new;
+ $rsp->code( 201 );
+ $rsp->protocol( 'HTTP/1.0' );
+ $rsp->header( Connection => 'keep-alive' );
+ $output->put( $rsp );
+ }
+ $self->output->put( undef );
+ }
+}
+
+{
+ package My::HTTPD;
+ use strict;
+
+ use Coro;
+ use Coro::Event;
+ use Coro::Socket;
+ use Hive::Read;
+ use Hive::Write;
+ use Hive::Muldex;
+
+ use base qw/Hive::Node/;
+
+ sub output : Out;
+ sub intake : In;
+
+ sub execute {
+ my ( $self, %opts ) = @_;
+ my $listen = Coro::Socket->new(
+ LocalHost => 'localhost',
+ LocalPort => $PORT,
+ ReuseAddr => 1,
+ Listen => 1024,
+ ) or die $!;
+
+ while ( ) {
+ my $socket = $listen->accept;
+ next unless $socket;
+ my $reader = Hive::Read->new( "\r\n" );
+ my $writer = My::Write->new;
+
+ my $webapp = My::WebApp->new( $socket );
+ my $parser = My::Parser->new;
+ my $strify = My::Strify->new;
+
+ $reader->handle->put( $socket );
+ $writer->handle->put( $socket );
+
+ $reader->output->to( $parser->intake );
+ $parser->output->to( $webapp->intake );
+ $webapp->output->to( $strify->intake );
+ $strify->output->to( $writer->intake );
+ }
+ }
+}
+
+my $httpd = My::HTTPD->new;
+$httpd->join;
Oops, something went wrong. Retry.

0 comments on commit fc2745e

Please sign in to comment.