Permalink
Browse files

Trying to track down the truncated responses that ab reports

  • Loading branch information...
1 parent 06a1c2b commit df99a8bb5cc8fd5723419c25ecdf88a16d1123ff @stash committed Oct 1, 2010
Showing with 160 additions and 14 deletions.
  1. +31 −8 Feersum.xs
  2. +1 −0 MANIFEST
  3. +5 −0 lib/Feersum.pm
  4. +15 −6 t/05-streaming.t
  5. +108 −0 xt/50-psgi-simple-stress.t
View
@@ -399,7 +399,6 @@ static int
prep_socket(int fd)
{
int flags;
- struct linger linger = { .l_onoff = 0, .l_linger = 0 };
// make it non-blocking
flags = O_NONBLOCK;
@@ -421,12 +420,19 @@ prep_socket(int fd)
return -1;
// disable lingering
+ struct linger linger = { .l_onoff = 0, .l_linger = 0 };
if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)))
return -1;
return 0;
}
+static void
+make_blocking(int fd)
+{
+ fcntl(fd, F_SETFL, 0);
+}
+
static struct feer_conn *
new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
{
@@ -739,7 +745,7 @@ try_write_finished:
goto try_write_shutdown;
case RESPOND_STREAMING:
if (c->poll_write_cb) goto try_write_again;
- goto try_write_paused;
+ else goto try_write_paused;
case RESPOND_SHUTDOWN:
goto try_write_shutdown;
default:
@@ -754,8 +760,12 @@ try_write_paused:
try_write_shutdown:
trace3("write SHUTDOWN %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
c->responding = RESPOND_SHUTDOWN;
- shutdown(c->fd, SHUT_WR);
stop_write_watcher(c);
+ make_blocking(c->fd);
+ //shutdown(c->fd, SHUT_WR);
+ if(close(c->fd))
+ perror("close socket at shutdown");
+ c->fd = 0;
try_write_cleanup:
SvREFCNT_dec(c->self);
@@ -851,10 +861,13 @@ try_read_error:
trace("READ ERROR %d, refcnt=%d\n", w->fd, SvREFCNT(c->self));
c->receiving = RECEIVE_SHUTDOWN;
c->responding = RESPOND_SHUTDOWN;
- shutdown(c->fd, SHUT_RDWR);
stop_read_watcher(c);
stop_read_timer(c);
stop_write_watcher(c);
+ //shutdown(c->fd, SHUT_RDWR);
+ if (close(c->fd))
+ perror("close on read error");
+ c->fd = 0;
goto try_read_cleanup;
try_read_bad:
@@ -911,9 +924,15 @@ conn_read_timeout (EV_P_ ev_timer *w, int revents)
}
else {
trace("read timeout while writing %d\n",c->fd);
- shutdown(c->fd, SHUT_RDWR);
- c->responding = RESPOND_SHUTDOWN;
stop_write_watcher(c);
+ stop_read_watcher(c);
+ stop_read_timer(c);
+ make_blocking(c->fd);
+ //shutdown(c->fd, SHUT_RDWR);
+ if(close(c->fd))
+ perror("close socket at read timeout");
+ c->fd = 0;
+ c->responding = RESPOND_SHUTDOWN;
}
read_timeout_cleanup:
@@ -2024,7 +2043,7 @@ _close (feer_conn_handle *hdl)
c->receiving = RECEIVE_SHUTDOWN;
break;
case 2:
- trace("close writer fd=%d, c=%p\n", c->fd, c);
+ trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
if (c->poll_write_cb) {
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
@@ -2145,7 +2164,11 @@ DESTROY (struct feer_conn *c)
if (c->sa) Safefree(c->sa);
- if (c->fd) close(c->fd);
+ if (c->fd) {
+ make_blocking(c->fd);
+ if(close(c->fd))
+ perror("close socket at destruction");
+ }
if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);
View
@@ -37,3 +37,4 @@ t/52-psgi-iohandle.t
t/60-plack.t
t/Utils.pm
typemap
+xt/50-psgi-simple-stress.t
View
@@ -417,6 +417,11 @@ This could lead to a DoS attack on a Feersum server. Suggested remedy is to
only run Feersum behind some other web server and to use that to limit the
entity size.
+Something isn't getting set right with the TCP socket options and the last
+chunk in a streamed response is sometimes lost. This happens more frequently
+under high concurrency. Fiddling with TCP_NODELAY and SO_LINGER don't seem to
+help. Maybe threads are needed to do blocking close() and shutdown() calls?
+
=head1 SEE ALSO
http://en.wikipedia.org/wiki/Feersum_Endjinn
View
@@ -1,8 +1,8 @@
#!perl
use warnings;
use strict;
-use constant CLIENTS_11 => 15;
-use constant CLIENTS_10 => 15;
+use constant CLIENTS_11 => 25;
+use constant CLIENTS_10 => 25;
use constant CLIENTS => CLIENTS_11 + CLIENTS_10;
use Test::More tests => 7 + 21 * CLIENTS_11 + 22 * CLIENTS_10;
use Test::Exception;
@@ -40,39 +40,44 @@ $evh->request_handler(sub {
ok !$r->can('write'), "write method removed from connection object";
$cv->begin;
- my $w = $r->start_streaming("200 OK", ['Content-Type' => 'text/plain', 'X-Client' => $cnum]);
+ my $w = $r->start_streaming("200 OK", ['Content-Type' => 'text/plain', 'X-Client' => $cnum, 'X-Fileno' => $r->fileno ]);
$started++;
isa_ok($w, 'Feersum::Connection::Writer', "got a writer $cnum");
isa_ok($w, 'Feersum::Connection::Handle', "... it's a handle $cnum");
my $n = 0;
+ my $wrote_third = 0;
my $t; $t = AE::timer rand()/5,rand()/5, sub {
$n++;
eval {
- ok blessed($w), "still blessed? $cnum";
if ($n == 1) {
+ ok blessed($w), "still blessed? $cnum";
# cover PADTMP case
$w->write("$cnum Hello streaming world! chunk ".
($n==1?"one":"WTF")."\n");
pass "wrote chunk $n $cnum";
}
elsif ($n == 2) {
+ ok blessed($w), "still blessed? $cnum";
# cover PADMY case
my $d = "$cnum Hello streaming world! chunk ".
($n==1?"WTF":"'two'")."\n";
$w->write($d);
pass "wrote chunk $n $cnum";
}
elsif ($n == 3) {
+ ok blessed($w), "still blessed? $cnum";
my $buf = "$cnum Hello streaming world! chunk three\n";
$w->poll_cb(sub {
my $w2 = shift;
isa_ok($w2, 'Feersum::Connection::Writer',
"got another writer $cnum");
$w2->write($buf);
$w2->poll_cb(undef); # unset
+ $wrote_third = 1;
});
}
- else {
+ elsif ($wrote_third) {
+ ok blessed($w), "still blessed? $cnum";
$w->close();
pass "async writer finished $cnum";
dies_ok {
@@ -121,7 +126,11 @@ sub client {
"$cnum Hello streaming world! chunk one",
"$cnum Hello streaming world! chunk 'two'",
"$cnum Hello streaming world! chunk three",
- ], "$cnum got all three lines";
+ ], "$cnum got all three lines"
+ or do {
+ warn "descriptor ".$headers->{'x-fileno'}." failed!";
+ exit 2;
+ };
$cv->end;
undef $h;
};
View
@@ -0,0 +1,108 @@
+#!perl
+use warnings;
+use strict;
+use constant PARALLEL => 15;
+use Test::More qw/no_plan/;
+use lib 't'; use Utils;
+use POSIX ();
+
+BEGIN { use_ok('Feersum') };
+
+my ($socket,$port) = get_listen_socket();
+ok $socket, "made listen socket";
+
+my $APP = <<'EOAPP';
+ my $app = sub {
+ my $env = shift;
+ return [
+ 200,
+ ['Content-Type' => 'text/plain'],
+ ['Hello',' ','World']
+ ];
+ };
+EOAPP
+
+my $app = eval $APP;
+ok $app, 'got an app' || diag $@;
+
+POSIX::setsid;
+my $ppid = $$;
+my $pid = fork();
+if (!defined($pid)) {
+ die "can't fork: $!";
+}
+elsif ($pid == 0) {
+ my $evh = Feersum->new();
+ {
+ no warnings 'redefine';
+ *Feersum::DIED = sub {
+ my $err = shift;
+ warn "DIED: $err";
+ kill 9, -$ppid;
+ POSIX::_exit(2);
+ };
+ }
+ $evh->use_socket($socket);
+ $evh->psgi_request_handler($app);
+ my $quit; $quit = AE::signal 'QUIT', sub {
+ $evh->graceful_shutdown();
+ };
+ AE::cv->recv;
+ scope_guard { POSIX::_exit(0) };
+}
+
+sleep 1;
+
+my $cv = AE::cv;
+my $requests = 0;
+my $responses = 0;
+my $total_latency = 0.0;
+
+sub cli ($);
+sub cli ($) {
+ my $n = shift;
+# diag "($n) starting req";
+ $cv->begin;
+ my $r_start = AE::time;
+ my $h; $h = simple_client GET => '/',
+ name => "($n)",
+ sub {
+ my ($body, $headers) = @_;
+ scope_guard { $cv->end };
+# is $headers->{'Status'}, 200, "($n) Response OK";
+# is $headers->{'content-type'}, 'text/plain', "... ($n) is text";
+# is $body, 'Hello World', "... ($n) correct body";
+# is $headers->{'content-length'}, 11;
+ $total_latency += AE::time - $r_start;
+ $cv->croak("extra crap!") if length($h->{rbuf});
+ undef $h;
+ if ($headers->{'Status'}) {
+ $responses++;
+ cli $n;
+ }
+ };
+ $requests++;
+}
+
+for my $n (1 .. PARALLEL) {
+ cli $n;
+}
+
+my $t; $t = AE::timer 15, 0, sub {
+ $cv->croak("time's up!");
+};
+
+my $started = AE::time();
+eval { $cv->recv };
+diag $@ if $@;
+my $finished = AE::time();
+
+pass "clients done, waitpid";
+kill 9, $pid;
+waitpid $pid, 0;
+
+my $taken = $finished-$started;
+print "resp/sec: ".sprintf('%0.4f r/s',$responses/$taken)."\n";
+print "overall/req ".sprintf('%0.2f ms/r',$taken*1000.0/$responses)."\n";
+print "latency/req ".sprintf('%0.2f ms/r',$total_latency*1000.0/$responses)."\n";
+pass "all done";

0 comments on commit df99a8b

Please sign in to comment.