Skip to content

Commit

Permalink
Refactoring aroundware, Part V: Moved all the aroundware examples ove…
Browse files Browse the repository at this point in the history
…r to use the new aroundware, doing necessary cleanup along the way.

* BarrierAroundware now store [req, resp] in the successes / failures hashes
* mongo things now work with both old and future em-mongo gems, at the cost of a big conditional 'if' statement in the file
* Added enqueue_acceptor to let you enqueue activities that take a block without yielding a deferrable
  • Loading branch information
Philip (flip) Kromer committed Jul 31, 2011
1 parent 6395370 commit 20122fb
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Gemfile
@@ -1,3 +1,3 @@
source "http://rubygems.org"

gemspec
gemspec
25 changes: 14 additions & 11 deletions examples/async_aroundware_demo.rb
Expand Up @@ -2,7 +2,6 @@
$: << File.dirname(__FILE__)+'/../lib'

require 'goliath'
require 'goliath/synchrony/response_receiver'
require 'em-synchrony/em-http'
require 'yajl/json_gem'

Expand All @@ -13,11 +12,11 @@
#
# To run this, start the 'test_rig.rb' server on port 9002:
#
# ./examples/test_rig.rb -sv -p 9002
# bundle exec ./examples/test_rig.rb -sv -p 9002
#
# And then start this server on port 9000:
#
# ./async_aroundware_demo.rb -sv -p 9000
# bundle exec ./examples/barrier_aroundware_demo.rb -sv -p 9000
#
# Now curl the async_aroundware_demo_multi:
#
Expand All @@ -43,27 +42,31 @@

BASE_URL = 'http://localhost:9002/'

class MyResponseReceiver < Goliath::Synchrony::MultiReceiver
class RemoteRequestBarrier
include Goliath::Rack::BarrierAroundware
attr_accessor :sleep_1

def pre_process
# Request with delay_1 and drop_1 -- note: 'aget', because we want execution to continue
req = EM::HttpRequest.new(BASE_URL).aget(:query => { :delay => env.params['delay_1'], :drop => env.params['drop_1'] })
add :sleep_1, req
enqueue :sleep_1, req
return Goliath::Connection::AsyncResponse
end

def post_process
# unify the results with the results of the API call
responses[:callback].each{|name, resp| body[:results][name] = JSON.parse(resp.response) }
responses[:errback ].each{|name, err| body[:errors][name] = err.error }
[status, headers, JSON.generate(body)]
if successes.include?(:sleep_1) then body[:results][:sleep_1] = JSON.parse(sleep_1.response)
else body[:errors][:sleep_1] = sleep_1.error ; end
[status, headers, JSON.pretty_generate(body)]
end
end

class AsyncAroundwareDemo < Goliath::API
class BarrierAroundwareDemo < Goliath::API
use Goliath::Rack::Params
use Goliath::Rack::Validation::NumericRange, {:key => 'delay_1', :default => 1.0, :max => 5.0, :min => 0.0, :as => Float}
use Goliath::Rack::Validation::NumericRange, {:key => 'delay_2', :default => 0.5, :max => 5.0, :min => 0.0, :as => Float}
#
use Goliath::Rack::AsyncAroundware, MyResponseReceiver
use Goliath::Rack::BarrierAroundwareFactory, RemoteRequestBarrier

def response(env)
# Request with delay_2 and drop_2 -- note: 'get', because we want execution to proceed linearly
Expand All @@ -72,7 +75,7 @@ def response(env)
body = { :results => {}, :errors => {} }

if resp.response_header.status.to_i != 0
body[:results][:sleep_2] = JSON.parse(resp.response)
body[:results][:sleep_2] = JSON.parse(resp.response) rescue 'parsing failed'
else
body[:errors ][:sleep_2] = resp.error
end
Expand Down
149 changes: 125 additions & 24 deletions examples/auth_and_rate_limit.rb
Expand Up @@ -4,46 +4,108 @@
require 'em-mongo'
require 'em-http'
require 'em-synchrony/em-http'
require 'em-synchrony/em-mongo'
require 'yajl/json_gem'

