Skip to content

Loading…

Add ability to remove individual scheduled jobs [#248] #257

Closed
wants to merge 1 commit into from

7 participants

@jonhinson

No description provided.

@fred

That could be a very interesting feature..

@linjunpop

It could be very useful, when this PR will be merged?

@mperham mperham commented on the diff
lib/sidekiq/client.rb
@@ -49,7 +49,10 @@ def self.push(item)
payload = Sidekiq.dump_json(item)
Sidekiq.redis do |conn|
if item['at']
- pushed = (conn.zadd('schedule', item['at'].to_s, payload) == 1)
+ _, pushed = conn.multi do
+ conn.zadd('schedule', item['at'].to_s, item['at'].to_s)
+ conn.rpush("schedule:#{item['at']}", payload)
@mperham Owner
mperham added a note

I don't understand why the rpush is necessary.

When we want to remove the job, we would be unable to remove it based only on the score as there may be multiple jobs scheduled for the same time. Thus, we store the payload in the list with key "schedule:timestamp" and can remove it like so: removed = Sidekiq.redis { |conn| conn.lrem("schedule:#{timestamp}", 0, payload) }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@mperham
Owner

I would prefer that scheduled jobs check at runtime to see if they should exit early or not.

Consider the case where you schedule an email to a user 5 days from now. 2 days from now, they go into their settings and select "don't send me email, ever". Should checking that box cause the application to go through the Sidekiq queue and remove any scheduled email jobs? I think the better design is to have the email job check if it has the correct permissions and settings when it executes.

@fred

also agree with that, I've changed my code to do the extra checking on the job itself.
removing from queue would be good, but not a required feature, you can always add that logic to your job class.

@jonhinson

@mperham Consider an application such as google calendar that has reminders for events. Say you set 3 reminders per event. If the event changes N times, then you'd have N * 3 "phantom" jobs that don't have permission to run. It seems a little more explicit and clear to simply remove the old jobs and create the new ones.

@mperham
Owner

Yeah, given that jobs don't have a unique identity, e.g. an 'id' property, we must resort to hacks to find the job to remove. On top of this, removing a job feels akin to cache invalidation: very easy to get wrong within your application code.

I'm sticking with my original thesis: jobs should check if they are still relevant/useful when run.

@mperham mperham closed this
@rmello

Now you seem to be using SecureRandom to generate an unique ID to each item. It should be easy to store the payload on a Hash linked to the jid. Is the issue still relevant?

@mperham
Owner

There is now an API for enumerating and removing jobs from the retry and named queues. See sidekiq/api

@rmello

What about the "schedule" queue? I guess it should have a special class (like the retry queue), right?

@mperham
Owner

@rmello Yeah, I realized that oversight last night. I'll fix it.

@rmello

It would be a great plus if one could pass the time range on the enumerator to allow enumerating only on a specific "time frame" scheduled jobs (avoiding the parse of all the scheduled JSON jobs - since afaik it will not be possible to directly access a job through its jid). Dont know if it will be performance relevant - Just a thought :)

@mperham
Owner

Sidekiq 2.5 includes the new API. I did not add the time range feature but would welcome a PR adding it.

@jhilden

Is there any documentation yet on how to use this new feature?

@mperham
Owner

Not yet. You can read through api.rb, there's some rdoc with basic examples.

@tmaier tmaier referenced this pull request in lardawge/carrierwave_backgrounder
Merged

Fail silently when record can't be found in workers. #123

@yanismydj

I just wanted to see if there was any progress on this perhaps

@mperham
Owner

@yanismydj Define this.

@yanismydj

Sorry I should have been more specific. I was curious about the status of documentation on this. I ended up looking through api.rb and figured it out. Thanks @mperham

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 19, 2012
  1. @jonhinson
Showing with 132 additions and 16 deletions.
  1. +29 −1 lib/sidekiq/client.rb
  2. +25 −3 lib/sidekiq/scheduled.rb
  3. +19 −4 lib/sidekiq/web.rb
  4. +9 −0 test/test_client.rb
  5. +5 −2 test/test_retry.rb
  6. +42 −4 test/test_scheduling.rb
  7. +2 −1 test/test_web.rb
  8. +1 −1 web/views/scheduled.slim
View
30 lib/sidekiq/client.rb
@@ -49,7 +49,10 @@ def self.push(item)
payload = Sidekiq.dump_json(item)
Sidekiq.redis do |conn|
if item['at']
- pushed = (conn.zadd('schedule', item['at'].to_s, payload) == 1)
+ _, pushed = conn.multi do
+ conn.zadd('schedule', item['at'].to_s, item['at'].to_s)
+ conn.rpush("schedule:#{item['at']}", payload)
@mperham Owner
mperham added a note

