Permalink
Browse files

Merge pull request #1511 from mperham/feature/redis-stats-protection

Swallow (and attempt to retry) Redis timeout errors when updating stats....
  • Loading branch information...
2 parents 939006e + cf641cb commit 48446bb2e8750a4495e565fa61ca87c59c64b55d @mperham committed Feb 25, 2014
Showing with 47 additions and 21 deletions.
  1. +1 −0 Changes.md
  2. +46 −21 lib/sidekiq/processor.rb
View
@@ -2,6 +2,7 @@
-----------
- Auto-prune jobs older than one hour from the Workers page [#1508]
+- Fix ReliableFetch issue where a job could be lost when an exception occurs updating Redis stats before the job executes [#1511]
2.17.6
-----------
View
@@ -92,38 +92,45 @@ def identity
end
def stats(worker, msg, queue)
- redis do |conn|
- conn.multi do
- conn.sadd('workers', identity)
- conn.setex("worker:#{identity}:started", EXPIRY, Time.now.to_s)
- hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
- conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash))
+ # Do not conflate errors from the job with errors caused by updating stats so calling code can react appropriately
+ retry_and_suppress_exceptions do
+ redis do |conn|
+ conn.multi do
+ conn.sadd('workers', identity)
+ conn.setex("worker:#{identity}:started", EXPIRY, Time.now.to_s)
+ hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
+ conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash))
+ end
end
end
begin
yield
rescue Exception
- redis do |conn|
- failed = "stat:failed:#{Time.now.utc.to_date}"
- result = conn.multi do
- conn.incrby("stat:failed", 1)
- conn.incrby(failed, 1)
+ retry_and_suppress_exceptions do
+ redis do |conn|
+ failed = "stat:failed:#{Time.now.utc.to_date}"
+ result = conn.multi do
+ conn.incrby("stat:failed", 1)
+ conn.incrby(failed, 1)
+ end
+ conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
- conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
raise
ensure
- redis do |conn|
- processed = "stat:processed:#{Time.now.utc.to_date}"
- result = conn.multi do
- conn.srem("workers", identity)
- conn.del("worker:#{identity}")
- conn.del("worker:#{identity}:started")
- conn.incrby("stat:processed", 1)
- conn.incrby(processed, 1)
+ retry_and_suppress_exceptions do
+ redis do |conn|
+ processed = "stat:processed:#{Time.now.utc.to_date}"
+ result = conn.multi do
+ conn.srem("workers", identity)
+ conn.del("worker:#{identity}")
+ conn.del("worker:#{identity}:started")
+ conn.incrby("stat:processed", 1)
+ conn.incrby(processed, 1)
+ end
+ conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
- conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
end
end
@@ -137,5 +144,23 @@ def stats(worker, msg, queue)
def cloned(ary)
Marshal.load(Marshal.dump(ary))
end
+
+ # If an exception occurs in the block passed to this method, that block will be retried up to max_retries times.
+ # All exceptions will be swallowed and logged.
+ def retry_and_suppress_exceptions(max_retries = 2)
+ retry_count = 0
+ begin
+ yield
+ rescue => e
+ retry_count += 1
+ if retry_count <= max_retries
+ Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"}
+ sleep(1)
+ retry
+ else
+ Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"}
+ end
+ end
+ end
end
end

0 comments on commit 48446bb

Please sign in to comment.