require 'goliath/deprecated/mongo_receiver' # has the aroundware logic for talking to mongodb
require File.join(File.dirname(__FILE__), 'http_log') # Use the HttpLog as our actual endpoint, but include this in the middleware

#
# Usage:
#
# First launch a dummy responder, like hello_world.rb or test_rig.rb:
# ruby ./examples/hello_world.rb -sv -p 8080 -e prod &
# First launch the test rig:
# bundle exec ./examples/test_rig.rb -sv -p 8080 -e prod &
#
# Then launch this script
# ruby ./examples/auth_and_rate_limit.rb -sv -p 9000 --config $PWD/examples/config/auth_and_rate_limit.rb
# bundle exec ./examples/auth_and_rate_limit.rb -sv -p 9000 --config $PWD/examples/config/auth_and_rate_limit.rb
#
# The auth info is returned in the headers:
#
# curl -vv 'http://127.0.0.1:9000/?_apikey=i_am_busy&drop=false' ; echo
# ...snip...
# < X-RateLimit-MaxRequests: 1000
# < X-RateLimit-Requests: 999
# < X-RateLimit-Reset: 1312059600
#
# This user will hit the rate limit after 10 requests:
#
# for foo in 1 2 3 4 5 6 7 8 9 10 11 12 ; do echo -ne $foo "\t" ; curl 'http://127.0.0.1:9000/?_apikey=i_am_limited' ; echo ; done
# 1 {"Special":"Header","Params":"_apikey: i_am_awesome|drop: false","Path":"/","Headers":"User-Agent: ...
# ...
# 11 [:error, "Your request rate (11) is over your limit (10)"]
#
# You can test the barrier (both delays are in fractional seconds):
# * drop=true will drop the request at the remote host
# * auth_db_delay will fake a slow response from the mongo
# * delay will cause a slow response from the remote host
#
# time curl -vv 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3'
# ...
# X-Tracer: ... received_usage_info: 0.06, received_sleepy: 299.52, received_downstream_resp: 101.67, ..., total: 406.09
# ...
# real 0m0.416s user 0m0.002s sys 0m0.003s pct 1.24
#
# This shows the mongodb response returning quickly, the fake DB delay returning
# after 300ms, and the downstream response returning after an additional 101 ms.
# The total request took 416ms of wall-clock time
#
# This will hold up even in the face of many concurrent connections. Relaunch in
# production (you may have to edit the config/auth_and_rate_limit scripts):
#
# bundle exec ./examples/auth_and_rate_limit.rb -sv -p 9000 -e prod --config $PWD/examples/config/auth_and_rate_limit.rb
#
# On my laptop, with 20 concurrent requests (each firing two db gets, a 400 ms
# http get, and two db writes), the median/90%ile times were 431ms / 457ms:
#
# time ab -c20 -n20 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3'
# ...
# Percentage of the requests served within a certain time (ms)
# 50% 431
# 90% 457
# real 0m0.460s user 0m0.001s sys 0m0.003s pct 0.85
#
# With 100 concurrent requests, the request latency starts to drop but the
# throughput and variance stand up:
#
# time ab -c100 -n100 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3'
# ...
# Percentage of the requests served within a certain time (ms)
# 50% 640
# 90% 673
# real 0m0.679s user 0m0.002s sys 0m0.007s pct 1.33
#

# Tracks and enforces account and rate limit policies.
#
# Before the request:
# This is like a bouncer who lets you order a drink while he checks your ID. We
# proxy your request to a backend server and get your account/usage info; if
# your ID is good there's no further wait on a response.
#
# This works through the magic of BarrierAroundware:
#
# * validates the apikey exists
# * launches requests for the account and current usage (hourly rate limit, etc)
# 1) In pre_process (before the request):
# * validate an apikey was given; if not, raise (returning directly)
# * launch requests for the account and rate limit usage
#
# It then passes the request down the middleware chain; execution resumes only
# when both the remote request and the auth info have returned.
# 2) BarrierAroundwareFactory passes the request down the middleware chain
#
# After remote request and auth info return:
# 3) post_process resumes only when both proxied request & auth info are complete
#
# * Check the account exists and is valid
# * Check the rate limit is OK
# 4) The post_process method then
# - Checks the account exists and is valid
# - Checks the rate limit is OK
#
# If it passes all those checks, the request goes through; otherwise we raise an
# error that Goliath::Rack::Validator turns into a 4xx response
# 5) If it passes all those checks, the request goes through; otherwise we raise
# an error that Goliath::Rack::Validator turns into a 4xx response
#
# WARNING: Since this passes ALL requests through to the responder, it's only
# suitable for idempotent requests (GET, typically). You may need to handle
# POST/PUT/DELETE requests differently.
#
#
class AuthReceiver < Goliath::Synchrony::MongoReceiver
class AuthBarrier
include Goliath::Rack::BarrierAroundware
include Goliath::Validation
include Goliath::Rack::Validator
attr_reader :db
attr_accessor :account_info, :usage_info

