diff --git a/Gemfile b/Gemfile index 857c670d..c80ee369 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,3 @@ source "http://rubygems.org" -gemspec \ No newline at end of file +gemspec diff --git a/examples/async_aroundware_demo.rb b/examples/async_aroundware_demo.rb index e42318b7..64b6000a 100755 --- a/examples/async_aroundware_demo.rb +++ b/examples/async_aroundware_demo.rb @@ -2,7 +2,6 @@ $: << File.dirname(__FILE__)+'/../lib' require 'goliath' -require 'goliath/synchrony/response_receiver' require 'em-synchrony/em-http' require 'yajl/json_gem' @@ -13,11 +12,11 @@ # # To run this, start the 'test_rig.rb' server on port 9002: # -# ./examples/test_rig.rb -sv -p 9002 +# bundle exec ./examples/test_rig.rb -sv -p 9002 # # And then start this server on port 9000: # -# ./async_aroundware_demo.rb -sv -p 9000 +# bundle exec ./examples/barrier_aroundware_demo.rb -sv -p 9000 # # Now curl the async_aroundware_demo_multi: # @@ -43,27 +42,31 @@ BASE_URL = 'http://localhost:9002/' -class MyResponseReceiver < Goliath::Synchrony::MultiReceiver +class RemoteRequestBarrier + include Goliath::Rack::BarrierAroundware + attr_accessor :sleep_1 + def pre_process # Request with delay_1 and drop_1 -- note: 'aget', because we want execution to continue req = EM::HttpRequest.new(BASE_URL).aget(:query => { :delay => env.params['delay_1'], :drop => env.params['drop_1'] }) - add :sleep_1, req + enqueue :sleep_1, req + return Goliath::Connection::AsyncResponse end def post_process # unify the results with the results of the API call - responses[:callback].each{|name, resp| body[:results][name] = JSON.parse(resp.response) } - responses[:errback ].each{|name, err| body[:errors][name] = err.error } - [status, headers, JSON.generate(body)] + if successes.include?(:sleep_1) then body[:results][:sleep_1] = JSON.parse(sleep_1.response) + else body[:errors][:sleep_1] = sleep_1.error ; end + [status, headers, JSON.pretty_generate(body)] end end -class AsyncAroundwareDemo < Goliath::API +class BarrierAroundwareDemo < Goliath::API use Goliath::Rack::Params use Goliath::Rack::Validation::NumericRange, {:key => 'delay_1', :default => 1.0, :max => 5.0, :min => 0.0, :as => Float} use Goliath::Rack::Validation::NumericRange, {:key => 'delay_2', :default => 0.5, :max => 5.0, :min => 0.0, :as => Float} # - use Goliath::Rack::AsyncAroundware, MyResponseReceiver + use Goliath::Rack::BarrierAroundwareFactory, RemoteRequestBarrier def response(env) # Request with delay_2 and drop_2 -- note: 'get', because we want execution to proceed linearly @@ -72,7 +75,7 @@ def response(env) body = { :results => {}, :errors => {} } if resp.response_header.status.to_i != 0 - body[:results][:sleep_2] = JSON.parse(resp.response) + body[:results][:sleep_2] = JSON.parse(resp.response) rescue 'parsing failed' else body[:errors ][:sleep_2] = resp.error end diff --git a/examples/auth_and_rate_limit.rb b/examples/auth_and_rate_limit.rb index edf9f263..0873d4fc 100755 --- a/examples/auth_and_rate_limit.rb +++ b/examples/auth_and_rate_limit.rb @@ -4,46 +4,108 @@ require 'em-mongo' require 'em-http' require 'em-synchrony/em-http' +require 'em-synchrony/em-mongo' require 'yajl/json_gem' -require 'goliath/deprecated/mongo_receiver' # has the aroundware logic for talking to mongodb require File.join(File.dirname(__FILE__), 'http_log') # Use the HttpLog as our actual endpoint, but include this in the middleware +# # Usage: # -# First launch a dummy responder, like hello_world.rb or test_rig.rb: -# ruby ./examples/hello_world.rb -sv -p 8080 -e prod & +# First launch the test rig: +# bundle exec ./examples/test_rig.rb -sv -p 8080 -e prod & # # Then launch this script -# ruby ./examples/auth_and_rate_limit.rb -sv -p 9000 --config $PWD/examples/config/auth_and_rate_limit.rb +# bundle exec ./examples/auth_and_rate_limit.rb -sv -p 9000 --config $PWD/examples/config/auth_and_rate_limit.rb +# +# The auth info is returned in the headers: +# +# curl -vv 'http://127.0.0.1:9000/?_apikey=i_am_busy&drop=false' ; echo +# ...snip... +# < X-RateLimit-MaxRequests: 1000 +# < X-RateLimit-Requests: 999 +# < X-RateLimit-Reset: 1312059600 +# +# This user will hit the rate limit after 10 requests: +# +# for foo in 1 2 3 4 5 6 7 8 9 10 11 12 ; do echo -ne $foo "\t" ; curl 'http://127.0.0.1:9000/?_apikey=i_am_limited' ; echo ; done +# 1 {"Special":"Header","Params":"_apikey: i_am_awesome|drop: false","Path":"/","Headers":"User-Agent: ... +# ... +# 11 [:error, "Your request rate (11) is over your limit (10)"] +# +# You can test the barrier (both delays are in fractional seconds): +# * drop=true will drop the request at the remote host +# * auth_db_delay will fake a slow response from the mongo +# * delay will cause a slow response from the remote host +# +# time curl -vv 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3' +# ... +# X-Tracer: ... received_usage_info: 0.06, received_sleepy: 299.52, received_downstream_resp: 101.67, ..., total: 406.09 +# ... +# real 0m0.416s user 0m0.002s sys 0m0.003s pct 1.24 +# +# This shows the mongodb response returning quickly, the fake DB delay returning +# after 300ms, and the downstream response returning after an additional 101 ms. +# The total request took 416ms of wall-clock time +# +# This will hold up even in the face of many concurrent connections. Relaunch in +# production (you may have to edit the config/auth_and_rate_limit scripts): +# +# bundle exec ./examples/auth_and_rate_limit.rb -sv -p 9000 -e prod --config $PWD/examples/config/auth_and_rate_limit.rb +# +# On my laptop, with 20 concurrent requests (each firing two db gets, a 400 ms +# http get, and two db writes), the median/90%ile times were 431ms / 457ms: +# +# time ab -c20 -n20 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3' +# ... +# Percentage of the requests served within a certain time (ms) +# 50% 431 +# 90% 457 +# real 0m0.460s user 0m0.001s sys 0m0.003s pct 0.85 +# +# With 100 concurrent requests, the request latency starts to drop but the +# throughput and variance stand up: +# +# time ab -c100 -n100 'http://127.0.0.1:9000/?_apikey=i_am_awesome&drop=false&delay=0.4&auth_db_delay=0.3' +# ... +# Percentage of the requests served within a certain time (ms) +# 50% 640 +# 90% 673 +# real 0m0.679s user 0m0.002s sys 0m0.007s pct 1.33 # # Tracks and enforces account and rate limit policies. # -# Before the request: +# This is like a bouncer who lets you order a drink while he checks your ID. We +# proxy your request to a backend server and get your account/usage info; if +# your ID is good there's no further wait on a response. +# +# This works through the magic of BarrierAroundware: # -# * validates the apikey exists -# * launches requests for the account and current usage (hourly rate limit, etc) +# 1) In pre_process (before the request): +# * validate an apikey was given; if not, raise (returning directly) +# * launch requests for the account and rate limit usage # -# It then passes the request down the middleware chain; execution resumes only -# when both the remote request and the auth info have returned. +# 2) BarrierAroundwareFactory passes the request down the middleware chain # -# After remote request and auth info return: +# 3) post_process resumes only when both proxied request & auth info are complete # -# * Check the account exists and is valid -# * Check the rate limit is OK +# 4) The post_process method then +# - Checks the account exists and is valid +# - Checks the rate limit is OK # -# If it passes all those checks, the request goes through; otherwise we raise an -# error that Goliath::Rack::Validator turns into a 4xx response +# 5) If it passes all those checks, the request goes through; otherwise we raise +# an error that Goliath::Rack::Validator turns into a 4xx response # # WARNING: Since this passes ALL requests through to the responder, it's only # suitable for idempotent requests (GET, typically). You may need to handle # POST/PUT/DELETE requests differently. # # -class AuthReceiver < Goliath::Synchrony::MongoReceiver +class AuthBarrier + include Goliath::Rack::BarrierAroundware include Goliath::Validation - include Goliath::Rack::Validator + attr_reader :db attr_accessor :account_info, :usage_info # time period to aggregate stats over, in seconds @@ -53,16 +115,34 @@ class MissingApikeyError < BadRequestError ; end class RateLimitExceededError < ForbiddenError ; end class InvalidApikeyError < UnauthorizedError ; end + def initialize(env, db_name) + @db = env.config[db_name] + super(env) + end + def pre_process + env.trace('pre_process_beg') validate_apikey! - first('AccountInfo', { :_id => apikey }){|res| self.account_info = res } - first('UsageInfo', { :_id => usage_id }){|res| self.usage_info = res } + + # the results of the afirst deferrable will be set right into account_info (and the request into successes) + enqueue_mongo_request(:account_info, { :_id => apikey }) + enqueue_mongo_request(:usage_info, { :_id => usage_id }) + + # Fake out a delay in the database response if auth_db_delay is given + if (auth_db_delay = env.params['auth_db_delay'].to_f) > 0 + enqueue_acceptor(:sleepy){|acc| EM.add_timer(auth_db_delay){ acc.succeed } } + end + env.trace('pre_process_end') + return Goliath::Connection::AsyncResponse end def post_process env.trace('post_process_beg') - env.logger.info [account_info, usage_info].inspect + + # When post_process resumes, the db requests and the response are here! + # [:account_info, :usage_info, :status, :headers, :body].each{|attr| env.logger.info(("%23s\t%s" % [attr, self.send(attr).inspect[0..200]])) } + self.account_info ||= {} self.usage_info ||= {} @@ -81,6 +161,25 @@ def post_process end end + if defined?(EM::Mongo::Cursor) + # em-mongo > 0.3.6 gives us a deferrable back. nice and clean. + def enqueue_mongo_request(handle, query) + enqueue handle, db.collection(handle).afirst(query) + end + else + # em-mongo <= 0.3.6 makes us fake a deferrable response. + def enqueue_mongo_request(handle, query) + enqueue_acceptor(handle) do |acc| + db.collection(handle).afind(query){|resp| acc.succeed(resp.first) } + end + end + end + + def accept_response(handle, *args) + env.trace("received_#{handle}") + super(handle, *args) + end + # =========================================================================== def validate_apikey! @@ -96,19 +195,21 @@ def check_apikey! end def check_rate_limit! - return true if usage_info['calls'].to_f <= account_info['max_call_rate'].to_f - raise RateLimitExceededError + rate = usage_info['calls'].to_i + 1 + limit = account_info['max_call_rate'].to_i + return true if rate <= limit + raise RateLimitExceededError, "Your request rate (#{rate}) is over your limit (#{limit})" end def charge_usage - update('UsageInfo', { :_id => usage_id }, + db.collection(:usage_info).update({ :_id => usage_id }, { '$inc' => { :calls => 1 } }, :upsert => true) end def inject_headers headers.merge!({ 'X-RateLimit-MaxRequests' => account_info['max_call_rate'].to_s, - 'X-RateLimit-Requests' => usage_info['calls'].to_s, + 'X-RateLimit-Requests' => usage_info['calls'].to_i.to_s, 'X-RateLimit-Reset' => timebin_end.to_s, }) end @@ -139,5 +240,5 @@ def timebin_end class AuthAndRateLimit < HttpLog use Goliath::Rack::Tracer, 'X-Tracer' use Goliath::Rack::Params # parse & merge query and body parameters - use Goliath::Rack::AsyncAroundware, AuthReceiver, 'api_auth_db' + use Goliath::Rack::BarrierAroundwareFactory, AuthBarrier, 'api_auth_db' end diff --git a/examples/config/auth_and_rate_limit.rb b/examples/config/auth_and_rate_limit.rb index a043c976..3cbd5946 100644 --- a/examples/config/auth_and_rate_limit.rb +++ b/examples/config/auth_and_rate_limit.rb @@ -11,20 +11,23 @@ timebin = ((Time.now.to_i / 3600).floor * 3600) # This user's calls should all go through - config['api_auth_db'].collection('AccountInfo').save({ + config['api_auth_db'].collection(:account_info).save({ :_id => 'i_am_awesome', 'valid' => true, 'max_call_rate' => 1_000_000 }) # this user's account is disabled - config['api_auth_db'].collection('AccountInfo').save({ + config['api_auth_db'].collection(:account_info).save({ :_id => 'i_am_lame', 'valid' => false, 'max_call_rate' => 1_000 }) # this user has not been seen, but will very quickly hit their limit - config['api_auth_db'].collection('AccountInfo').save({ + config['api_auth_db'].collection(:account_info).save({ :_id => 'i_am_limited', 'valid' => true, 'max_call_rate' => 10 }) + config['api_auth_db'].collection(:usage_info).save({ + :_id => "i_am_limited-#{timebin}", 'calls' => 0 }) # fakes a user with a bunch of calls already made this hour -- two more = no yuo - config['api_auth_db'].collection('AccountInfo').save({ + config['api_auth_db'].collection(:account_info).save({ :_id => 'i_am_busy', 'valid' => true, 'max_call_rate' => 1_000 }) - config['api_auth_db'].collection('UsageInfo').save({ + config['api_auth_db'].collection(:usage_info).save({ :_id => "i_am_busy-#{timebin}", 'calls' => 999 }) + end diff --git a/examples/rasterize/rasterize_and_shorten.rb b/examples/rasterize/rasterize_and_shorten.rb index 441b988c..74fd1aae 100755 --- a/examples/rasterize/rasterize_and_shorten.rb +++ b/examples/rasterize/rasterize_and_shorten.rb @@ -1,11 +1,10 @@ #!/usr/bin/env ruby $: << File.dirname(__FILE__)+'/../../lib' require File.dirname(__FILE__)+'/rasterize' +require File.dirname(__FILE__)+'/../favicon' require 'goliath' require 'em-synchrony/em-http' -require 'goliath/deprecated/async_aroundware' -require 'goliath/deprecated/response_receiver' require 'postrank-uri' # @@ -13,18 +12,21 @@ # generate a shortened link, stuffing it in the header. Both requests happen # simultaneously. # -class ShortenURL < Goliath::Synchrony::MultiReceiver +class ShortenURL + include Goliath::Rack::BarrierAroundware SHORTENER_URL_BASE = 'http://is.gd/create.php' + attr_accessor :shortened_url def pre_process target_url = PostRank::URI.clean(env.params['url']) shortener_request = EM::HttpRequest.new(SHORTENER_URL_BASE).aget(:query => { :format => 'simple', :url => target_url }) - enqueue :shortener, shortener_request + enqueue :shortened_url, shortener_request + return Goliath::Connection::AsyncResponse end def post_process - if successes[:shortener] - headers['X-Shortened-URI'] = successes[:shortener].response + if shortened_url + headers['X-Shortened-URI'] = shortened_url.response end [status, headers, body] end @@ -32,8 +34,9 @@ def post_process class RasterizeAndShorten < Rasterize use Goliath::Rack::Params + use Favicon, File.expand_path(File.dirname(__FILE__)+"/../public/favicon.ico") use Goliath::Rack::Validation::RequestMethod, %w(GET) use Goliath::Rack::Validation::RequiredParam, {:key => 'url'} # - use Goliath::Rack::AsyncAroundware, ShortenURL + use Goliath::Rack::BarrierAroundwareFactory, ShortenURL end diff --git a/goliath.gemspec b/goliath.gemspec index cd24882a..91820147 100644 --- a/goliath.gemspec +++ b/goliath.gemspec @@ -29,7 +29,7 @@ Gem::Specification.new do |s| s.add_development_dependency 'rake', '0.8.7' s.add_development_dependency 'rspec', '>2.0' s.add_development_dependency 'nokogiri' - s.add_development_dependency 'em-http-request', '= 1.0.0.beta.1' + s.add_development_dependency 'em-http-request', '>= 1.0.0.beta.1' s.add_development_dependency 'em-mongo', '~> 0.3.6' s.add_development_dependency 'yajl-ruby' s.add_development_dependency 'rack-rewrite' diff --git a/lib/goliath/deprecated/mongo_receiver.rb b/lib/goliath/deprecated/mongo_receiver.rb index 085ba7fd..d6898e05 100644 --- a/lib/goliath/deprecated/mongo_receiver.rb +++ b/lib/goliath/deprecated/mongo_receiver.rb @@ -1,4 +1,5 @@ require 'goliath/deprecated/response_receiver' +require 'em-synchrony/em-mongo' module Goliath module Synchrony @@ -39,18 +40,29 @@ def enqueue(handle, req_id) # ... requests aren't deferrables so they're tracked in @pending_queries end - def find(collection, selector={}, opts={}, &block) - @pending_queries += 1 - db.collection(collection).find(selector, opts) do |result| - yield result - @pending_queries -= 1 - self.succeed if finished? + if defined?(EM::Mongo::Cursor) + def find(collection, selector={}, opts={}, &block) + @pending_queries += 1 + db.collection(collection).afind(selector, opts).to_a.callback do |result| + yield result + @pending_queries -= 1 + self.succeed if finished? + end + end + else + def find(collection, selector={}, opts={}, &block) + @pending_queries += 1 + db.collection(collection).afind(selector, opts) do |result| + yield result + @pending_queries -= 1 + self.succeed if finished? + end end end def first(collection, selector={}, opts={}, &block) opts[:limit] = 1 - find(collection, selector, opts) do |result| + self.find(collection, selector, opts) do |result| yield result.first end end diff --git a/lib/goliath/rack/barrier_aroundware.rb b/lib/goliath/rack/barrier_aroundware.rb index d755d45f..3438c978 100644 --- a/lib/goliath/rack/barrier_aroundware.rb +++ b/lib/goliath/rack/barrier_aroundware.rb @@ -31,12 +31,13 @@ def initialize(env) # * call the setter for that handle if any (on receipt of :shortened_url, # calls self.shortened_url = resp) # * check progress -- succeeds (transferring controll) if nothing is pending. - def accept_response(handle, resp_succ, resp, fiber=nil) + def accept_response(handle, resp_succ, resp, req=nil, fiber=nil) raise "received response for a non-pending request!" if not pending_requests.include?(handle) pending_requests.delete(handle) - resp_succ ? (successes[handle] = resp) : (failures[handle] = resp) + resp_succ ? (successes[handle] = [req, resp]) : (failures[handle] = [req, resp]) self.send("#{handle}=", resp) if self.respond_to?("#{handle}=") check_progress(fiber) + resp end # Add a deferred request to the pending pool, and set a callback to @@ -44,8 +45,33 @@ def accept_response(handle, resp_succ, resp, fiber=nil) def enqueue(handle, deferred_req) fiber = Fiber.current add_to_pending(handle) - deferred_req.callback{ safely(env){ accept_response(handle, true, deferred_req, fiber) } } - deferred_req.errback{ safely(env){ accept_response(handle, false, deferred_req, fiber) } } + deferred_req.callback{|resp| safely(env){ accept_response(handle, true, resp, deferred_req, fiber) } } + deferred_req.errback{|resp| safely(env){ accept_response(handle, false, resp, deferred_req, fiber) } } + end + + # Do you have a method that uses a block, not a deferrable? This method + # gives you a deferrable 'acceptor' and enqueues it -- simply call + # #succeed (or #fail) on the acceptor from within the block, passing it + # your desired response. + # + # @example + # # sleep for 1.0 seconds and then complete + # enqueue_acceptor(:sleepy)do |acc| + # EM.add_timer(1.0){ acc.succeed } + # end + # + # @example + # # a database lookup that takes a block + # enqueue_acceptor(:bob) do |acc| + # db.collection(:users).afind(:username => :bob) do |resp| + # acc.succeed(resp.first) + # end + # end + # + def enqueue_acceptor(handle) + acceptor = EM::DefaultDeferrable.new + yield(acceptor) + enqueue handle, acceptor end # Register a pending request. If you call this from outside #enqueue, you diff --git a/lib/goliath/rack/barrier_aroundware_factory.rb b/lib/goliath/rack/barrier_aroundware_factory.rb index 2ebaca5a..cceb30dd 100644 --- a/lib/goliath/rack/barrier_aroundware_factory.rb +++ b/lib/goliath/rack/barrier_aroundware_factory.rb @@ -44,15 +44,14 @@ module Rack # # class AwesomeApiWithShortening < Goliath::API # use Goliath::Rack::Params - # use Goliath::Rack::AroundwareFactory, ShortenUrl + # use Goliath::Rack::BarrierAroundwareFactory, ShortenUrl # def response(env) # # ... do something awesome # end # end # - class BarrierAroundwareFactory + class BarrierAroundwareFactory < Goliath::Rack::SimpleAroundwareFactory include Goliath::Rack::Validator - include Goliath::Rack::AroundwareFactory # Put aroundware in the middle of the async_callback chain: # * save the old callback chain; @@ -72,7 +71,7 @@ def hook_into_callback_chain(env, aroundware) new_resp = safely(env){ aroundware.post_process } async_callback.call(new_resp) end - + env['async.callback'] = downstream_callback aroundware.add_to_pending(:downstream_resp) aroundware.callback(&invoke_upstream_chain) diff --git a/lib/goliath/rack/simple_aroundware_factory.rb b/lib/goliath/rack/simple_aroundware_factory.rb index 29f51cf5..f180067c 100644 --- a/lib/goliath/rack/simple_aroundware_factory.rb +++ b/lib/goliath/rack/simple_aroundware_factory.rb @@ -65,7 +65,7 @@ class SimpleAroundwareFactory # end # # class AwesomeApiWithShortening < Goliath::API - # use Goliath::Rack::AroundwareFactory, Awesomizer2011, 3 + # use Goliath::Rack::SimpleAroundwareFactory, Awesomizer2011, 3 # # ... stuff ... # end #