Skip to content
Browse files

remove BlockingFallback and inline the blocking recv in the Handler c…

…ode.

This should now work with Plack topic/streaming-blocking branch.
  • Loading branch information...
1 parent 96c17d3 commit e1c93db2b44571acc15939c8407f6aab7f855a30 @miyagawa committed Jan 5, 2010
Showing with 6 additions and 53 deletions.
  1. +0 −2 lib/Tatsumaki/Application.pm
  2. +6 −3 lib/Tatsumaki/Handler.pm
  3. +0 −48 lib/Tatsumaki/Middleware/BlockingFallback.pm
View
2 lib/Tatsumaki/Application.pm
@@ -6,7 +6,6 @@ use Tatsumaki::Request;
use Try::Tiny;
use Plack::Middleware::Static;
-use Tatsumaki::Middleware::BlockingFallback;
use overload q(&{}) => sub { shift->psgi_app }, fallback => 1;
@@ -81,7 +80,6 @@ sub compile_psgi_app {
$app = Plack::Middleware::Static->wrap($app, path => sub { s/^\/static\/// }, root => $self->static_path);
}
- $app = Tatsumaki::Middleware::BlockingFallback->wrap($app);
$app;
}
View
9 lib/Tatsumaki/Handler.pm
@@ -108,7 +108,6 @@ sub run {
if ($self->is_asynchronous) {
$self->condvar(my $cv = AE::cv);
- $self->request->env->{'psgix.block.response'} = sub { $cv->recv };
return sub {
my $start_response = shift;
$cv->cb(sub {
@@ -119,7 +118,6 @@ sub run {
if (!$res->[2] && $w) {
$self->writer($w);
$self->condvar(my $cv2 = AE::cv);
- $self->request->env->{'psgix.block.body'} = sub { $cv2->recv };
}
} catch {
$start_response->($catch->());
@@ -132,6 +130,11 @@ sub run {
} catch {
$cv->croak($_);
};
+
+ unless ($self->request->env->{'psgi.nonblocking'}) {
+ $self->log("Running an async handler in a blocking server. MQ based app should cause a deadlock.\n");
+ $self->condvar->recv for 1..2; # response and writer
+ }
};
} else {
my $res = try {
@@ -210,7 +213,7 @@ sub flush {
my $res = $self->response->finalize;
delete $res->[2]; # gimme a writer
$self->condvar->send($res);
- $self->writer or Carp::croak("Can't get writer object back: you need servers with psgi.nonblocking");
+ $self->writer or Carp::croak("Can't get a writer object back: you need servers with psgi.streaming");
$self->flush();
}
}
View
48 lib/Tatsumaki/Middleware/BlockingFallback.pm
@@ -1,48 +0,0 @@
-package Tatsumaki::Middleware::BlockingFallback;
-use strict;
-use parent qw(Plack::Middleware);
-use Carp ();
-use Plack::Util;
-use Scalar::Util ();
-
-# Run asnynchronous PSGI app in a blocking mode. See also Middleware::Writer
-sub call {
- my($self, $env) = @_;
-
- my $caller_supports_streaming = $env->{'psgi.streaming'};
- $env->{'psgi.streaming'} = Plack::Util::TRUE;
-
- my $res = $self->app->($env);
- return $res if $caller_supports_streaming;
-
- if (ref $res eq 'CODE') {
- $env->{'psgi.errors'}->print("psgi.streaming is off: running $env->{PATH_INFO} in a blocking mode\n");
- my $use_writer;
- $res->(sub {
- $res = shift;
- unless (defined $res->[2]) {
- $env->{'psgi.errors'}->print("Buffering the output of $env->{PATH_INFO}: This might cause a deadlock\n");
- $use_writer = 1;
- my($closed, @body);
- $res->[2] = \@body;
- my $writer;
- my $ref_up = $writer = Plack::Util::inline_object
- poll_cb => sub { $_[0]->($writer) until $closed },
- write => sub { push @body, $_[0] },
- close => sub { $closed => 1 };
-
- Scalar::Util::weaken($writer);
- return $writer;
- }
- });
-
- $env->{'psgix.block.response'}->() if $env->{'psgix.block.response'};
- if ($use_writer) {
- $env->{'psgix.block.body'}->() if $env->{'psgix.block.body'};
- }
- }
-
- return $res;
-}
-
-1;

0 comments on commit e1c93db

Please sign in to comment.
Something went wrong with that request. Please try again.