Skip to content
This repository has been archived by the owner on Sep 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #20 from dimko/patch-3
Browse files Browse the repository at this point in the history
Fixed scheduled jobs locking
  • Loading branch information
krasnoukhov committed Jan 22, 2014
2 parents fe0d701 + 681d9af commit 870f1bc
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 167 deletions.
14 changes: 7 additions & 7 deletions lib/sidekiq-middleware.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "digest"
require "sidekiq-middleware/version"
require "sidekiq-middleware/core_ext"
require "sidekiq-middleware/unique_key"
require "sidekiq-middleware/server/unique_jobs"
require "sidekiq-middleware/client/unique_jobs"
require "sidekiq-middleware/middleware"
require 'digest/md5'
require 'sidekiq-middleware/version'
require 'sidekiq-middleware/core_ext'
require 'sidekiq-middleware/worker'
require 'sidekiq-middleware/server/unique_jobs'
require 'sidekiq-middleware/client/unique_jobs'
require 'sidekiq-middleware/middleware'
24 changes: 10 additions & 14 deletions lib/sidekiq-middleware/client/unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,33 @@ module Sidekiq
module Middleware
module Client
class UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60

def call(worker_class, item, queue)
worker_class = worker_class.constantize if worker_class.is_a?(String)

enabled, expiration = worker_class.get_sidekiq_options['unique'],
(worker_class.get_sidekiq_options['expiration'] || HASH_KEY_EXPIRATION)
enabled = worker_class.unique_enabled?(item)

if enabled
unique, payload = false, item.clone.slice(*%w(class queue args at))
expiration = worker_class.unique_exiration
job_id = item['jid']
unique = false

# Enabled unique scheduled
if enabled == :all && payload.has_key?('at')
# Scheduled
if item.has_key?('at')
# Use expiration period as specified in configuration,
# but relative to job schedule time
expiration += (payload['at'].to_i - Time.now.to_i)
payload.delete('at')
expiration += (item['at'].to_i - Time.now.to_i)
end

payload_hash = Sidekiq::Middleware::UniqueKey.generate(worker_class, payload)
unique_key = worker_class.unique_digest(item)

Sidekiq.redis do |conn|
conn.watch(payload_hash)
conn.watch(unique_key)

locked_job_id = conn.get(payload_hash)
locked_job_id = conn.get(unique_key)
if locked_job_id && locked_job_id != job_id
conn.unwatch
else
unique = conn.multi do
conn.setex(payload_hash, expiration, job_id)
conn.setex(unique_key, expiration, job_id)
end
end
end
Expand Down
35 changes: 14 additions & 21 deletions lib/sidekiq-middleware/server/unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,27 @@ module Sidekiq
module Middleware
module Server
class UniqueJobs

def call(worker_instance, item, queue)
manual = worker_instance.class.get_sidekiq_options['manual']
worker_class = worker_instance.class
enabled = worker_class.unique_enabled?(item)

begin
if enabled
begin
yield
ensure
unless worker_class.unique_manual?
clear(worker_class, item)
end
end
else
yield
ensure
clear(worker_instance, item, queue) unless manual
end
end

def unique_lock_key(worker_instance, item, queue)
# Only enforce uniqueness across class, queue, args, and at.
# Useful when middleware uses the payload to store metadata.
enabled, payload = worker_instance.class.get_sidekiq_options['unique'],
item.clone.slice(*%w(class queue args at))

# Enabled unique scheduled
if enabled == :all && payload.has_key?('at')
payload.delete('at')
def clear(worker_class, item)
Sidekiq.redis do |conn|
conn.del worker_class.unique_digest(item)
end

Sidekiq::Middleware::UniqueKey.generate(worker_instance.class, payload)
end

def clear(worker_instance, item, queue)
enabled = worker_instance.class.get_sidekiq_options['unique']
Sidekiq.redis { |conn| conn.del unique_lock_key(worker_instance, item, queue) } if enabled
end
end
end
Expand Down
14 changes: 0 additions & 14 deletions lib/sidekiq-middleware/unique_key.rb

This file was deleted.

39 changes: 39 additions & 0 deletions lib/sidekiq-middleware/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module Sidekiq
module Middleware
module Worker
UNIQUE_EXPIRATION = 30 * 60 # 30 minutes

def unique_digest(item)
if respond_to?(:lock)
args = item['args']
lock(*args)
else
dumped = Sidekiq.dump_json(item.slice('class', 'queue', 'args'))
digest = Digest::MD5.hexdigest(dumped)

"locks:unique:#{digest}"
end
end

def unique_exiration
get_sidekiq_options['expiration'] || UNIQUE_EXPIRATION
end

def unique_enabled?(item)
enabled = get_sidekiq_options['unique']
if item.has_key?('at') && enabled != :all
enabled = false
end
enabled
end

def unique_manual?
get_sidekiq_options['manual']
end
end
end
end

Sidekiq::Worker::ClassMethods.class_eval do
include Sidekiq::Middleware::Worker
end
Loading

0 comments on commit 870f1bc

Please sign in to comment.