Skip to content

Commit

Permalink
Splitting the resque extension's methods up a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
meatballhat committed Feb 18, 2014
1 parent b09d0a2 commit 0aba925
Show file tree
Hide file tree
Showing 4 changed files with 426 additions and 398 deletions.
1 change: 1 addition & 0 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Resque
module Scheduler
autoload :Cli, 'resque/scheduler/cli'
autoload :Extension, 'resque/scheduler/extension'
autoload :Util, 'resque/scheduler/util'

private

Expand Down
270 changes: 270 additions & 0 deletions lib/resque/scheduler/delaying_extensions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
# vim:fileencoding=utf-8
require 'resque'
require_relative 'plugin'
require_relative '../scheduler'

module Resque
module Scheduler
module DelayingExtensions
# This method is nearly identical to +enqueue+ only it also
# takes a timestamp which will be used to schedule the job
# for queueing. Until timestamp is in the past, the job will
# sit in the schedule list.
def enqueue_at(timestamp, klass, *args)
validate(klass)
enqueue_at_with_queue(
queue_from_class(klass), timestamp, klass, *args
)
end

# Identical to +enqueue_at+, except you can also specify
# a queue in which the job will be placed after the
# timestamp has passed. It respects Resque.inline option, by
# creating the job right away instead of adding to the queue.
def enqueue_at_with_queue(queue, timestamp, klass, *args)
return false unless plugin.run_before_schedule_hooks(klass, *args)

if Resque.inline? || timestamp.to_i < Time.now.to_i
# Just create the job and let resque perform it right away with
# inline. If the class is a custom job class, call self#scheduled
# on it. This allows you to do things like
# Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1).
# Otherwise, pass off to Resque.
if klass.respond_to?(:scheduled)
klass.scheduled(queue, klass.to_s, *args)
else
Resque::Job.create(queue, klass, *args)
end
else
delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
end

plugin.run_after_schedule_hooks(klass, *args)
end

# Identical to enqueue_at but takes number_of_seconds_from_now
# instead of a timestamp.
def enqueue_in(number_of_seconds_from_now, klass, *args)
enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

# Identical to +enqueue_in+, except you can also specify
# a queue in which the job will be placed after the
# number of seconds has passed.
def enqueue_in_with_queue(queue, number_of_seconds_from_now,
klass, *args)
enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now,
klass, *args)
end

# Used internally to stuff the item into the schedule sorted list.
# +timestamp+ can be either in seconds or a datetime object Insertion
# if O(log(n)). Returns true if it's the first job to be scheduled at
# that time, else false
def delayed_push(timestamp, item)
# First add this item to the list for this timestamp
redis.rpush("delayed:#{timestamp.to_i}", encode(item))

# Store the timestamps at with this item occurs
redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")

# Now, add this timestamp to the zsets. The score and the value are
# the same since we'll be querying by timestamp, and we don't have
# anything else to store.
redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

# Returns an array of timestamps based on start and count
def delayed_queue_peek(start, count)
result = redis.zrange(:delayed_queue_schedule, start,
start + count - 1)
Array(result).map(&:to_i)
end

# Returns the size of the delayed queue schedule
def delayed_queue_schedule_size
redis.zcard :delayed_queue_schedule
end

# Returns the number of jobs for a given timestamp in the delayed queue
# schedule
def delayed_timestamp_size(timestamp)
redis.llen("delayed:#{timestamp.to_i}").to_i
end

# Returns an array of delayed items for the given timestamp
def delayed_timestamp_peek(timestamp, start, count)
if 1 == count
r = list_range "delayed:#{timestamp.to_i}", start, count
r.nil? ? [] : [r]
else
list_range "delayed:#{timestamp.to_i}", start, count
end
end

# Returns the next delayed queue timestamp
# (don't call directly)
def next_delayed_timestamp(at_time = nil)
items = redis.zrangebyscore(
:delayed_queue_schedule, '-inf', (at_time || Time.now).to_i,
limit: [0, 1]
)
timestamp = items.nil? ? nil : Array(items).first
timestamp.to_i unless timestamp.nil?
end

