Skip to content
This repository has been archived by the owner on Apr 1, 2023. It is now read-only.

Commit

Permalink
Add functionality for cancelling scheduled Sidekiq jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Robinson7D committed Jul 26, 2013
1 parent 8890464 commit 14fc1b6
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
4 changes: 4 additions & 0 deletions lib/sidekiq-status.rb
Expand Up @@ -29,6 +29,10 @@ def status(job_id)
status.to_sym unless status.nil?
end

def cancel(job_id, job_unix_time = nil)
delete_and_unschedule(job_id, job_unix_time)
end

STATUS.each do |name|
class_eval(<<-END, __FILE__, __LINE__)
def #{name}?(job_id)
Expand Down
57 changes: 57 additions & 0 deletions lib/sidekiq-status/storage.rb
Expand Up @@ -27,6 +27,43 @@ def store_status(id, status, expiration = nil)
store_for_id id, {status: status}, expiration
end

# Unschedules the job and deletes the Status
# @param [String] id job id
# @param [Num] job_unix_time, unix timestamp for the scheduled job
def delete_and_unschedule(job_id, job_unix_time = nil)
Sidekiq.redis do |conn|
scheduled_jobs = conn.zrange "schedule", 0, -1, {withscores: true}
matching_index = scan_scheduled_jobs_for_jid scheduled_jobs, job_id, job_unix_time

job_found = matching_index > -1
if job_found
conn.zrem "schedule", scheduled_jobs[matching_index]
conn.del job_id
end
job_found
end
end

# Searches the schedule Array for the job_id
# @param [Array] scheduled_jobs, results of Redis schedule key
# @param [String] id job id
# @param [Num] job_unix_time, unix timestamp for the scheduled job
def scan_scheduled_jobs_for_jid(scheduled_jobs, job_id, job_unix_time = nil)
## schedule is an array ordered by a (float) unix timestamp for the posting time.
## Better would be to binary search on the time: # jobs_same_time = scheduled_jobs.bsearch {|x| x[1] == unix_time_scheduled }
## Unfortunately Ruby 2.0's bsearch won't help here because it does not return a range of elements (would only return first-matching), nor does it return an index.
## Instead we will scan through all elements until timestamp matches and check elements after:
scheduled_jobs.each_with_index do |schedule_listing, i|
checking_result = listing_matches_job(schedule_listing, job_id, job_unix_time)
if checking_result.nil?
return -1 # Is nil when we've exhaused potential candidates
elsif checking_result
return i
end
end
-1 # Not found
end

# Gets a single valued from job status hash
# @param [String] id job id
# @param [String] Symbol field fetched field name
Expand All @@ -45,4 +82,24 @@ def read_hash_for_id(id)
conn.hgetall id
end
end

private

# Searches the schedule Array for the job_id
# @param [Array] schedule_listing, a particular entry from the Redis schedule Array
# @param [String] id job id
# @param [Num] job_unix_time, unix timestamp for the scheduled job
def listing_matches_job(schedule_listing, job_id, job_unix_time = nil)
if(job_unix_time.nil? || schedule_listing[1] == job_unix_time)
# A Little skecthy, I know, but the structure of these internal JSON
# is predefined in such a way where this will not catch unintentional elements,
# and this is notably faster than performing JSON.parse() for every listing:
if schedule_listing[0].include?("\"jid\":\"#{job_id}")
return true
end
elsif(schedule_listing[1] > job_unix_time)
return nil #Not found. Can break (due to ordering)
end
false
end
end
40 changes: 40 additions & 0 deletions spec/lib/sidekiq-status_spec.rb
Expand Up @@ -5,6 +5,7 @@
let!(:redis) { Sidekiq.redis { |conn| conn } }
let!(:job_id) { SecureRandom.hex(12) }
let!(:job_id_1) { SecureRandom.hex(12) }
let!(:unused_id) { SecureRandom.hex(12) }

# Clean Redis before each test
# Seems like flushall has no effect on recently published messages,
Expand Down Expand Up @@ -77,6 +78,45 @@
end
end

describe ".cancel" do
it "cancels a job by id" do
SecureRandom.should_receive(:hex).twice.and_return(job_id, job_id_1)
start_server do
job = LongJob.perform_in(3600)
job.should == job_id
second_job = LongJob.perform_in(3600)
second_job.should == job_id_1

initial_schedule = redis.zrange "schedule", 0, -1, {withscores: true}
initial_schedule.size.should be 2
initial_schedule.select {|scheduled_job| JSON.parse(scheduled_job[0])["jid"] == job_id }.size.should be 1

Sidekiq::Status.cancel(job_id).should be_true
Sidekiq::Status.cancel(unused_id).should be_false # Unused, therefore unfound => false

remaining_schedule = redis.zrange "schedule", 0, -1, {withscores: true}
remaining_schedule.size.should == (initial_schedule.size - 1)
remaining_schedule.select {|scheduled_job| JSON.parse(scheduled_job[0])["jid"] == job_id }.size.should be 0
end
end

it "does not cancel a job with correct id but wrong time" do
SecureRandom.should_receive(:hex).once.and_return(job_id)
start_server do
scheduled_time = Time.now.to_i + 3600
returned_job_id = LongJob.perform_at(scheduled_time)
returned_job_id.should == job_id

initial_schedule = redis.zrange "schedule", 0, -1, {withscores: true}
initial_schedule.size.should == 1
Sidekiq::Status.cancel(returned_job_id, (scheduled_time + 1)).should be_false # wrong time, therefore unfound => false
(redis.zrange "schedule", 0, -1, {withscores: true}).size.should be 1
Sidekiq::Status.cancel(returned_job_id, (scheduled_time)).should be_true # same id, same time, deletes
(redis.zrange "schedule", 0, -1, {withscores: true}).size.should be_zero
end
end
end

context "keeps normal Sidekiq functionality" do
it "does jobs with and without included worker module" do
SecureRandom.should_receive(:hex).exactly(4).times.and_return(job_id, job_id, job_id_1, job_id_1)
Expand Down

0 comments on commit 14fc1b6

Please sign in to comment.