Browse files

read timeouts

  • Loading branch information...
1 parent 0874283 commit d236d167c086b913060110f2cd7086e3f8b99bc3 @stash committed May 17, 2010
Showing with 332 additions and 20 deletions.
  1. +85 −15 Feersum.xs
  2. +9 −4 TODO
  3. +90 −1 lib/Feersum.pm
  4. +148 −0 t/08-read-timeout.t
View
100 Feersum.xs
@@ -19,9 +19,11 @@
#endif
#define CRLFx2 CRLF CRLF
+// if you change these, also edit the LIMITS section in the POD
#define MAX_HEADERS 64
#define MAX_HEADER_NAME_LEN 128
-#define MAX_BODY_LENGTH IV_MAX - 1
+#define MAX_BODY_LENGTH 2147483647
+#define READ_CHUNK 4096
// Setting this to true will wait for writability before calling write() (will
// try to immediately write otherwise)
@@ -59,6 +61,7 @@ struct feer_req {
// enough to hold a 64-bit signed integer (which is 20+1 chars) plus nul
#define CONN_LABEL_LENGTH 24
+#define RESPOND_NOT_STARTED 0
#define RESPOND_NORMAL 1
#define RESPOND_STREAMING 2
#define RESPOND_SHUTDOWN 3
@@ -74,6 +77,7 @@ struct feer_conn {
int fd;
struct ev_io read_ev_io;
struct ev_io write_ev_io;
+ struct ev_timer read_ev_timer;
struct ev_loop *loop;
SV *rbuf, *wbuf;
@@ -82,9 +86,6 @@ struct feer_conn {
size_t expected_cl;
size_t received_cl;
- // SV *drain_cb; // async "done writing" callback
- // SV *read_cb; // async "data available" callback
-
int16_t in_callback;
int16_t responding;
int16_t receiving;
@@ -96,6 +97,7 @@ typedef struct feer_conn feer_conn_handle; // for typemap
static void try_conn_write(EV_P_ struct ev_io *w, int revents);
static void try_conn_read(EV_P_ struct ev_io *w, int revents);
+static void conn_read_timeout(EV_P_ struct ev_timer *w, int revents);
static bool process_request_headers(struct feer_conn *c, int body_offset);
static void sched_request_callback(struct feer_conn *c);
static void call_request_callback(struct feer_conn *c);
@@ -120,6 +122,7 @@ static SV *request_cb_cv = NULL;
static SV *shutdown_cb_cv = NULL;
static bool shutting_down = 0;
static int active_conns = 0;
+static double read_timeout = 5.0;
static ev_io accept_w;
static ev_prepare ep;
@@ -282,9 +285,10 @@ new_feer_conn (EV_P_ int conn_fd)
ev_io_init(&c->write_ev_io, try_conn_write, conn_fd, EV_WRITE);
c->write_ev_io.data = (void *)c;
-#ifdef DEBUG
- sv_dump(self);
-#endif
+ ev_init(&c->read_ev_timer, conn_read_timeout);
+ c->read_ev_timer.repeat = read_timeout;
+ c->read_ev_timer.data = (void *)c;
+
trace("made conn fd=%d self=%p, c=%p, cur=%d, len=%d\n",
c->fd, self, c, SvCUR(self), SvLEN(self));
@@ -467,8 +471,6 @@ try_parse_http(struct feer_conn *c, size_t last_read)
(SvCUR(c->rbuf)-last_read));
}
-#define READ_CHUNK 4096
-
static void
try_conn_read(EV_P_ ev_io *w, int revents)
{
@@ -478,6 +480,7 @@ try_conn_read(EV_P_ ev_io *w, int revents)
if (c->receiving == RECEIVE_SHUTDOWN) {
ev_io_stop(EV_A, w);
+ ev_timer_stop(EV_A, &c->read_ev_timer);
return;
}
@@ -486,9 +489,6 @@ try_conn_read(EV_P_ ev_io *w, int revents)
if (!c->rbuf) {
trace("init rbuf for %d\n",w->fd);
c->rbuf = newSV(2*READ_CHUNK + 1);
-#ifdef DEBUG
- sv_dump(c->rbuf);
-#endif
}
if (SvLEN(c->rbuf) - SvCUR(c->rbuf) < READ_CHUNK) {
@@ -519,14 +519,14 @@ try_conn_read(EV_P_ ev_io *w, int revents)
if (ret == -2) goto try_read_again;
if (process_request_headers(c, ret))
- goto try_read_again;
+ goto try_read_again_reset_timer;
else
goto dont_read_again;
}
else if (c->receiving == RECEIVE_BODY) {
c->received_cl += got_n;
if (c->received_cl < c->expected_cl)
- goto try_read_again;
+ goto try_read_again_reset_timer;
// body is complete
sched_request_callback(c);
goto dont_read_again;
@@ -548,8 +548,12 @@ dont_read_again:
c->receiving = RECEIVE_SHUTDOWN;
shutdown(c->fd, SHUT_RD); // TODO: respect keep-alive
ev_io_stop(EV_A, w);
+ ev_timer_stop(EV_A, &c->read_ev_timer);
return;
+try_read_again_reset_timer:
+ trace("(reset read timer) %d\n", w->fd);
+ ev_timer_again(EV_A, &c->read_ev_timer);
try_read_again:
trace("read again %d\n", w->fd);
if (!ev_is_active(w)) {
@@ -559,6 +563,45 @@ try_read_again:
}
static void
+conn_read_timeout (EV_P_ ev_timer *w, int revents)
+{
+ dCONN;
+
+ trace("read timeout %d\n", c->fd);
+ if (revents != EV_TIMER || c->receiving == RECEIVE_SHUTDOWN) {
+ return;
+ }
+
+ c->receiving = RECEIVE_SHUTDOWN;
+ ev_io_stop(EV_A, &c->read_ev_io);
+
+ // always stop since, for efficiency, we set this up as a recurring timer.
+ ev_timer_stop(EV_A, w);
+
+
+ if (c->responding == RESPOND_NOT_STARTED) {
+ shutdown(c->fd, SHUT_RD);
+ const char *msg;
+ if (c->receiving == RECEIVE_HEADERS) {
+ msg = "Headers took too long.";
+ }
+ else {
+ msg = "Timeout reading body.";
+ }
+ respond_with_server_error(c, msg, 0, 408);
+ return;
+ }
+ else {
+ shutdown(c->fd, SHUT_RDWR);
+ c->responding = RESPOND_SHUTDOWN;
+ }
+
+ // TODO: trigger the Reader poll callback with an error, if present
+
+ SvREFCNT_dec(c->self);
+}
+
+static void
accept_cb (EV_P_ ev_io *w, int revents)
{
struct sockaddr_in sa;
@@ -585,6 +628,8 @@ accept_cb (EV_P_ ev_io *w, int revents)
// XXX: good idea to read right away?
// try_conn_read(EV_A, &c->read_ev_io, EV_READ);
ev_io_start(EV_A, &c->read_ev_io);
+ // alternative to ev_timer_start, from the libev man-page
+ ev_timer_again(EV_A, &c->read_ev_timer);
}
}
@@ -791,7 +836,7 @@ respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len,
{
SV *tmp;
- if (c->responding) {
+ if (c->responding != RESPOND_NOT_STARTED) {
trouble("Tried to send server error but already responding!");
return;
}
@@ -994,6 +1039,26 @@ graceful_shutdown (SV *self, SV *cb)
close(accept_w.fd);
}
+double
+read_timeout (SV *self, ...)
+ PROTOTYPE: $;$
+ CODE:
+{
+ if (items <= 1) {
+ RETVAL = read_timeout;
+ }
+ else if (items == 2) {
+ SV *duration = ST(1);
+ NV new_read_timeout = SvNV(duration);
+ if (!(new_read_timeout > 0.0)) {
+ croak("must set a positive (non-zero) value for the timeout");
+ }
+ read_timeout = (double) new_read_timeout;
+ }
+}
+ OUTPUT:
+ RETVAL
+
void
DESTROY (SV *self)
PPCODE:
@@ -1453,6 +1518,11 @@ DESTROY (struct feer_conn *c)
}
if (c->fd) close(c->fd);
+ if (ev_is_active(&c->read_ev_timer)) {
+ trace("... hmm, read timer was still active");
+ ev_timer_stop(c->loop, &c->read_ev_timer);
+ }
+
active_conns--;
if (shutting_down && active_conns <= 0) {
View
13 TODO
@@ -5,19 +5,24 @@ Timeouts
psgi.input streaming
- * add a "read_cb()" method to the psgi.input handle as an extension? EV
+ * add a "poll_cb()" method to the psgi.input handle as an extension? EV
gets to schedule the watcher in that case rather than bleeding the fd to
the handler.
* related: Connection: close bodies
* related: Transfer-Encoding: chunked bodies
-allow streaming responses to be HTTP/1.0 "Connection: close" rather than
-HTTP/1.1 "Transfer-Encoding: chunked".
+streamed responses
- * maybe match up the streaming flavour with the request version?
+ * add a poll_cb() method to the writer handle.
+ * allow streaming responses to be HTTP/1.0 "Connection: close" rather than
+ HTTP/1.1 "Transfer-Encoding: chunked". match up the streaming flavour
+ with the request version?
Handle requests that don't require a body (optional entities).
+ * Related: allow overriding the "if entity has a C-L, wait for it" during request
+ start (not sure if there's a PSGI-compatible way to do this)
+
Better EV-callback error handling.
Lazy I/O watcher initialization?
View
91 lib/Feersum.pm
@@ -133,7 +133,62 @@ any C<write> or C<start_response> calls.
=head1 METHODS
-TODO
+B<Notice> some methods are not documented yet.
+
+=over 4
+
+=item use_socket ($sock)
+
+Use the file-descriptor attached to a listen-socket to accept connections. If
+you create this using IO::Socket::INET, you should keep a reference to $sock
+to prevent closing the descriptor during garbage-collection.
+
+=item request_handler ($code->($connection))
+
+Sets the global request handler. Any previous handler is replaced.
+
+The handler callback is passed a C<Feersum::Connection> object. See above for
+how to use this for now.
+
+B<Subject to change>: if the request has an entity body then the handler will
+be called B<only> after receiving the body in its entirety. The headers
+*must* specify a Content-Length of the body otherwise the request will be
+rejected. The maximum size is hard coded to 2147483647 bytes (this may be
+considered a bug).
+
+=item read_timeout
+
+=item read_timeout ($duration)
+
+Get or set the global read timeout.
+
+Feersum will wait about this long to receive all headers of a request (within
+the tollerances provided by libev). If an entity body is part of the request
+(e.g. POST or PUT) it will wait this long between successful C<read()> system
+calls.
+
+=item DIED
+
+Not really a method so much as a static function. Works similar to
+EV's/AnyEvent's error handler.
+
+To install a handler:
+
+ no strict 'refs';
+ *{Feersum::DIED} = sub { warn "nuts $_[0]" };
+
+Will get called for any errors that happen before the request handler callback
+is called, when the request handler callback throws an exception and
+potentially for other not-in-a-request-context errors.
+
+It will not get called for read timeouts that occur while waiting for a
+complete header (and also, until Feersum supports otherwise, time-outs while
+waiting for a request entity body).
+
+Any exceptions thrown in the handler will generate a warning and not
+propagated.
+
+=back
=cut
@@ -210,6 +265,40 @@ package Feersum;
1;
__END__
+=head1 LIMITS
+
+=over 4
+
+=item body length
+
+2147483647 - about 2GiB.
+
+=item request headers
+
+64
+
+=item request header name length
+
+128 bytes
+
+=item bytes read per read() system call
+
+4096 bytes
+
+=back
+
+=head1 BUGS
+
+Keep-alive is ignored completely.
+
+Chunked-encoding responses can be sent to HTTP/1.0 clients, which is only part
+of the HTTP/1.1 spec.
+
+Currently there's no way to limit the request entity length of a POST/PUT/etc.
+This could lead to a DoS attack on a Feersum server. Suggested remedy is to
+only run Feersum behind some other web server and to use that to limit the
+entity size.
+
=head1 SEE ALSO
http://en.wikipedia.org/wiki/Feersum_Endjinn
View
148 t/08-read-timeout.t
@@ -0,0 +1,148 @@
+#!perl
+use warnings;
+use strict;
+use constant CLIENTS => 5;
+use constant POST_CLIENTS => 5;
+use constant GOOD_CLIENTS => 5;
+use Test::More tests =>
+ 17 + 4*CLIENTS + 4*POST_CLIENTS + 3*GOOD_CLIENTS;
+use Test::Exception;
+use Test::Differences;
+use Scalar::Util qw/blessed/;
+use lib 't'; use Utils;
+
+BEGIN { use_ok('Feersum') };
+
+my ($socket,$port) = get_listen_socket();
+ok $socket, "made listen socket";
+ok $socket->fileno, "has a fileno";
+
+my $evh = Feersum->new();
+lives_ok { $evh->use_socket($socket) };
+$evh->request_handler(sub {
+ my $r = shift;
+ my %env;
+ $r->env(\%env);
+ ok $env{HTTP_X_GOOD_CLIENT}, "got a request from a good client";
+ $r->send_response(200, ["Content-Type" => "text/plain"], "thx.");
+});
+
+my $default = $evh->read_timeout;
+is $default, 5.0, "default timeout is 5 seconds";
+
+dies_ok { $evh->read_timeout(-1.0) } "can't set a negative number";
+is $evh->read_timeout, 5.0;
+
+dies_ok {
+ no warnings 'numeric';
+ $evh->read_timeout("this isn't a number");
+} "can't set a string as the timeout";
+is $evh->read_timeout, 5.0;
+
+lives_ok { $evh->read_timeout(6+1) } "IV is OK";
+is $evh->read_timeout, 7.0, "new timeout set";
+
+lives_ok { $evh->read_timeout("8.0") } "NV-as-string is OK";
+is $evh->read_timeout, 8.0, "new timeout set";
+
+lives_ok { $evh->read_timeout($default) } "NV is OK";
+is $evh->read_timeout, $default, "reset to default";
+
+use AnyEvent::Handle;
+
+my $cv = AE::cv;
+my $CRLF = "\015\012";
+
+sub start_client {
+ my $n = shift;
+ my $on_conn = shift || sub {};
+ $cv->begin;
+ my $h;
+ my $done = sub {
+ $cv->end;
+ $h->destroy;
+ };
+ $h = AnyEvent::Handle->new(
+ connect => ['127.0.0.1',$port],
+ on_error => sub {
+ my $hdl = shift;
+ fail "handle error";
+ $hdl->destroy;
+ $cv->croak(join(" ",@_));
+ },
+ on_connect => sub {
+ pass "connected $n";
+ $on_conn->($h);
+ },
+ on_read => sub {
+ diag "ignoring extra bytes $n";
+ },
+ );
+ $h->push_read(line => "$CRLF$CRLF", sub {
+ my $header = $_[1];
+ like $header, qr{^HTTP/1\.\d 408 Request Timeout}, "got a timeout response $n";
+ my $cl;
+ if ($header =~ m{^Content-Length: (\d+)}m) {
+ $cl = $1;
+ pass "got a c-l header $n";
+ }
+ else {
+ fail "no c-l header?! $n";
+ $done->();
+ }
+
+ if ($cl == 0) {
+ pass "alright, empty error body $n";
+ $done->();
+ }
+ else {
+ $h->push_read(chunk => $cl, sub {
+ pass "got error body $n";
+ $done->();
+ });
+ }
+ });
+ return $h;
+}
+
+sub post_client {
+ my $n = shift;
+ my $t;
+ start_client("(post $n)", sub {
+ my $h = shift;
+ $h->push_write("POST / HTTP/1.0$CRLF");
+ $h->push_write("Content-Length: 8$CRLF$CRLF");
+ $t = AE::timer 3,0,sub {
+ $h->push_write("o hai"); # 5 out of 8 bytes
+ undef $t; # keep ref
+ };
+ });
+}
+
+sub good_client {
+ my $n = "(good $_[0])";
+ $cv->begin;
+ my $h; $h = http_get "http://localhost:$port/rad",
+ headers => {'X-Good-Client' => 1},
+ sub {
+ my ($body,$headers) = @_;
+ is $headers->{Status}, 200, "got 200 $n";
+ is $body, "thx.", "got body $n";
+ $cv->end;
+ undef $h; # keep ref
+ };
+}
+
+my $t; $t = AE::timer 20, 0, sub {
+ fail "TOO LONG";
+ $cv->croak("TOO LONG");
+};
+
+$cv->begin;
+good_client($_) for (1 .. GOOD_CLIENTS);
+start_client("(get $_)") for (1 .. CLIENTS);
+post_client($_) for (1 .. POST_CLIENTS);
+$cv->end;
+
+lives_ok { $cv->recv } "no client errors";
+pass "all done";

0 comments on commit d236d16

Please sign in to comment.