# Returns the next item to be processed for a given timestamp, nil if
# done. (don't call directly)
# +timestamp+ can either be in seconds or a datetime
def next_item_for_timestamp(timestamp)
key = "delayed:#{timestamp.to_i}"

encoded_item = redis.lpop(key)
redis.srem("timestamps:#{encoded_item}", key)
item = decode(encoded_item)

# If the list is empty, remove it.
clean_up_timestamp(key, timestamp)
item
end

# Clears all jobs created with enqueue_at or enqueue_in
def reset_delayed_queue
Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
key = "delayed:#{item}"
items = redis.lrange(key, 0, -1)
redis.pipelined do
items.each { |ts_item| redis.del("timestamps:#{ts_item}") }
end
redis.del key
end

redis.del :delayed_queue_schedule
end

# Given an encoded item, remove it from the delayed_queue
def remove_delayed(klass, *args)
search = encode(job_to_hash(klass, args))
timestamps = redis.smembers("timestamps:#{search}")

replies = redis.pipelined do
timestamps.each do |key|
redis.lrem(key, 0, search)
redis.srem("timestamps:#{search}", key)
end
end

return 0 if replies.nil? || replies.empty?
replies.each_slice(2).map(&:first).inject(:+)
end

# Given an encoded item, enqueue it now
def enqueue_delayed(klass, *args)
hash = job_to_hash(klass, args)
remove_delayed(klass, *args).times do
Resque::Scheduler.enqueue_from_config(hash)
end
end

# Given a block, remove jobs that return true from a block
#
# This allows for removal of delayed jobs that have arguments matching
# certain criteria
def remove_delayed_selection
fail ArgumentError, 'Please supply a block' unless block_given?

destroyed = 0
# There is no way to search Redis list entries for a partial match,
# so we query for all delayed job tasks and do our matching after
# decoding the payload data
jobs = Resque.redis.keys('delayed:*')
jobs.each do |job|
index = Resque.redis.llen(job) - 1
while index >= 0
payload = Resque.redis.lindex(job, index)
decoded_payload = decode(payload)
if yield(decoded_payload['args'])
removed = redis.lrem job, 0, payload
destroyed += removed
index -= removed
else
index -= 1
end
end
end
destroyed
end

# Given a timestamp and job (klass + args) it removes all instances and
# returns the count of jobs removed.
#
# O(N) where N is the number of jobs scheduled to fire at the given
# timestamp
def remove_delayed_job_from_timestamp(timestamp, klass, *args)
key = "delayed:#{timestamp.to_i}"
encoded_job = encode(job_to_hash(klass, args))

redis.srem("timestamps:#{encoded_job}", key)
count = redis.lrem(key, 0, encoded_job)
clean_up_timestamp(key, timestamp)

count
end

def count_all_scheduled_jobs
total_jobs = 0
Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |ts|
total_jobs += redis.llen("delayed:#{ts}").to_i
end
total_jobs
end

# Discover if a job has been delayed.
# Examples
# Resque.delayed?(MyJob)
# Resque.delayed?(MyJob, id: 1)
# Returns true if the job has been delayed
def delayed?(klass, *args)
!scheduled_at(klass, *args).empty?
end

# Returns delayed jobs schedule timestamp for +klass+, +args+.
def scheduled_at(klass, *args)
search = encode(job_to_hash(klass, args))
redis.smembers("timestamps:#{search}").map do |key|
key.tr('delayed:', '').to_i
end
end

private

def job_to_hash(klass, args)
{ class: klass.to_s, args: args, queue: queue_from_class(klass) }
end

def job_to_hash_with_queue(queue, klass, args)
{ class: klass.to_s, args: args, queue: queue }
end

def clean_up_timestamp(key, timestamp)
# If the list is empty, remove it.

# Use a watch here to ensure nobody adds jobs to this delayed
# queue while we're removing it.
redis.watch key
if 0 == redis.llen(key).to_i
redis.multi do
redis.del key
redis.zrem :delayed_queue_schedule, timestamp.to_i
end
else
redis.unwatch
end
end

def plugin
Resque::Scheduler::Plugin
end
end
end
end
Loading

0 comments on commit 0aba925

Please sign in to comment.