Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: resque/resque
...
head fork: efficientcloud/resque
compare: master
Checking mergeability… Don’t worry, you can still create the pull request.
  • 1 commit
  • 5 files changed
  • 0 commit comments
  • 1 contributor
Commits on Jul 04, 2011
Nate Pickens use blocking pop when running redis version >= 1.3.1 76bbb55
View
45 lib/resque.rb
@@ -31,6 +31,8 @@ module Resque
# 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
# or `Redis::Namespace`.
def redis=(server)
+ @server = server
+
case server
when String
if server =~ /redis\:\/\//
@@ -68,6 +70,15 @@ def redis_id
end
end
+ # Establishes a new Redis connection. Useful for testing a blocking
+ # pop (fork child process, establish new connection, and push onto a
+ # queue while the parent process blocks on that queue).
+ def reconnect
+ return self.redis unless defined?(@server)
+ self.redis = @server
+ self.redis
+ end
+
# The `before_first_fork` hook will be run in the **parent** process
# only once, before forking to run the first job. Be careful- any
# changes you make will be permanent for the lifespan of the
@@ -162,6 +173,20 @@ def pop(queue)
decode redis.lpop("queue:#{queue}")
end
+ # Pops a job off the first non-empty of an array of queues, blocking
+ # if all queues are empty. A timeout of 0 will cause the call to block
+ # forever (or until the connection is reset)--not recommended.
+ #
+ # Returns a queue name and a Ruby object, or nil if `timeout` seconds
+ # passed with all queues remaining empty.
+ def bpop(queues, timeout)
+ args = queues.map { |q| "queue:#{q}" }
+ args << timeout
+
+ queue, raw_payload = redis.blpop(*args)
+ queue ? [queue.split(":").last, decode(raw_payload)] : nil
+ end
+
# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
@@ -275,11 +300,25 @@ def queue_from_class(klass)
# This method will return a `Resque::Job` object or a non-true value
# depending on whether a job can be obtained. You should pass it the
- # precise name of a queue: case matters.
+ # precise name of a queue: case matters. If a timeout is supplied,
+ # blocking will occur for that many seconds or until a job is found on
+ # (one of) the queue(s).
#
# This method is considered part of the `stable` API.
- def reserve(queue)
- Job.reserve(queue)
+ def reserve(queues, timeout = nil)
+ queues = [queues] unless queues.kind_of?(Array)
+
+ if timeout
+ queue, payload = bpop(queues, timeout.to_i)
+ return Job.new(queue, payload) if payload
+ else
+ queues.each do |queue|
+ payload = Resque.pop(queue)
+ return Job.new(queue, payload) if payload
+ end
+ end
+
+ nil
end
# Validates if the given klass could be a valid Resque job
View
6 lib/resque/tasks.rb
@@ -24,7 +24,11 @@
worker.log "Starting worker #{worker}"
- worker.work(ENV['INTERVAL'] || 5) # interval, will block
+ # Use blocking pop if Redis version >= 1.3.1
+ redis_version = Resque.redis.info["redis_version"]
+ can_block = redis_version && redis_version.to_f >= 1.3 && redis_version.split(".").last.to_i != 0
+
+ worker.work(:blocking => can_block, :interval => ENV['INTERVAL'] || 5) # interval, will block
end
desc "Start multiple Resque workers. Should only be used in dev mode."
View
55 lib/resque/worker.rb
@@ -102,21 +102,38 @@ def validate_queues
# 2. Work loop: Jobs are pulled from a queue and processed.
# 3. Teardown: This worker is unregistered.
#
- # Can be passed a float representing the polling frequency.
- # The default is 5 seconds, but for a semi-active site you may
- # want to use a smaller value.
+ # Options:
+ # :interval In polling mode, how often to poll; in blocking
+ # mode, how often to timeout the blocking and return
+ # to the main loop (to check if this worker has been
+ # paused or shutdown).
+ # :blocking Whether or not the Redis call to grab a job should
+ # block if all the queues are empty. Can only be used
+ # if running Redis version >= 1.3.1.
+ # :die_if_idle Whether or not this worker should die as soon as
+ # there are no jobs to process. Useful for testing.
+ #
+ # The default interval is 5 seconds, but for a semi-active site you
+ # may want to use a smaller value when not using blocking.
#
# Also accepts a block which will be passed the job as soon as it
# has completed processing. Useful for testing.
- def work(interval = 5.0, &block)
- interval = Float(interval)
+ def work(options = {:interval => 5.0}, &block)
$0 = "resque: Starting"
startup
+ # Support old way of passing in a simple polling interval.
+ unless options.kind_of?(Hash)
+ options = {:interval => options}
+ options[:die_if_idle] = options[:interval].to_i == 0
+ end
+
+ options[:interval] = options[:interval].to_f
+
loop do
break if shutdown?
- if not paused? and job = reserve
+ if not @paused and job = reserve(options[:blocking] ? options[:interval] : nil)
log "got: #{job.inspect}"
run_hook :before_fork, job
working_on job
@@ -134,13 +151,14 @@ def work(interval = 5.0, &block)
done_working
@child = nil
else
- break if interval.zero?
- log! "Sleeping for #{interval} seconds"
- procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
- sleep interval
+ break if options[:die_if_idle]
+ next if options[:blocking] && !@paused
+
+ log! "Sleeping for #{options[:interval]}"
+ procline @paused ? "Paused" : "Waiting for #{@queues.join(',')}"
+ sleep options[:interval]
end
end
-
ensure
unregister_worker
end
@@ -178,16 +196,11 @@ def perform(job)
# Attempts to grab a job off one of the provided queues. Returns
# nil if no job can be found.
- def reserve
- queues.each do |queue|
- log! "Checking #{queue}"
- if job = Resque::Job.reserve(queue)
- log! "Found job on #{queue}"
- return job
- end
- end
-
- nil
+ def reserve(timeout = nil)
+ log! "Checking #{queues.join(', ')}"
+ job = Resque.reserve(queues, timeout)
+ log! "Found job on #{job.queue}" if job
+ job
rescue Exception => e
log "Error reserving job: #{e.inspect}"
log e.backtrace.join("\n")
View
34 test/resque_test.rb
@@ -256,4 +256,38 @@
Resque.inline = false
end
end
+
+ test "can block on a single empty queue until a job is queued" do
+ if child = Kernel.fork
+ begin
+ assert_kind_of Resque::Job, Resque.reserve(:jobs, 1)
+ ensure
+ Process.wait(child)
+ end
+ else
+ # Open a new socket so we're not stuck waiting behind the blocking
+ # call made by the parent process.
+ Resque.reconnect
+ Resque::Job.create(:jobs, SomeJob)
+ exit!
+ end
+ end
+
+ test "can block on multiple empty queues until a job is queued" do
+ if child = Kernel.fork
+ begin
+ assert_kind_of Resque::Job, Resque.reserve([:queue1, :queue2], 5)
+ assert_kind_of Resque::Job, Resque.reserve([:queue1, :queue2], 5)
+ ensure
+ Process.wait(child)
+ end
+ else
+ Resque.reconnect
+
+ Resque::Job.create(:queue2, SomeJob)
+ Resque::Job.create(:queue1, SomeJob)
+
+ exit!
+ end
+ end
end
View
28 test/worker_test.rb
@@ -325,8 +325,34 @@
workerA.work(0)
assert $AFTER_FORK_CALLED
end
-
+
test "returns PID of running process" do
assert_equal @worker.to_s.split(":")[1].to_i, @worker.pid
end
+
+ test "can block on empty queues until a job is queued" do
+ if child = Kernel.fork
+ begin
+ worker = Resque::Worker.new(:queue1, :queue2)
+
+ processed_jobs = 0
+
+ worker.work(:blocking => true, :interval => 1, :die_if_idle => true) do |job|
+ assert_kind_of Resque::Job, job
+ processed_jobs +=1
+ end
+
+ assert_equal 2, processed_jobs
+ ensure
+ Process.wait(child)
+ end
+ else
+ Resque.reconnect
+
+ Resque::Job.create(:queue2, SomeJob)
+ Resque::Job.create(:queue1, SomeJob)
+
+ exit!
+ end
+ end
end

No commit comments for this range

Something went wrong with that request. Please try again.