# time period to aggregate stats over, in seconds
Expand All @@ -53,16 +115,34 @@ class MissingApikeyError < BadRequestError ; end
class RateLimitExceededError < ForbiddenError ; end
class InvalidApikeyError < UnauthorizedError ; end

def initialize(env, db_name)
@db = env.config[db_name]
super(env)
end

def pre_process
env.trace('pre_process_beg')
validate_apikey!
first('AccountInfo', { :_id => apikey }){|res| self.account_info = res }
first('UsageInfo', { :_id => usage_id }){|res| self.usage_info = res }

# the results of the afirst deferrable will be set right into account_info (and the request into successes)
enqueue_mongo_request(:account_info, { :_id => apikey })
enqueue_mongo_request(:usage_info, { :_id => usage_id })

# Fake out a delay in the database response if auth_db_delay is given
if (auth_db_delay = env.params['auth_db_delay'].to_f) > 0
enqueue_acceptor(:sleepy){|acc| EM.add_timer(auth_db_delay){ acc.succeed } }
end

env.trace('pre_process_end')
return Goliath::Connection::AsyncResponse
end

def post_process
env.trace('post_process_beg')
env.logger.info [account_info, usage_info].inspect

# When post_process resumes, the db requests and the response are here!
# [:account_info, :usage_info, :status, :headers, :body].each{|attr| env.logger.info(("%23s\t%s" % [attr, self.send(attr).inspect[0..200]])) }

self.account_info ||= {}
self.usage_info ||= {}

Expand All @@ -81,6 +161,25 @@ def post_process
end
end

if defined?(EM::Mongo::Cursor)
# em-mongo > 0.3.6 gives us a deferrable back. nice and clean.
def enqueue_mongo_request(handle, query)
enqueue handle, db.collection(handle).afirst(query)
end
else
# em-mongo <= 0.3.6 makes us fake a deferrable response.
def enqueue_mongo_request(handle, query)
enqueue_acceptor(handle) do |acc|
db.collection(handle).afind(query){|resp| acc.succeed(resp.first) }
end
end
end

def accept_response(handle, *args)
env.trace("received_#{handle}")
super(handle, *args)
end

# ===========================================================================

def validate_apikey!
Expand All @@ -96,19 +195,21 @@ def check_apikey!
end

def check_rate_limit!
return true if usage_info['calls'].to_f <= account_info['max_call_rate'].to_f
raise RateLimitExceededError
rate = usage_info['calls'].to_i + 1
limit = account_info['max_call_rate'].to_i
return true if rate <= limit
raise RateLimitExceededError, "Your request rate (#{rate}) is over your limit (#{limit})"
end

