Skip to content

Commit

Permalink
Fix two issues related to the writer.
Browse files Browse the repository at this point in the history
* The prototype on the write() method wasn't correct.
* DESTROYing the writer didn't close the stream.
  • Loading branch information
stash committed Oct 8, 2010
1 parent fd15f89 commit 12e413e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 35 deletions.
2 changes: 2 additions & 0 deletions Changes
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ Revision history for Perl extension Feersum
0.980 0.980
psgix.io support! psgix.io support!
Add missing JSON::XS test-dep. Add missing JSON::XS test-dep.
Fix: write() prototype was incorrect.
Fix: writer not flushing on DESTROY.


0.971 Wed Oct 6 16:21:00 2010 -0700 0.971 Wed Oct 6 16:21:00 2010 -0700
Fix the feersum script. Fix the feersum script.
Expand Down
83 changes: 49 additions & 34 deletions Feersum.xs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static void feersum_start_response
(pTHX_ struct feer_conn *c, SV *message, AV *headers, int streaming); (pTHX_ struct feer_conn *c, SV *message, AV *headers, int streaming);
static int feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body); static int feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body);
static void feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret); static void feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret);
static int feersum_close_handle(pTHX_ struct feer_conn *c, bool is_writer);


static void start_read_watcher(struct feer_conn *c); static void start_read_watcher(struct feer_conn *c);
static void stop_read_watcher(struct feer_conn *c); static void stop_read_watcher(struct feer_conn *c);
Expand Down Expand Up @@ -1702,6 +1703,39 @@ feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret)
} }
} }


static int
feersum_close_handle (pTHX_ struct feer_conn *c, bool is_writer)
{
int RETVAL;
if (is_writer) {
trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
if (c->poll_write_cb) {
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
}
if (c->responding < RESPOND_SHUTDOWN) {
finish_wbuf(c);
conn_write_ready(c);
c->responding = RESPOND_SHUTDOWN;
}
RETVAL = 1;
}
else {
trace("close reader fd=%d, c=%p\n", c->fd, c);
// TODO: ref-dec poll_read_cb
if (c->rbuf) {
SvREFCNT_dec(c->rbuf);
c->rbuf = NULL;
}
RETVAL = shutdown(c->fd, SHUT_RD); // TODO: respect keep-alive
c->receiving = RECEIVE_SHUTDOWN;
}

// disassociate the handle from the conn
SvREFCNT_dec(c->self);
return RETVAL;
}

static void static void
call_died (pTHX_ struct feer_conn *c, const char *cb_type) call_died (pTHX_ struct feer_conn *c, const char *cb_type)
{ {
Expand Down Expand Up @@ -2053,16 +2087,22 @@ fileno (feer_conn_handle *hdl)


void void
DESTROY (SV *self) DESTROY (SV *self)
ALIAS:
Feersum::Connection::Reader::DESTROY = 1
Feersum::Connection::Writer::DESTROY = 2
PPCODE: PPCODE:
{ {
feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0); feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);

if (hdl == NULL) { if (hdl == NULL) {
trace3("DESTROY handle (closed) class=%s\n", HvNAME(SvSTASH(SvRV(ST(0))))); trace3("DESTROY handle (closed) class=%s\n",
HvNAME(SvSTASH(SvRV(self))));
} }
else { else {
struct feer_conn *c = (struct feer_conn *)hdl; struct feer_conn *c = (struct feer_conn *)hdl;
trace3("DESTROY handle fd=%d, class=%s\n", c->fd, HvNAME(SvSTASH(SvRV(ST(0))))); trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
SvREFCNT_dec(c->self); HvNAME(SvSTASH(SvRV(self))));
feersum_close_handle(aTHX_ c, (ix == 2));
} }
} }


Expand Down Expand Up @@ -2153,16 +2193,17 @@ read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
} }


