Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #6946 from threedaymonk/queue-refs

Ensure that queued jobs are marshallable
  • Loading branch information...
commit a967487cbd7760a7108edd2328b18c137edc1d6d 2 parents eedca4a + b44104a
@tenderlove tenderlove authored
View
7 railties/lib/rails/queueing.rb
@@ -22,6 +22,13 @@ def jobs
@que.dup
end
+ # Marshal and unmarshal job before pushing it onto the queue. This will
+ # raise an exception on any attempts in tests to push jobs that can't (or
+ # shouldn't) be marshalled.
+ def push(job)
+ super Marshal.load(Marshal.dump(job))
+ end
+
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
View
85 railties/test/application/queue_test.rb
@@ -30,46 +30,68 @@ def app_const
assert_kind_of Rails::Queueing::Queue, Rails.queue
end
- test "in development mode, an enqueued job will be processed in a separate thread" do
- app("development")
+ class ThreadTrackingJob
+ def initialize
+ @origin = Thread.current.object_id
+ end
+
+ def run
+ @target = Thread.current.object_id
+ end
- job = Struct.new(:origin, :target).new(Thread.current)
- def job.run
- self.target = Thread.current
+ def ran_in_different_thread?
+ @origin != @target
end
+ def ran?
+ @target
+ end
+ end
+
+ test "in development mode, an enqueued job will be processed in a separate thread" do
+ app("development")
+
+ job = ThreadTrackingJob.new
Rails.queue.push job
sleep 0.1
- assert job.target, "The job was run"
- assert_not_equal job.origin, job.target
+ assert job.ran?, "Expected job to be run"
+ assert job.ran_in_different_thread?, "Expected job to run in a different thread"
end
test "in test mode, explicitly draining the queue will process it in a separate thread" do
app("test")
- job = Struct.new(:origin, :target).new(Thread.current)
- def job.run
- self.target = Thread.current
+ Rails.queue.push ThreadTrackingJob.new
+ job = Rails.queue.jobs.last
+ Rails.queue.drain
+
+ assert job.ran?, "Expected job to be run"
+ assert job.ran_in_different_thread?, "Expected job to run in a different thread"
+ end
+
+ class IdentifiableJob
+ def initialize(id)
+ @id = id
end
- Rails.queue.push job
- Rails.queue.drain
+ def ==(other)
+ other.same_id?(@id)
+ end
+
+ def same_id?(other_id)
+ other_id == @id
+ end
- assert job.target, "The job was run"
- assert_not_equal job.origin, job.target
+ def run
+ end
end
test "in test mode, the queue can be observed" do
app("test")
- job = Struct.new(:id) do
- def run
- end
- end
-
jobs = (1..10).map do |id|
- job.new(id)
+ IdentifiableJob.new(id)
end
jobs.each do |job|
@@ -79,6 +101,29 @@ def run
assert_equal jobs, Rails.queue.jobs
end
+ test "in test mode, adding an unmarshallable job will raise an exception" do
+ app("test")
+ anonymous_class_instance = Struct.new(:run).new
+ assert_raises TypeError do
+ Rails.queue.push anonymous_class_instance
+ end
+ end
+
+ test "attempting to marshal a queue will raise an exception" do
+ app("test")
+ assert_raises TypeError do
+ Marshal.dump Rails.queue
+ end
+ end
+
+ test "attempting to add a reference to itself to the queue will raise an exception" do
+ app("test")
+ job = {reference: Rails.queue}
+ assert_raises TypeError do
+ Rails.queue.push job
+ end
+ end
+
def setup_custom_queue
add_to_env_config "production", <<-RUBY
require "my_queue"
View
85 railties/test/queueing/test_queue_test.rb
@@ -2,22 +2,18 @@
require 'rails/queueing'
class TestQueueTest < ActiveSupport::TestCase
- class Job
- def initialize(&block)
- @block = block
- end
+ def setup
+ @queue = Rails::Queueing::TestQueue.new
+ end
+ class ExceptionRaisingJob
def run
- @block.call if @block
+ raise
end
end
- def setup
- @queue = Rails::Queueing::TestQueue.new
- end
-
def test_drain_raises
- @queue.push Job.new { raise }
+ @queue.push ExceptionRaisingJob.new
assert_raises(RuntimeError) { @queue.drain }
end
@@ -27,41 +23,80 @@ def test_jobs
assert_equal [1,2], @queue.jobs
end
+ class EquivalentJob
+ def initialize
+ @initial_id = self.object_id
+ end
+
+ def run
+ end
+
+ def ==(other)
+ other.same_initial_id?(@initial_id)
+ end
+
+ def same_initial_id?(other_id)
+ other_id == @initial_id
+ end
+ end
+
def test_contents
assert @queue.empty?
- job = Job.new
+ job = EquivalentJob.new
@queue.push job
refute @queue.empty?
assert_equal job, @queue.pop
end
- def test_order
- processed = []
+ class ProcessingJob
+ def self.clear_processed
+ @processed = []
+ end
+
+ def self.processed
+ @processed
+ end
+
+ def initialize(object)
+ @object = object
+ end
+
+ def run
+ self.class.processed << @object
+ end
+ end
- job1 = Job.new { processed << 1 }
- job2 = Job.new { processed << 2 }
+ def test_order
+ ProcessingJob.clear_processed
+ job1 = ProcessingJob.new(1)
+ job2 = ProcessingJob.new(2)
@queue.push job1
@queue.push job2
@queue.drain
- assert_equal [1,2], processed
+ assert_equal [1,2], ProcessingJob.processed
end
- def test_drain
- t = nil
- ran = false
+ class ThreadTrackingJob
+ attr_reader :thread_id
- job = Job.new do
- ran = true
- t = Thread.current
+ def run
+ @thread_id = Thread.current.object_id
end
- @queue.push job
+ def ran?
+ @thread_id
+ end
+ end
+
+ def test_drain
+ @queue.push ThreadTrackingJob.new
+ job = @queue.jobs.last
@queue.drain
assert @queue.empty?
- assert ran, "The job runs synchronously when the queue is drained"
- assert_not_equal t, Thread.current
+ assert job.ran?, "The job runs synchronously when the queue is drained"
+ assert_not_equal job.thread_id, Thread.current.object_id
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.