Skip to content

Commit

Permalink
Merge pull request justinweiss#16 from autohaus24/master
Browse files Browse the repository at this point in the history
Add more redis methods (peek, list_range), implemented encode/decode redis simulation
  • Loading branch information
justinweiss committed Jun 28, 2011
2 parents 1ee3a47 + 4800271 commit 2372464
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 89 deletions.
16 changes: 5 additions & 11 deletions lib/resque_unit/assertions.rb
Expand Up @@ -21,9 +21,9 @@ def assert_not_queued(klass = nil, args = nil, message = nil, &block)
queue = if block_given?
snapshot = Resque.size(queue_name)
yield
Resque.queue(queue_name)[snapshot..-1]
Resque.all(queue_name)[snapshot..-1]
else
Resque.queue(queue_name)
Resque.all(queue_name)
end

assert_with_custom_message(!in_queue?(queue, klass, args),
Expand All @@ -43,9 +43,9 @@ def assert_job_created(queue_name, klass, args = nil, message = nil, &block)
queue = if block_given?
snapshot = Resque.size(queue_name)
yield
Resque.queue(queue_name)[snapshot..-1]
Resque.all(queue_name)[snapshot..-1]
else
Resque.queue(queue_name)
Resque.all(queue_name)
end

assert_with_custom_message(in_queue?(queue, klass, args),
Expand Down Expand Up @@ -74,13 +74,7 @@ def in_queue?(queue, klass, args = nil)
end

def matching_jobs(queue, klass, args = nil)
queue = Resque.queue(queue) if queue.is_a? Symbol
if args # retrieve the elements that match klass and args in the queue
args = Resque.normalized_args(args)
queue.select {|e| e[:klass] == klass && e[:args] == args}
else # if no args were passed, retrieve all queued jobs that match klass
queue.select {|e| e[:klass] == klass}
end
queue.select {|e| e["class"] == klass.to_s && (!args || e["args"] == Resque.decode(Resque.encode(args)))}
end

end
6 changes: 6 additions & 0 deletions lib/resque_unit/helpers.rb
Expand Up @@ -3,6 +3,9 @@ module Helpers
# Given a Ruby object, returns a string suitable for storage in a
# queue.
def encode(object)
if object.is_a?(Array)
return object.map{|o| encode(o) }
end
if defined? Yajl
Yajl::Encoder.encode(object)
else
Expand All @@ -13,6 +16,9 @@ def encode(object)
# Given a string, returns a Ruby object.
def decode(object)
return unless object
if object.is_a?(Array)
return object.map{|o| decode(o) }
end

if defined? Yajl
begin
Expand Down
90 changes: 60 additions & 30 deletions lib/resque_unit/resque.rb
Expand Up @@ -24,13 +24,43 @@ def self.queues
end

# Returns an array of all the jobs that have been queued. Each
# element is of the form +{:klass => klass, :args => args}+ where
# element is of the form +{"class" => klass, "args" => args}+ where
# +klass+ is the job's class and +args+ is an array of the arguments
# passed to the job.
def queue(queue_name)
queues[queue_name]
end

# Return an array of all jobs' payloads for queue
# Elements are decoded
def all(queue_name)
result = list_range(queue_name, 0, size(queue_name))
result.is_a?(Array) ? result : [ result ]
end

# Returns an array of jobs' payloads for queue.
#
# start and count should be integer and can be used for pagination.
# start is the item to begin, count is how many items to return.
#
# To get the 3rd page of a 30 item, paginatied list one would use:
# Resque.peek('my_list', 59, 30)
def peek(queue_name, start = 0, count = 1)
list_range(queue_name, start, count)
end

# Gets a range of jobs' payloads from queue.
# Returns single element if count equal 1
# Elements are decoded
def list_range(key, start = 0, count = 1)
data = if count == 1
queues[key][start]
else
queues[key][start...start + count] || []
end
decode(data)
end

# Yes, all Resque hooks!
def enable_hooks!
@hooks_enabled = true
Expand All @@ -45,20 +75,22 @@ def run!
old_queue = @queue.dup
self.reset!

old_queue.each do |k, v|
while job = v.shift
@hooks_enabled ? perform_with_hooks(job) : perform_without_hooks(job)

old_queue.each do |queue_name, queue|
decode(queue).each do |job_payload|
@hooks_enabled ? perform_with_hooks(job_payload) : perform_without_hooks(job_payload)
end
end
end

# Executes all jobs in the given queue in an undefined order.
def run_for!(queue_name)
jobs = self.queue(queue_name)
jobs_payloads = all(queue_name)

self.reset!(queue_name)

while job = jobs.shift
@hooks_enabled ? perform_with_hooks(job) : perform_without_hooks(job)
jobs_payloads.each do |job_payload|
@hooks_enabled ? perform_with_hooks(job_payload) : perform_without_hooks(job_payload)
end
end

Expand All @@ -83,11 +115,7 @@ def enqueue(klass, *args)
queue_name = queue_for(klass)
# Behaves like Resque, raise if no queue was specifed
raise NoQueueError.new("Jobs must be placed onto a queue.") unless queue_name
enqueue_unit(queue_name, {:klass => klass, :args => normalized_args(args) })
end

def normalized_args(args)
decode(encode(args))
enqueue_unit(queue_name, {"class" => klass.name, "args" => args })
end

# :nodoc:
Expand All @@ -103,54 +131,56 @@ def empty_queues?
end

def enqueue_unit(queue_name, hash)
queue(queue_name) << hash
klass = constantize(hash["class"])
queue(queue_name) << encode(hash)
if @hooks_enabled
Plugin.after_enqueue_hooks(hash[:klass]).each do |hook|
hash[:klass].send(hook, *hash[:args])
Plugin.after_enqueue_hooks(klass).each do |hook|
klass.send(hook, *hash["args"])
end
end
queue(queue_name).size
end

# Call perform on the job class
def perform_without_hooks(job)
job[:klass].perform(*job[:args])
def perform_without_hooks(job_payload)
constantize(job_payload["class"]).perform(*job_payload["args"])
end

# Call perform on the job class, and adds support for Resque hooks.
def perform_with_hooks(job)
before_hooks = Resque::Plugin.before_hooks(job[:klass])
around_hooks = Resque::Plugin.around_hooks(job[:klass])
after_hooks = Resque::Plugin.after_hooks(job[:klass])
failure_hooks = Resque::Plugin.failure_hooks(job[:klass])
def perform_with_hooks(job_payload)
job_class = constantize(job_payload["class"])
before_hooks = Resque::Plugin.before_hooks(job_class)
around_hooks = Resque::Plugin.around_hooks(job_class)
after_hooks = Resque::Plugin.after_hooks(job_class)
failure_hooks = Resque::Plugin.failure_hooks(job_class)

begin
# Execute before_perform hook. Abort the job gracefully if
# Resque::DontPerform is raised.
begin
before_hooks.each do |hook|
job[:klass].send(hook, *job[:args])
job_class.send(hook, *job_payload["args"])
end
rescue Resque::Job::DontPerform
return false
end

# Execute the job. Do it in an around_perform hook if available.
if around_hooks.empty?
perform_without_hooks(job)
perform_without_hooks(job_payload)
job_was_performed = true
else
# We want to nest all around_perform plugins, with the last one
# finally calling perform
stack = around_hooks.reverse.inject(nil) do |last_hook, hook|
if last_hook
lambda do
job[:klass].send(hook, *job[:args]) { last_hook.call }
job_class.send(hook, *job_payload["args"]) { last_hook.call }
end
else
lambda do
job[:klass].send(hook, *job[:args]) do
result = perform_without_hooks(job)
job_class.send(hook, *job_payload["args"]) do
result = perform_without_hooks(job_payload)
job_was_performed = true
result
end
Expand All @@ -162,7 +192,7 @@ def perform_with_hooks(job)

# Execute after_perform hook
after_hooks.each do |hook|
job[:klass].send(hook, *job[:args])
job_class.send(hook, *job_payload["args"])
end

# Return true if the job was performed
Expand All @@ -171,15 +201,15 @@ def perform_with_hooks(job)
# If an exception occurs during the job execution, look for an
# on_failure hook then re-raise.
rescue => e
failure_hooks.each { |hook| job[:klass].send(hook, e, *job[:args]) }
failure_hooks.each { |hook| job_class.send(hook, e, *job_payload["args"]) }
raise e
end
end

class Job
extend Helpers
def self.create(queue, klass_name, *args)
Resque.enqueue_unit(queue, {:klass => constantize(klass_name), :args => decode(encode(args))})
Resque.enqueue_unit(queue, {"class" => constantize(klass_name), "args" => args})
end
end

Expand Down
13 changes: 5 additions & 8 deletions lib/resque_unit/scheduler.rb
Expand Up @@ -19,17 +19,14 @@ def enqueue_in(number_of_seconds_from_now, klass, *args)
end

def enqueue_with_timestamp(timestamp, klass, *args)
enqueue_unit(queue_for(klass), {:klass => klass, :args => decode(encode(args)), :timestamp => timestamp})
enqueue_unit(queue_for(klass), {"class" => klass, "args" => args, "timestamp" => timestamp})
end

def remove_delayed(klass, *args)
queue = Resque.queue(queue_for(klass))
if args # retrieve the elements that match klass and args in the queue
args = Resque.normalized_args(args)
queue.delete_if { |e| e[:klass] == klass && e[:args] == args }
else # if no args were passed, retrieve all queued jobs that match klass
queue.delete_if {|e| e[:klass] == klass}
end
# points to real queue
encoded_job_payloads = Resque.queue(queue_for(klass))
args ||= []
encoded_job_payloads.delete_if { |e| e = Resque.decode(e); e["class"] == klass.to_s && e["args"] == args }
end
end

Expand Down
4 changes: 3 additions & 1 deletion lib/resque_unit/scheduler_assertions.rb
@@ -1,5 +1,7 @@
# These are a group of assertions you can use in your unit tests to
# verify that your code is using resque-scheduler correctly.
require 'time'

module ResqueUnit::SchedulerAssertions

# Asserts that +klass+ has been queued into its appropriate queue at
Expand Down Expand Up @@ -41,7 +43,7 @@ def assert_not_queued_in(expected_time_difference, klass, args = nil, message =
def in_timestamped_queue?(queue_name, expected_timestamp, klass, args = nil)
# check if we have any matching jobs with a timestamp less than
# expected_timestamp
!matching_jobs(Resque.queue(queue_name), klass, args).select {|e| e[:timestamp] && e[:timestamp] <= expected_timestamp}.empty?
!matching_jobs(Resque.all(queue_name), klass, args).select {|e| e["timestamp"] && Time.parse(e["timestamp"]) <= expected_timestamp}.empty?
end

end
61 changes: 61 additions & 0 deletions test/resque_test.rb
@@ -0,0 +1,61 @@
require 'test_helper'

class ResqueTest < Test::Unit::TestCase

def setup
Resque.reset!
end

context "with one queued job" do
setup do
Resque.enqueue(MediumPriorityJob, "some args")
@job_payload = {"args"=>["some args"], "class"=>"MediumPriorityJob"}
end

should "return job payload when peek method called with count equal 1" do
assert_equal @job_payload, Resque.peek(MediumPriorityJob.queue, 0, 1)
end

should "return array of jobs' payloads when peek method called with count different than 1" do
assert_equal [@job_payload], Resque.peek(MediumPriorityJob.queue, 0, 5)
end
end

context "with few queued jobs" do
setup do
Resque.enqueue(MediumPriorityJob, "1")
Resque.enqueue(MediumPriorityJob, "2")
Resque.enqueue(MediumPriorityJob, "3")
end

should "return jobs' payloads 2 and 3 when peek method called with start equal 1 and count equal 2" do
assert_equal [["2"], ["3"]], Resque.peek(MediumPriorityJob.queue, 1, 2).map{|h| h["args"]}
end

should "return empty array when peek method called with start higher than queue size" do
assert_equal [], Resque.peek(MediumPriorityJob.queue, 4, 2)
end

should "return empty array when peek method called with count equal 0" do
assert_equal [], Resque.peek(MediumPriorityJob.queue, 0, 0)
end

should "return all jobs' payloads when all method called" do
assert Resque.all(MediumPriorityJob.queue).length == 3, "should return all 3 elements"
end
end

context "without queued jobs" do
should "return nil when peek method called with count equal 1" do
assert_equal nil, Resque.peek(MediumPriorityJob.queue, 0, 1)
end

should "return empty array when peek method called with count not equal 1" do
assert_equal [], Resque.peek(MediumPriorityJob.queue, 0, 999)
end

should "return empty array when all method called" do
assert_equal [], Resque.all(MediumPriorityJob.queue)
end
end
end

0 comments on commit 2372464

Please sign in to comment.