Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add timeouts to groups

  • Loading branch information...
commit 788b14f6f96395bc51995083f729a4a6dc82f5fa 1 parent 0bdbd39
@carllerche carllerche authored
View
6 lib/kirk/client.rb
@@ -2,6 +2,8 @@
module Kirk
class Client
+ class TimeoutError < RuntimeError ; end
+
require 'kirk/client/group'
require 'kirk/client/response'
require 'kirk/client/request'
@@ -51,8 +53,8 @@ def client
end
end
- def process(request)
- client.send Exchange.build(request)
+ def process(exchange)
+ client.send exchange
end
def stop
View
16 lib/kirk/client/exchange.rb
@@ -23,6 +23,7 @@ def prepare!(request)
self.response = Response.new(!handler.respond_to?(:on_response_body))
self.method = request.method
self.url = request.url.to_s
+ self.timeout = request.group.timeout
if request.headers
request.headers.each do |name, val|
@@ -77,15 +78,14 @@ def onException(ex)
end
response.exception = true
- group.respond(response)
+ group.respond(self, response)
end
- # def onExpire
- # # p [ :onExpire ]
- # if handler.respond_to?(:on_timeout)
- # handler.on_timeout
- # end
- # end
+ def onExpire
+ if handler.respond_to?(:on_timeout)
+ handler.on_timeout
+ end
+ end
def onRequestComplete
if handler.respond_to?(:on_request_complete)
@@ -100,7 +100,7 @@ def onResponseComplete
# Need to return the response after the handler
# is done with it
- group.respond(response)
+ group.respond(self, response)
end
def onResponseContent(buf)
View
29 lib/kirk/client/group.rb
@@ -12,6 +12,7 @@ def initialize(client = Client.new, options = {})
@requests_count = 0
@responses = []
+ @in_progress = []
if @options[:host]
@host = @options.delete(:host).chomp('/')
@@ -50,8 +51,8 @@ def request(method = nil, url = nil, handler = nil, body = nil, headers = {})
request
end
- def respond(response)
- @queue.put(response)
+ def respond(exchange, response)
+ @queue.put([exchange, response])
end
%w/get post put delete head/.each do |method|
@@ -63,25 +64,37 @@ def #{method}(url, handler = nil, body = nil, headers = {})
end
def process(request)
- @client.process(request)
+ exchange = Exchange.build(request)
+
+ @in_progress << exchange
+ @client.process(exchange)
+
@requests_count += 1
end
def get_responses
while @requests_count > 0
- if resp = @queue.poll(timeout, TimeUnit::SECONDS)
- @responses << resp
+ exchange, resp = @queue.poll(timeout, TimeUnit::SECONDS)
+
+ if resp
+ @responses << resp
@requests_count -= 1
else
- raise "timed out"
+ @in_progress.each do |ex|
+ ex.cancel
+ end
+
+ @in_progress.each do |ex|
+ ex.wait_for_done
+ end
+
+ raise TimeoutError, "timed out"
end
end
completed
end
- private
-
def completed
complete.call if complete
end
View
31 spec/client/client_spec.rb
@@ -1,5 +1,6 @@
require 'spec_helper'
require 'kirk/client'
+require 'thread'
java_import org.eclipse.jetty.util.thread.QueuedThreadPool
@@ -81,16 +82,21 @@ def on_response_complete(response)
it "allows to use simplified syntax" do
class MyHandler
class << self
- attr_accessor :response_count
+ attr_accessor :response_count, :lock
end
+ self.lock = Mutex.new
self.response_count = 0
def on_response_complete(response)
- self.class.response_count += 1
+ self.class.lock.synchronize do
+ self.class.response_count += 1
+ end
end
end
+
h = MyHandler.new
+
group = Kirk::Client.group(:host => "localhost:9090") do |g|
g.get '/', h, 'get.body', {'X-Request-Method' => 'get'}
g.put '/', h, 'put.body', {'X-Request-Method' => 'put'}
@@ -99,11 +105,17 @@ def on_response_complete(response)
end
responses = parse_responses(group.responses)
- expected = ["DELETE delete delete.body", "GET get get.body",
- "POST post post.body", "PUT put put.body"]
+ expected = [ "DELETE delete delete.body",
+ "GET get get.body",
+ "POST post post.body",
+ "PUT put put.body" ]
+
responses.map do |r|
+
[r["REQUEST_METHOD"], r["HTTP_X_REQUEST_METHOD"], r["rack.input"]].join ' '
+
end.sort.should == expected
+
MyHandler.response_count.should == 4
end
@@ -366,6 +378,17 @@ def dispatch(job)
group.client.should == client
end
+ it "times out sanely" do
+ start(lambda { |env| sleep 2; [ 200, {}, 'Hello' ] })
+
+ lambda {
+ Kirk::Client.group(:host => "localhost:9090", :timeout => 1) do |g|
+ g.get "/"
+ end
+ }.should raise_error(Kirk::Client::TimeoutError)
+ sleep 1.1
+ end
+
def start_default_app
start(lambda { |env| [ 200, { 'Content-Type' => 'text/plain' }, [ "Hello" ] ] })
end
Please sign in to comment.
Something went wrong with that request. Please try again.