Skip to content

Commit

Permalink
Support IO::Handle-like responses from PSGI handlers.
Browse files Browse the repository at this point in the history
  • Loading branch information
stash committed Sep 28, 2010
1 parent 9c7974d commit 8419b0b
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 28 deletions.
115 changes: 89 additions & 26 deletions Feersum.xs
Expand Up @@ -1420,6 +1420,27 @@ feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
return RETVAL;
}

static void
feersum_start_psgi_streaming(pTHX_ struct feer_conn *c, SV *streamer)
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
SV *conn_sv = sv_2mortal(feer_conn_2sv(c));
XPUSHs(conn_sv);
XPUSHs(streamer);
PUTBACK;
call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
SPAGAIN;
if (SvTRUE(ERRSV)) {
call_died(aTHX_ c, "initiate streaming");
}
PUTBACK;
FREETMPS;
LEAVE;
}

static void
feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret)
{
Expand All @@ -1428,33 +1449,43 @@ feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret)
if (av_len(rav)+1 == 3) { // standard response triplet
trace("PSGI response triplet, c=%p av=%p len=%d\n", c, rav, av_len(rav));
SV *msg = av_shift(rav);
SV *hdrs = SvRV(av_shift(rav)); // XXX no typecheck for speed
feersum_start_response(aTHX_ c, msg, (AV*)hdrs, 0);
SV *body = av_shift(rav); // XXX no typecheck for speed
feersum_write_whole_body(aTHX_ c, body);
SV *hdrs = av_shift(rav);
SV *body = av_shift(rav);

if (!(SvROK(hdrs) && SvTYPE(SvRV(hdrs)) == SVt_PVAV)) {
sv_setpvs(ERRSV, "Headers must be an array-ref");
call_died(aTHX_ c, "request");
}
hdrs = SvRV(hdrs);

if (SvROK(body) && SvTYPE(SvRV(body)) == SVt_PVAV) {
feersum_start_response(aTHX_ c, msg, (AV*)hdrs, 0);
feersum_write_whole_body(aTHX_ c, body);
}
else if (SvROK(body)) { // assume it's an IO-handle
feersum_start_response(aTHX_ c, msg, (AV*)hdrs, 1);
SV *pump = (SV*)Perl_get_cvn_flags(aTHX_
STR_WITH_LEN("Feersum::Connection::_pump_io"),
GV_NOADD_NOINIT);
AV *pair = newAV();
av_push(pair, newRV_inc(pump));
body = newRV(SvRV(body));
av_push(pair, body);
c->poll_write_cb = (SV*)pair;
conn_write_ready(c);
}
else {
sv_setpvs(ERRSV, "Expected array-ref or object body");
call_died(aTHX_ c, "request");
}
}
else {
sv_setpvs(ERRSV, "Unsupported PSGI array response");
call_died(aTHX_ c, "request");
}
}
else if (SvROK(ret) && SvTYPE(SvRV(ret)) == SVt_PVCV) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
SV *conn_sv = sv_2mortal(feer_conn_2sv(c));
XPUSHs(conn_sv);
XPUSHs(ret);
PUTBACK;
call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
SPAGAIN;
if (SvTRUE(ERRSV)) {
call_died(aTHX_ c, "initiate streaming");
}
PUTBACK;
FREETMPS;
LEAVE;
feersum_start_psgi_streaming(aTHX_ c, ret);
}
else {
sv_setpvs(ERRSV, "Unsupported PSGI response");
Expand Down Expand Up @@ -1540,23 +1571,40 @@ call_poll_callback (struct feer_conn *c, bool is_write)
{
dSP;
dTHX;

bool write_iohandle = 0;
SV *cb = (is_write) ? c->poll_write_cb : NULL;
SV *io;
SV *ret;
int flags;

SV *cbrv = (is_write) ? c->poll_write_cb : NULL;
if (!cbrv) return;
if (!cb) return;

c->in_callback++;

trace("%s poll callback c=%p cbrv=%p\n",
is_write ? "write" : "read", c, cbrv);
is_write ? "write" : "read", c, cb);

ENTER;
SAVETMPS;

PUSHMARK(SP);
XPUSHs(sv_2mortal(new_feer_conn_handle(c, is_write)));

if (is_write && SvTYPE(cb) == SVt_PVAV) {
write_iohandle = 1;
flags = G_EVAL;
cb = *(av_fetch((AV*)c->poll_write_cb, 0, 0));
io = *(av_fetch((AV*)c->poll_write_cb, 1, 0));
XPUSHs(sv_2mortal(newSVsv(io))); // it's an RV so copy is light
ret = newSV(0);
XPUSHs(ret);
}
else {
flags = G_DISCARD|G_EVAL|G_VOID;
XPUSHs(sv_2mortal(new_feer_conn_handle(c, is_write)));
}

PUTBACK;
call_sv(cbrv, G_DISCARD|G_EVAL|G_VOID);
call_sv(cb, flags);
SPAGAIN;

trace("called %s poll callback, errsv? %d\n",
Expand All @@ -1565,6 +1613,21 @@ call_poll_callback (struct feer_conn *c, bool is_write)
if (SvTRUE(ERRSV)) {
call_died(aTHX_ c, is_write ? "write poll" : "read poll");
}
else if (write_iohandle) {
if (!SvOK(ret)) {
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
finish_wbuf(c);
c->responding = RESPOND_SHUTDOWN;
}
else {
if (c->is_http11)
add_chunk_sv_to_wbuf(c, ret);
else
add_sv_to_wbuf(c, ret);
}
SvREFCNT_dec(ret);
}

trace("leaving %s poll callback\n", is_write ? "write" : "read");
PUTBACK;
Expand Down
4 changes: 2 additions & 2 deletions TODO
Expand Up @@ -15,9 +15,9 @@ psgi.input streaming

IO::Handle-like responses

* pump the getline() method when connection-writable (v1.0)
* check if it's got a real file descriptor, optimize (libeio or similar
* check if it's got a real file descriptor? optimize (libeio or similar
for non-blocking sendfile?) (v1.1)
* wait for readability using an ev watcher? (v1.1)

streamed responses

Expand Down
7 changes: 7 additions & 0 deletions lib/Feersum/Connection.pm
Expand Up @@ -43,6 +43,13 @@ sub _initiate_streaming_psgi {
goto &$streamer;
}

sub _pump_io {
local $/ = \4096;
$_[1] = $_[0]->getline();
$_[0]->close() unless defined $_[1];
return;
}

1;
__END__
Expand Down
164 changes: 164 additions & 0 deletions t/52-psgi-iohandle.t
@@ -0,0 +1,164 @@
#!perl
use warnings;
use strict;
use Test::More tests => 30;
use lib 't'; use Utils;
use File::Temp qw/tempfile/;
use Encode qw/decode_utf8/;

BEGIN { use_ok('Feersum') };

my ($socket,$port) = get_listen_socket();
ok $socket, "made listen socket";

my $evh = Feersum->new();
{
no warnings 'redefine';
*Feersum::DIED = sub {
my $err = shift;
fail "Died during request handler: $err";
};
}
$evh->use_socket($socket);

{
package FakeIOHandle;
sub new { return bless {lines => $_[1]}, __PACKAGE__ }
sub getline {
my $self = shift;
return shift @{$self->{lines}};
}
sub close {}
}

my $APP = <<'EOAPP';
my $app = sub {
my $env = shift;
Test::More::pass "called app";
my $io = FakeIOHandle->new([
"line one\n",
"line two\n"
]);
return [200,['Content-Type'=>'text/plain'],$io];
};
EOAPP

my $app = eval $APP;
ok $app, 'got an app' || diag $@;
$evh->psgi_request_handler($app);

returning_mock: {
my $cv = AE::cv;

$cv->begin;
my $h; $h = simple_client GET => '/', sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'text/plain';
is $body, qq(line one\nline two\n);
$cv->end;
undef $h;
};

$cv->recv;
pass "all done app 1";
}

