Skip to content
Browse files

Use Conn:close streaming for 1.0 clients

  • Loading branch information...
1 parent d78dd5e commit 36088da955a75d3489434fae63e5e0058a62e0dc @stash committed Sep 20, 2010
Showing with 91 additions and 28 deletions.
  1. +40 −13 Feersum.xs
  2. +2 −4 TODO
  3. +19 −0 lib/Feersum/Connection.pm
  4. +22 −7 t/05-streaming.t
  5. +8 −4 t/Utils.pm
View
53 Feersum.xs
@@ -109,6 +109,8 @@ struct feer_conn {
int16_t in_callback;
int16_t responding;
int16_t receiving;
+
+ bool is_http11;
};
typedef struct feer_conn feer_conn_handle; // for typemap
@@ -135,6 +137,7 @@ static void respond_with_server_error(struct feer_conn *c, const char *msg, STRL
static STRLEN add_sv_to_wbuf (struct feer_conn *c, SV *sv);
static STRLEN add_const_to_wbuf (struct feer_conn *c, const char const *str, size_t str_len);
+static void finish_wbuf (struct feer_conn *c);
static void add_chunk_sv_to_wbuf (struct feer_conn *c, SV *sv);
static void add_placeholder_to_wbuf (struct feer_conn *c, SV **sv, struct iovec **iov_ref);
@@ -274,21 +277,23 @@ add_placeholder_to_wbuf(struct feer_conn *c, SV **sv, struct iovec **iov_ref)
*iov_ref = &m->iov[idx];
}
+static void
+finish_wbuf(struct feer_conn *c)
+{
+ if (!c->is_http11) return; // nothing required
+ add_const_to_wbuf(c, "0\r\n\r\n", 5); // terminating chunk
+}
+
#define update_wbuf_placeholder(c,sv,iov) iov->iov_base = SvPV(sv, iov->iov_len)
static void
add_chunk_sv_to_wbuf(struct feer_conn *c, SV *sv)
{
- if (!sv) {
- add_const_to_wbuf(c, "0\r\n\r\n", 5);
- return;
- }
SV *chunk;
struct iovec *chunk_iov;
add_placeholder_to_wbuf(c, &chunk, &chunk_iov);
STRLEN cur = add_sv_to_wbuf(c, sv);
add_const_to_wbuf(c, CRLF, 2);
-
sv_setpvf(chunk, "%x" CRLF, cur);
update_wbuf_placeholder(c, chunk, chunk_iov);
}
@@ -425,7 +430,7 @@ new_feer_conn (EV_P_ int conn_fd)
c->fd = conn_fd;
if (prep_socket(c->fd)) {
perror("prep_socket");
- trace("prep_socket failed for %d", c->fd);
+ trace("prep_socket failed for %d\n", c->fd);
}
// TODO: these initializations should be Lazy
@@ -555,8 +560,11 @@ try_conn_write(EV_P_ struct ev_io *w, int revents)
int i;
if (!c->wbuf_rinq) {
+ if (c->responding == RESPOND_SHUTDOWN)
+ goto try_write_finished;
+
if (!c->poll_write_cb) {
- trace("tried to write with an empty buffer %d\n",w->fd);
+ trace("tried to write with an empty buffer %d resp=%d\n",w->fd,c->responding);
ev_io_stop(EV_A, w);
return;
}
@@ -840,9 +848,11 @@ process_request_headers (struct feer_conn *c, int body_offset)
const char *err;
struct feer_req *req = c->req;
- trace("body follows headers, making new rbuf\n");
+ trace("processing headers %d minor_version=%d\n",c->fd,req->minor_version);
bool body_is_required;
+ c->is_http11 = (req->minor_version == 1);
+
c->receiving = RECEIVE_BODY;
if (str_eq("GET", 3, req->method, req->method_len) ||
@@ -923,6 +933,7 @@ got_bad_request:
got_cl:
c->expected_cl = (ssize_t)expected;
c->received_cl = SvCUR(c->rbuf);
+ trace("expecting body %d size=%d have=%d\n",c->fd, c->expected_cl,c->received_cl);
// don't have enough bytes to schedule immediately?
if (c->expected_cl && c->received_cl < c->expected_cl) {
@@ -1248,7 +1259,7 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
message = sv_2mortal(newSVpvf("%d %.*s",code,len,ptr));
}
- add_const_to_wbuf(c, streaming ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
+ add_const_to_wbuf(c, c->is_http11 ? "HTTP/1.1 " : "HTTP/1.0 ", 9);
add_sv_to_wbuf(c, message);
add_const_to_wbuf(c, CRLF, 2);
@@ -1279,7 +1290,10 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
}
if (streaming) {
- add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
+ if (c->is_http11)
+ add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
+ else
+ add_const_to_wbuf(c, "Connection: close" CRLFx2, 21);
}
conn_write_ready(c);
@@ -1700,7 +1714,7 @@ write (feer_conn_handle *hdl, SV *body, ...)
if (c->responding != RESPOND_STREAMING)
croak("can only call write in streaming mode");
- if (!SvOK(body)) {
+ if (!body || !SvOK(body)) {
XSRETURN_IV(0);
}
@@ -1715,7 +1729,12 @@ write (feer_conn_handle *hdl, SV *body, ...)
}
}
(void)SvPV(body, RETVAL);
- add_chunk_sv_to_wbuf(c, body);
+
+ if (c->is_http11)
+ add_chunk_sv_to_wbuf(c, body);
+ else
+ add_sv_to_wbuf(c, body);
+
conn_write_ready(c);
}
OUTPUT:
@@ -1742,7 +1761,7 @@ _close (feer_conn_handle *hdl)
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
}
- add_chunk_sv_to_wbuf(c, NULL);
+ finish_wbuf(c);
conn_write_ready(c);
c->responding = RESPOND_SHUTDOWN;
RETVAL = 1;
@@ -1809,6 +1828,14 @@ write_whole_body (struct feer_conn *c, SV *body)
OUTPUT:
RETVAL
+void
+force_http10 (struct feer_conn *c)
+ PROTOTYPE: $
+ ALIAS:
+ force_http11 = 1
+ PPCODE:
+ c->is_http11 = ix;
+
SV *
_handle (struct feer_conn *c)
PROTOTYPE: $
View
6 TODO
@@ -22,10 +22,6 @@ IO::Handle-like responses
streamed responses
- * allow streaming responses to be HTTP/1.0 "Connection: close" rather than
- HTTP/1.1 "Transfer-Encoding: chunked". Match up the streaming flavour
- with the request version. Varnish seems to support this and nginx may as
- well. http://www.varnish-cache.org/trac/ticket/400 (v1.0)
* instead of an implicit "low water mark" of 0 for the poll_cb
writer-object callback, a configurable number of bytes can be used.
@@ -70,3 +66,5 @@ WebSocket support (v1.1)
openssl? use the guts of Digest::MD5 somehow?)
* make this a separate module since if it brings in an openssl deps.
* will this work for PSGI? psgix.web_socket or something?
+
+Release t/Utils.pm's "simple_client" as "anyevent::anotherhttp" or something?
View
19 lib/Feersum/Connection.pm
@@ -112,6 +112,25 @@ immediately called). The sub is, in turn, passed a code reference. The code
reference will return a Writer object when called with C<< $code, \@header >>
parameters. See L<Feersum> for examples.
+=item C<< $o->force_http10 >>
+
+=item C<< $o->force_http11 >>
+
+Force the response to use HTTP/1.0 or HTTP/1.1, respectively.
+
+Normally, if the request was made with 1.1 then Feersum uses HTTP/1.1 for the
+response, otherwise HTTP/1.0 is used (this includes requests made with the
+HTTP "0.9" non-declaration).
+
+For streaming under HTTP/1.1 C<Transfer-Encoding: chunked> is used, otherwise
+a C<Connection: close> stream-style is used (with the usual non-guarantees
+about delivery). You may know about certain user-agents that
+support/don't-support T-E:chunked, so this is how you can override that.
+
+Supposedly clients and a lot of proxies support the C<Connection: close>
+stream-style, see support in Varnish at
+http://www.varnish-cache.org/trac/ticket/400
+
=back
=head1 AUTHOR
View
29 t/05-streaming.t
@@ -1,8 +1,10 @@
#!perl
use warnings;
use strict;
-use constant CLIENTS => 15;
-use Test::More tests => 7 + 21 * CLIENTS;
+use constant CLIENTS_11 => 15;
+use constant CLIENTS_10 => 15;
+use constant CLIENTS => CLIENTS_11 + CLIENTS_10;
+use Test::More tests => 7 + 22 * CLIENTS_11 + 23 * CLIENTS_10;
use Test::Exception;
use lib 't'; use Utils;
@@ -96,29 +98,42 @@ lives_ok {
sub client {
my $cnum = sprintf("%04d",shift);
+ my $is_chunked = shift || 0;
$cv->begin;
my $h; $h = simple_client GET => '/foo',
name => $cnum,
- timeout => 3,
+ timeout => 15,
+ proto => $is_chunked ? '1.1' : '1.0',
headers => {
"Accept" => "*/*",
'X-Client' => $cnum,
},
sub {
my ($body, $headers) = @_;
- is $headers->{Status}, 200, "$cnum got 200";
- is $headers->{'transfer-encoding'}, "chunked", "$cnum got chunked!";
+ is $headers->{Status}, 200, "$cnum got 200"
+ or diag $headers->{Reason};
+ if ($is_chunked) {
+ is $headers->{HTTPVersion}, '1.1';
+ is $headers->{'transfer-encoding'}, "chunked", "$cnum got chunked!";
+ }
+ else {
+ is $headers->{HTTPVersion}, '1.0';
+ ok !exists $headers->{'transfer-encoding'}, "$cnum not chunked!";
+ is $headers->{'connection'}, 'close', "$cnum conn closed";
+ }
is_deeply [split /\n/,$body], [
"$cnum Hello streaming world! chunk one",
"$cnum Hello streaming world! chunk 'two'",
"$cnum Hello streaming world! chunk three",
- ], "$cnum got all three chunks";
+ ], "$cnum got all three lines";
$cv->end;
undef $h;
};
}
-client($_) for (1..CLIENTS);
+
+client(1000+$_,1) for (1..CLIENTS_11);
+client(2000+$_,0) for (1..CLIENTS_10); # HTTP/1.0 style
$cv->recv;
is $started, CLIENTS, 'handlers started';
View
12 t/Utils.pm
@@ -126,14 +126,18 @@ sub simple_client ($$;@) {
}
}
+ $hdrs{'content-length'} = 0 if ($hdrs{Status} == 204);
+
if (exists $hdrs{'content-length'}) {
return $done->() unless ($hdrs{'content-length'});
+# Test::More::diag "$name waiting for C-L body";
$h->push_read(chunk => $hdrs{'content-length'}, sub {
$buf = $_[1];
return $done->();
});
}
elsif (($hdrs{'transfer-encoding'}||'') eq 'chunked') {
+# Test::More::diag "$name waiting for T-E:chunked body";
my $len = 0;
my ($chunk_reader, $chunk_handler);
$chunk_handler = sub {
@@ -162,10 +166,10 @@ sub simple_client ($$;@) {
};
$h->push_read(line => $CRLF, $chunk_reader);
}
- elsif ($hdrs{Status} == 204) {
- return $done->();
- }
- elsif ($hdrs{Proto} eq '1.0' or ($hdrs{connection}||'') eq 'close') {
+ elsif ($hdrs{HTTPVersion} eq '1.0' or
+ ($hdrs{connection}||'') eq 'close')
+ {
+# Test::More::diag "$name waiting for conn:close body";
$h->on_read(sub {
$buf .= substr($_[0]->{rbuf},0,length($_[0]->{rbuf}),'');
});

0 comments on commit 36088da

Please sign in to comment.
Something went wrong with that request. Please try again.