Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Refactoring aroundware, Part V: Moved all the aroundware examples ove…

…r to use the new aroundware, doing necessary cleanup along the way.

* BarrierAroundware now store [req, resp] in the successes / failures hashes
* mongo things now work with both old and future em-mongo gems, at the cost of a big conditional 'if' statement in the file
* Added enqueue_acceptor to let you enqueue activities that take a block without yielding a deferrable
  • Loading branch information...
commit 20122fb36ccb76fc7a61ae3ca8c4457ca3640350 1 parent 6395370
Philip (flip) Kromer authored
View
2  Gemfile
@@ -1,3 +1,3 @@
source "http://rubygems.org"
-gemspec
+gemspec
View
25 examples/async_aroundware_demo.rb
@@ -2,7 +2,6 @@
$: << File.dirname(__FILE__)+'/../lib'
require 'goliath'
-require 'goliath/synchrony/response_receiver'
require 'em-synchrony/em-http'
require 'yajl/json_gem'
@@ -13,11 +12,11 @@
#
# To run this, start the 'test_rig.rb' server on port 9002:
#
-# ./examples/test_rig.rb -sv -p 9002
+# bundle exec ./examples/test_rig.rb -sv -p 9002
#
# And then start this server on port 9000:
#
-# ./async_aroundware_demo.rb -sv -p 9000
+# bundle exec ./examples/barrier_aroundware_demo.rb -sv -p 9000
#
# Now curl the async_aroundware_demo_multi:
#
@@ -43,27 +42,31 @@
BASE_URL = 'http://localhost:9002/'
-class MyResponseReceiver < Goliath::Synchrony::MultiReceiver
+class RemoteRequestBarrier
+ include Goliath::Rack::BarrierAroundware
+ attr_accessor :sleep_1
+
def pre_process
# Request with delay_1 and drop_1 -- note: 'aget', because we want execution to continue
req = EM::HttpRequest.new(BASE_URL).aget(:query => { :delay => env.params['delay_1'], :drop => env.params['drop_1'] })
- add :sleep_1, req
+ enqueue :sleep_1, req
+ return Goliath::Connection::AsyncResponse
end
def post_process
# unify the results with the results of the API call
- responses[:callback].each{|name, resp| body[:results][name] = JSON.parse(resp.response) }
- responses[:errback ].each{|name, err| body[:errors][name] = err.error }
- [status, headers, JSON.generate(body)]
+ if successes.include?(:sleep_1) then body[:results][:sleep_1] = JSON.parse(sleep_1.response)
+ else body[:errors][:sleep_1] = sleep_1.error ; end
+ [status, headers, JSON.pretty_generate(body)]
end
end
-class AsyncAroundwareDemo < Goliath::API
+class BarrierAroundwareDemo < Goliath::API
use Goliath::Rack::Params
use Goliath::Rack::Validation::NumericRange, {:key => 'delay_1', :default => 1.0, :max => 5.0, :min => 0.0, :as => Float}
use Goliath::Rack::Validation::NumericRange, {:key => 'delay_2', :default => 0.5, :max => 5.0, :min => 0.0, :as => Float}
#
- use Goliath::Rack::AsyncAroundware, MyResponseReceiver
+ use Goliath::Rack::BarrierAroundwareFactory, RemoteRequestBarrier
def response(env)
# Request with delay_2 and drop_2 -- note: 'get', because we want execution to proceed linearly
@@ -72,7 +75,7 @@ def response(env)
body = { :results => {}, :errors => {} }
if resp.response_header.status.to_i != 0
- body[:results][:sleep_2] = JSON.parse(resp.response)
+ body[:results][:sleep_2] = JSON.parse(resp.response) rescue 'parsing failed'
else
body[:errors ][:sleep_2] = resp.error
end
View
149 examples/auth_and_rate_limit.rb
@@ -4,46 +4,108 @@
require 'em-mongo'
require 'em-http'
require 'em-synchrony/em-http'
+require 'em-synchrony/em-mongo'
require 'yajl/json_gem'
-require 'goliath/deprecated/mongo_receiver' # has the aroundware logic for talking to mongodb
require File.join(File.dirname(__FILE__), 'http_log') # Use the HttpLog as our actual endpoint, but include this in the middleware
+#
# Usage:
#
-# First launch a dummy responder, like hello_world.rb or test_rig.rb:
-# ruby ./examples/hello_world.rb -sv -p 8080 -e prod &
+# First launch the test rig:
+# bundle exec ./examples/test_rig.rb -sv -p 8080 -e prod &
#
# Then launch this script
-# ruby ./examples/auth_and_rate_limit.rb -sv -p 9000 --config $PWD/examples/config/auth_and_rate_limit.rb
+# bundle exec ./examples/auth_and_rate_limit.rb -sv -p 9000 --config $PWD/examples/config/auth_and_rate_limit.rb
+#
+# The auth info is returned in the headers:
+#
+# curl -vv 'http://127.0.0.1:9000/?_apikey=i_am_busy&drop=false' ; echo
+# ...snip...
+# < X-RateLimit-MaxRequests: 1000
+# < X-RateLimit-Requests: 999
+# < X-RateLimit-Reset: 1312059600
+#
+# This user will hit the rate limit after 10 requests:
+#
+# for foo in 1 2 3 4 5 6 7 8 9 10 11 12 ; do echo -ne $foo "\t" ; curl 'http://127.0.0.1:9000/?_apikey=i_am_limited' ; echo ; done
+# 1 {"Special":"Header","Params":"_apikey: i_am_awesome|drop: false","Path":"/","Headers":"User-Agent: ...
+# ...
+# 11 [:error, "Your request rate (11) is over your limit (10)"]
+#
+# You can test the barrier (both delays are in fractional seconds):
+# * drop=true will drop the request at the remote host
+# * auth_db_delay will fake a slow response from the mongo
+# * delay will cause a slow response from the remote host
+#
+# time curl -vv 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3'
+# ...
+# X-Tracer: ... received_usage_info: 0.06, received_sleepy: 299.52, received_downstream_resp: 101.67, ..., total: 406.09
+# ...
+# real 0m0.416s user 0m0.002s sys 0m0.003s pct 1.24
+#
+# This shows the mongodb response returning quickly, the fake DB delay returning
+# after 300ms, and the downstream response returning after an additional 101 ms.
+# The total request took 416ms of wall-clock time
+#
+# This will hold up even in the face of many concurrent connections. Relaunch in
+# production (you may have to edit the config/auth_and_rate_limit scripts):
+#
+# bundle exec ./examples/auth_and_rate_limit.rb -sv -p 9000 -e prod --config $PWD/examples/config/auth_and_rate_limit.rb
+#
+# On my laptop, with 20 concurrent requests (each firing two db gets, a 400 ms
+# http get, and two db writes), the median/90%ile times were 431ms / 457ms:
+#
+# time ab -c20 -n20 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3'
+# ...
+# Percentage of the requests served within a certain time (ms)
+# 50% 431
+# 90% 457
+# real 0m0.460s user 0m0.001s sys 0m0.003s pct 0.85
+#
+# With 100 concurrent requests, the request latency starts to drop but the
+# throughput and variance stand up:
+#
+# time ab -c100 -n100 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3'
+# ...
+# Percentage of the requests served within a certain time (ms)
+# 50% 640
+# 90% 673
+# real 0m0.679s user 0m0.002s sys 0m0.007s pct 1.33
#
# Tracks and enforces account and rate limit policies.
#
-# Before the request:
+# This is like a bouncer who lets you order a drink while he checks your ID. We
+# proxy your request to a backend server and get your account/usage info; if
+# your ID is good there's no further wait on a response.
+#
+# This works through the magic of BarrierAroundware:
#
-# * validates the apikey exists
-# * launches requests for the account and current usage (hourly rate limit, etc)
+# 1) In pre_process (before the request):
+# * validate an apikey was given; if not, raise (returning directly)
+# * launch requests for the account and rate limit usage
#
-# It then passes the request down the middleware chain; execution resumes only
-# when both the remote request and the auth info have returned.
+# 2) BarrierAroundwareFactory passes the request down the middleware chain
#
-# After remote request and auth info return:
+# 3) post_process resumes only when both proxied request & auth info are complete
#
-# * Check the account exists and is valid
-# * Check the rate limit is OK
+# 4) The post_process method then
+# - Checks the account exists and is valid
+# - Checks the rate limit is OK
#
-# If it passes all those checks, the request goes through; otherwise we raise an
-# error that Goliath::Rack::Validator turns into a 4xx response
+# 5) If it passes all those checks, the request goes through; otherwise we raise
+# an error that Goliath::Rack::Validator turns into a 4xx response
#
# WARNING: Since this passes ALL requests through to the responder, it's only
# suitable for idempotent requests (GET, typically). You may need to handle
# POST/PUT/DELETE requests differently.
#
#
-class AuthReceiver < Goliath::Synchrony::MongoReceiver
+class AuthBarrier
+ include Goliath::Rack::BarrierAroundware
include Goliath::Validation
- include Goliath::Rack::Validator
+ attr_reader :db
attr_accessor :account_info, :usage_info
# time period to aggregate stats over, in seconds
@@ -53,16 +115,34 @@ class MissingApikeyError < BadRequestError ; end
class RateLimitExceededError < ForbiddenError ; end
class InvalidApikeyError < UnauthorizedError ; end
+ def initialize(env, db_name)
+ @db = env.config[db_name]
+ super(env)
+ end
+
def pre_process
+ env.trace('pre_process_beg')
validate_apikey!
- first('AccountInfo', { :_id => apikey }){|res| self.account_info = res }
- first('UsageInfo', { :_id => usage_id }){|res| self.usage_info = res }
+
+ # the results of the afirst deferrable will be set right into account_info (and the request into successes)
+ enqueue_mongo_request(:account_info, { :_id => apikey })
+ enqueue_mongo_request(:usage_info, { :_id => usage_id })
+
+ # Fake out a delay in the database response if auth_db_delay is given
+ if (auth_db_delay = env.params['auth_db_delay'].to_f) > 0
+ enqueue_acceptor(:sleepy){|acc| EM.add_timer(auth_db_delay){ acc.succeed } }
+ end
+
env.trace('pre_process_end')
+ return Goliath::Connection::AsyncResponse
end
def post_process
env.trace('post_process_beg')
- env.logger.info [account_info, usage_info].inspect
+
+ # When post_process resumes, the db requests and the response are here!
+ # [:account_info, :usage_info, :status, :headers, :body].each{|attr| env.logger.info(("%23s\t%s" % [attr, self.send(attr).inspect[0..200]])) }
+
self.account_info ||= {}
self.usage_info ||= {}
@@ -81,6 +161,25 @@ def post_process
end
end
+ if defined?(EM::Mongo::Cursor)
+ # em-mongo > 0.3.6 gives us a deferrable back. nice and clean.
+ def enqueue_mongo_request(handle, query)
+ enqueue handle, db.collection(handle).afirst(query)
+ end
+ else
+ # em-mongo <= 0.3.6 makes us fake a deferrable response.
+ def enqueue_mongo_request(handle, query)
+ enqueue_acceptor(handle) do |acc|
+ db.collection(handle).afind(query){|resp| acc.succeed(resp.first) }
+ end
+ end
+ end
+
+ def accept_response(handle, *args)
+ env.trace("received_#{handle}")
+ super(handle, *args)
+ end
+
# ===========================================================================
def validate_apikey!
@@ -96,19 +195,21 @@ def check_apikey!
end
def check_rate_limit!
- return true if usage_info['calls'].to_f <= account_info['max_call_rate'].to_f
- raise RateLimitExceededError
+ rate = usage_info['calls'].to_i + 1
+ limit = account_info['max_call_rate'].to_i
+ return true if rate <= limit
+ raise RateLimitExceededError, "Your request rate (#{rate}) is over your limit (#{limit})"
end
def charge_usage
- update('UsageInfo', { :_id => usage_id },
+ db.collection(:usage_info).update({ :_id => usage_id },
{ '$inc' => { :calls => 1 } }, :upsert => true)
end
def inject_headers
headers.merge!({
'X-RateLimit-MaxRequests' => account_info['max_call_rate'].to_s,
- 'X-RateLimit-Requests' => usage_info['calls'].to_s,
+ 'X-RateLimit-Requests' => usage_info['calls'].to_i.to_s,
'X-RateLimit-Reset' => timebin_end.to_s,
})
end
@@ -139,5 +240,5 @@ def timebin_end
class AuthAndRateLimit < HttpLog
use Goliath::Rack::Tracer, 'X-Tracer'
use Goliath::Rack::Params # parse & merge query and body parameters
- use Goliath::Rack::AsyncAroundware, AuthReceiver, 'api_auth_db'
+ use Goliath::Rack::BarrierAroundwareFactory, AuthBarrier, 'api_auth_db'
end
View
13 examples/config/auth_and_rate_limit.rb
@@ -11,20 +11,23 @@
timebin = ((Time.now.to_i / 3600).floor * 3600)
# This user's calls should all go through
- config['api_auth_db'].collection('AccountInfo').save({
+ config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_awesome', 'valid' => true, 'max_call_rate' => 1_000_000 })
# this user's account is disabled
- config['api_auth_db'].collection('AccountInfo').save({
+ config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_lame', 'valid' => false, 'max_call_rate' => 1_000 })
# this user has not been seen, but will very quickly hit their limit
- config['api_auth_db'].collection('AccountInfo').save({
+ config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_limited', 'valid' => true, 'max_call_rate' => 10 })
+ config['api_auth_db'].collection(:usage_info).save({
+ :_id => "i_am_limited-#{timebin}", 'calls' => 0 })
# fakes a user with a bunch of calls already made this hour -- two more = no yuo
- config['api_auth_db'].collection('AccountInfo').save({
+ config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_busy', 'valid' => true, 'max_call_rate' => 1_000 })
- config['api_auth_db'].collection('UsageInfo').save({
+ config['api_auth_db'].collection(:usage_info).save({
:_id => "i_am_busy-#{timebin}", 'calls' => 999 })
+
end
View
17 examples/rasterize/rasterize_and_shorten.rb
@@ -1,11 +1,10 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
require File.dirname(__FILE__)+'/rasterize'
+require File.dirname(__FILE__)+'/../favicon'
require 'goliath'
require 'em-synchrony/em-http'
-require 'goliath/deprecated/async_aroundware'
-require 'goliath/deprecated/response_receiver'
require 'postrank-uri'
#
@@ -13,18 +12,21 @@
# generate a shortened link, stuffing it in the header. Both requests happen
# simultaneously.
#
-class ShortenURL < Goliath::Synchrony::MultiReceiver
+class ShortenURL
+ include Goliath::Rack::BarrierAroundware
SHORTENER_URL_BASE = 'http://is.gd/create.php'
+ attr_accessor :shortened_url
def pre_process
target_url = PostRank::URI.clean(env.params['url'])
shortener_request = EM::HttpRequest.new(SHORTENER_URL_BASE).aget(:query => { :format => 'simple', :url => target_url })
- enqueue :shortener, shortener_request
+ enqueue :shortened_url, shortener_request
+ return Goliath::Connection::AsyncResponse
end
def post_process
- if successes[:shortener]
- headers['X-Shortened-URI'] = successes[:shortener].response
+ if shortened_url
+ headers['X-Shortened-URI'] = shortened_url.response
end
[status, headers, body]
end
@@ -32,8 +34,9 @@ def post_process
class RasterizeAndShorten < Rasterize
use Goliath::Rack::Params
+ use Favicon, File.expand_path(File.dirname(__FILE__)+"/../public/favicon.ico")
use Goliath::Rack::Validation::RequestMethod, %w(GET)
use Goliath::Rack::Validation::RequiredParam, {:key => 'url'}
#
- use Goliath::Rack::AsyncAroundware, ShortenURL
+ use Goliath::Rack::BarrierAroundwareFactory, ShortenURL
end
View
2  goliath.gemspec
@@ -29,7 +29,7 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rake', '0.8.7'
s.add_development_dependency 'rspec', '>2.0'
s.add_development_dependency 'nokogiri'
- s.add_development_dependency 'em-http-request', '= 1.0.0.beta.1'
+ s.add_development_dependency 'em-http-request', '>= 1.0.0.beta.1'
s.add_development_dependency 'em-mongo', '~> 0.3.6'
s.add_development_dependency 'yajl-ruby'
s.add_development_dependency 'rack-rewrite'
View
26 lib/goliath/deprecated/mongo_receiver.rb
@@ -1,4 +1,5 @@
require 'goliath/deprecated/response_receiver'
+require 'em-synchrony/em-mongo'
module Goliath
module Synchrony
@@ -39,18 +40,29 @@ def enqueue(handle, req_id)
# ... requests aren't deferrables so they're tracked in @pending_queries
end
- def find(collection, selector={}, opts={}, &block)
- @pending_queries += 1
- db.collection(collection).find(selector, opts) do |result|
- yield result
- @pending_queries -= 1
- self.succeed if finished?
+ if defined?(EM::Mongo::Cursor)
+ def find(collection, selector={}, opts={}, &block)
+ @pending_queries += 1
+ db.collection(collection).afind(selector, opts).to_a.callback do |result|
+ yield result
+ @pending_queries -= 1
+ self.succeed if finished?
+ end
+ end
+ else
+ def find(collection, selector={}, opts={}, &block)
+ @pending_queries += 1
+ db.collection(collection).afind(selector, opts) do |result|
+ yield result
+ @pending_queries -= 1
+ self.succeed if finished?
+ end
end
end
def first(collection, selector={}, opts={}, &block)
opts[:limit] = 1
- find(collection, selector, opts) do |result|
+ self.find(collection, selector, opts) do |result|
yield result.first
end
end
View
34 lib/goliath/rack/barrier_aroundware.rb
@@ -31,12 +31,13 @@ def initialize(env)
# * call the setter for that handle if any (on receipt of :shortened_url,
# calls self.shortened_url = resp)
# * check progress -- succeeds (transferring controll) if nothing is pending.
- def accept_response(handle, resp_succ, resp, fiber=nil)
+ def accept_response(handle, resp_succ, resp, req=nil, fiber=nil)
raise "received response for a non-pending request!" if not pending_requests.include?(handle)
pending_requests.delete(handle)
- resp_succ ? (successes[handle] = resp) : (failures[handle] = resp)
+ resp_succ ? (successes[handle] = [req, resp]) : (failures[handle] = [req, resp])
self.send("#{handle}=", resp) if self.respond_to?("#{handle}=")
check_progress(fiber)
+ resp
end
# Add a deferred request to the pending pool, and set a callback to
@@ -44,8 +45,33 @@ def accept_response(handle, resp_succ, resp, fiber=nil)
def enqueue(handle, deferred_req)
fiber = Fiber.current
add_to_pending(handle)
- deferred_req.callback{ safely(env){ accept_response(handle, true, deferred_req, fiber) } }
- deferred_req.errback{ safely(env){ accept_response(handle, false, deferred_req, fiber) } }
+ deferred_req.callback{|resp| safely(env){ accept_response(handle, true, resp, deferred_req, fiber) } }
+ deferred_req.errback{|resp| safely(env){ accept_response(handle, false, resp, deferred_req, fiber) } }
+ end
+
+ # Do you have a method that uses a block, not a deferrable? This method
+ # gives you a deferrable 'acceptor' and enqueues it -- simply call
+ # #succeed (or #fail) on the acceptor from within the block, passing it
+ # your desired response.
+ #
+ # @example
+ # # sleep for 1.0 seconds and then complete
+ # enqueue_acceptor(:sleepy)do |acc|
+ # EM.add_timer(1.0){ acc.succeed }
+ # end
+ #
+ # @example
+ # # a database lookup that takes a block
+ # enqueue_acceptor(:bob) do |acc|
+ # db.collection(:users).afind(:username => :bob) do |resp|
+ # acc.succeed(resp.first)
+ # end
+ # end
+ #
+ def enqueue_acceptor(handle)
+ acceptor = EM::DefaultDeferrable.new
+ yield(acceptor)
+ enqueue handle, acceptor
end
# Register a pending request. If you call this from outside #enqueue, you
View
7 lib/goliath/rack/barrier_aroundware_factory.rb
@@ -44,15 +44,14 @@ module Rack
#
# class AwesomeApiWithShortening < Goliath::API
# use Goliath::Rack::Params
- # use Goliath::Rack::AroundwareFactory, ShortenUrl
+ # use Goliath::Rack::BarrierAroundwareFactory, ShortenUrl
# def response(env)
# # ... do something awesome
# end
# end
#
- class BarrierAroundwareFactory
+ class BarrierAroundwareFactory < Goliath::Rack::SimpleAroundwareFactory
include Goliath::Rack::Validator
- include Goliath::Rack::AroundwareFactory
# Put aroundware in the middle of the async_callback chain:
# * save the old callback chain;
@@ -72,7 +71,7 @@ def hook_into_callback_chain(env, aroundware)
new_resp = safely(env){ aroundware.post_process }
async_callback.call(new_resp)
end
-
+
env['async.callback'] = downstream_callback
aroundware.add_to_pending(:downstream_resp)
aroundware.callback(&invoke_upstream_chain)
View
2  lib/goliath/rack/simple_aroundware_factory.rb
@@ -65,7 +65,7 @@ class SimpleAroundwareFactory
# end
#
# class AwesomeApiWithShortening < Goliath::API
- # use Goliath::Rack::AroundwareFactory, Awesomizer2011, 3
+ # use Goliath::Rack::SimpleAroundwareFactory, Awesomizer2011, 3
# # ... stuff ...
# end
#
Please sign in to comment.
Something went wrong with that request. Please try again.