Skip to content

Commit

Permalink
Move IO::Handle pumper into XS
Browse files Browse the repository at this point in the history
  • Loading branch information
stash committed Sep 30, 2010
1 parent b80ead5 commit 5d2d6ce
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 50 deletions.
133 changes: 91 additions & 42 deletions Feersum.xs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static void sched_request_callback(struct feer_conn *c);
static void call_died (pTHX_ struct feer_conn *c, const char *cb_type);
static void call_request_callback(struct feer_conn *c);
static void call_poll_callback (struct feer_conn *c, bool is_write);
static void pump_io_handle (struct feer_conn *c, SV *io);

static void conn_write_ready (struct feer_conn *c);
static void respond_with_server_error(struct feer_conn *c, const char *msg, STRLEN msg_len, int code);
Expand Down Expand Up @@ -174,7 +175,6 @@ static struct rinq *request_ready_rinq = NULL;

static AV *psgi_ver;
static SV *psgi_serv10, *psgi_serv11, *crlf_sv;
static SV *pump_io_cv = NULL;

// TODO: make this thread-local if and when there are multiple C threads:
struct ev_loop *feersum_ev_loop = NULL;
Expand Down Expand Up @@ -255,6 +255,7 @@ add_sv_to_wbuf(struct feer_conn *c, SV *sv)
else {
sv = SvREFCNT_inc(sv);
}

m->iov[idx].iov_base = SvPV(sv, cur);
m->iov[idx].iov_len = cur;
m->sv[idx] = sv;
Expand Down Expand Up @@ -597,7 +598,11 @@ try_conn_write(EV_P_ struct ev_io *w, int revents)
return;
}

call_poll_callback(c, 1);
if (c->poll_write_cb_is_io_handle)
pump_io_handle(c, c->poll_write_cb);
else
call_poll_callback(c, 1);

