Skip to content

Commit

Permalink
Merge branch 'topic/chunked-input'
Browse files Browse the repository at this point in the history
  • Loading branch information
miyagawa committed Feb 9, 2010
2 parents ff11b86 + 736f54f commit 626a0d4
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 17 deletions.
1 change: 1 addition & 0 deletions benchmarks/ab.pl
Expand Up @@ -23,6 +23,7 @@
[ 'Coro' ],
[ 'Danga::Socket' ],
[ 'POE' ],
[ 'Nomo' ],
);

my @backends;
Expand Down
55 changes: 54 additions & 1 deletion lib/HTTP/Message/PSGI.pm
Expand Up @@ -33,7 +33,18 @@ sub req_to_psgi {
# Plack::Request :/
utf8::downgrade $$uri;

open my $input, "<", \do { $req->content };
my $input;
my $content = $req->content;
if (ref $content eq 'CODE') {
if (defined $req->content_length) {
$input = HTTP::Message::PSGI::ChunkedInput->new($content);
} else {
$req->header("Transfer-Encoding" => "chunked");
$input = HTTP::Message::PSGI::ChunkedInput->new($content, 1);
}
} else {
open $input, "<", \$content;
}

my $env = {
PATH_INFO => URI::Escape::uri_unescape($uri->path),
Expand Down Expand Up @@ -138,6 +149,48 @@ sub HTTP::Response::from_psgi {
res_from_psgi(@_);
}

package
HTTP::Message::PSGI::ChunkedInput;

sub new {
my($class, $content, $chunked) = @_;

my $content_cb;
if ($chunked) {
my $done;
$content_cb = sub {
my $chunk = $content->();
return if $done;
unless (defined $chunk) {
$done = 1;
return "0\015\012\015\012";
}
return '' unless length $chunk;
return sprintf('%x', length $chunk) . "\015\012$chunk\015\012";
};
} else {
$content_cb = $content;
}

bless { content => $content_cb }, $class;
}

sub read {
my $self = shift;

my $chunk = $self->{content}->();
return 0 unless defined $chunk;

$_[0] = '';
substr($_[0], $_[2] || 0, length $chunk) = $chunk;

return length $chunk;
}

sub close { }

package HTTP::Message::PSGI;

1;

__END__
Expand Down
28 changes: 19 additions & 9 deletions lib/HTTP/Server/PSGI.pm
Expand Up @@ -171,16 +171,26 @@ sub handle_connection {
}
}
$buf = substr $buf, $reqlen;
if ($env->{CONTENT_LENGTH}) {
# TODO can $conn seek to the begining of body and then set to 'psgi.input'?
while (length $buf < $env->{CONTENT_LENGTH}) {
$self->read_timeout($conn, \$buf, $env->{CONTENT_LENGTH} - length($buf), length($buf), $self->{timeout})
or return;
}
}

open my $input, "<", \$buf;
$env->{'psgi.input'} = $input;
$env->{'psgi.input'} = Plack::Util::inline_object
read => sub {
my(undef, $length, $offset) = @_;
my $read;
if (my $buflen = length $buf) {
$read = $length < $buflen ? $length : $buflen;
$_[0] = substr $buf, 0, $read;
$buf = substr $buf, $read;
$length -= $read;
$offset += $read;
}
if ($length > 0) {
my $rlen = $self->read_timeout($conn, \$_[0], $length, $offset, $self->{timeout});
$read += $rlen if $rlen;
}
return $read;
},
close => sub { };

$res = Plack::Util::run_app $app, $env;
last;
}
Expand Down
150 changes: 150 additions & 0 deletions lib/Plack/Loader/GatewayCGI.pm
@@ -0,0 +1,150 @@
# This library is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.

package Plack::Loader::GatewayCGI;

use strict;
use warnings;

our $VERSION = '0.0001';

use IO::Socket::INET;
use Plack::Request;
use LWP::UserAgent;

use parent qw( Plack::Loader );

our $LIVETIME = 90;

sub run {
my ( $self, $server, $builder ) = @_;

if ( $server->isa('Plack::Handler::CGI') || $server->isa('Plack::Server::CGI') ) {
$server->run( $builder->() );
}
else {
my $cgiserver = $self->load('CGI');
my ( $host, $port ) = $self->gethostandport( $server );
my $proxy = $self->make_proxy( $host, $port );

if ( $self->live_server( $host, $port ) ) {
$cgiserver->run( $proxy );
}
else {
my $pid = fork();
if ( $pid ) {
$cgiserver->run( $proxy );
}
elsif ( $pid == 0 ) {
$self->run_server( $server, $builder );
}
else {
die "Cannot running backend server.";
}
}
}
}

sub live_server {
my ( $self, $host, $port ) = @_;

my $sock = IO::Socket::INET->new(
PeerAddr => $host || '127.0.0.1',
PeerHost => $port,
Proto => 'tcp',
Timeout => 10,
);

if ( $sock ) {
$sock->close;
return 1;
}

return 0;
}

our %CONFIG_GETTER = (
AnyEvent => sub { return @{ $_[0] }{qw( host port )} },
Coro => sub { return @{ $_[0] }{qw( host port )} },
POE => sub { return @{ $_[0] }{qw( host port )} },
ServerSimple => sub { return @{ $_[0] }{qw( host port )} },
Standalone => sub {
my ( $server ) = @_;
if ( $server->can('_server') ) {
return @{ $server->{'args'} }{qw( host port )},
}
else {
return @{ $server }{qw( host port )},
}
},
);

