Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merge master

  • Loading branch information...
commit b56d97ef7287787dba2f03a27042f6026ebc823f 2 parents c5402fd + 08d277c
@mperham authored
Showing with 47 additions and 19 deletions.
  1. +2 −0  Changes.md
  2. +45 −19 lib/sidekiq/processor.rb
View
2  Changes.md
@@ -24,6 +24,8 @@ Please see [Upgrading.md](Upgrading.md) for upgrade notes.
-----------
- Auto-prune jobs older than one hour from the Workers page [#1508]
+- Fix issue where a job could be lost when an exception occurs updating
+ Redis stats before the job executes [#1511]
2.17.6
-----------
View
64 lib/sidekiq/processor.rb
@@ -74,36 +74,44 @@ def identity
end
def stats(worker, msg, queue)
- redis do |conn|
- conn.multi do
- conn.sadd('workers', identity)
- hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
- conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash))
+ # Do not conflate errors from the job with errors caused by updating
+ # stats so calling code can react appropriately
+ retry_and_suppress_exceptions do
+ hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
+ redis do |conn|
+ conn.multi do
+ conn.sadd('workers', identity)
+ conn.setex("worker:#{identity}", EXPIRY, hash)
+ end
end
end
begin
yield
rescue Exception
- redis do |conn|
- failed = "stat:failed:#{Time.now.utc.to_date}"
- result = conn.multi do
- conn.incrby("stat:failed", 1)
- conn.incrby(failed, 1)
+ retry_and_suppress_exceptions do
+ redis do |conn|
+ failed = "stat:failed:#{Time.now.utc.to_date}"
+ result = conn.multi do
+ conn.incrby("stat:failed", 1)
+ conn.incrby(failed, 1)
+ end
+ conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
- conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
raise
ensure
- redis do |conn|
- processed = "stat:processed:#{Time.now.utc.to_date}"
- result = conn.multi do
- conn.srem("workers", identity)
- conn.del("worker:#{identity}")
- conn.incrby("stat:processed", 1)
- conn.incrby(processed, 1)
+ retry_and_suppress_exceptions do
+ redis do |conn|
+ processed = "stat:processed:#{Time.now.utc.to_date}"
+ result = conn.multi do
+ conn.srem("workers", identity)
+ conn.del("worker:#{identity}")
+ conn.incrby("stat:processed", 1)
+ conn.incrby(processed, 1)
+ end
+ conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
- conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
end
end
@@ -117,5 +125,23 @@ def stats(worker, msg, queue)
def cloned(ary)
Marshal.load(Marshal.dump(ary))
end
+
+ # If an exception occurs in the block passed to this method, that block will be retried up to max_retries times.
+ # All exceptions will be swallowed and logged.
+ def retry_and_suppress_exceptions(max_retries = 2)
+ retry_count = 0
+ begin
+ yield
+ rescue => e
+ retry_count += 1
+ if retry_count <= max_retries
+ Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"}
+ sleep(1)
+ retry
+ else
+ Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"}
+ end
+ end
+ end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.