Permalink
Browse files

Merge remote-tracking branch 'blocking/blocking_reserve'

Conflicts:
	test/worker_test.rb
  • Loading branch information...
2 parents 248dd88 + ad7a8d2 commit 2e58bafd7c77d296527642d75ab3539b78ae76bc @wishdev committed Dec 15, 2011
Showing with 90 additions and 4 deletions.
  1. +7 −0 lib/resque.rb
  2. +9 −0 lib/resque/job.rb
  3. +35 −4 lib/resque/worker.rb
  4. +39 −0 test/worker_test.rb
View
@@ -158,6 +158,13 @@ def pop(queue)
decode redis.lpop("queue:#{queue}")
end
+ # Blocking pops a job off a list of queues. Queuelist should be an array.
+ #
+ # Returns a Ruby object.
+ def bpop(queuelist, timeout)
+ redis.blpop(*queuelist.map {|queue| "queue:#{queue}"}, timeout)
+ end
+
# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
View
@@ -98,6 +98,15 @@ def self.reserve(queue)
new(queue, payload)
end
+ # Given a list of queue names, returns an instance or Resque::Job
+ # or blocks until a job becomes available on any of the queues, or
+ # timeout is reached.
+ def self.blocking_reserve(queues, timeout)
+ return unless payload = Resque.bpop(queues, timeout) # payload = ["namespace:queue:job", payload]
+ queue = payload[0].split(':').last
+ new(queue, decode(payload[1]))
+ end
+
# Attempts to perform the work represented by this job instance.
# Calls #perform on the class given in the payload with the
# arguments given in the payload.
View
@@ -106,6 +106,15 @@ def validate_queues
end
end
+ # Check to see if the work can use blocking_reserve, depending
+ # on what queues are given.
+ # - If one queue, blocking_reserve is always used
+ # - If multiple queues, blocking_reserve is only used when
+ # not using Redis::Distributed
+ def blockable?
+ @queues.size == 1 || !redis.respond_to?(:nodes)
+ end
+
# This is the main workhorse method. Called on a Worker instance,
# it begins the worker life cycle.
#
@@ -127,10 +136,13 @@ def work(interval = 5.0, &block)
$0 = "resque: Starting"
startup
+ blocking = interval > 0 && blockable?
+
loop do
break if shutdown?
- if not paused? and job = reserve
+ procline "Blocked reserving for #{@queues.join(', ')}" if blocking
+ if not paused? and job = blocking ? blocking_reserve(interval.to_i) : reserve
log "got: #{job.inspect}"
job.worker = self
run_hook :before_fork, job
@@ -150,9 +162,11 @@ def work(interval = 5.0, &block)
@child = nil
else
break if interval.zero?
- log! "Sleeping for #{interval} seconds"
- procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
- sleep interval
+ unless blocking
+ log! "Sleeping for #{interval} seconds"
+ procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
+ sleep interval
+ end
end
end
@@ -210,6 +224,23 @@ def reserve
raise e
end
+ # Blocking-attempts to grab a job off one of the queues, or blocks
+ # until a job arrives or timeout expires. Returns nil if no job can
+ # be found.
+ def blocking_reserve(timeout)
+ log! "Checking #{queues.join(', ')} (blocking, timeout = #{timeout})"
+ if job = Resque::Job.blocking_reserve(queues, timeout)
+ log! "Found job on #{job.queue}"
+ return job
+ end
+
+ nil
+ rescue Exception => e
+ log "Error block-reserving job: #{e.inspect}"
+ log e.backtrace.join("\n")
+ raise e
+ end
+
# Returns a list of queues to use when searching for a job.
# A splat ("*") means you want every queue (in alpha order) - this
# can be useful for dynamically adding new queues.
View
@@ -402,4 +402,43 @@ def self.exception
assert_equal queue2, Resque::Failure.all(0)['queue']
assert_equal 1, Resque::Failure.count
end
+
+ test "can blocking grab a job from its queues" do
+ job = @worker.blocking_reserve(1)
+ assert_not_nil job
+ assert_equal({"args"=>[20, "/tmp"], "class"=>"SomeJob"}, job.payload)
+ assert_equal 0, Resque.size(:jobs)
+ end
+
+ test "can blocking grab nothing from an empty queue" do
+ worker = Resque::Worker.new(:empty)
+ job = worker.blocking_reserve(1)
+ assert_nil job
+ end
+
+ test "can do blocking work" do
+ shutdown_thread = Thread.new do
+ sleep 2
+ @worker.shutdown
+ end
+
+ @worker.work(1)
+
+ shutdown_thread.join
+ end
+
+ test "can blocking reserve from multiple queues" do
+ Resque::Job.create(:high, GoodJob)
+ Resque::Job.create(:critical, GoodJob)
+
+ worker = Resque::Worker.new(:critical, :high)
+
+ worker.blocking_reserve(5)
+ assert_equal 1, Resque.size(:high)
+ assert_equal 0, Resque.size(:critical)
+
+ worker.blocking_reserve(5)
+ assert_equal 0, Resque.size(:high)
+ end
+
end

0 comments on commit 2e58baf

Please sign in to comment.