Permalink
Browse files

Refactoring aroundware, Part III: added new SimpleAroundware and Barr…

…ierAroundware, to replace the soon-deprecated (but still functional) AsyncAroundware and cronies.

* In AsyncMiddleware, moved callback hook into own method making it easier to follow; also, post_process executes in a safely{} block, avoiding a source of hung calls.
* Created two pairs of base modules for aroundware: SimpleAroundware and SimpleAroundwareFactory handle the case where you *may* want to share information across pre- and post-processing, but don't need to have a barrier to clear pending calls.
* BarrierAroundware and BarrierAroundwareFactory respect the same interface, but add equivalent functionality to EM::Multi.
  - Any deferrable you #enqueue goes into a pending_requests pool; once #pre_process returns, the downstream callback's response also goes in the pending_requests pool.
  - The BarrierAroundware's post_process method will not resume until all pending_requests (the aroundware's and the downstream response) have completed.
  - You're free to at any time also call #perform!, which concurrently waits for the pending pool to clear and then resume.
  - Completed requests are added to the succeses or failures hash as appropriate; and passed to the instance setter named for that handle if any (so, enqueue(:shortened_url, su_req) will eventually call self.shortened_url = su_req on completion).
In a following commit, I'll move AsyncAroundware, ResponseReceiver, MongoReciever to a deprecated/ directory and explain the differences
  • Loading branch information...