def charge_usage
update('UsageInfo', { :_id => usage_id },
db.collection(:usage_info).update({ :_id => usage_id },
{ '$inc' => { :calls => 1 } }, :upsert => true)
end

def inject_headers
headers.merge!({
'X-RateLimit-MaxRequests' => account_info['max_call_rate'].to_s,
'X-RateLimit-Requests' => usage_info['calls'].to_s,
'X-RateLimit-Requests' => usage_info['calls'].to_i.to_s,
'X-RateLimit-Reset' => timebin_end.to_s,
})
end
Expand Down Expand Up @@ -139,5 +240,5 @@ def timebin_end
class AuthAndRateLimit < HttpLog
use Goliath::Rack::Tracer, 'X-Tracer'
use Goliath::Rack::Params # parse & merge query and body parameters
use Goliath::Rack::AsyncAroundware, AuthReceiver, 'api_auth_db'
use Goliath::Rack::BarrierAroundwareFactory, AuthBarrier, 'api_auth_db'
end
13 changes: 8 additions & 5 deletions examples/config/auth_and_rate_limit.rb
Expand Up @@ -11,20 +11,23 @@
timebin = ((Time.now.to_i / 3600).floor * 3600)

# This user's calls should all go through
config['api_auth_db'].collection('AccountInfo').save({
config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_awesome', 'valid' => true, 'max_call_rate' => 1_000_000 })

# this user's account is disabled
config['api_auth_db'].collection('AccountInfo').save({
config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_lame', 'valid' => false, 'max_call_rate' => 1_000 })

# this user has not been seen, but will very quickly hit their limit
config['api_auth_db'].collection('AccountInfo').save({
config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_limited', 'valid' => true, 'max_call_rate' => 10 })
config['api_auth_db'].collection(:usage_info).save({
:_id => "i_am_limited-#{timebin}", 'calls' => 0 })

# fakes a user with a bunch of calls already made this hour -- two more = no yuo
config['api_auth_db'].collection('AccountInfo').save({
config['api_auth_db'].collection(:account_info).save({
:_id => 'i_am_busy', 'valid' => true, 'max_call_rate' => 1_000 })
config['api_auth_db'].collection('UsageInfo').save({
config['api_auth_db'].collection(:usage_info).save({
:_id => "i_am_busy-#{timebin}", 'calls' => 999 })

end
17 changes: 10 additions & 7 deletions examples/rasterize/rasterize_and_shorten.rb
@@ -1,39 +1,42 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
require File.dirname(__FILE__)+'/rasterize'
require File.dirname(__FILE__)+'/../favicon'

require 'goliath'
require 'em-synchrony/em-http'
require 'goliath/deprecated/async_aroundware'
require 'goliath/deprecated/response_receiver'
require 'postrank-uri'

#
# Aroundware: while the Rasterize API is processing, this uses http://is.gd to
# generate a shortened link, stuffing it in the header. Both requests happen
# simultaneously.
#
class ShortenURL < Goliath::Synchrony::MultiReceiver
class ShortenURL
include Goliath::Rack::BarrierAroundware
SHORTENER_URL_BASE = 'http://is.gd/create.php'
attr_accessor :shortened_url

def pre_process
target_url = PostRank::URI.clean(env.params['url'])
shortener_request = EM::HttpRequest.new(SHORTENER_URL_BASE).aget(:query => { :format => 'simple', :url => target_url })
enqueue :shortener, shortener_request
enqueue :shortened_url, shortener_request
return Goliath::Connection::AsyncResponse
end

def post_process
if successes[:shortener]
headers['X-Shortened-URI'] = successes[:shortener].response
if shortened_url
headers['X-Shortened-URI'] = shortened_url.response
end
[status, headers, body]
end
end

class RasterizeAndShorten < Rasterize
use Goliath::Rack::Params
use Favicon, File.expand_path(File.dirname(__FILE__)+"/../public/favicon.ico")
use Goliath::Rack::Validation::RequestMethod, %w(GET)
use Goliath::Rack::Validation::RequiredParam, {:key => 'url'}
#
use Goliath::Rack::AsyncAroundware, ShortenURL
use Goliath::Rack::BarrierAroundwareFactory, ShortenURL
end
2 changes: 1 addition & 1 deletion goliath.gemspec
Expand Up @@ -29,7 +29,7 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rake', '0.8.7'
s.add_development_dependency 'rspec', '>2.0'
s.add_development_dependency 'nokogiri'
s.add_development_dependency 'em-http-request', '= 1.0.0.beta.1'
s.add_development_dependency 'em-http-request', '>= 1.0.0.beta.1'
s.add_development_dependency 'em-mongo', '~> 0.3.6'
s.add_development_dependency 'yajl-ruby'
s.add_development_dependency 'rack-rewrite'
Expand Down

0 comments on commit 20122fb

Please sign in to comment.