Skip to content
Browse files

Missing git add :trollface:

  • Loading branch information...
1 parent adff4a7 commit 602000be90e9935f4f4ee5acc096725d7b7c33e5 @wycats wycats committed Apr 26, 2012
View
65 railties/lib/rails/queueing.rb
@@ -0,0 +1,65 @@
+module Rails
+ module Queueing
+ # In test mode, the Rails queue is backed by an Array so that assertions
+ # can be made about its contents. The test queue provides a +contents+
+ # method to make assertions about the queue's contents and a +drain+
+ # method to drain the queue and run the jobs.
+ #
+ # Jobs are run in a separate thread to catch mistakes where code
+ # assumes that the job is run in the same thread.
+ class TestQueue
+ attr_reader :contents
+
+ def initialize
+ @contents = []
+ end
+
+ def drain
+ # run the jobs in a separate thread so assumptions of synchronous
+ # jobs are caught in test mode.
+ t = Thread.new do
@josevalim
Ruby on Rails member
josevalim added a note Apr 27, 2012

The downside is that, if the job fails, we won't get an exception. So we should have a way to "propagate" the exception to the main thread or turn Thread.abort_on_exception to true in testing environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ while job = @contents.pop
+ job.run
+ end
+ end
+ t.join
+ end
+
+ # implement the Queue API
+ def push(object)
+ @contents << object
+ end
+ end
+
+ # The threaded consumer will run jobs in a background thread in
+ # development mode or in a VM where running jobs on a thread in
+ # production mode makes sense.
+ #
+ # When the process exits, the consumer pushes a nil onto the
+ # queue and joins the thread, which will ensure that all jobs
+ # are executed before the process finally dies.
+ class ThreadedConsumer
+ def self.start(queue)
+ new(queue).start
+ end
+
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def start
+ @thread = Thread.new do
@josevalim
Ruby on Rails member
josevalim added a note Apr 27, 2012

Should we consider wrapping job.run in a begin/rescue block to ensure the thread won't die? Then we can log the failures as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ while job = @queue.pop
+ job.run
+ end
+ end
+ self
+ end
+
+ def shutdown
+ @queue.push nil
+ @thread.join
+ end
+ end
+ end
+end
View
114 railties/test/application/queue_test.rb
@@ -0,0 +1,114 @@
+require 'isolation/abstract_unit'
+require 'rack/test'
+
+module ApplicationTests
+ class GeneratorsTest < ActiveSupport::TestCase
+ include ActiveSupport::Testing::Isolation
+
+ def setup
+ build_app
+ boot_rails
+ end
+
+ def teardown
+ teardown_app
+ end
+
+ def app_const
+ @app_const ||= Class.new(Rails::Application)
+ end
+
+ test "the queue is a TestQueue in test mode" do
+ app("test")
+ assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue
+ assert_kind_of Rails::Queueing::TestQueue, Rails.queue
+ end
+
+ test "the queue is a Queue in development mode" do
+ app("development")
+ assert_kind_of Queue, Rails.application.queue
+ assert_kind_of Queue, Rails.queue
+ end
+
+ test "in development mode, an enqueued job will be processed in a separate thread" do
+ app("development")
+ current = Thread.current
+
+ job = Struct.new(:origin, :target).new(Thread.current)
+ def job.run
+ self.target = Thread.current
+ end
+
+ Rails.queue.push job
+ sleep 0.1
@mperham
mperham added a note Apr 27, 2012

You'll find that your test suite quickly slows to a crawl with sleep and has non-deterministic failures due to performance edge cases where 0.1 sec is not enough time.

