From 3f40dc5d5f0dd27f3df0455cacf3fe94549a731c Mon Sep 17 00:00:00 2001 From: Stash Date: Mon, 6 Sep 2010 16:14:41 -0700 Subject: [PATCH] PSGI streaming response handler --- Feersum.xs | 31 +++++++-- lib/Feersum/Connection.pm | 20 ++++++ t/51-psgi-streaming.t | 128 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 5 deletions(-) create mode 100644 t/51-psgi-streaming.t diff --git a/Feersum.xs b/Feersum.xs index 5c23fce..6460648 100644 --- a/Feersum.xs +++ b/Feersum.xs @@ -1295,9 +1295,24 @@ feersum_handle_psgi_response(pTHX_ struct feer_conn *c, SV *ret) call_died(aTHX_ c, "request"); } } -// else if (SvROK(ret) && SvTYPE(SvRV(ret)) == SVt_PVCV) { -// // TODO streaming mode -// } + else if (SvROK(ret) && SvTYPE(SvRV(ret)) == SVt_PVCV) { + dSP; + ENTER; + SAVETMPS; + PUSHMARK(SP); + SV *conn_sv = sv_2mortal(feer_conn_2sv(c)); + XPUSHs(conn_sv); + XPUSHs(ret); + PUTBACK; + call_method("_initiate_streaming_psgi", G_DISCARD|G_EVAL|G_VOID); + SPAGAIN; + if (SvTRUE(ERRSV)) { + call_died(aTHX_ c, "initiate streaming"); + } + PUTBACK; + FREETMPS; + LEAVE; + } else { sv_setpv(ERRSV, "Unsupported PSGI response"); call_died(aTHX_ c, "request"); @@ -1335,7 +1350,6 @@ call_request_callback (struct feer_conn *c) SAVETMPS; PUSHMARK(SP); - if (request_cb_is_psgi) { SV *env = feersum_env(aTHX_ c,newHV()); // SV *conn_sv = feer_conn_2sv(c); @@ -1360,8 +1374,10 @@ call_request_callback (struct feer_conn *c) call_died(aTHX_ c, "request"); } + SV *psgi_response; if (request_cb_is_psgi) { - feersum_handle_psgi_response(aTHX_ c, POPs); + psgi_response = *sp; + SvREFCNT_inc(psgi_response); } trace("leaving request callback\n"); @@ -1369,6 +1385,11 @@ call_request_callback (struct feer_conn *c) FREETMPS; LEAVE; + if (request_cb_is_psgi) { + feersum_handle_psgi_response(aTHX_ c, psgi_response); + SvREFCNT_dec(psgi_response); + } + c->in_callback--; } diff --git a/lib/Feersum/Connection.pm b/lib/Feersum/Connection.pm index 35a7904..b0917ce 100644 --- a/lib/Feersum/Connection.pm +++ b/lib/Feersum/Connection.pm @@ -23,6 +23,26 @@ sub initiate_streaming { goto &$streamer; } +sub _initiate_streaming_psgi { + my ($self, $streamer) = @_; + @_ = (sub { + my $strm = shift; + if ($#$strm == 2) { + $self->start_response($strm->[0],$strm->[1],0); + $self->write_whole_body(ref($strm->[2]) ? $strm->[2] : \$strm->[2]); + } + elsif ($#$strm == 1) { + $self->start_response($strm->[0],$strm->[1],1); + return $self->write_handle; + } + else { + die "streaming responder expected array"; + } + return; + }); + goto &$streamer; +} + 1; __END__ diff --git a/t/51-psgi-streaming.t b/t/51-psgi-streaming.t new file mode 100644 index 0000000..e08b394 --- /dev/null +++ b/t/51-psgi-streaming.t @@ -0,0 +1,128 @@ +#!perl +use warnings; +use strict; +use Test::More tests => 21; +use lib 't'; use Utils; +use AnyEvent::HTTP; + +BEGIN { use_ok('Feersum') }; + +my ($socket,$port) = get_listen_socket(); +ok $socket, "made listen socket"; + +my $evh = Feersum->new(); +{ + no warnings 'redefine'; + *Feersum::DIED = sub { + my $err = shift; + fail "Died during request handler: $err"; + }; +} +$evh->use_socket($socket); + +{ + package Message; + my $n = 0; + sub new { return bless {}, 'Message' } + sub to_json { ++$n; return qq({"message":"O hai $n"}) } +} + +sub wait_for_new_message { + my $cb = shift; + my $t; $t = AE::timer rand(0.5),0,sub { + $cb->(Message->new()); + undef $t; # cancel circular-ref + }; + return; +} + +# from the PSGI::FAQ +my $APP = <<'EOAPP'; + my $app = sub { + my $env = shift; + unless ($env->{'psgi.streaming'}) { + die "This application needs psgi.streaming support"; + } + Test::More::pass "called app"; + return sub { + Test::More::pass "called streamer"; + my $respond = shift; + wait_for_new_message(sub { + my $message = shift; + my $body = [ $message->to_json ]; + Test::More::pass "sending response"; + undef $env; + $respond->([200, ['Content-Type', 'application/json'], $body]); + Test::More::pass "sent response"; + }); + }; + }; +EOAPP + +my $app = eval $APP; +ok $app, 'got an app' || diag $@; +$evh->psgi_request_handler($app); + +returning_body: { + my $cv = AE::cv; + + $cv->begin; + http_request GET => "http://localhost:$port/", timeout => 300, sub { + my ($body, $headers) = @_; + is $headers->{'Status'}, 200, "Response OK"; + is $headers->{'content-type'}, 'application/json', "... is JSON"; + is $body, q({"message":"O hai 1"}), '... correct body'; + $cv->end; + }; + + $cv->recv; + pass "all done app 1"; +} + +my $APP2 = <<'EOAPP'; + my $app2 = sub { + my $env = shift; + unless ($env->{'psgi.streaming'}) { + die "This application needs psgi.streaming support"; + } + Test::More::pass "called app2"; + return sub { + Test::More::pass "called streamer2"; + my $respond = shift; + wait_for_new_message(sub { + my $message = shift; + Test::More::pass "sending response2"; + my $w = $respond->([200, ['Content-Type', 'application/json']]); + Test::More::pass "started response2"; + $w->write($message->to_json); + Test::More::pass "done response2"; + $w->close; + undef $env; + }); + }; + }; +EOAPP + +my $app2 = eval $APP2; +ok $app2, 'got app 2' || diag $@; +$evh->psgi_request_handler($app2); + +using_writer: { + my $cv = AE::cv; + + $cv->begin; + http_request GET => "http://localhost:$port/", timeout => 300, sub { + my ($body, $headers) = @_; + is $headers->{'Status'}, 200, "Response OK"; + is $headers->{'content-type'}, 'application/json', "... is JSON"; + ok( + ($body eq "15\r\n".q({"message":"O hai 2"})."\r\n0\r\n\r\n" + or + $body eq q({"message":"O hai 2"})), + '... correct body (maybe AE::HTTP doesn\'t decode chunked)'); + $cv->end; + }; + + $cv->recv; + pass "all done app 2"; +}