Permalink
Browse files

Refactoring async_aroundware, Part II. Refactored to make the control…

… flow clearer. This is mostly cosmetic, and sets the stage for a new interface that will reduce the distinction between AsyncAroundware and normal middleware.
  • Loading branch information...
1 parent 8df93be commit 80fc1d12ebc1ade2c1f7f511a1f4ac69f62e9d03 Philip (flip) Kromer committed Jul 30, 2011
Showing with 44 additions and 14 deletions.
  1. +27 −12 lib/goliath/rack/async_aroundware.rb
  2. +17 −2 lib/goliath/synchrony/response_receiver.rb
@@ -7,7 +7,7 @@ class AsyncAroundware
# to the use statement are sent to each response_receiver_klass as it is created.
#
# @example
- # class MyResponseReceiver < Goliath::Rack::MultiReceiver
+ # class Awesomizer2011 < Goliath::Rack::MultiReceiver
# def initialize(env, aq)
# @awesomeness_quotient = aq
# super(env)
@@ -16,7 +16,7 @@ class AsyncAroundware
# end
#
# class AwesomeApiWithShortening < Goliath::API
- # use Goliath::Rack::AsyncAroundware, MyResponseReceiver, 3
+ # use Goliath::Rack::AsyncAroundware, Awesomizer2011, 3
# # ... stuff ...
# end
#
@@ -33,7 +33,7 @@ def initialize app, response_receiver_klass, *args
# This coordinates a response_receiver to process a request. We hook the
# response_receiver in the middle of the async_callback chain:
- # * send the downstream response to the barrier, whether received directly
+ # * send the downstream response to the response_receiver, whether received directly
# from @app.call or via async callback
# * have the upstream callback chain be invoked when the response_receiver completes
#
@@ -42,12 +42,18 @@ def initialize app, response_receiver_klass, *args
def call(env)
response_receiver = new_response_receiver(env)
- hook_into_callback_chain(env, response_receiver)
-
response_receiver_resp = response_receiver.pre_process
+ hook_into_callback_chain(env, response_receiver)
+
downstream_resp = @app.call(env)
- response_receiver.call(downstream_resp)
+
+ # if downstream resp is final, pass it to the response_receiver; it will invoke
+ # the callback chain at its leisure. Our response is *always* async.
+ if final_response?(downstream_resp)
+ safely(env){ response_receiver.call(downstream_resp) }
+ end
+ return Goliath::Connection::AsyncResponse
end
# Generate a response_receiver to process the request, using request env & any args
@@ -59,20 +65,29 @@ def new_response_receiver(env)
@response_receiver_klass.new(env, *@response_receiver_args)
end
- # put response_receiver in the middle of the async_callback chain:
+ # Put response_receiver in the middle of the async_callback chain:
# * save the old callback chain;
# * have the downstream callback send results to the response_receiver (possibly
# completing it)
# * set the old callback chain to fire when the response_receiver completes
def hook_into_callback_chain(env, response_receiver)
async_callback = env['async.callback']
- env['async.callback'] = response_receiver
- response_receiver.callback{ do_postprocess(env, async_callback, response_receiver) }
- response_receiver.errback{ do_postprocess(env, async_callback, response_receiver) }
+
+ # The response from the downstream app is accepted by the response_receiver...
+ downstream_callback = Proc.new{|resp| safely(env){ response_receiver.call(resp) } }
+ env['async.callback'] = downstream_callback
+
+ # .. but the upstream chain is only invoked when the response_receiver completes
+ invoke_upstream_chain = Proc.new do
+ response_receiver_resp = safely(env){ response_receiver.post_process }
+ async_callback.call(response_receiver_resp)
+ end
+ response_receiver.callback(&invoke_upstream_chain)
+ response_receiver.errback(&invoke_upstream_chain)
end
- def do_postprocess(env, async_callback, response_receiver)
- safely(env){ async_callback.call(response_receiver.post_process) }
+ def final_response?(resp)
+ resp != Goliath::Connection::AsyncResponse
end
end
end
@@ -24,18 +24,33 @@ def post_process
[status, headers, body]
end
+ # Virtual setter for the downstream middleware/endpoint response
+ def downstream_resp=(status_headers_body)
+ @status, @headers, @body = status_headers_body
+ end
+
# Invoked by the async_callback chain. Stores the [status, headers, body]
# for post_process'ing
def call shb
return shb if shb.first == Goliath::Connection::AsyncResponse.first
- @status, @headers, @body = shb
- succeed if finished?
+ self.downstream_resp = shb
+ check_progress
end
# Have we received a response?
def response_received?
!! @status
end
+
+ protected
+
+ def check_progress(fiber)
+ if finished?
+ succeed
+ # continue processing
+ fiber.resume(self) if fiber && fiber.alive? && fiber != Fiber.current
+ end
+ end
end
class MultiReceiver < EM::Synchrony::Multi

0 comments on commit 80fc1d1

Please sign in to comment.