I don't understand why the rpush is necessary.

When we want to remove the job, we would be unable to remove it based only on the score as there may be multiple jobs scheduled for the same time. Thus, we store the payload in the list with key "schedule:timestamp" and can remove it like so: removed = Sidekiq.redis { |conn| conn.lrem("schedule:#{timestamp}", 0, payload) }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ end
else
_, pushed = conn.multi do
conn.sadd('queues', queue)
@@ -70,5 +73,30 @@ def self.push(item)
def self.enqueue(klass, *args)
klass.perform_async(*args)
end
+
+ def self.unschedule(timestamp, klass, *args)
+ timestamp = timestamp.to_f
+
+ item = {'class' => klass.to_s, 'args' => args, 'at' => timestamp}
+ item = klass.get_sidekiq_options.merge(item)
+ item['retry'] = !!item['retry']
+ payload = Sidekiq.dump_json(item)
+
+ removed = Sidekiq.redis { |conn| conn.lrem("schedule:#{timestamp}", 0, payload) }
+ remove_scheduled_queue(timestamp)
+ !! removed
+ end
+
+ def self.remove_scheduled_queue(timestamp)
+ key = "schedule:#{timestamp.to_f}"
+ Sidekiq.redis do |conn|
+ if 0 == conn.llen(key)
+ conn.multi do
+ conn.del(key)
+ conn.zrem('schedule', timestamp)
+ end
+ end
+ end
+ end
end
end
View
28 lib/sidekiq/scheduled.rb
@@ -27,9 +27,13 @@ def poll(first_time=false)
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
- (messages, _) = conn.multi do
- conn.zrangebyscore(sorted_set, '-inf', now)
- conn.zremrangebyscore(sorted_set, '-inf', now)
+ if sorted_set == 'schedule'
+ messages = scheduled_messages
+ else
+ (messages, _) = conn.multi do
+ conn.zrangebyscore(sorted_set, '-inf', now)
+ conn.zremrangebyscore(sorted_set, '-inf', now)
+ end
end
messages.each do |message|
@@ -55,6 +59,24 @@ def add_jitter
end
end
+ def scheduled_messages
+ [].tap do |messages|
+ while timestamp = find_next_timestamp
+ while message = Sidekiq.redis { |conn| conn.lpop("schedule:#{timestamp}") }
+ messages << message
+ end
+ Sidekiq::Client.remove_scheduled_queue(timestamp)
+ end
+ end
+ end
+
+ def find_next_timestamp
+ timestamp = Sidekiq.redis { |conn| conn.zrangebyscore('schedule', '-inf', Time.now.to_f, :limit => [0, 1]) }
+ if timestamp.is_a?(Array)
+ timestamp = timestamp.first
+ end
+ end
+
end
end
end
View
23 lib/sidekiq/web.rb
@@ -77,10 +77,20 @@ def scheduled(count=50)
end
def zcontents(name, count)
+ jobs = []
Sidekiq.redis do |conn|
results = conn.zrange(name, 0, count, :withscores => true)
- results.map { |msg, score| [Sidekiq.load_json(msg), score] }
+ if name == 'retry'
+ results.each { |msg, score| jobs << [Sidekiq.load_json(msg), score] }
+ else
+ results.each do |msg, score|
+ conn.lrange("schedule:#{score}", 0, count).each do |job|
+ jobs << [Sidekiq.load_json(job), score]
+ end
+ end
+ end
end
+ jobs
end
def queues
@@ -176,8 +186,7 @@ def display_args(args, count=100)
halt 404 unless params[:score]
halt 404 unless params['delete']
params[:score].each do |score|
- s = score.to_f
- process_score('schedule', s, :delete)
+ process_score('schedule', score, :delete)
end
redirect root_path
end
@@ -220,7 +229,13 @@ def process_score(set, score, operation)
end
when :delete
Sidekiq.redis do |conn|
- conn.zremrangebyscore(set, score, score)
+ if set == 'retry'
+ conn.zremrangebyscore(set, score, score)
+ else
+ score, msg = score.split("_")
+ msg = Sidekiq.load_json(msg)
+ Sidekiq::Client.unschedule(score, msg['class'].constantize, *msg['args'])
+ end
end
end
end
View
9 test/test_client.rb
@@ -115,5 +115,14 @@ class QueuedWorker
@redis.expect :smembers, ['bob'], ['workers']
assert_equal ['bob'], Sidekiq::Client.registered_workers
end
+
+ it 'removes scheduled jobs' do
+ @redis.expect :lrem, 1, ['schedule:123.0', 0, String]
+ @redis.expect :llen, 0, ['schedule:123.0']
+ @redis.expect :zrem, 1, ['schedule', 123.0]
+ removed = Sidekiq::Client.unschedule(123, MyWorker, 1, 2)
+ assert removed
+ @redis.verify
+ end
end
end
View
7 test/test_retry.rb
@@ -112,11 +112,14 @@ def @redis.with; yield self; end
it 'should poll like a bad mother...SHUT YO MOUTH' do
fake_msg = Sidekiq.dump_json({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
@redis.expect :multi, [[fake_msg], 1], []
- @redis.expect :multi, [[], nil], []
+ timestamp = Time.now
+ @redis.expect :zrangebyscore, [], ['schedule', '-inf', timestamp.to_f, { :limit => [0, 1] }]
@redis.expect :rpush, 1, ['queue:someq', fake_msg]
inst = Sidekiq::Scheduled::Poller.new
- inst.poll
+ Time.stub :now, timestamp do
+ inst.poll
+ end
@redis.verify
end
View
46 test/test_scheduling.rb
@@ -9,6 +9,7 @@ class TestScheduling < MiniTest::Unit::TestCase
Sidekiq.instance_variable_set(:@redis, @redis)
def @redis.with; yield self; end
+ def @redis.multi; [yield] * 2 if block_given?; end
end
class ScheduledWorker
@@ -18,16 +19,53 @@ def perform(x)
end
it 'schedules a job via interval' do
- @redis.expect :zadd, 1, ['schedule', String, String]
- assert_equal true, ScheduledWorker.perform_in(600, 'mike')
+ timestamp = Time.now.to_f
+ Time.stub :now, timestamp do
+ @redis.expect :zadd, 1, ['schedule', String, String]
+ @redis.expect :rpush, 1, ["schedule:#{(timestamp + 600)}", String]
+ assert_equal true, ScheduledWorker.perform_in(600, 'mike')
+ end
@redis.verify
end
it 'schedules a job via timestamp' do
- @redis.expect :zadd, 1, ['schedule', String, String]
- assert_equal true, ScheduledWorker.perform_in(5.days.from_now, 'mike')
+ Time.stub :now, Time.now do
+ @redis.expect :zadd, 1, ['schedule', String, String]
+ @redis.expect :rpush, 1, ["schedule:#{(5.days.from_now.to_f)}", String]
+ assert_equal true, ScheduledWorker.perform_in(5.days.from_now, 'mike')
+ end
@redis.verify
end
end
+ describe 'poller' do
+ before do
+ @redis = MiniTest::Mock.new
+ Sidekiq.instance_variable_set(:@redis, @redis)
+
+ def @redis.with; yield self; end
+ end
+
+ it 'should poll like a bad mother...SHUT YO MOUTH' do
+ fake_msg = Sidekiq.dump_json({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
+ @redis.expect :multi, [[], nil], []
+ timestamp = Time.now
+ @redis.expect :zrangebyscore, [123], ['schedule', '-inf', timestamp.to_f, { :limit => [0, 1] }]
+ @redis.expect :zrangebyscore, [], ['schedule', '-inf', timestamp.to_f, { :limit => [0, 1] }]
+ @redis.expect :lpop, fake_msg, ['schedule:123']
+ @redis.expect :lpop, nil, ['schedule:123']
+
+ @redis.expect :multi, [[], nil], []
+ @redis.expect :llen, 0, ['schedule:123.0']
+
+ @redis.expect :rpush, 1, ['queue:someq', fake_msg]
+
+ inst = Sidekiq::Scheduled::Poller.new
+ Time.stub :now, timestamp do
+ inst.poll(false)
+ end
+
+ @redis.verify
+ end
+ end
end
View
3 test/test_web.rb
@@ -135,7 +135,8 @@ def add_scheduled
'at' => Time.now.to_f }
score = Time.now.to_f
Sidekiq.redis do |conn|
- conn.zadd('schedule', score, Sidekiq.dump_json(msg))
+ conn.zadd('schedule', score, score)
+ conn.rpush("schedule:#{score}", Sidekiq.dump_json(msg))
end
[msg, score]
end
View
2 web/views/scheduled.slim
@@ -13,7 +13,7 @@ h1 Scheduled Jobs
- @scheduled.each do |(msg, score)|
tr
td
- input type='checkbox' name='score[]' value='#{score}'
+ input type='checkbox' name='score[]' value='#{score}_#{Sidekiq.dump_json(msg)}'
td== relative_time(Time.at(score))
td
a href="#{root_path}queues/#{msg['queue']}" #{msg['queue']}
Something went wrong with that request. Please try again.