Permalink
Browse files

Using WATCH/MULTI/EXEC to implement LPOPRPUSH to prevent data loss be…

…tween the worker pop and job perform
  • Loading branch information...
1 parent 96bb1b2 commit d23b9e5b4f3df7db15d066c2b86a9dcd06b0c693 @redsquirrel committed Mar 2, 2012
Showing with 63 additions and 3 deletions.
  1. +27 −1 lib/resque.rb
  2. +7 −2 lib/resque/job.rb
  3. +11 −0 lib/resque/worker.rb
  4. +12 −0 test/job_hooks_test.rb
  5. +6 −0 test/resque_test.rb
View
@@ -155,7 +155,33 @@ def push(queue, item)
#
# Returns a Ruby object.
def pop(queue)
- decode redis.lpop("queue:#{queue}")
+ decode lpoprpush("queue:#{queue}", "backup-queue:#{queue}")
+ end
+
+ def lpoprpush(from, to)
+ payload = nil
+ begin
+ redis.watch(from)
+ payload = redis.lindex(from, 0)
+ if payload.nil?
+ redis.unwatch
+ return nil
+ end
+ redis.multi
+ redis.lpop(from)
+
+ # Doing this to ensure that we can later LREM in remove_backup because
+ # Ruby 1.8.* orders hashes with string/symbol keys differently.
+ payload = encode(decode(payload))
+
+ redis.rpush(to, payload)
+ end until redis.exec
+ payload
+ end
+
+ # Removes a job off a backup-queue.
+ def remove_backup(queue, payload)
+ redis.lrem("backup-queue:#{queue}", 1, encode(payload))
end
# Returns an integer representing the size of a queue.
View
@@ -119,7 +119,7 @@ def perform
# Execute the job. Do it in an around_perform hook if available.
if around_hooks.empty?
- job.perform(*job_args)
+ do_perform(job, job_args)
job_was_performed = true
else
# We want to nest all around_perform plugins, with the last one
@@ -132,7 +132,7 @@ def perform
else
lambda do
job.send(hook, *job_args) do
- result = job.perform(*job_args)
+ result = do_perform(job, job_args)
job_was_performed = true
result
end
@@ -158,6 +158,11 @@ def perform
end
end
+ def do_perform(job, job_args)
+ Resque.remove_backup(queue, payload)
+ job.perform(*job_args)
+ end
+
# Returns the actual class constant represented in this job's payload.
def payload_class
@payload_class ||= constantize(@payload['class'])
View
@@ -144,6 +144,7 @@ def work(interval = 5.0, &block)
done_working
@child = nil
+ restore_unprocessed_messages
else
break if interval.zero?
log! "Sleeping for #{interval} seconds"
@@ -238,6 +239,7 @@ def startup
enable_gc_optimizations
register_signal_handlers
prune_dead_workers
+ restore_unprocessed_messages
run_hook :before_first_fork
register_worker
@@ -246,6 +248,15 @@ def startup
$stdout.sync = true
end
+
+ def restore_unprocessed_messages
+ @queues.each do |queue|
+ while Resque.lpoprpush("backup-queue:#{queue}", "queue:#{queue}")
+ #just moving the backup queue stuff onto the queue again
+ end
+ end
+ end
+
# Enables GC Optimizations if you're running REE.
# http://www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
def enable_gc_optimizations
View
@@ -420,4 +420,16 @@ def self.on_failure_record_history(exception, history)
"oh no"
]
end
+
+
+ class ::VerifyBackupRemovalJob
+ def self.perform(ignored); end
+ end
+
+ test "the backup job is removed from the job's backup queue" do
+ Resque.redis.rpush("backup-queue:testqueue", Resque.encode("class" => "VerifyBackupRemovalJob", "args" => ["foo" => "bar"]))
+ perform_job("VerifyBackupRemovalJob", "foo" => "bar")
+ assert_equal [], Resque.redis.lrange("backup-queue:testqueue", 0, -1)
+ end
+
end
View
@@ -169,6 +169,12 @@
assert_equal nil, Resque.pop(:people)
end
+
+ test "can backup items when they are pulled off a queue" do
+ Resque.pop(:people)
+ assert_equal({ 'name' => 'chris' }, Resque.decode(Resque.redis.lpop("backup-queue:people")))
+ end
+
test "knows how big a queue is" do
assert_equal 3, Resque.size(:people)

5 comments on commit d23b9e5

amiel commented on d23b9e5 Mar 3, 2012

Would it be helpful to have more tests around this?

Owner

redsquirrel replied Mar 4, 2012

amiel replied Mar 5, 2012

@redsquirrel I wrote some tests for and fixed a bug in Worker#restore_unprocessed_messages on the airplane leaving #rbonales before I noticed that you dropped it.

I'm not sure if you'll go that direction again. In case it could be helpful:

amiel/resque@d23b9e5...192749e

Owner

redsquirrel replied Mar 13, 2012

I've sent you a pull request to you multi branch since that's what I based my stuff from.

Please sign in to comment.