Skip to content

Commit

Permalink
Merge commit 'origin/impl-prefork'
Browse files Browse the repository at this point in the history
Conflicts:
	benchmarks/ab.pl
	lib/Plack/Server/Standalone.pm
  • Loading branch information
miyagawa committed Oct 4, 2009
2 parents f52e954 + 0b8a70d commit 9f69577
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 64 deletions.
2 changes: 1 addition & 1 deletion benchmarks/ab.pl
Expand Up @@ -15,7 +15,7 @@
my $url = 'http://127.0.0.1/';

my @backends = grep eval "require Plack::Server::$_; 1",
qw( AnyEvent Standalone ServerSimple Coro Danga::Socket );
qw( AnyEvent Standalone Standalone::Prefork ServerSimple Coro Danga::Socket );

warn "Testing implementations: ", join(", ", @backends), "\n";

Expand Down
168 changes: 105 additions & 63 deletions lib/Plack/Server/Standalone.pm
Expand Up @@ -4,16 +4,15 @@ use warnings;

use Plack;
use Plack::HTTPParser qw( parse_http_request );
use Fcntl qw(F_SETFL FNDELAY);
use IO::Socket::INET;
use HTTP::Date;
use HTTP::Status;
use List::Util qw(max sum);
use Plack::Util;
use Plack::Middleware::ContentLength;
use POSIX qw(EAGAIN);
use POSIX qw(EINTR);
use Socket qw(IPPROTO_TCP TCP_NODELAY);
use Time::HiRes qw(time);
use Time::HiRes qw(alarm time);

use constant MAX_REQUEST_SIZE => 131072;
use constant MSWin32 => $^O eq 'MSWin32';
Expand All @@ -23,42 +22,75 @@ our $HasSendFile = !$ENV{PLACK_NO_SENDFILE} && do {
eval { require Sys::Sendfile; 1 };
};

our $HasServerStarter = do {
local $@;
eval { require Server::Starter; 1 };
};