See how girl_friday's async_test helper uses a callback with timeout to make this code effectively synchronous and deterministic. https://github.com/mperham/girl_friday/blob/master/test/test_girl_friday_queue.rb#L10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ assert job.target, "The job was run"
+ assert_not_equal job.origin, job.target
+ end
+
+ test "in test mode, explicitly draining the queue will process it in a separate thread" do
+ app("test")
+ current = Thread.current
+
+ job = Struct.new(:origin, :target).new(Thread.current)
+ def job.run
+ self.target = Thread.current
+ end
+
+ Rails.queue.push job
+ Rails.queue.drain
+
+ assert job.target, "The job was run"
+ assert_not_equal job.origin, job.target
+ end
+
+ test "in test mode, the queue can be observed" do
+ app("test")
+
+ job = Class.new(Struct.new(:id)) do
+ def run
+ end
+ end
+
+ jobs = (1..10).map do |id|
+ job.new(id)
+ end
+
+ jobs.each do |job|
+ Rails.queue.push job
+ end
+
+ assert_equal jobs, Rails.queue.contents
+ end
+
+ test "a custom queue implementation can be provided" do
+ add_to_env_config "production", <<-RUBY
+ require "my_queue"
+ config.queue = MyQueue
+ RUBY
+
+ app_file "lib/my_queue.rb", <<-RUBY
+ class MyQueue
+ def push(job)
+ job.run
+ end
+ end
+ RUBY
+
+ app("production")
+
+ assert_kind_of MyQueue, Rails.queue
+
+ job = Class.new(Struct.new(:id, :ran)) do
+ def run
+ self.ran = true
+ end
+ end
+
+ job1 = job.new(1)
+ Rails.queue.push job1
+
+ assert_equal true, job1.ran
+ end
+ end
+end
View
44 railties/test/queueing/test_queue_test.rb
@@ -0,0 +1,44 @@
+require 'abstract_unit'
+require 'rails/queueing'
+
+class TestQueueTest < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @queue = Rails::Queueing::TestQueue.new
+ end
+
+ def test_contents
+ assert_equal [], @queue.contents
+ job = Job.new(1)
+ @queue.push job
+ assert_equal [job], @queue.contents
+ end
+
+ def test_drain
+ t = nil
+ ran = false
+
+ job = Job.new(1) do
+ ran = true
+ t = Thread.current
+ end
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal [], @queue.contents
+ assert ran, "The job runs synchronously when the queue is drained"
+ assert_not_equal t, Thread.current
+ end
+end
View
65 railties/test/queueing/threaded_consumer_test.rb
@@ -0,0 +1,65 @@
+require 'abstract_unit'
+require 'rails/queueing'
+
+class TestThreadConsumer < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @queue = Queue.new
+ @consumer = Rails::Queueing::ThreadedConsumer.start(@queue)
+ end
+
+ def teardown
+ @queue.push nil
+ end
+
+ test "the jobs are executed" do
+ ran = false
+
+ job = Job.new(1) do
+ ran = true
+ end
+
+ @queue.push job
+ sleep 0.1
+ assert_equal true, ran
+ end
+
+ test "the jobs are not executed synchronously" do
+ ran = false
+
+ job = Job.new(1) do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.push job
+ assert_equal false, ran
+ end
+
+ test "shutting down the queue synchronously drains the jobs" do
+ ran = false
+
+ job = Job.new(1) do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.push job
+ assert_equal false, ran
+
+ @consumer.shutdown
+
+ assert_equal true, ran
+ end
+end

8 comments on commit 602000b

@bokmann

No trollface ended... Two commits filters out trolls that aren't paying attention. Maybe the core team should do that on purpose with potentially controversial changes!

Nice addition. I'd like a standardized way to know if a job finishes, fails, etc. Perhaps queue.push could return a receipt that had a state machine of pending -> running -> completed | failed

@josevalim
Ruby on Rails member
@bokmann

Yes, but isn't that receipt part of the api? And isn't the benefit of this not the memory queue itself, but standardizing the api?

I just realized this is my chance for a core contribution. I'll submit a pullup, but it'll be a week away - I'm leaving this evening for a much needed vacation. I'm thinking Queue.push returns an object that responds to .state and .errors. Implementations can choose to return other things, depending on how they want to differentiate themselves.

@josevalim
Ruby on Rails member
@bokmann

Thanks for the clarity of thought. Given how short-lived that receipt would be on the request/response cycle its probably not worth hanging onto it anyway.

@jrochkind

While I understand the main intent is for something like resque in production (workers in another process entirely, one way or another), I like that it looks pretty straightforward to replace the simple single-thread built-in ThreadedConsumer with a multi-threaded thread pool based on Celluloid and worker pools. For cases where a thread-based consumer is appropriate, but one needs more than a single thread worker.

Might be worth keeping that use case in mind too.

(Using DCell, a Celluloid-based solution could even easily switch to workers in a seperate process too, even as runtime configuration).

@mperham

@jrochkind "ThreadedConsumer with a multi-threaded thread pool based on Celluloid and worker pools" is exactly what Sidekiq is. http://mperham.github.com/sidekiq/ I'm very happy to see this and it should make background processing with Rails even easier.

@jrochkind

@mperham, yeah it looks like Sidekiq uses redis to communicate between job-generators and workers. What I was considering was an in-process thread pool; or a seperate process thread pool that uses DCell to communicate between job-generators and worker pool. So not the same thing, but, year, clearly the point of this is to make a variety of implementations possible behind the same api. +1

Please sign in to comment.
Something went wrong with that request. Please try again.