Permalink
Browse files

Only digest 'class', 'queue', 'args', and 'at' keys for the payload.

  • Loading branch information...
1 parent 3df69f8 commit e9aa277a5028c34ff0f9eb21920d0eaeaa947529 @bnorton bnorton committed Oct 13, 2012
@@ -5,23 +5,19 @@ class UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60
def call(worker_class, item, queue)
- enabled = worker_class.get_sidekiq_options['unique']
+ enabled, expiration = worker_class.get_sidekiq_options['unique'],
+ (worker_class.get_sidekiq_options['expiration'] || HASH_KEY_EXPIRATION)
if enabled
- unique = false
+ unique, payload = false, item.clone.slice(%w(class queue args at))
# Enabled unique scheduled
- if enabled == :all && item.has_key?('at')
- expiration = worker_class.get_sidekiq_options['expiration'] || (item['at'].to_i - Time.new.to_i)
- payload = item.clone
+ if enabled == :all && payload.has_key?('at')
+ expiration = (payload['at'].to_i - Time.now.to_i)
payload.delete('at')
- payload.delete('jid')
- else
- expiration = worker_class.get_sidekiq_options['expiration'] || HASH_KEY_EXPIRATION
- payload = item.clone
- payload.delete('jid')
end
- payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(Hash[payload.sort]))
+
+ payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(payload))
Sidekiq.redis do |conn|
conn.watch(payload_hash)
@@ -40,7 +36,6 @@ def call(worker_class, item, queue)
yield
end
end
-
end
end
end
@@ -0,0 +1,11 @@
+class Hash
+ def slice(*items)
+ items = items.to_a.flatten
+
+ {}.tap do |hash|
+ items.each do |item|
+ hash[item] = self[item] if self[item]
+ end
+ end
+ end
+end
@@ -18,22 +18,18 @@ def call(worker_instance, item, queue)
end
def clear(worker_instance, item, queue)
- enabled = worker_instance.class.get_sidekiq_options['unique']
+ # Only enforce uniqueness across class, queue, args, and at.
+ # Useful when middleware uses the payload to store metadata.
+ enabled, payload = worker_instance.class.get_sidekiq_options['unique'],
+ item.clone.slice(%w(class queue args at))
# Enabled unique scheduled
- if enabled == :all && item.has_key?('at')
- payload = item.clone
+ if enabled == :all && payload.has_key?('at')
payload.delete('at')
- payload.delete('jid')
- else
- payload = item.clone
- payload.delete('jid')
end
- payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(Hash[payload.sort]))
-
- Sidekiq.redis { |conn| conn.del(payload_hash) }
- end
+ Sidekiq.redis { |conn| conn.del Digest::MD5.hexdigest(Sidekiq.dump_json(payload)) }
+ end
end
end
end
View
@@ -8,6 +8,9 @@
require 'minitest/pride'
require 'minitest/autorun'
+require 'celluloid'
+Celluloid.logger = nil
+
require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq-middleware'
View
@@ -0,0 +1,28 @@
+require 'helper'
+require 'sidekiq-middleware/core_ext'
+
+class TestCoreExt < MiniTest::Unit::TestCase
+ describe 'for an empty array' do
+ it 'should be an ampty hash' do
+ assert_equal({}, {:foo => "bar"}.slice([]))
+ end
+ end
+
+ describe 'for items not in the hash' do
+ it 'should be an empty hash' do
+ assert_equal({}, {:foo => "bar", :foobar => "baz"}.slice(:baz, :foobaz))
+ end
+ end
+
+ describe 'for items in the hash' do
+ it 'should be the attributes' do
+ assert_equal({:foo => "bar"}, {:foo => "bar", :foobar => "baz"}.slice(:foo))
+ end
+ end
+
+ describe 'when all items are in the hash' do
+ it 'should be the hash' do
+ assert_equal({:foo => "bar", :foobar => "baz"}, {:foo => "bar", :foobar => "baz"}.slice(:foo, :foobar))
+ end
+ end
+end
View
@@ -10,7 +10,6 @@ class TestUniqueJobs < MiniTest::Unit::TestCase
before do
@boss = MiniTest::Mock.new
@processor = ::Sidekiq::Processor.new(@boss)
- Celluloid.logger = nil
Sidekiq.redis = REDIS
Sidekiq.redis {|c| c.flushdb }
@@ -29,6 +28,11 @@ def perform(x)
assert_equal 1, Sidekiq.redis { |c| c.llen('queue:unique_queue') }
end
+ it 'discards non critical information about the message' do
+ 5.times { Sidekiq::Client.push('class' => UniqueWorker, 'args' => ['critical'], 'sent_at' => Time.now.to_f, 'non' => 'critical') }
+ assert_equal 1, Sidekiq.redis { |c| c.llen('queue:unique_queue') }
+ end
+
class NotUniqueWorker
include Sidekiq::Worker
sidekiq_options queue: :not_unique_queue, unique: false
@@ -53,17 +57,27 @@ def perform(x)
it 'does not duplicate scheduled messages with enabled unique option' do
5.times { |t| UniqueScheduledWorker.perform_in((t+1)*60, 'args') }
- assert_equal 1, Sidekiq.redis { |c| c.zrangebyscore('schedule', '-inf', '+inf').length }
+ assert_equal 1, Sidekiq.redis { |c| c.zcard('schedule') }
end
- it 'once schedules job in future with enabled forever option' do
+ it 'allows the job to reschedule itself with enabled forever option' do
5.times {
- msg = Sidekiq.dump_json({ 'class' => UniqueScheduledWorker.to_s, 'args' => ['forever'] })
+ msg = Sidekiq.dump_json('class' => UniqueScheduledWorker.to_s, 'args' => ['something'])
+ @boss.expect(:processor_done!, nil, [@processor])
+ @processor.process(msg, 'default')
+ @boss.verify
+ }
+ assert_equal 1, Sidekiq.redis { |c| c.zcard('schedule') }
+ end
+
+ it 'discards non critical information about the message' do
+ 5.times {|i|
+ msg = Sidekiq.dump_json('class' => UniqueScheduledWorker.to_s, 'args' => ['something'], 'sent_at' => (Time.now + i*60).to_f)
@boss.expect(:processor_done!, nil, [@processor])
@processor.process(msg, 'default')
@boss.verify
}
- assert_equal 1, Sidekiq.redis { |c| c.zrangebyscore('schedule', '-inf', '+inf').length }
+ assert_equal 1, Sidekiq.redis { |c| c.zcard('schedule') }
end
end
-end
+end

0 comments on commit e9aa277

Please sign in to comment.