sub new {
my($class, %args) = @_;
bless {
my $self = bless {
host => $args{host} || 0,
port => $args{port} || 8080,
timeout => $args{timeout} || 300,
max_keepalive_reqs => $args{max_keepalive_reqs} || 100,
keepalive_timeout => $args{keepalive_timeout} || 5,
}, $class;

# setup immediately if there is a superdaemon
if ($HasServerStarter && $ENV{SERVER_STARTER_PORT}) {
$self->{listen_sock} = IO::Socket::INET->new(
Proto => 'tcp',
) or die "failed to create socket:$!";
my ($hostport, $fd) = %{Server::Starter::server_ports()};
print STDERR "HOSTPORT: $hostport, fd: $fd\n";
if ($hostport =~ /(.*):(\d+)/) {
$self->{host} = $1;
$self->{port} = $2;
} else {
$self->{port} = $hostport;
}
$self->{listen_sock}->fdopen($fd, 'w')
or die "failed to bind to listening socket:$!";
}

$self;
}

sub run {
my($self, $app) = @_;
$self->setup_listener();
$self->accept_loop($app);
}

$app = Plack::Middleware::ContentLength->wrap($app);

my $listen_sock = IO::Socket::INET->new(
sub setup_listener {
my $self = shift;
$self->{listen_sock} ||= IO::Socket::INET->new(
Listen => SOMAXCONN,
LocalPort => $self->{port},
LocalAddr => $self->{host},
Proto => 'tcp',
ReuseAddr => 1,
) or die "failed to listen to port $self->{port}:$!";

warn "Accepting connections at http://$self->{host}:$self->{port}/\n";
while (1) {
}

sub accept_loop {
# TODO handle $max_reqs_per_child
my($self, $app, $max_reqs_per_child) = @_;
my $proc_req_count = 0;

$app = Plack::Middleware::ContentLength->wrap($app);

while (! defined $max_reqs_per_child || $proc_req_count < $max_reqs_per_child) {
local $SIG{PIPE} = 'IGNORE';
if (my $conn = $listen_sock->accept) {
unless (MSWin32) {
$conn->fcntl(F_SETFL, FNDELAY)
or die "fcntl(FNDELAY) failed:$!";
}
if (my $conn = $self->{listen_sock}->accept) {
$conn->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
or die "setsockopt(TCP_NODELAY) failed:$!";
# we do not compare $req_count with $self->{max_keepalive_reqs} here, since it is an advisory variable and can be overridden by applications
for (my $req_count = 1; ; ++$req_count) {
my $req_count = 0;
while (1) {
++$req_count;
++$proc_req_count;
my $env = {
SERVER_PORT => $self->{port},
SERVER_NAME => $self->{host},
Expand All @@ -73,7 +105,11 @@ sub run {
};

# no need to take care of pipelining since this module is a HTTP/1.0 server
$self->handle_connection($env, $conn, $app, $req_count)
my $may_keepalive = $req_count < $self->{max_keepalive_reqs};
if ($may_keepalive && $max_reqs_per_child && $proc_req_count >= $max_reqs_per_child) {
$may_keepalive = undef;
}
$self->handle_connection($env, $conn, $app, $may_keepalive, $req_count != 0)
or last;
# TODO add special cases for clients with broken keep-alive support, as well as disabling keep-alive for HTTP/1.0 proxies
}
Expand All @@ -82,17 +118,28 @@ sub run {
}

sub handle_connection {
my($self, $env, $conn, $app, $req_count) = @_;
my($self, $env, $conn, $app, $use_keepalive, $is_keepalive) = @_;

my $buf = '';
my $res = [ 400, [ 'Content-Type' => 'text/plain' ], [ 'Bad Request' ] ];

while (1) {
my $rlen = $self->read_timeout($conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf), $req_count == 1 || length($buf) != 0 ? $self->{timeout} : $self->{keepalive_timeout})
or return;
my $rlen = $self->read_timeout(
$conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
$is_keepalive || length($buf) != 0
? $self->{timeout} : $self->{keepalive_timeout},
) or return;
my $reqlen = parse_http_request($buf, $env);
if ($reqlen >= 0) {
# handle request
if ($use_keepalive) {
if (my $c = $env->{HTTP_CONNECTION}) {
$use_keepalive = undef
unless $c =~ /^\s*keep-alive\s*/i;
} else {
$use_keepalive = undef;
}
}
$buf = substr $buf, $reqlen;
if ($env->{CONTENT_LENGTH}) {
# TODO can $conn seek to the begining of body and then set to 'psgi.input'?
Expand Down Expand Up @@ -122,21 +169,22 @@ sub handle_connection {
);

Plack::Util::header_iter($res->[1], sub {
my($k, $v) = @_;
push @lines, "$k: $v\r\n";
my ($k, $v) = @_;
if (lc $k eq 'connection') {
$conn_value = $v;
$use_keepalive = undef
if $use_keepalive && lc $v ne 'keep-alive';
} else {
push @lines, "$k: $v\015\012";
}
});

my $has_cl = Plack::Util::header_exists($res->[1], 'Content-Length');

if ($req_count < $self->{max_keepalive_reqs} && $has_cl && ! defined($conn_value) && ($env->{HTTP_CONNECTION} || '') =~ /keep-alive/i) {
unshift @lines, "Connection: keep-alive\r\n";
$conn_value = "keep-alive";
if ($use_keepalive) {
$use_keepalive = undef
unless Plack::Util::header_exists($res->[1], 'Content-Length');
}
unshift @lines, "HTTP/1.0 $res->[0] @{[ HTTP::Status::status_message($res->[0]) ]}\r\n";
push @lines, "\r\n";
push @lines, "Connection: keep-alive\015\012"
if $use_keepalive;
unshift @lines, "HTTP/1.0 $res->[0] @{[ HTTP::Status::status_message($res->[0]) ]}\015\012";
push @lines, "\015\012";

$self->write_all($conn, join('', @lines), $self->{timeout})
or return;
Expand Down Expand Up @@ -168,49 +216,42 @@ sub handle_connection {
}
}
}
defined($conn_value) && $conn_value =~ /keep-alive/i;
$use_keepalive;
}

# returns 1 if socket is ready, undef on timeout
sub wait_socket {
my ($self, $sock, $is_write, $wait_until) = @_;
do {
my $vec = '';
vec($vec, $sock->fileno, 1) = 1;
if (select($is_write ? undef : $vec, $is_write ? $vec : undef, undef,
max($wait_until - time, 0)) > 0) {
return 1;
sub do_timeout {
my ($self, $cb, $timeout) = @_;
local $SIG{ALRM} = sub {};
my $wait_until = time + $timeout;
alarm($timeout);
my $ret;
while (1) {
if ($ret = $cb->()) {
last;
} elsif (! (! defined($ret) && $! == EINTR)) {
undef $ret;
last;
}
} while (time < $wait_until);
return;
# got EINTR
my $left = $wait_until - time;
last if $left <= 0;
alarm($left + 0.1);
}
alarm(0);
$ret;
}

# returns (positive) number of bytes read, or undef if the socket is to be closed
sub read_timeout {
my ($self, $sock, $buf, $len, $off, $timeout) = @_;
my $wait_until = time + $timeout;
while ($self->wait_socket($sock, undef, $wait_until)) {
if (my $ret = $sock->sysread($$buf, $len, $off)) {
return $ret;
} elsif (! (! defined($ret) && $! == EAGAIN)) {
last;
}
}
return;
$self->do_timeout(sub { $sock->sysread($$buf, $len, $off) }, $timeout);
}

# returns (positive) number of bytes written, or undef if the socket is to be closed
sub write_timeout {
my ($self, $sock, $buf, $len, $off, $timeout) = @_;
my $wait_until = time + $timeout;
while ($self->wait_socket($sock, 1, $wait_until)) {
if (my $ret = $sock->syswrite($buf, $len, $off)) {
return $ret;
} elsif (! (! defined($ret) && $! == EAGAIN)) {
last;
}
}
return;
$self->do_timeout(sub { $sock->syswrite($buf, $len, $off) }, $timeout);
}

# writes all data in buf and returns number of bytes written or undef if failed
Expand All @@ -232,9 +273,10 @@ sub sendfile_all {
my $len = -s $fd;
die "TODO" unless defined $len;
while ($off < $len) {
return
unless $self->wait_socket($sock, 1, time + $timeout);
my $r = Sys::Sendfile::sendfile($sock, $fd, $len - $off, $off);
my $r = $self->do_timeout(
sub { Sys::Sendfile::sendfile($sock, $fd, $len - $off, $off) },
$timeout,
);
return
unless defined $r;
$off += $r;
Expand Down
34 changes: 34 additions & 0 deletions lib/Plack/Server/Standalone/Prefork.pm
@@ -0,0 +1,34 @@
package Plack::Server::Standalone::Prefork;
use strict;
use warnings;

use base qw(Plack::Server::Standalone);
use Parallel::Prefork;

sub new {
my($class, %args) = @_;
my $self = $class->SUPER::new(%args);
$self->{max_workers} = $args{max_workers} || 10;
$self->{max_reqs_per_child} = $args{max_reqs_per_child} || 100;
$self;
}

sub run {
my($self, $app) = @_;
$self->setup_listener();
my $pm = Parallel::Prefork->new({
max_workers => $self->{max_workers},
trap_signals => {
TERM => 'TERM',
HUP => 'TERM',
},
});
while ($pm->signal_received ne 'TERM') {
$pm->start and next;
$self->accept_loop($app, $self->{max_reqs_per_child});
$pm->finish;
}
$pm->wait_all_children;
}

1;
16 changes: 16 additions & 0 deletions t/Plack-Server/standalone-prefork.t
@@ -0,0 +1,16 @@
use strict;
use warnings;
use Test::More;
use Test::Requires {
'HTTP::Parser::XS' => 0,
'Parallel::Prefork' => 0.04,
};

use FindBin;
use Plack;
use Plack::Test::Suite;
$Plack::Test::Suite::BaseDir = "$FindBin::Bin/..";

Plack::Test::Suite->run_server_tests('Standalone::Prefork');
done_testing();

0 comments on commit 9f69577

Please sign in to comment.