Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Fix transactional client #6

Merged
merged 3 commits into from Mar 28, 2012
View
@@ -125,8 +125,11 @@ def get(key, opts = {})
nil
end
- # nil result, force next get to jump from current server
- @counter = @gets_per_server unless val
+ # nil result without :close and :abort, force next get to jump from
+ # current server
+ if !val && @shuffle && !opts[:close] && !opts[:abort]
+ @counter = @gets_per_server
+ end
val
end
@@ -58,9 +58,15 @@ def get(key, opts = {})
close_last_transaction
- queue = read_from_error_queue? ? key + "_errors" : key
+ if read_from_error_queue?
+ queue = key + "_errors"
+ job = client.get_from_last(queue, opts.merge(:open => true))
+ else
+ queue = key
+ job = client.get(queue, opts.merge(:open => true))
+ end
- if job = client.get(queue, opts.merge(:open => true))
+ if job
@job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job)
@last_read_queue = queue
@current_queue = key
@@ -75,7 +81,7 @@ def current_try
def close_last_transaction #:nodoc:
return unless @last_read_queue
- client.get_from_last(@last_read_queue + "/close")
+ client.get_from_last(@last_read_queue, :close => true)
@last_read_queue = @current_queue = @job = nil
end
@@ -17,9 +17,9 @@ def get_job
it "processes normal jobs" do
returns = [:mcguffin]
stub(@raw_kestrel_client).get(@queue, anything) { returns.shift }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything)
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything)
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 1
@@ -28,10 +28,10 @@ def get_job
it "processes successful retries" do
returns = [Kestrel::Client::Transactional::RetryableJob.new(1, :mcguffin)]
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything) { returns.shift }
stub(@raw_kestrel_client).get(@queue, anything)
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 2
@@ -41,13 +41,13 @@ def get_job
it "processes normal jobs that should retry" do
returns = [:mcguffin]
stub(@raw_kestrel_client).get(@queue, anything) { returns.shift }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything)
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything)
mock(@raw_kestrel_client).set(@queue + "_errors", anything) do |q,j|
j.retries.should == 1
j.job.should == :mcguffin
end
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 1
@@ -58,13 +58,13 @@ def get_job
it "processes retries that should retry" do
returns = [Kestrel::Client::Transactional::RetryableJob.new(1, :mcguffin)]
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { returns.shift }
stub(@raw_kestrel_client).get(@queue, anything)
mock(@raw_kestrel_client).set(@queue + "_errors", anything) do |q,j|
j.retries.should == 2
j.job.should == :mcguffin
end
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 2
@@ -75,10 +75,10 @@ def get_job
it "processes retries that should give up" do
returns = [Kestrel::Client::Transactional::RetryableJob.new(Kestrel::Client::Transactional::DEFAULT_RETRIES - 1, :mcguffin)]
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { returns.shift }
stub(@raw_kestrel_client).get(@queue, anything)
mock(@raw_kestrel_client).set.never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == Kestrel::Client::Transactional::DEFAULT_RETRIES
@@ -102,20 +102,20 @@ def get_job
it "is nil when the primary queue is empty and selected" do
mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE + 0.05 }
mock(@raw_kestrel_client).get(@queue, anything) { nil }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything).never
+ mock(@raw_kestrel_client).get(@queue + "_errors", :open => true).never
@kestrel.get(@queue).should be_nil
end
it "is nil when the error queue is empty and selected" do
mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE - 0.05 }
mock(@raw_kestrel_client).get(@queue, anything).never
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) { nil }
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { nil }
@kestrel.get(@queue).should be_nil
end
it "returns the payload of a RetryableJob" do
stub(@kestrel).rand { 0 }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", anything) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@@ -126,18 +126,18 @@ def get_job
stub(@raw_kestrel_client).get(@queue, anything) { :mcguffin }
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
@kestrel.get(@queue)
end
it "closes an open transaction with retries" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.get(@queue)
end
@@ -186,7 +186,7 @@ def get_job
it "increments the retry count and re-enqueues the retried job" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@@ -201,11 +201,11 @@ def get_job
it "does not enqueue the retried job after too many tries" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) do
Kestrel::Client::Transactional::RetryableJob.new(Kestrel::Client::Transactional::DEFAULT_RETRIES - 1, :mcmuffin)
end
mock(@raw_kestrel_client).set(@queue + "_errors", anything).never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.get(@queue)
lambda { @kestrel.retry }.should raise_error(Kestrel::Client::Transactional::RetriesExceeded)
end
@@ -214,18 +214,18 @@ def get_job
stub(@raw_kestrel_client).get(@queue, anything) { :mcguffin }
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
@kestrel.retry
end
it "closes an open transaction with retries" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.retry
end
end
@@ -244,26 +244,26 @@ def get_job
describe "#close_last_transaction" do
it "does nothing if there is no last transaction" do
- mock(@raw_kestrel_client).get_from_last(@queue + "/close").never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close").never
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true).never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true).never
@kestrel.send(:close_last_transaction)
end
it "closes the normal queue if the job was pulled off of the normal queue" do
mock(@kestrel).read_from_error_queue? { false }
mock(@raw_kestrel_client).get(@queue, :open => true) { :mcguffin }
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close").never
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true).never
@kestrel.get(@queue).should == :mcguffin
@kestrel.send(:close_last_transaction)
end
it "closes the error queue if the job was pulled off of the error queue" do
mock(@kestrel).read_from_error_queue? { true }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) { Kestrel::Client::Transactional::RetryableJob.new 1, :mcguffin }
- mock(@raw_kestrel_client).get_from_last(@queue + "/close").never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { Kestrel::Client::Transactional::RetryableJob.new 1, :mcguffin }
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true).never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.get(@queue).should == :mcguffin
@kestrel.send(:close_last_transaction)