Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 4 commits
  • 3 files changed
  • 0 commit comments
  • 2 contributors
Commits on Oct 22, 2011
@eric eric Pass proper arguments to Client#get_from_last in Transactional client
This prevents clients from prematurely jumping between servers by not passing
the "/close" argument as part of the queue name.
f08c5d5
@eric eric Don't trigger a reconnect on /close or error queue 0b5394a
@eric eric Only reset the counter if we were shufflin' 5da8546
Commits on Mar 28, 2012
@bitbckt bitbckt Merge pull request #6 from eric/fix-transactional-close
Fix transactional client
846a4fd
View
7 lib/kestrel/client.rb
@@ -125,8 +125,11 @@ def get(key, opts = {})
nil
end
- # nil result, force next get to jump from current server
- @counter = @gets_per_server unless val
+ # nil result without :close and :abort, force next get to jump from
+ # current server
+ if !val && @shuffle && !opts[:close] && !opts[:abort]
+ @counter = @gets_per_server
+ end
val
end
View
12 lib/kestrel/client/transactional.rb
@@ -58,9 +58,15 @@ def get(key, opts = {})
close_last_transaction
- queue = read_from_error_queue? ? key + "_errors" : key
+ if read_from_error_queue?
+ queue = key + "_errors"
+ job = client.get_from_last(queue, opts.merge(:open => true))
+ else
+ queue = key
+ job = client.get(queue, opts.merge(:open => true))
+ end
- if job = client.get(queue, opts.merge(:open => true))
+ if job
@job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job)
@last_read_queue = queue
@current_queue = key
@@ -75,7 +81,7 @@ def current_try
def close_last_transaction #:nodoc:
return unless @last_read_queue
- client.get_from_last(@last_read_queue + "/close")
+ client.get_from_last(@last_read_queue, :close => true)
@last_read_queue = @current_queue = @job = nil
end
View
58 spec/kestrel/client/transactional_spec.rb
@@ -17,9 +17,9 @@ def get_job
it "processes normal jobs" do
returns = [:mcguffin]
stub(@raw_kestrel_client).get(@queue, anything) { returns.shift }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything)
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything)
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 1
@@ -28,10 +28,10 @@ def get_job
it "processes successful retries" do
returns = [Kestrel::Client::Transactional::RetryableJob.new(1, :mcguffin)]
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything) { returns.shift }
stub(@raw_kestrel_client).get(@queue, anything)
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 2
@@ -41,13 +41,13 @@ def get_job
it "processes normal jobs that should retry" do
returns = [:mcguffin]
stub(@raw_kestrel_client).get(@queue, anything) { returns.shift }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything)
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything)
mock(@raw_kestrel_client).set(@queue + "_errors", anything) do |q,j|
j.retries.should == 1
j.job.should == :mcguffin
end
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 1
@@ -58,13 +58,13 @@ def get_job
it "processes retries that should retry" do
returns = [Kestrel::Client::Transactional::RetryableJob.new(1, :mcguffin)]
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { returns.shift }
stub(@raw_kestrel_client).get(@queue, anything)
mock(@raw_kestrel_client).set(@queue + "_errors", anything) do |q,j|
j.retries.should == 2
j.job.should == :mcguffin
end
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == 2
@@ -75,10 +75,10 @@ def get_job
it "processes retries that should give up" do
returns = [Kestrel::Client::Transactional::RetryableJob.new(Kestrel::Client::Transactional::DEFAULT_RETRIES - 1, :mcguffin)]
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { returns.shift }
stub(@raw_kestrel_client).get(@queue, anything)
mock(@raw_kestrel_client).set.never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
get_job.should == :mcguffin
@kestrel.current_try.should == Kestrel::Client::Transactional::DEFAULT_RETRIES
@@ -102,20 +102,20 @@ def get_job
it "is nil when the primary queue is empty and selected" do
mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE + 0.05 }
mock(@raw_kestrel_client).get(@queue, anything) { nil }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything).never
+ mock(@raw_kestrel_client).get(@queue + "_errors", :open => true).never
@kestrel.get(@queue).should be_nil
end
it "is nil when the error queue is empty and selected" do
mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE - 0.05 }
mock(@raw_kestrel_client).get(@queue, anything).never
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) { nil }
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { nil }
@kestrel.get(@queue).should be_nil
end
it "returns the payload of a RetryableJob" do
stub(@kestrel).rand { 0 }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", anything) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@@ -126,18 +126,18 @@ def get_job
stub(@raw_kestrel_client).get(@queue, anything) { :mcguffin }
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
@kestrel.get(@queue)
end
it "closes an open transaction with retries" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.get(@queue)
end
@@ -186,7 +186,7 @@ def get_job
it "increments the retry count and re-enqueues the retried job" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", anything) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@@ -201,11 +201,11 @@ def get_job
it "does not enqueue the retried job after too many tries" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) do
Kestrel::Client::Transactional::RetryableJob.new(Kestrel::Client::Transactional::DEFAULT_RETRIES - 1, :mcmuffin)
end
mock(@raw_kestrel_client).set(@queue + "_errors", anything).never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.get(@queue)
lambda { @kestrel.retry }.should raise_error(Kestrel::Client::Transactional::RetriesExceeded)
end
@@ -214,18 +214,18 @@ def get_job
stub(@raw_kestrel_client).get(@queue, anything) { :mcguffin }
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
@kestrel.retry
end
it "closes an open transaction with retries" do
stub(@kestrel).rand { 0 }
- stub(@raw_kestrel_client).get(@queue + "_errors", anything) do
+ stub(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) do
Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
end
@kestrel.get(@queue)
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.retry
end
end
@@ -244,16 +244,16 @@ def get_job
describe "#close_last_transaction" do
it "does nothing if there is no last transaction" do
- mock(@raw_kestrel_client).get_from_last(@queue + "/close").never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close").never
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true).never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true).never
@kestrel.send(:close_last_transaction)
end
it "closes the normal queue if the job was pulled off of the normal queue" do
mock(@kestrel).read_from_error_queue? { false }
mock(@raw_kestrel_client).get(@queue, :open => true) { :mcguffin }
- mock(@raw_kestrel_client).get_from_last(@queue + "/close")
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close").never
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true)
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true).never
@kestrel.get(@queue).should == :mcguffin
@kestrel.send(:close_last_transaction)
@@ -261,9 +261,9 @@ def get_job
it "closes the error queue if the job was pulled off of the error queue" do
mock(@kestrel).read_from_error_queue? { true }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) { Kestrel::Client::Transactional::RetryableJob.new 1, :mcguffin }
- mock(@raw_kestrel_client).get_from_last(@queue + "/close").never
- mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :open => true) { Kestrel::Client::Transactional::RetryableJob.new 1, :mcguffin }
+ mock(@raw_kestrel_client).get_from_last(@queue, :close => true).never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors", :close => true)
@kestrel.get(@queue).should == :mcguffin
@kestrel.send(:close_last_transaction)

No commit comments for this range

Something went wrong with that request. Please try again.