Permalink
Browse files

added maximum concurrency option to typhoeus

  • Loading branch information...
1 parent 278febe commit f53a7481c63cedb38829b6da76f89ef8aed408c4 @pauldix pauldix committed Nov 10, 2009
Showing with 61 additions and 21 deletions.
  1. +34 −16 lib/typhoeus/hydra.rb
  2. +27 −5 spec/typhoeus/hydra_spec.rb
View
@@ -1,48 +1,57 @@
module Typhoeus
class Hydra
- def initialize(initial_pool_size = 10)
+ def initialize(options = {})
@memoize_requests = true
@multi = Multi.new
@easy_pool = []
+ initial_pool_size = options[:initial_pool_size] || 10
+ @max_concurrency = options[:max_concurrency] || 200
initial_pool_size.times { @easy_pool << Easy.new }
@stubs = []
@memoized_requests = {}
@retrieved_from_cache = {}
+ @queued_requests = []
+ @running_requests = 0
end
def self.hydra
@hydra ||= new
end
-
+
def self.hydra=(val)
@hydra = val
end
-
+
def clear_stubs
@stubs = []
end
-
+
def fire_and_forget
+ @queued_requests.each {|r| queue(r, false)}
@multi.fire_and_forget
end
- def queue(request)
+ def queue(request, obey_concurrency_limit = true)
return if assign_to_stub(request)
- if request.method == :get
- if @memoize_requests && @memoized_requests.has_key?(request.url)
- if response = @retrieved_from_cache[request.url]
- request.response = response
- request.call_handlers
+ if @running_requests >= @max_concurrency && obey_concurrency_limit
+ @queued_requests << request
+ else
+ if request.method == :get
+ if @memoize_requests && @memoized_requests.has_key?(request.url)
+ if response = @retrieved_from_cache[request.url]
+ request.response = response
+ request.call_handlers
+ else
+ @memoized_requests[request.url] << request
+ end
else
- @memoized_requests[request.url] << request
+ @memoized_requests[request.url] = [] if @memoize_requests
+ get_from_cache_or_queue(request)
end
else
- @memoized_requests[request.url] = [] if @memoize_requests
get_from_cache_or_queue(request)
end
- else
- get_from_cache_or_queue(request)
end
end
@@ -57,7 +66,7 @@ def run
@memoized_requests = {}
@retrieved_from_cache = {}
end
-
+
def disable_memoization
@memoize_requests = false
end
@@ -105,6 +114,7 @@ def get_from_cache_or_queue(request)
private :get_from_cache_or_queue
def get_easy_object(request)
+ @running_requests += 1
easy = @easy_pool.pop || Easy.new
easy.url = request.url
easy.method = request.method
@@ -113,10 +123,12 @@ def get_easy_object(request)
easy.request_body = request.body if request.body
easy.timeout = request.timeout if request.timeout
easy.on_success do |easy|
+ queue_next
handle_request(request, response_from_easy(easy, request))
release_easy_object(easy)
end
easy.on_failure do |easy|
+ queue_next
handle_request(request, response_from_easy(easy, request))
release_easy_object(easy)
end
@@ -125,6 +137,12 @@ def get_easy_object(request)
end
private :get_easy_object
+ def queue_next
+ @running_requests -= 1
+ queue(@queued_requests.pop) unless @queued_requests.empty?
+ end
+ private :queue_next
+
def release_easy_object(easy)
easy.reset
@easy_pool.push easy
@@ -175,7 +193,7 @@ def add_request(request)
def and_return(val)
@response = val
end
-
+
def matches?(request)
if url.kind_of?(String)
request.method == method && request.url == url
@@ -23,7 +23,7 @@ def set(key, object, timeout = 0)
it "has a singleton" do
Typhoeus::Hydra.hydra.should be_a Typhoeus::Hydra
end
-
+
it "has a setter for the singleton" do
Typhoeus::Hydra.hydra = :foo
Typhoeus::Hydra.hydra.should == :foo
@@ -50,15 +50,15 @@ def set(key, object, timeout = 0)
@on_complete_handler_called.should be_true
@response.request.should == @request
end
-
+
it "stubs requests to URIs matching a pattern" do
@hydra.stub(:get, /foo/).and_return(@response)
@hydra.queue(@request)
@hydra.run
@on_complete_handler_called.should be_true
@response.request.should == @request
end
-
+
it "can clear stubs" do
@hydra.clear_stubs
end
@@ -110,7 +110,7 @@ def set(key, object, timeout = 0)
first.handled_response.should == second.handled_response
(Time.now - start_time).should < 1.2 # if it had run twice it would be ~ 2 seconds
end
-
+
it "can turn off memoization for GET requests" do
hydra = Typhoeus::Hydra.new
hydra.disable_memoization
@@ -222,8 +222,10 @@ def set(key, object, timeout = 0)
@responses.size.should == 3
(Time.now - start_time).should < 3.3
end
-
+
it "should fire and forget" do
+ # this test is totally hacky. I have no clue how to make it verify. I just look at the test servers
+ # to verify that stuff is running
hydra = Typhoeus::Hydra.new
first = Typhoeus::Request.new("http://localhost:3000/first?delay=1")
second = Typhoeus::Request.new("http://localhost:3001/second?delay=2")
@@ -233,5 +235,25 @@ def set(key, object, timeout = 0)
third = Typhoeus::Request.new("http://localhost:3002/third?delay=3")
hydra.queue third
hydra.fire_and_forget
+ sleep 3 # have to do this or future tests may break.
+ end
+
+ it "should take the maximum number of concurrent reqeusts as an argument" do
+ hydra = Typhoeus::Hydra.new(:max_concurrency => 2)
+ first = Typhoeus::Request.new("http://localhost:3000/first?delay=1")
+ second = Typhoeus::Request.new("http://localhost:3001/second?delay=1")
+ third = Typhoeus::Request.new("http://localhost:3002/third?delay=1")
+ hydra.queue first
+ hydra.queue second
+ hydra.queue third
+
+ start_time = Time.now
+ hydra.run
+ finish_time = Time.now
+
+ first.response.code.should == 200
+ second.response.code.should == 200
+ third.response.code.should == 200
+ (finish_time - start_time).should > 2.0
end
end

0 comments on commit f53a748

Please sign in to comment.