Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

533 lines (443 sloc) 16.648 kb
package Starman::Server;
use strict;
use base 'Net::Server::PreFork';
use Data::Dump qw(dump);
use Socket qw(IPPROTO_TCP TCP_NODELAY);
use IO::Socket qw(:crlf);
use HTTP::Parser::XS qw(parse_http_request);
use HTTP::Status qw(status_message);
use HTTP::Date qw(time2str);
use Symbol;
use Plack::Util;
use Plack::TempBuffer;
use constant DEBUG => $ENV{STARMAN_DEBUG} || 0;
use constant CHUNKSIZE => 64 * 1024;
use constant READ_TIMEOUT => 5;
my $null_io = do { open my $io, "<", \""; $io };
use Net::Server::SIG qw(register_sig);
# Override Net::Server's HUP handling - just restart all the workers and that's about it
sub sig_hup {
my $self = shift;
$self->hup_children;
}
sub run {
my($self, $app, $options) = @_;
$self->{app} = $app;
$self->{options} = $options;
my %extra = ();
if ( $options->{pid} ) {
$extra{pid_file} = $options->{pid};
}
if ( $options->{daemonize} ) {
$extra{setsid} = $extra{background} = 1;
}
if (! exists $options->{keepalive}) {
$options->{keepalive} = 1;
}
if (! exists $options->{keepalive_timeout}) {
$options->{keepalive_timeout} = 1;
}
my($host, $port, $proto);
for my $listen (@{$options->{listen} || [ "$options->{host}:$options->{port}" ]}) {
if ($listen =~ /:/) {
my($h, $p) = split /:/, $listen, 2;
push @$host, $h || '*';
push @$port, $p;
push @$proto, 'tcp';
} else {
push @$host, 'localhost';
push @$port, $listen;
push @$proto, 'unix';
}
}
my $workers = $options->{workers} || 5;
local @ARGV = (@{$options->{argv} || []});
$self->SUPER::run(
port => $port,
host => $host,
proto => $proto,
serialize => 'flock',
log_level => DEBUG ? 4 : 2,
($options->{error_log} ? ( log_file => $options->{error_log} ) : () ),
min_servers => $options->{min_servers} || $workers,
min_spare_servers => $options->{min_spare_servers} || $workers - 1,
max_spare_servers => $options->{max_spare_servers} || $workers - 1,
max_servers => $options->{max_servers} || $workers,
max_requests => $options->{max_requests} || 1000,
user => $options->{user} || $>,
group => $options->{group} || $),
listen => $options->{backlog} || 1024,
check_for_waiting => 1,
no_client_stdout => 1,
%extra
);
}
sub pre_loop_hook {
my $self = shift;
my $host = $self->{server}->{host}->[0];
my $port = $self->{server}->{port}->[0];
$self->{options}{server_ready}->({
host => $host,
port => $port,
proto => $port =~ /unix/ ? 'unix' : 'http',
server_software => 'Starman',
}) if $self->{options}{server_ready};
register_sig(
TTIN => sub { $self->{server}->{$_}++ for qw( min_servers max_servers ) },
TTOU => sub { $self->{server}->{$_}-- for qw( min_servers max_servers ) },
QUIT => sub { $self->server_close(1) },
);
}
sub server_close {
my($self, $quit) = @_;
if ($quit) {
$self->log(2, $self->log_time . " Received QUIT. Running a graceful shutdown\n");
$self->{server}->{$_} = 0 for qw( min_servers max_servers );
$self->hup_children;
while (1) {
Net::Server::SIG::check_sigs();
$self->coordinate_children;
last if !keys %{$self->{server}{children}};
sleep 1;
}
$self->log(2, $self->log_time . " Worker processes cleaned up\n");
}
$self->SUPER::server_close();
}
sub run_parent {
my $self = shift;
$0 = "starman master " . join(" ", @{$self->{options}{argv} || []});
no warnings 'redefine';
local *Net::Server::PreFork::register_sig = sub {
my %args = @_;
delete $args{QUIT};
Net::Server::SIG::register_sig(%args);
};
$self->SUPER::run_parent(@_);
}
# The below methods run in the child process
sub child_init_hook {
my $self = shift;
srand();
if ($self->{options}->{psgi_app_builder}) {
DEBUG && warn "[$$] Initializing the PSGI app\n";
$self->{app} = $self->{options}->{psgi_app_builder}->();
}
$0 = "starman worker " . join(" ", @{$self->{options}{argv} || []});
}
sub post_accept_hook {
my $self = shift;
$self->{client} = {
headerbuf => '',
inputbuf => '',
keepalive => 1,
};
}
sub process_request {
my $self = shift;
my $conn = $self->{server}->{client};
if ($conn->NS_proto eq 'TCP') {
setsockopt($conn, IPPROTO_TCP, TCP_NODELAY, 1)
or die $!;
}
while ( $self->{client}->{keepalive} ) {
last if !$conn->connected;
# Read until we see all headers
last if !$self->_read_headers;
my $env = {
REMOTE_ADDR => $self->{server}->{peeraddr},
REMOTE_HOST => $self->{server}->{peerhost} || $self->{server}->{peeraddr},
SERVER_NAME => $self->{server}->{sockaddr} || 0, # XXX: needs to be resolved?
SERVER_PORT => $self->{server}->{sockport} || 0,
SCRIPT_NAME => '',
'psgi.version' => [ 1, 1 ],
'psgi.errors' => *STDERR,
'psgi.url_scheme' => 'http',
'psgi.nonblocking' => Plack::Util::FALSE,
'psgi.streaming' => Plack::Util::TRUE,
'psgi.run_once' => Plack::Util::FALSE,
'psgi.multithread' => Plack::Util::FALSE,
'psgi.multiprocess' => Plack::Util::TRUE,
'psgix.io' => $conn,
'psgix.input.buffered' => Plack::Util::TRUE,
'psgix.harakiri' => Plack::Util::TRUE,
};
# Parse headers
my $reqlen = parse_http_request(delete $self->{client}->{headerbuf}, $env);
if ( $reqlen == -1 ) {
# Bad request
DEBUG && warn "[$$] Bad request\n";
$self->_http_error(400, { SERVER_PROTOCOL => "HTTP/1.0" });
last;
}
# Initialize PSGI environment
# Determine whether we will keep the connection open after the request
my $connection = delete $env->{HTTP_CONNECTION};
my $proto = $env->{SERVER_PROTOCOL};
if ( $proto && $proto eq 'HTTP/1.0' ) {
if ( $connection && $connection =~ /^keep-alive$/i ) {
# Keep-alive only with explicit header in HTTP/1.0
$self->{client}->{keepalive} = 1;
}
else {
$self->{client}->{keepalive} = 0;
}
}
elsif ( $proto && $proto eq 'HTTP/1.1' ) {
if ( $connection && $connection =~ /^close$/i ) {
$self->{client}->{keepalive} = 0;
}
else {
# Keep-alive assumed in HTTP/1.1
$self->{client}->{keepalive} = 1;
}
# Do we need to send 100 Continue?
if ( $env->{HTTP_EXPECT} ) {
if ( $env->{HTTP_EXPECT} eq '100-continue' ) {
syswrite $conn, 'HTTP/1.1 100 Continue' . $CRLF . $CRLF;
DEBUG && warn "[$$] Sent 100 Continue response\n";
}
else {
DEBUG && warn "[$$] Invalid Expect header, returning 417\n";
$self->_http_error( 417, $env );
last;
}
}
unless ($env->{HTTP_HOST}) {
# No host, bad request
DEBUG && warn "[$$] Bad request, HTTP/1.1 without Host header\n";
$self->_http_error( 400, $env );
last;
}
}
unless ($self->{options}->{keepalive}) {
DEBUG && warn "[$$] keep-alive is disabled. Closing the connection after this request\n";
$self->{client}->{keepalive} = 0;
}
$self->_prepare_env($env);
# Run PSGI apps
my $res = Plack::Util::run_app($self->{app}, $env);
if (ref $res eq 'CODE') {
$res->(sub { $self->_finalize_response($env, $_[0]) });
} else {
$self->_finalize_response($env, $res);
}
DEBUG && warn "[$$] Request done\n";
if ( $self->{client}->{keepalive} ) {
# If we still have data in the input buffer it may be a pipelined request
if ( $self->{client}->{inputbuf} ) {
if ( $self->{client}->{inputbuf} =~ /^(?:GET|HEAD)/ ) {
if ( DEBUG ) {
warn "Pipelined GET/HEAD request in input buffer: "
. dump( $self->{client}->{inputbuf} ) . "\n";
}
# Continue processing the input buffer
next;
}
else {
# Input buffer just has junk, clear it
if ( DEBUG ) {
warn "Clearing junk from input buffer: "
. dump( $self->{client}->{inputbuf} ) . "\n";
}
$self->{client}->{inputbuf} = '';
}
}
DEBUG && warn "[$$] Waiting on previous connection for keep-alive request...\n";
my $sel = IO::Select->new($conn);
last unless $sel->can_read($self->{options}->{keepalive_timeout});
}
}
DEBUG && warn "[$$] Closing connection\n";
}
sub _read_headers {
my $self = shift;
eval {
local $SIG{ALRM} = sub { die "Timed out\n"; };
alarm( READ_TIMEOUT );
while (1) {
# Do we have a full header in the buffer?
# This is before sysread so we don't read if we have a pipelined request
# waiting in the buffer
last if defined $self->{client}->{inputbuf} && $self->{client}->{inputbuf} =~ /$CRLF$CRLF/s;
# If not, read some data
my $read = sysread $self->{server}->{client}, my $buf, CHUNKSIZE;
if ( !defined $read || $read == 0 ) {
die "Read error: $!\n";
}
if ( DEBUG ) {
warn "[$$] Read $read bytes: " . dump($buf) . "\n";
}
$self->{client}->{inputbuf} .= $buf;
}
};
alarm(0);
if ( $@ ) {
if ( $@ =~ /Timed out/ ) {
DEBUG && warn "[$$] Client connection timed out\n";
return;
}
if ( $@ =~ /Read error/ ) {
DEBUG && warn "[$$] Read error: $!\n";
return;
}
}
# Pull out the complete header into a new buffer
$self->{client}->{headerbuf} = $self->{client}->{inputbuf};
# Save any left-over data, possibly body data or pipelined requests
$self->{client}->{inputbuf} =~ s/.*?$CRLF$CRLF//s;
return 1;
}
sub _http_error {
my ( $self, $code, $env ) = @_;
my $status = $code || 500;
my $msg = status_message($status);
my $res = [
$status,
[ 'Content-Type' => 'text/plain', 'Content-Length' => length($msg) ],
[ $msg ],
];
$self->{client}->{keepalive} = 0;
$self->_finalize_response($env, $res);
}
sub _prepare_env {
my($self, $env) = @_;
my $get_chunk = sub {
if ($self->{client}->{inputbuf}) {
my $chunk = delete $self->{client}->{inputbuf};
return ($chunk, length $chunk);
}
my $read = sysread $self->{server}->{client}, my($chunk), CHUNKSIZE;
return ($chunk, $read);
};
my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
if (my $cl = $env->{CONTENT_LENGTH}) {
my $buf = Plack::TempBuffer->new($cl);
while ($cl > 0) {
my($chunk, $read) = $get_chunk->();
if ( !defined $read || $read == 0 ) {
die "Read error: $!\n";
}
$cl -= $read;
$buf->print($chunk);
}
$env->{'psgi.input'} = $buf->rewind;
} elsif ($chunked) {
my $buf = Plack::TempBuffer->new;
my $chunk_buffer = '';
my $length;
DECHUNK:
while (1) {
my($chunk, $read) = $get_chunk->();
$chunk_buffer .= $chunk;
while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
my $trailer = $1;
my $chunk_len = hex $2;
if ($chunk_len == 0) {
last DECHUNK;
} elsif (length $chunk_buffer < $chunk_len + 2) {
$chunk_buffer = $trailer . $chunk_buffer;
last;
}
$buf->print(substr $chunk_buffer, 0, $chunk_len, '');
$chunk_buffer =~ s/^\015\012//;
$length += $chunk_len;
}
last unless $read && $read > 0;
}
$env->{CONTENT_LENGTH} = $length;
$env->{'psgi.input'} = $buf->rewind;
} else {
$env->{'psgi.input'} = $null_io;
}
}
sub _finalize_response {
my($self, $env, $res) = @_;
if ($env->{'psgix.harakiri.commit'}) {
$self->{client}->{keepalive} = 0;
$self->{client}->{harakiri} = 1;
}
my $protocol = $env->{SERVER_PROTOCOL};
my $status = $res->[0];
my $message = status_message($status);
my(@headers, %headers);
push @headers, "$protocol $status $message";
# Switch on Transfer-Encoding: chunked if we don't know Content-Length.
my $chunked;
my $headers = $res->[1];
for (my $i = 0; $i < @$headers; $i += 2) {
my $k = $headers->[$i];
my $v = $headers->[$i + 1];
next if $k eq 'Connection';
push @headers, "$k: $v";
$headers{lc $k} = $v;
}
if ( $protocol eq 'HTTP/1.1' ) {
if ( !exists $headers{'content-length'} ) {
if ( $status !~ /^1\d\d|[23]04$/ ) {
DEBUG && warn "[$$] Using chunked transfer-encoding to send unknown length body\n";
push @headers, 'Transfer-Encoding: chunked';
$chunked = 1;
}
}
elsif ( my $te = $headers{'transfer-encoding'} ) {
if ( $te eq 'chunked' ) {
DEBUG && warn "[$$] Chunked transfer-encoding set for response\n";
$chunked = 1;
}
}
} else {
if ( !exists $headers{'content-length'} ) {
DEBUG && warn "[$$] Disabling keep-alive after sending unknown length body on $protocol\n";
$self->{client}->{keepalive} = 0;
}
}
if ( ! $headers{date} ) {
push @headers, "Date: " . time2str( time() );
}
# Should we keep the connection open?
if ( $self->{client}->{keepalive} ) {
push @headers, 'Connection: keep-alive';
} else {
push @headers, 'Connection: close';
}
my $conn = $self->{server}->{client};
# Buffer the headers so they are sent with the first write() call
# This reduces the number of TCP packets we are sending
syswrite $conn, join( $CRLF, @headers, '' ) . $CRLF;
if (defined $res->[2]) {
Plack::Util::foreach($res->[2], sub {
my $buffer = $_[0];
if ($chunked) {
my $len = length $buffer;
return unless $len;
$buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF;
}
syswrite $conn, $buffer;
DEBUG && warn "[$$] Wrote " . length($buffer) . " bytes\n";
});
syswrite $conn, "0$CRLF$CRLF" if $chunked;
} else {
return Plack::Util::inline_object
write => sub {
my $buffer = $_[0];
if ($chunked) {
my $len = length $buffer;
return unless $len;
$buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF;
}
syswrite $conn, $buffer;
DEBUG && warn "[$$] Wrote " . length($buffer) . " bytes\n";
},
close => sub {
syswrite $conn, "0$CRLF$CRLF" if $chunked;
};
}
}
sub post_client_connection_hook {
my $self = shift;
if ($self->{client}->{harakiri}) {
exit;
}
}
1;
Jump to Line
Something went wrong with that request. Please try again.