Skip to content

Commit

Permalink
PSGI streaming response handler
Browse files Browse the repository at this point in the history
  • Loading branch information
stash committed Sep 6, 2010
1 parent a380521 commit 3f40dc5
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 5 deletions.
31 changes: 26 additions & 5 deletions Feersum.xs
Expand Up @@ -1295,9 +1295,24 @@ feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret)
call_died(aTHX_ c, "request");
}
}
// else if (SvROK(ret) && SvTYPE(SvRV(ret)) == SVt_PVCV) {
// // TODO streaming mode
// }
else if (SvROK(ret) && SvTYPE(SvRV(ret)) == SVt_PVCV) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
SV *conn_sv = sv_2mortal(feer_conn_2sv(c));
XPUSHs(conn_sv);
XPUSHs(ret);
PUTBACK;
call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID);
SPAGAIN;
if (SvTRUE(ERRSV)) {
call_died(aTHX_ c, "initiate streaming");
}
PUTBACK;
FREETMPS;
LEAVE;
}
else {
sv_setpv(ERRSV, "Unsupported PSGI response");
call_died(aTHX_ c, "request");
Expand Down Expand Up @@ -1335,7 +1350,6 @@ call_request_callback (struct feer_conn *c)
SAVETMPS;
PUSHMARK(SP);


if (request_cb_is_psgi) {
SV *env = feersum_env(aTHX_ c,newHV());
// SV *conn_sv = feer_conn_2sv(c);
Expand All @@ -1360,15 +1374,22 @@ call_request_callback (struct feer_conn *c)
call_died(aTHX_ c, "request");
}

SV *psgi_response;
if (request_cb_is_psgi) {
feersum_handle_psgi_response(aTHX_ c, POPs);
psgi_response = *sp;
SvREFCNT_inc(psgi_response);
}

trace("leaving request callback\n");
PUTBACK;
FREETMPS;
LEAVE;

if (request_cb_is_psgi) {
feersum_handle_psgi_response(aTHX_ c, psgi_response);
SvREFCNT_dec(psgi_response);
}

c->in_callback--;
}

Expand Down
20 changes: 20 additions & 0 deletions lib/Feersum/Connection.pm
Expand Up @@ -23,6 +23,26 @@ sub initiate_streaming {
goto &$streamer;
}

sub _initiate_streaming_psgi {
my ($self, $streamer) = @_;
@_ = (sub {
my $strm = shift;
if ($#$strm == 2) {
$self->start_response($strm->[0],$strm->[1],0);
$self->write_whole_body(ref($strm->[2]) ? $strm->[2] : \$strm->[2]);
}
elsif ($#$strm == 1) {
$self->start_response($strm->[0],$strm->[1],1);
return $self->write_handle;
}
else {
die "streaming responder expected array";
}
return;
});
goto &$streamer;
}

1;
__END__
Expand Down
128 changes: 128 additions & 0 deletions t/51-psgi-streaming.t
@@ -0,0 +1,128 @@
#!perl
use warnings;
use strict;
use Test::More tests => 21;
use lib 't'; use Utils;
use AnyEvent::HTTP;

BEGIN { use_ok('Feersum') };

my ($socket,$port) = get_listen_socket();
ok $socket, "made listen socket";

my $evh = Feersum->new();
{
no warnings 'redefine';
*Feersum::DIED = sub {
my $err = shift;
fail "Died during request handler: $err";
};
}
$evh->use_socket($socket);

{
package Message;
my $n = 0;
sub new { return bless {}, 'Message' }
sub to_json { ++$n; return qq({"message":"O hai $n"}) }
}

sub wait_for_new_message {
my $cb = shift;
my $t; $t = AE::timer rand(0.5),0,sub {
$cb->(Message->new());
undef $t; # cancel circular-ref
};
return;
}

# from the PSGI::FAQ
my $APP = <<'EOAPP';
my $app = sub {
my $env = shift;
unless ($env->{'psgi.streaming'}) {
die "This application needs psgi.streaming support";
}
Test::More::pass "called app";
return sub {
Test::More::pass "called streamer";
my $respond = shift;
wait_for_new_message(sub {
my $message = shift;
my $body = [ $message->to_json ];
Test::More::pass "sending response";
undef $env;
$respond->([200, ['Content-Type', 'application/json'], $body]);
Test::More::pass "sent response";
});
};
};
EOAPP

my $app = eval $APP;
ok $app, 'got an app' || diag $@;
$evh->psgi_request_handler($app);

returning_body: {
my $cv = AE::cv;

$cv->begin;
http_request GET => "http://localhost:$port/", timeout => 300, sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'application/json', "... is JSON";
is $body, q({"message":"O hai 1"}), '... correct body';
$cv->end;
};

$cv->recv;
pass "all done app 1";
}

my $APP2 = <<'EOAPP';
my $app2 = sub {
my $env = shift;
unless ($env->{'psgi.streaming'}) {
die "This application needs psgi.streaming support";
}
Test::More::pass "called app2";
return sub {
Test::More::pass "called streamer2";
my $respond = shift;
wait_for_new_message(sub {
my $message = shift;
Test::More::pass "sending response2";
my $w = $respond->([200, ['Content-Type', 'application/json']]);
Test::More::pass "started response2";
$w->write($message->to_json);
Test::More::pass "done response2";
$w->close;
undef $env;
});
};
};
EOAPP

my $app2 = eval $APP2;
ok $app2, 'got app 2' || diag $@;
$evh->psgi_request_handler($app2);

using_writer: {
my $cv = AE::cv;

$cv->begin;
http_request GET => "http://localhost:$port/", timeout => 300, sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'application/json', "... is JSON";
ok(
($body eq "15\r\n".q({"message":"O hai 2"})."\r\n0\r\n\r\n"
or
$body eq q({"message":"O hai 2"})),
'... correct body (maybe AE::HTTP doesn\'t decode chunked)');
$cv->end;
};

$cv->recv;
pass "all done app 2";
}

0 comments on commit 3f40dc5

Please sign in to comment.