sub gethostandport {
my ( $self, $server ) = @_;

for my $impl ( keys %CONFIG_GETTER ) {
my @classes = (
"Plack::Handler::${impl}",
"Plack::Server::${impl}",
);

for my $class ( @classes ) {
if ( $server->isa($class) ) {
return $CONFIG_GETTER{$impl}->( $server );
}
}
}

die "Cannot getting server host and port.";
}

sub run_server {
my ( $self, $server, $builder ) = @_;

my $pid = fork;
if ( $pid ) {
sleep $LIVETIME;
warn "Killing backend server (pid: ${pid})";
kill INT => $pid;
waitpid( $pid, 0 );
warn "Killed backend server.";
}
elsif ( $pid == 0 ) {
warn "Backend server start.";
$server->run( $builder->() );
}
else {
die "Cannot fork server killer";
}
}

sub make_proxy {
my ( $self, $host, $port ) = @_;
my $ua = LWP::UserAgent->new;

return sub {
my $req = Plack::Request->new(shift);

for ( qw( Connection Keep-Alive Proxy-Authenticate Proxy-Authorization
TE Trailers Transfer-Encoding Upgrade Proxy-Connection Public ) ) {
$req->headers->remove_header($_);
}

$req->headers->scan(sub {
my ( $key, $value ) = @_;
$req->headers->remove_header($key);
});

my $uri = $req->uri;
$uri->host( $host );
$uri->port( $port );

my $res = $ua->request( HTTP::Request->new(
$req->method, $uri, $req->headers, $req->body,
) );

return $req->new_response( $res->code, $res->headers, $res->content )->finalize;
};
}

1;
83 changes: 83 additions & 0 deletions lib/Plack/Middleware/Dechunk.pm
@@ -0,0 +1,83 @@
package Plack::Middleware::Dechunk;
use strict;
use parent qw(Plack::Middleware);

use Plack::TempBuffer;
use constant CHUNK_SIZE => 1024 * 32;

sub call {
my($self, $env) = @_;
no warnings;
if ( $env->{HTTP_TRANSFER_ENCODING} eq 'chunked'
&& ($env->{REQUEST_METHOD} eq 'POST' || $env->{REQUEST_METHOD} eq 'PUT')) {
$self->dechunk_input($env);
}

$self->app->($env);
}

sub dechunk_input {
my($self, $env) = @_;

my $buffer = Plack::TempBuffer->new;
my $chunk_buffer;
my $length;

DECHUNK:
while (1) {
my $read = $env->{'psgi.input'}->read($chunk_buffer, CHUNK_SIZE, length $chunk_buffer);

while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
my $trailer = $1;
my $chunk_len = hex $2;

if ($chunk_len == 0) {
last DECHUNK;
} elsif (length $chunk_buffer < $chunk_len) {
$chunk_buffer = $trailer . $chunk_buffer;
last;
}

$buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
$chunk_buffer =~ s/^\015\012//;

$length += $chunk_len;
}

last unless $read && $read > 0;
}

delete $env->{HTTP_TRANSFER_ENCODING};
$env->{CONTENT_LENGTH} = $length;
$env->{'psgi.input'} = $buffer->rewind;
}

1;

__END__
=head1 NAME
Plack::Middleware::Dechunk - Decode chunked (TE: chunked) request body
=head1 SYNOPSIS
# This should be used in Servers as a library
=head1 DESCRIPTION
This middleware checks if an incoming request is chunked, and in that
case decodes the request body and buffers the whole output, and sets
the IO (either with PerlIO or with a temp filehandle) to
C<psgi.input>. It also sets I<Content-Length> header so your
application can work transparently.
=head1 AUTHOR
Tatsuhiko Miyagawa
=head1 SEE ALSO
L<HTTP::Body>
=cut
25 changes: 21 additions & 4 deletions lib/Plack/TempBuffer.pm
Expand Up @@ -11,15 +11,30 @@ sub new {

# $MaxMemoryBufferSize = 0 -> Always temp file
# $MaxMemoryBufferSize = -1 -> Always PerlIO
if ($length && $MaxMemoryBufferSize >= 0 && $length > $MaxMemoryBufferSize) {
Plack::Util::load_class('File', $class)->new($length);
my $backend;
if ($MaxMemoryBufferSize < 0) {
$backend = "PerlIO";
} elsif ($MaxMemoryBufferSize == 0) {
$backend = "File";
} elsif (!$length) {
$backend = "Auto";
} elsif ($length > $MaxMemoryBufferSize) {
$backend = "File";
} else {
Plack::Util::load_class('PerlIO', $class)->new;
$backend = "PerlIO";
}

$class->create($backend, $length, $MaxMemoryBufferSize);
}

sub create {
my($class, $backend, $length, $max) = @_;
Plack::Util::load_class($backend, $class)->new($length, $max);
}

sub print;
sub rewind;
sub size;

1;

Expand All @@ -33,7 +48,9 @@ Plack::TempBuffer - temporary buffer to save bytes
my $buf = Plack::TempBuffer->new($length);
$buf->print($bytes);
my $fh = $buf->rewind;
my $size = $buf->size;
my $fh = $buf->rewind;
=head1 DESCRIPTION
Expand Down

0 comments on commit 626a0d4

Please sign in to comment.