Permalink
Browse files

Generalized Blocking middleware and introduced psgix.block.response a…

…nd .body handler
  • Loading branch information...
1 parent aa62cfa commit 905a5c361ae21ea21d5549cbef7f1e036facf96f @miyagawa committed Oct 20, 2009
Showing with 57 additions and 36 deletions.
  1. +2 −2 eg/demo.pl
  2. +8 −4 lib/Tatsumaki/Handler.pm
  3. +0 −30 lib/Tatsumaki/Middleware/Blocking.pm
  4. +47 −0 lib/Tatsumaki/Middleware/BlockingFallback.pm
View
@@ -171,8 +171,8 @@ package main;
use Plack::Middleware::Static;
$app = Plack::Middleware::Static->wrap($app, path => qr/^\/static/, root => dirname(__FILE__));
-use Tatsumaki::Middleware::Blocking;
-$app = Tatsumaki::Middleware::Blocking->wrap($app);
+use Tatsumaki::Middleware::BlockingFallback;
+$app = Tatsumaki::Middleware::BlockingFallback->wrap($app);
# TODO this should be an external services module
use Try::Tiny;
@@ -86,14 +86,17 @@ sub run {
}
if ($self->is_asynchronous) {
- my $cv = AE::cv;
- $self->condvar($cv);
- $self->request->env->{'tatsumaki.block'} = sub { $cv->recv };
+ $self->condvar(my $cv = AE::cv);
+ $self->request->env->{'psgix.block.response'} = sub { $cv->recv };
return sub {
my $start_response = shift;
$cv->cb(sub {
my $w = $start_response->($_[0]->recv);
- $self->writer($w) if $w;
+ if ($w) {
+ $self->writer($w);
+ $self->condvar(my $cv2 = AE::cv);
+ $self->request->env->{'psgix.block.body'} = sub { $cv2->recv };
+ }
});
$self->$method(@{$self->args});
};
@@ -178,6 +181,7 @@ sub finish {
$self->flush(1);
if ($self->writer) {
$self->writer->close;
+ $self->condvar->send;
} elsif ($self->condvar) {
$self->condvar->send($self->response->finalize);
}
@@ -1,30 +0,0 @@
-package Tatsumaki::Middleware::Blocking;
-use strict;
-use base qw(Plack::Middleware);
-use Carp ();
-use Plack::Util;
-
-# Run asnynchronous Tatsumaki app in a blocking mode. See also Middleware::Writer
-sub call {
- my($self, $env) = @_;
-
- my $caller_supports_streaming = $env->{'psgi.streaming'};
- $env->{'psgi.streaming'} = Plack::Util::TRUE;
-
- my $res = $self->app->($env);
- return $res if $caller_supports_streaming;
-
- if (ref $res eq 'CODE') {
- $env->{'psgi.errors'}->print("psgi.nonblocking is off: running $env->{PATH_INFO} in a blocking mode\n");
- $res->(sub { $res = shift });
- $env->{'tatsumaki.block'}->();
- }
-
- unless (defined $res->[2]) {
- Carp::croak("stream_write is not supported on this server");
- }
-
- return $res;
-}
-
-1;
@@ -0,0 +1,47 @@
+package Tatsumaki::Middleware::BlockingFallback;
+use strict;
+use base qw(Plack::Middleware);
+use Carp ();
+use Plack::Util;
+use Scalar::Util ();
+
+# Run asnynchronous PSGI app in a blocking mode. See also Middleware::Writer
+sub call {
+ my($self, $env) = @_;
+
+ my $caller_supports_streaming = $env->{'psgi.streaming'};
+ $env->{'psgi.streaming'} = Plack::Util::TRUE;
+
+ my $res = $self->app->($env);
+ return $res if $caller_supports_streaming;
+
+ if (ref $res eq 'CODE') {
+ $env->{'psgi.errors'}->print("psgi.nonblocking is off: running $env->{PATH_INFO} in a blocking mode\n");
+ my $use_writer;
+ $res->(sub {
+ $res = shift;
+ unless (defined $res->[2]) {
+ $use_writer = 1;
+ my($closed, @body);
+ $res->[2] = \@body;
+ my $writer;
+ my $ref_up = $writer = Plack::Util::inline_object
+ poll_cb => sub { $_[0]->($writer) until $closed },
+ write => sub { push @body, $_[0] },
+ close => sub { $closed => 1 };
+
+ Scalar::Util::weaken($writer);
+ return $writer;
+ }
+ });
+
+ $env->{'psgix.block.response'}->() if $env->{'psgix.block.response'};
+ if ($use_writer) {
+ $env->{'psgix.block.body'}->() if $env->{'psgix.block.body'};
+ }
+ }
+
+ return $res;
+}
+
+1;

0 comments on commit 905a5c3

Please sign in to comment.