diff --git a/lib/sidekiq-status.rb b/lib/sidekiq-status.rb index 047f09b..8a6375b 100644 --- a/lib/sidekiq-status.rb +++ b/lib/sidekiq-status.rb @@ -29,6 +29,10 @@ def status(job_id) status.to_sym unless status.nil? end + def cancel(job_id, job_unix_time = nil) + delete_and_unschedule(job_id, job_unix_time) + end + STATUS.each do |name| class_eval(<<-END, __FILE__, __LINE__) def #{name}?(job_id) diff --git a/lib/sidekiq-status/storage.rb b/lib/sidekiq-status/storage.rb index 837ee78..78ec2f2 100644 --- a/lib/sidekiq-status/storage.rb +++ b/lib/sidekiq-status/storage.rb @@ -27,6 +27,43 @@ def store_status(id, status, expiration = nil) store_for_id id, {status: status}, expiration end + # Unschedules the job and deletes the Status + # @param [String] id job id + # @param [Num] job_unix_time, unix timestamp for the scheduled job + def delete_and_unschedule(job_id, job_unix_time = nil) + Sidekiq.redis do |conn| + scheduled_jobs = conn.zrange "schedule", 0, -1, {withscores: true} + matching_index = scan_scheduled_jobs_for_jid scheduled_jobs, job_id, job_unix_time + + job_found = matching_index > -1 + if job_found + conn.zrem "schedule", scheduled_jobs[matching_index] + conn.del job_id + end + job_found + end + end + + # Searches the schedule Array for the job_id + # @param [Array] scheduled_jobs, results of Redis schedule key + # @param [String] id job id + # @param [Num] job_unix_time, unix timestamp for the scheduled job + def scan_scheduled_jobs_for_jid(scheduled_jobs, job_id, job_unix_time = nil) + ## schedule is an array ordered by a (float) unix timestamp for the posting time. + ## Better would be to binary search on the time: # jobs_same_time = scheduled_jobs.bsearch {|x| x[1] == unix_time_scheduled } + ## Unfortunately Ruby 2.0's bsearch won't help here because it does not return a range of elements (would only return first-matching), nor does it return an index. + ## Instead we will scan through all elements until timestamp matches and check elements after: + scheduled_jobs.each_with_index do |schedule_listing, i| + checking_result = listing_matches_job(schedule_listing, job_id, job_unix_time) + if checking_result.nil? + return -1 # Is nil when we've exhaused potential candidates + elsif checking_result + return i + end + end + -1 # Not found + end + # Gets a single valued from job status hash # @param [String] id job id # @param [String] Symbol field fetched field name @@ -45,4 +82,24 @@ def read_hash_for_id(id) conn.hgetall id end end + + private + + # Searches the schedule Array for the job_id + # @param [Array] schedule_listing, a particular entry from the Redis schedule Array + # @param [String] id job id + # @param [Num] job_unix_time, unix timestamp for the scheduled job + def listing_matches_job(schedule_listing, job_id, job_unix_time = nil) + if(job_unix_time.nil? || schedule_listing[1] == job_unix_time) + # A Little skecthy, I know, but the structure of these internal JSON + # is predefined in such a way where this will not catch unintentional elements, + # and this is notably faster than performing JSON.parse() for every listing: + if schedule_listing[0].include?("\"jid\":\"#{job_id}") + return true + end + elsif(schedule_listing[1] > job_unix_time) + return nil #Not found. Can break (due to ordering) + end + false + end end diff --git a/spec/lib/sidekiq-status_spec.rb b/spec/lib/sidekiq-status_spec.rb index b383404..1e0a1ea 100644 --- a/spec/lib/sidekiq-status_spec.rb +++ b/spec/lib/sidekiq-status_spec.rb @@ -5,6 +5,7 @@ let!(:redis) { Sidekiq.redis { |conn| conn } } let!(:job_id) { SecureRandom.hex(12) } let!(:job_id_1) { SecureRandom.hex(12) } + let!(:unused_id) { SecureRandom.hex(12) } # Clean Redis before each test # Seems like flushall has no effect on recently published messages, @@ -77,6 +78,45 @@ end end + describe ".cancel" do + it "cancels a job by id" do + SecureRandom.should_receive(:hex).twice.and_return(job_id, job_id_1) + start_server do + job = LongJob.perform_in(3600) + job.should == job_id + second_job = LongJob.perform_in(3600) + second_job.should == job_id_1 + + initial_schedule = redis.zrange "schedule", 0, -1, {withscores: true} + initial_schedule.size.should be 2 + initial_schedule.select {|scheduled_job| JSON.parse(scheduled_job[0])["jid"] == job_id }.size.should be 1 + + Sidekiq::Status.cancel(job_id).should be_true + Sidekiq::Status.cancel(unused_id).should be_false # Unused, therefore unfound => false + + remaining_schedule = redis.zrange "schedule", 0, -1, {withscores: true} + remaining_schedule.size.should == (initial_schedule.size - 1) + remaining_schedule.select {|scheduled_job| JSON.parse(scheduled_job[0])["jid"] == job_id }.size.should be 0 + end + end + + it "does not cancel a job with correct id but wrong time" do + SecureRandom.should_receive(:hex).once.and_return(job_id) + start_server do + scheduled_time = Time.now.to_i + 3600 + returned_job_id = LongJob.perform_at(scheduled_time) + returned_job_id.should == job_id + + initial_schedule = redis.zrange "schedule", 0, -1, {withscores: true} + initial_schedule.size.should == 1 + Sidekiq::Status.cancel(returned_job_id, (scheduled_time + 1)).should be_false # wrong time, therefore unfound => false + (redis.zrange "schedule", 0, -1, {withscores: true}).size.should be 1 + Sidekiq::Status.cancel(returned_job_id, (scheduled_time)).should be_true # same id, same time, deletes + (redis.zrange "schedule", 0, -1, {withscores: true}).size.should be_zero + end + end + end + context "keeps normal Sidekiq functionality" do it "does jobs with and without included worker module" do SecureRandom.should_receive(:hex).exactly(4).times.and_return(job_id, job_id, job_id_1, job_id_1)