Permalink
Browse files

Swallow (and attempt to retry) Redis timeout errors when updating sta…

…ts. This is so code which calls #stats does not conflate job failures with stats failures.
  • Loading branch information...
1 parent 939006e commit 0a58b7f4f6921e32a8fccc2b49820398f5e23555 @jonhyman jonhyman committed Feb 24, 2014
Showing with 46 additions and 21 deletions.
  1. +46 −21 lib/sidekiq/processor.rb
View
67 lib/sidekiq/processor.rb
@@ -92,38 +92,45 @@ def identity
end
def stats(worker, msg, queue)
- redis do |conn|
- conn.multi do
- conn.sadd('workers', identity)
- conn.setex("worker:#{identity}:started", EXPIRY, Time.now.to_s)
- hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
- conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash))
+ # Do not conflate errors from the job with errors caused by updating stats so calling code can react appropriately
+ retry_and_suppress_redis_timeouts do
+ redis do |conn|
+ conn.multi do
+ conn.sadd('workers', identity)
+ conn.setex("worker:#{identity}:started", EXPIRY, Time.now.to_s)
+ hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
+ conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash))
+ end
end
end
begin
yield
rescue Exception
- redis do |conn|
- failed = "stat:failed:#{Time.now.utc.to_date}"
- result = conn.multi do
- conn.incrby("stat:failed", 1)
- conn.incrby(failed, 1)
+ retry_and_suppress_redis_timeouts do
+ redis do |conn|
+ failed = "stat:failed:#{Time.now.utc.to_date}"
+ result = conn.multi do
+ conn.incrby("stat:failed", 1)
+ conn.incrby(failed, 1)
+ end
+ conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
- conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
raise
ensure
- redis do |conn|
- processed = "stat:processed:#{Time.now.utc.to_date}"
- result = conn.multi do
- conn.srem("workers", identity)
- conn.del("worker:#{identity}")
- conn.del("worker:#{identity}:started")
- conn.incrby("stat:processed", 1)
- conn.incrby(processed, 1)
+ retry_and_suppress_redis_timeouts do
+ redis do |conn|
+ processed = "stat:processed:#{Time.now.utc.to_date}"
+ result = conn.multi do
+ conn.srem("workers", identity)
+ conn.del("worker:#{identity}")
+ conn.del("worker:#{identity}:started")
+ conn.incrby("stat:processed", 1)
+ conn.incrby(processed, 1)
+ end
+ conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
- conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
end
end
@@ -137,5 +144,23 @@ def stats(worker, msg, queue)
def cloned(ary)
Marshal.load(Marshal.dump(ary))
end
+
+ # If there is a Redis::TimeoutError, the block passed to this method will be retried up to max_retries times.
+ # All exceptions will be swallowed and logged.
+ def retry_and_suppress_redis_timeouts(max_retries = 2)
+ retry_count = 0
+ begin
+ yield
+ rescue Redis::TimeoutError
+ retry_count += 1
+ if retry_count <= max_retries
+ retry
+ else
+ Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"}
+ end
+ rescue StandardError => e
+ Sidekiq.logger.info {"Suppressing error #{e.inspect}"}
+ end
+ end
end
end

0 comments on commit 0a58b7f

Please sign in to comment.