my ($tempfh, $tempname) = tempfile(UNLINK=>1);
print $tempfh "temp line one\n";
print $tempfh "temp line two\n";
close $tempfh;

my $APP2 = <<'EOAPP';
my $app2 = sub {
my $env = shift;
Test::More::pass "called app2";
open my $io, '<', $tempname;
return [200,['Content-Type'=>'text/plain'],$io];
};
EOAPP

my $app2 = eval $APP2;
ok $app2, 'got app 2' || diag $@;
$evh->psgi_request_handler($app2);

returning_glob: {
my $cv = AE::cv;
$cv->begin;
my $h; $h = simple_client GET => '/', sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'text/plain';
is $body, qq(temp line one\ntemp line two\n);
$cv->end;
undef $h;
};
$cv->recv;
}

pass "all done app 2";

my $APP3 = <<'EOAPP';
my $app3 = sub {
my $env = shift;
Test::More::pass "called app3";
require IO::File;
my $io = IO::File->new($tempname,"r");
return [200,['Content-Type'=>'text/plain'],$io];
};
EOAPP

my $app3 = eval $APP3;
ok $app3, 'got app 3' || diag $@;
$evh->psgi_request_handler($app3);

returning_io_file: {
my $cv = AE::cv;
$cv->begin;
my $h; $h = simple_client GET => '/', sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'text/plain', "C-T";
is $body, qq(temp line one\ntemp line two\n), "body";
$cv->end;
undef $h;
};
$cv->recv;
}

pass "all done app 3";

{
open my $fh, '>:encoding(UTF-16LE)',$tempname;
print $fh "\x{2603}\n"; # U+2603 SNOWMAN, UTF-8: E2 98 83
close $fh;
}

my $APP4 = <<'EOAPP';
my $app4 = sub {
my $env = shift;
Test::More::pass "called app4";
open my $io, '<:encoding(UTF-16LE)',$tempname;
return [200,['Content-Type'=>'text/plain; charset=UTF-8'],$io];
};
EOAPP

my $app4 = eval $APP4;
ok $app4, 'got app 4' || diag $@;
$evh->psgi_request_handler($app4);

returning_perlio_layer: {
my $cv = AE::cv;
$cv->begin;
my $h; $h = simple_client GET => '/', sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'text/plain; charset=UTF-8', "C-T";
is decode_utf8($body), qq(\x{2603}\n), "utf8 body";
$cv->end;
undef $h;
};
$cv->recv;
}

pass "all done app 4";

0 comments on commit 8419b0b

Please sign in to comment.