// callback didn't write anything:
if (!c->wbuf_rinq) goto try_write_again;
}
Expand Down Expand Up @@ -1413,7 +1418,8 @@ feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
SV **elt = av_fetch(abody, i, 0);
if (elt == NULL) continue;
SV *sv = *elt;
if (SvMAGICAL(sv)) sv = newSVsv(sv); // copy to remove magic
// copy to remove magic
if (SvMAGICAL(sv)) sv = sv_2mortal(newSVsv(sv));
if (!SvOK(sv)) continue;
if (SvROK(sv)) sv = SvRV(sv);
cur = add_sv_to_wbuf(c,sv);
Expand Down Expand Up @@ -1522,8 +1528,8 @@ call_died (pTHX_ struct feer_conn *c, const char *cb_type)
static void
call_request_callback (struct feer_conn *c)
{
dSP;
dTHX;
dSP;
int flags;
c->in_callback++;

Expand Down Expand Up @@ -1578,13 +1584,10 @@ call_request_callback (struct feer_conn *c)
static void
call_poll_callback (struct feer_conn *c, bool is_write)
{
dSP;
dTHX;
dSP;

SV *cb = (is_write) ? c->poll_write_cb : NULL;
SV *io;
SV *ret;
int flags;

if (!cb) return;

Expand All @@ -1596,27 +1599,9 @@ call_poll_callback (struct feer_conn *c, bool is_write)
ENTER;
SAVETMPS;
PUSHMARK(SP);

if (is_write && c->poll_write_cb_is_io_handle) {
flags = G_EVAL;
// Can't do this in the BOOT section, unfortunately:
if (pump_io_cv == NULL) {
CV *pump = get_cv("Feersum::Connection::_pump_io", 0);
pump_io_cv = newRV_inc((SV*)pump);
}
cb = pump_io_cv;
// it's an RV so copy is light
XPUSHs(sv_2mortal(newSVsv(c->poll_write_cb)));
ret = newSV(0);
XPUSHs(ret);
}
else {
flags = G_DISCARD|G_EVAL|G_VOID;
XPUSHs(sv_2mortal(new_feer_conn_handle(c, is_write)));
}

XPUSHs(sv_2mortal(new_feer_conn_handle(c, is_write)));
PUTBACK;
call_sv(cb, flags);
call_sv(cb, G_DISCARD|G_EVAL|G_VOID);
SPAGAIN;

trace("called %s poll callback, errsv? %d\n",
Expand All @@ -1625,27 +1610,91 @@ call_poll_callback (struct feer_conn *c, bool is_write)
if (SvTRUE(ERRSV)) {
call_died(aTHX_ c, is_write ? "write poll" : "read poll");
}
else if (is_write && c->poll_write_cb_is_io_handle) {
if (!SvOK(ret)) {
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
finish_wbuf(c);
c->responding = RESPOND_SHUTDOWN;
}
else {
if (c->is_http11)
add_chunk_sv_to_wbuf(c, ret);
else
add_sv_to_wbuf(c, ret);

trace("leaving %s poll callback\n", is_write ? "write" : "read");
PUTBACK;
FREETMPS;
LEAVE;

c->in_callback--;
}

static void
pump_io_handle (struct feer_conn *c, SV *io)
{
dTHX;
dSP;
SV *ret = NULL;

if (!io) return;

c->in_callback++;

trace("pump io handle %d\n", c->fd);

ENTER;
SAVETMPS;

// Emulate `local $/ = \4096;`
SV *old_rs = PL_rs;
PL_rs = sv_2mortal(newRV_noinc(newSViv(4096)));
sv_setsv(get_sv("/", GV_ADD), PL_rs);

PUSHMARK(SP);
XPUSHs(c->poll_write_cb);
PUTBACK;
call_method("getline", G_SCALAR|G_EVAL);
SPAGAIN;

trace("called getline on io handle, errsv? %d %d\n",
SvTRUE(ERRSV) ? 1 : 0, c->fd);

if (SvTRUE(ERRSV)) {
call_died(aTHX_ c, "getline on io handle");
goto done_pump_io;
}

ret = POPs;
if (SvMAGICAL(ret))
ret = sv_2mortal(newSVsv(ret));

if (!SvOK(ret)) {
// returned undef, so call the close method out of niceity
PUSHMARK(SP);
XPUSHs(c->poll_write_cb);
PUTBACK;
call_method("close", G_VOID|G_DISCARD|G_EVAL);
SPAGAIN;

if (SvTRUE(ERRSV)) {
STRLEN len;
const char *err = SvPV(ERRSV,len);
trouble("Couldn't close body IO handle: %.*s",len,err);
}
SvREFCNT_dec(ret);

SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
finish_wbuf(c);
c->responding = RESPOND_SHUTDOWN;

goto done_pump_io;
}

trace("leaving %s poll callback\n", is_write ? "write" : "read");
if (c->is_http11)
add_chunk_sv_to_wbuf(c, ret);
else
add_sv_to_wbuf(c, ret);

done_pump_io:
trace("leaving pump io handle %d\n", c->fd);

PUTBACK;
FREETMPS;
LEAVE;

PL_rs = old_rs;
sv_setsv(get_sv("/", GV_ADD), old_rs);

c->in_callback--;
}

Expand Down
7 changes: 0 additions & 7 deletions lib/Feersum/Connection.pm
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ sub _initiate_streaming_psgi {
goto &$streamer;
}

sub _pump_io {
local $/ = \4096;
$_[1] = $_[0]->getline();
$_[0]->close() unless defined $_[1];
return;
}

1;
__END__
Expand Down
3 changes: 2 additions & 1 deletion t/52-psgi-iohandle.t
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!perl
use warnings;
use strict;
use Test::More tests => 30;
use Test::More tests => 33;
use lib 't'; use Utils;
use File::Temp qw/tempfile/;
use Encode qw/decode_utf8/;
Expand All @@ -26,6 +26,7 @@ $evh->use_socket($socket);
sub new { return bless {lines => $_[1]}, __PACKAGE__ }
sub getline {
my $self = shift;
Test::More::ok(ref($/) && ${$/} == 4096, '$/ is \4096');
return shift @{$self->{lines}};
}
sub close {}
Expand Down

0 comments on commit 5d2d6ce

Please sign in to comment.