STRLEN STRLEN
write (feer_conn_handle *hdl, SV *body, ...) write (feer_conn_handle *hdl, ...)
PROTOTYPE: $;$
CODE: CODE:
{ {
if (c->made_raw) XSRETURN_UNDEF; if (c->made_raw) XSRETURN_UNDEF;
if (c->responding != RESPOND_STREAMING) if (c->responding != RESPOND_STREAMING)
croak("can only call write in streaming mode"); croak("can only call write in streaming mode");


if (!body || !SvOK(body)) { SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
if (!body || !SvOK(body))
XSRETURN_IV(0); XSRETURN_IV(0);
}


trace("write fd=%d c=%p, body=%p\n", c->fd, c, body); trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
if (SvROK(body)) { if (SvROK(body)) {
Expand Down Expand Up @@ -2247,35 +2288,9 @@ close (feer_conn_handle *hdl)
{ {
if (c->made_raw) XSRETURN_UNDEF; if (c->made_raw) XSRETURN_UNDEF;


switch (ix) { assert(ix);
case 1: RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
trace("close reader fd=%d, c=%p\n", c->fd, c);
// TODO: ref-dec poll_read_cb
if (c->rbuf) {
SvREFCNT_dec(c->rbuf);
c->rbuf = NULL;
}
RETVAL = shutdown(c->fd, SHUT_RD); // TODO: respect keep-alive
c->receiving = RECEIVE_SHUTDOWN;
break;
case 2:
trace("close writer fd=%d, c=%p, refcnt=%d\n", c->fd, c, SvREFCNT(c->self));
if (c->poll_write_cb) {
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
}
finish_wbuf(c);
conn_write_ready(c);
c->responding = RESPOND_SHUTDOWN;
RETVAL = 1;
break;
default:
croak("cannot call _close directly");
}

// disassociate the handle from the conn
SvUVX(hdl_sv) = 0; SvUVX(hdl_sv) = 0;
SvREFCNT_dec(c->self);
} }
OUTPUT: OUTPUT:
RETVAL RETVAL
Expand Down
1 change: 1 addition & 0 deletions MANIFEST
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ t/08-read-timeout.t
t/09-magic.t t/09-magic.t
t/10-respond-304.t t/10-respond-304.t
t/11-runner.t t/11-runner.t
t/12-close-on-drop.t
t/50-psgi-simple.t t/50-psgi-simple.t
t/51-psgi-streaming.t t/51-psgi-streaming.t
t/52-psgi-iohandle.t t/52-psgi-iohandle.t
Expand Down
7 changes: 7 additions & 0 deletions lib/Feersum.pm
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -191,11 +191,15 @@ it will be immediately flushed to the socket.
my $n = 0; my $n = 0;
$w->poll_cb(sub { $w->poll_cb(sub {
$_[0]->write(get_next_chunk()); $_[0]->write(get_next_chunk());
# will also unset the poll_cb:
$_[0]->close if ($n++ >= 100); $_[0]->close if ($n++ >= 100);
}); });
}; };
}; };
Note that C<< $w->close() >> will be called when the last reference to the
writer is dropped.
=head2 PSGI extensions =head2 PSGI extensions
=over 4 =over 4
Expand Down Expand Up @@ -319,6 +323,9 @@ stop the callback from getting called.
$_[0]->close if ($n++ >= 100); $_[0]->close if ($n++ >= 100);
}); });
Note that C<< $w->close() >> will be called when the last reference to the
writer is dropped.
=head1 METHODS =head1 METHODS
These are methods on the global Feersum singleton. These are methods on the global Feersum singleton.
Expand Down
3 changes: 2 additions & 1 deletion lib/Feersum/Connection/Handle.pm
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ C<psgix.body.scalar_refs> in the PSGI env hash.
=item C<< $w->close() >> =item C<< $w->close() >>
Close the HTTP response (which triggers the "T-E: chunked" terminating chunk Close the HTTP response (which triggers the "T-E: chunked" terminating chunk
to be sent). to be sent). This method is implicitly called when the last reference to the
writer is dropped.
=item C<< $w->poll_cb(sub { .... }) >> =item C<< $w->poll_cb(sub { .... }) >>
Expand Down
64 changes: 64 additions & 0 deletions t/12-close-on-drop.t
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,64 @@
#!perl
use warnings;
use strict;
use Test::More tests => 14;
use Test::Exception;
use lib 't'; use Utils;

BEGIN { use_ok('Feersum') };

my ($socket,$port) = get_listen_socket();
ok $socket, "made listen socket";

my $evh = Feersum->new();

{
no warnings 'redefine';
*Feersum::DIED = sub {
my $err = shift;
fail "Died during request handler: $err";
};
}

$evh->request_handler(sub {
my $r = shift;
ok $r, 'got request';
my $w = $r->start_streaming(200, []);
$w->write("hello ");
$w->write("world!\n");
lives_ok {
undef $w;
} 'no death on undef';
});

lives_ok {
$evh->use_socket($socket);
} 'assigned socket';

my $cv = AE::cv;

sub client {
my $cnum = shift;
my $is_chunked = shift || 0;
$cv->begin;
my $h; $h = simple_client GET => '/foo',
name => "client $cnum",
timeout => 15,
proto => $is_chunked ? '1.1' : '1.0',
headers => {"Accept" => "*/*"},
sub {
my ($body, $headers) = @_;
is $headers->{Status}, 200, "client $cnum got 200"
or diag $headers->{Reason};
is $body, "hello world!\n", "client $cnum body";
$cv->end;
undef $h;
};
}


client(1,'chunked');
client(2);

$cv->recv;
pass "all done";

0 comments on commit 12e413e

Please sign in to comment.