1 parent 80fc1d1 commit 0f6ed2aa45802c0d419ed529051dbde74aa833ea Philip (flip) Kromer committed Jul 30, 2011
@@ -3,8 +3,10 @@ module Rack
class AsyncAroundware
include Goliath::Rack::Validator
- # Called by the framework to create the middleware. Any extra args passed
- # to the use statement are sent to each response_receiver_klass as it is created.
+ # Called by the framework to create the middleware.
+ #
+ # Any extra args passed to the use statement are sent to each
+ # aroundware_klass as it is created.
#
# @example
# class Awesomizer2011 < Goliath::Rack::MultiReceiver
@@ -21,74 +23,78 @@ class AsyncAroundware
# end
#
# @param app [#call] the downstream app
- # @param response_receiver_klass a class that quacks like a
+ # @param aroundware_klass a class that quacks like a
# Goliath::Rack::ResponseReceiver and an EM::Deferrable
- # @param *args [Array] extra args to pass to the response_receiver
- # @return [Goliath::Rack::AsyncMiddleware]
- def initialize app, response_receiver_klass, *args
+ # @param *args [Array] extra args to pass to the aroundware
+ # @return [Goliath::Rack::AsyncAroundware]
+ def initialize app, aroundware_klass, *args
@app = app
- @response_receiver_klass = response_receiver_klass
- @response_receiver_args = args
+ @aroundware_klass = aroundware_klass
+ @aroundware_args = args
end
- # This coordinates a response_receiver to process a request. We hook the
- # response_receiver in the middle of the async_callback chain:
- # * send the downstream response to the response_receiver, whether received directly
+ # Coordinates aroundware to process a request.
+ #
+ # We hook the aroundware in the middle of the async_callback chain:
+ # * send the downstream response to the aroundware, whether received directly
# from @app.call or via async callback
- # * have the upstream callback chain be invoked when the response_receiver completes
+ # * have the upstream callback chain be invoked when the aroundware completes
#
# @param env [Goliath::Env] The goliath environment
# @return [Array] The [status_code, headers, body] tuple
def call(env)
- response_receiver = new_response_receiver(env)
+ aroundware = new_aroundware(env)
- response_receiver_resp = response_receiver.pre_process
+ aroundware_resp = aroundware.pre_process
- hook_into_callback_chain(env, response_receiver)
+ hook_into_callback_chain(env, aroundware)
downstream_resp = @app.call(env)
- # if downstream resp is final, pass it to the response_receiver; it will invoke
+ # if downstream resp is final, pass it to the aroundware; it will invoke
# the callback chain at its leisure. Our response is *always* async.
if final_response?(downstream_resp)
- safely(env){ response_receiver.call(downstream_resp) }
+ aroundware.call(downstream_resp)
end
return Goliath::Connection::AsyncResponse
end
- # Generate a response_receiver to process the request, using request env & any args
- # passed to this Response_ReceiverMiddleware at creation
- #
- # @param env [Goliath::Env] The goliath environment
- # @return [Goliath::Rack::AsyncResponse_Receiver] The response_receiver to process this request
- def new_response_receiver(env)
- @response_receiver_klass.new(env, *@response_receiver_args)
- end
-
- # Put response_receiver in the middle of the async_callback chain:
+ # Put aroundware in the middle of the async_callback chain:
# * save the old callback chain;
- # * have the downstream callback send results to the response_receiver (possibly
+ # * have the downstream callback send results to the aroundware (possibly
# completing it)
- # * set the old callback chain to fire when the response_receiver completes
- def hook_into_callback_chain(env, response_receiver)
+ # * set the old callback chain to fire when the aroundware completes
+ def hook_into_callback_chain(env, aroundware)
async_callback = env['async.callback']
- # The response from the downstream app is accepted by the response_receiver...
- downstream_callback = Proc.new{|resp| safely(env){ response_receiver.call(resp) } }
+ # The response from the downstream app is accepted by the aroundware...
+ downstream_callback = Proc.new do |resp|
+ safely(env){ aroundware.call(resp) }
+ end
env['async.callback'] = downstream_callback
- # .. but the upstream chain is only invoked when the response_receiver completes
+ # .. but the upstream chain is only invoked when the aroundware completes
invoke_upstream_chain = Proc.new do
- response_receiver_resp = safely(env){ response_receiver.post_process }
- async_callback.call(response_receiver_resp)
+ new_resp = safely(env){ aroundware.post_process }
+ async_callback.call(new_resp)
end
- response_receiver.callback(&invoke_upstream_chain)
- response_receiver.errback(&invoke_upstream_chain)
+ aroundware.callback(&invoke_upstream_chain)
+ aroundware.errback(&invoke_upstream_chain)
end
def final_response?(resp)
resp != Goliath::Connection::AsyncResponse
end
+
+ # Generate a aroundware to process the request, using request env & any args
+ # passed to this AsyncAroundware at creation
+ #
+ # @param env [Goliath::Env] The goliath environment
+ # @return [Goliath::Rack::ResponseReceiver] The response_receiver to process this request
+ def new_aroundware(env)
+ @aroundware_klass.new(env, *@aroundware_args)
+ end
+
end
end
end
@@ -41,6 +41,8 @@ module Rack
# next request. Everything that you need to store needs to be stored in
# local variables.
module AsyncMiddleware
+ include Goliath::Rack::Validator
+
# Called by the framework to create the middleware.
#
# @param app [Proc] The application
@@ -65,19 +67,38 @@ def initialize(app)
# @param env [Goliath::Env] The goliath environment
# @return [Array] The [status_code, headers, body] tuple
def call(env, *args)
- async_cb = env['async.callback']
- env['async.callback'] = Proc.new do |status, headers, body|
- async_cb.call(post_process(env, status, headers, body, *args))
- end
+ hook_into_callback_chain(env, *args)
- status, headers, body = @app.call(env)
+ downstream_resp = @app.call(env)
- if status == Goliath::Connection::AsyncResponse.first
- [status, headers, body]
- else
+ if final_response?(downstream_resp)
+ status, headers, body = downstream_resp
post_process(env, status, headers, body, *args)
+ else
+ return Goliath::Connection::AsyncResponse
+ end
+ end
+
+ # Put a callback block in the middle of the async_callback chain:
+ # * save the old callback chain;
+ # * have the downstream callback send results to our proc...
+ # * which fires old callback chain when it completes
+ def hook_into_callback_chain(env, *args)
+ async_callback = env['async.callback']
+
+ # The response from the downstream app is sent to post_process
+ # and then directly up the callback chain
+ downstream_callback = Proc.new do |status, headers, body|
+ new_resp = safely(env){ post_process(env, status, headers, body, *args) }
+ async_callback.call(new_resp)
end
+
+ env['async.callback'] = downstream_callback
+ end
+
+ def final_response?(resp)
+ resp != Goliath::Connection::AsyncResponse
end
# Override this method in your middleware to perform any
@@ -88,6 +109,7 @@ def call(env, *args)
def post_process(env, status, headers, body)
[status, headers, body]
end
+
end
end
end
@@ -0,0 +1,81 @@
+module Goliath
+ module Rack
+
+ #
+ # The strategy here is similar to that of EM::Multi. Figuring out what goes
+ # on there will help you understand this.
+ #
+ module BarrierAroundware
+ include EventMachine::Deferrable
+ include Goliath::Rack::SimpleAroundware
+
+ # Pool with handles of pending requests
+ attr_reader :pending_requests
+ # Pool with handles of sucessful requests
+ attr_reader :successes
+ # Pool with handles of failed requests
+ attr_reader :failures
+
+ # @param env [Goliath::Env] The request environment
+ # @return [Goliath::Rack::AsyncBarrier]
+ def initialize(env)
+ @env = env
+ @pending_requests = Set.new
+ @successes = {}
+ @failures = {}
+ end
+
+ # On receipt of an async result,
+ # * remove the tracking handle from pending_requests
+ # * and file the response in either successes or failures as appropriate
+ # * call the setter for that handle if any (on receipt of :shortened_url,
+ # calls self.shortened_url = resp)
+ # * check progress -- succeeds (transferring controll) if nothing is pending.
+ def accept_response(handle, resp_succ, resp, fiber=nil)
+ raise "received response for a non-pending request!" if not pending_requests.include?(handle)
+ pending_requests.delete(handle)
+ resp_succ ? (successes[handle] = resp) : (failures[handle] = resp)
+ self.send("#{handle}=", resp) if self.respond_to?("#{handle}=")
+ check_progress(fiber)
+ end
+
+ # Add a deferred request to the pending pool, and set a callback to
+ # #accept_response when the request completes
+ def enqueue(handle, deferred_req)
+ fiber = Fiber.current
+ add_to_pending(handle)
+ deferred_req.callback{ safely(env){ accept_response(handle, true, deferred_req, fiber) } }
+ deferred_req.errback{ safely(env){ accept_response(handle, false, deferred_req, fiber) } }
+ end
+
+ # Register a pending request. If you call this from outside #enqueue, you
+ # must construct callbacks that eventually invoke accept_response
+ def add_to_pending(handle)
+ set_deferred_status(nil) # we're not done yet, even if we were
+ @pending_requests << handle
+ end
+
+ def finished?
+ pending_requests.empty?
+ end
+
+ # Perform will yield (allowing other processes to continue) until all
+ # pending responses complete. You're free to enqueue responses, call
+ # perform,
+ def perform
+ Fiber.yield unless finished?
+ end
+
+ protected
+
+ def check_progress(fiber)
+ if finished?
+ succeed
+ # continue processing
+ fiber.resume(self) if fiber && fiber.alive? && fiber != Fiber.current
+ end
+ end
+
+ end
+ end
+end
@@ -0,0 +1,84 @@
+module Goliath
+ module Rack
+ #
+ # Include this to enable middleware that can perform pre- and
+ # post-processing, optionally having multiple responses pending.
+ #
+ # For internal reasons, you can't do the following as you would in Rack:
+ #
+ # def call(env)
+ # # ... do pre-processing
+ # status, headers, body = @app.call(env)
+ # new_body = make_totally_awesome(body) ## !! BROKEN !!
+ # [status, headers, new_body]
+ # end
+ #
+ # This class creates a "aroundware" helper to do that kind of "around"
+ # processing. Goliath proceeds asynchronously, but will still "unwind" the
+ # request by walking up the callback chain. Delegating out to the
+ # aroundware also lets you carry state around -- the ban on instance
+ # variables no longer applies, as each aroundware is unique per request.
+ #
+ # @example
+ # class ShortenUrl
+ # attr_accessor :shortened_url
+ # include Goliath::Rack::BarrierAroundware
+ #
+ # def pre_process
+ # target_url = PostRank::URI.clean(env.params['url'])
+ # shortener_request = EM::HttpRequest.new('http://is.gd/create.php').aget(:query => { :format => 'simple', :url => target_url })
+ # enqueue :shortened_url, shortener_request
+ # Goliath::Connection::AsyncResponse
+ # end
+ #
+ # # by the time you get here, the AroundwareFactory will have populated
+ # # the [status, headers, body] and the shortener_request will have
+ # # populated the shortened_url attribute.
+ # def post_process
+ # if succeeded?(:shortened_url)
+ # headers['X-Shortened-URI'] = shortened_url
+ # end
+ # [status, headers, body]
+ # end
+ # end
+ #
+ # class AwesomeApiWithShortening < Goliath::API
+ # use Goliath::Rack::Params
+ # use Goliath::Rack::AroundwareFactory, ShortenUrl
+ # def response(env)
+ # # ... do something awesome
+ # end
+ # end
+ #
+ class BarrierAroundwareFactory
+ include Goliath::Rack::Validator
+ include Goliath::Rack::AroundwareFactory
+
+ # Put aroundware in the middle of the async_callback chain:
+ # * save the old callback chain;
+ # * have the downstream callback send results to the aroundware (possibly
+ # completing it)
+ # * set the old callback chain to fire when the aroundware completes
+ def hook_into_callback_chain(env, aroundware)
+ async_callback = env['async.callback']
+
+ # The response from the downstream app is accepted by the aroundware...
+ downstream_callback = Proc.new do |resp|
+ safely(env){ aroundware.accept_response(:downstream_resp, true, resp) }
+ end
+
+ # .. but the upstream chain is only invoked when the aroundware completes
+ invoke_upstream_chain = Proc.new do
+ new_resp = safely(env){ aroundware.post_process }
+ async_callback.call(new_resp)
+ end
+
+ env['async.callback'] = downstream_callback
+ aroundware.add_to_pending(:downstream_resp)
+ aroundware.callback(&invoke_upstream_chain)
+ aroundware.errback(&invoke_upstream_chain)
+ end
+
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit 0f6ed2a

Please sign in to comment.