Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Move queue classes to ActiveSupport

  • Loading branch information...
commit 8577687fcb9da20868a5ea50aea36427270d4485 1 parent ae00ade
Santiago Pastorino spastorino authored
4 actionmailer/test/abstract_unit.rb
View
@@ -11,7 +11,7 @@
require 'minitest/autorun'
require 'action_mailer'
require 'action_mailer/test_case'
-require 'rails/queueing'
+require 'active_support/queueing'
silence_warnings do
# These external dependencies have warnings :/
@@ -27,7 +27,7 @@
FIXTURE_LOAD_PATH = File.expand_path('fixtures', File.dirname(__FILE__))
ActionMailer::Base.view_paths = FIXTURE_LOAD_PATH
-ActionMailer::Base.queue = Rails::Queueing::SynchronousQueue.new
+ActionMailer::Base.queue = ActiveSupport::SynchronousQueue.new
class MockSMTP
def self.deliveries
4 actionmailer/test/base_test.rb
View
@@ -3,13 +3,13 @@
require 'set'
require 'action_dispatch'
+require 'active_support/queueing'
require 'active_support/time'
require 'mailers/base_mailer'
require 'mailers/proc_mailer'
require 'mailers/asset_mailer'
require 'mailers/async_mailer'
-require 'rails/queueing'
class BaseTest < ActiveSupport::TestCase
def teardown
@@ -433,7 +433,7 @@ def stub_queue(klass, queue)
end
test "delivering message asynchronously" do
- testing_queue = Rails::Queueing::TestQueue.new
+ testing_queue = ActiveSupport::TestQueue.new
AsyncMailer.delivery_method = :test
AsyncMailer.deliveries.clear
stub_queue(AsyncMailer, testing_queue).welcome.deliver
116 activesupport/lib/active_support/queueing.rb
View
@@ -0,0 +1,116 @@
+require 'delegate'
+require 'thread'
+
+module ActiveSupport
+ # A Queue that simply inherits from STDLIB's Queue. Everytime this
+ # queue is used, Rails automatically sets up a ThreadedConsumer
+ # to consume it.
+ class Queue < ::Queue
+ end
+
+ class SynchronousQueue < ::Queue
+ def push(job)
+ result = nil
+ Thread.new { result = job.run }.join
+ result
+ end
+ alias << push
+ alias enq push
+ end
+
+ # In test mode, the Rails queue is backed by an Array so that assertions
+ # can be made about its contents. The test queue provides a +jobs+
+ # method to make assertions about the queue's contents and a +drain+
+ # method to drain the queue and run the jobs.
+ #
+ # Jobs are run in a separate thread to catch mistakes where code
+ # assumes that the job is run in the same thread.
+ class TestQueue < ::Queue
+ # Get a list of the jobs off this queue. This method may not be
+ # available on production queues.
+ def jobs
+ @que.dup
+ end
+
+ # Marshal and unmarshal job before pushing it onto the queue. This will
+ # raise an exception on any attempts in tests to push jobs that can't (or
+ # shouldn't) be marshalled.
+ def push(job)
+ super Marshal.load(Marshal.dump(job))
+ end
+
+ # Drain the queue, running all jobs in a different thread. This method
+ # may not be available on production queues.
+ def drain
+ # run the jobs in a separate thread so assumptions of synchronous
+ # jobs are caught in test mode.
+ Thread.new { pop.run until empty? }.join
+ end
+ end
+
+ # A container for multiple queues. This class delegates to a default Queue
+ # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
+ # with multiple queues:
+ #
+ # # In your configuration:
+ # Rails.queue[:image_queue] = SomeQueue.new
+ # Rails.queue[:mail_queue] = SomeQueue.new
+ #
+ # # In your app code:
+ # Rails.queue[:mail_queue].push SomeJob.new
+ #
+ class QueueContainer < DelegateClass(::Queue)
+ def initialize(default_queue)
+ @queues = { :default => default_queue }
+ super(default_queue)
+ end
+
+ def [](queue_name)
+ @queues[queue_name]
+ end
+
+ def []=(queue_name, queue)
+ @queues[queue_name] = queue
+ end
+ end
+
+ # The threaded consumer will run jobs in a background thread in
+ # development mode or in a VM where running jobs on a thread in
+ # production mode makes sense.
+ #
+ # When the process exits, the consumer pushes a nil onto the
+ # queue and joins the thread, which will ensure that all jobs
+ # are executed before the process finally dies.
+ class ThreadedQueueConsumer
+ def self.start(queue, logger=nil)
+ new(queue, logger).start
+ end
+
+ def initialize(queue, logger=nil)
+ @queue = queue
+ @logger = logger
+ end
+
+ def start
+ @thread = Thread.new do
+ while job = @queue.pop
+ begin
+ job.run
+ rescue Exception => e
+ handle_exception e
+ end
+ end
+ end
+ self
+ end
+
+ def shutdown
+ @queue.push nil
+ @thread.join
+ end
+
+ def handle_exception(e)
+ @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger
+ end
+ end
+end
28 activesupport/test/queueing/container_test.rb
View
@@ -0,0 +1,28 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+
+module ActiveSupport
+ class ContainerTest < ActiveSupport::TestCase
+ def test_delegates_to_default
+ q = Queue.new
+ container = QueueContainer.new q
+ job = Object.new
+
+ container.push job
+ assert_equal job, q.pop
+ end
+
+ def test_access_default
+ q = Queue.new
+ container = QueueContainer.new q
+ assert_equal q, container[:default]
+ end
+
+ def test_assign_queue
+ container = QueueContainer.new Object.new
+ q = Object.new
+ container[:foo] = q
+ assert_equal q, container[:foo]
+ end
+ end
+end
4 railties/test/queueing/test_queue_test.rb → activesupport/test/queueing/test_queue_test.rb
View
@@ -1,9 +1,9 @@
require 'abstract_unit'
-require 'rails/queueing'
+require 'active_support/queueing'
class TestQueueTest < ActiveSupport::TestCase
def setup
- @queue = Rails::Queueing::TestQueue.new
+ @queue = ActiveSupport::TestQueue.new
end
class ExceptionRaisingJob
18 railties/test/queueing/threaded_consumer_test.rb → ...vesupport/test/queueing/threaded_consumer_test.rb
View
@@ -1,5 +1,6 @@
require 'abstract_unit'
-require 'rails/queueing'
+require 'active_support/queueing'
+require "active_support/log_subscriber/test_helper"
class TestThreadConsumer < ActiveSupport::TestCase
class Job
@@ -15,8 +16,9 @@ def run
end
def setup
- @queue = Rails::Queueing::Queue.new
- @consumer = Rails::Queueing::ThreadedConsumer.start(@queue)
+ @queue = ActiveSupport::Queue.new
+ @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
+ @consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger)
end
def teardown
@@ -64,10 +66,6 @@ def teardown
end
test "log job that raises an exception" do
- require "active_support/log_subscriber/test_helper"
- logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
- Rails.logger = logger
-
job = Job.new(1) do
raise "RuntimeError: Error!"
end
@@ -75,13 +73,13 @@ def teardown
@queue.push job
sleep 0.1
- assert_equal 1, logger.logged(:error).size
- assert_match(/Job Error: RuntimeError: Error!/, logger.logged(:error).last)
+ assert_equal 1, @logger.logged(:error).size
+ assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last)
end
test "test overriding exception handling" do
@consumer.shutdown
- @consumer = Class.new(Rails::Queueing::ThreadedConsumer) do
+ @consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do
attr_reader :last_error
def handle_exception(e)
@last_error = e.message
4 guides/source/configuring.textile
View
@@ -111,9 +111,9 @@ end
* +config.middleware+ allows you to configure the application's middleware. This is covered in depth in the "Configuring Middleware":#configuring-middleware section below.
-* +config.queue+ configures a different queue implementation for the application. Defaults to +Rails::Queueing::Queue+. Note that, if the default queue is changed, the default +queue_consumer+ is not going to be initialized, it is up to the new queue implementation to handle starting and shutting down its own consumer(s).
+* +config.queue+ configures a different queue implementation for the application. Defaults to +ActiveSupport::SynchronousQueue+. Note that, if the default queue is changed, the default +queue_consumer+ is not going to be initialized, it is up to the new queue implementation to handle starting and shutting down its own consumer(s).
-* +config.queue_consumer+ configures a different consumer implementation for the default queue. Defaults to +Rails::Queueing::ThreadedConsumer+.
+* +config.queue_consumer+ configures a different consumer implementation for the default queue. Defaults to +ActiveSupport::ThreadedQueueConsumer+.
* +config.reload_classes_only_on_change+ enables or disables reloading of classes only when tracked files change. By default tracks everything on autoload paths and is set to true. If +config.cache_classes+ is true, this option is ignored.
1  railties/lib/rails.rb
View
@@ -22,7 +22,6 @@
module Rails
autoload :Info, 'rails/info'
autoload :InfoController, 'rails/info_controller'
- autoload :Queueing, 'rails/queueing'
class << self
def application
3  railties/lib/rails/application.rb
View
@@ -1,4 +1,5 @@
require 'fileutils'
+require 'active_support/queueing'
require 'rails/engine'
module Rails
@@ -188,7 +189,7 @@ def config #:nodoc:
end
def queue #:nodoc:
- @queue ||= Queueing::Container.new(build_queue)
+ @queue ||= ActiveSupport::QueueContainer.new(build_queue)
end
def build_queue #:nodoc:
5 railties/lib/rails/application/configuration.rb
View
@@ -1,5 +1,6 @@
require 'active_support/core_ext/kernel/reporting'
require 'active_support/file_update_checker'
+require 'active_support/queueing'
require 'rails/engine/configuration'
module Rails
@@ -41,8 +42,8 @@ def initialize(*)
@exceptions_app = nil
@autoflush_log = true
@log_formatter = ActiveSupport::Logger::SimpleFormatter.new
- @queue = Rails::Queueing::SynchronousQueue
- @queue_consumer = Rails::Queueing::ThreadedConsumer
+ @queue = ActiveSupport::SynchronousQueue
+ @queue_consumer = ActiveSupport::ThreadedQueueConsumer
@eager_load = nil
@assets = ActiveSupport::OrderedOptions.new
4 railties/lib/rails/application/finisher.rb
View
@@ -97,8 +97,8 @@ module Finisher
end
initializer :activate_queue_consumer do |app|
- if config.queue == Rails::Queueing::Queue
- app.queue_consumer = config.queue_consumer.start(app.queue)
+ if config.queue == ActiveSupport::Queue
+ app.queue_consumer = config.queue_consumer.start(app.queue, Rails.logger)
at_exit { app.queue_consumer.shutdown }
end
end
2  railties/lib/rails/generators/rails/app/templates/config/environments/production.rb.tt
View
@@ -82,5 +82,5 @@
# Default the production mode queue to an synchronous queue. You will probably
# want to replace this with an out-of-process queueing solution.
- # config.queue = Rails::Queueing::SynchronousQueue
+ # config.queue = ActiveSupport::SynchronousQueue
end
2  railties/lib/rails/generators/rails/app/templates/config/environments/test.rb.tt
View
@@ -40,5 +40,5 @@
config.active_support.deprecation = :stderr
# Use the testing queue.
- config.queue = Rails::Queueing::TestQueue
+ config.queue = ActiveSupport::TestQueue
end
115 railties/lib/rails/queueing.rb
View
@@ -1,115 +0,0 @@
-require "thread"
-require 'delegate'
-
-module Rails
- module Queueing
- # A container for multiple queues. This class delegates to a default Queue
- # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
- # with multiple queues:
- #
- # # In your configuration:
- # Rails.queue[:image_queue] = SomeQueue.new
- # Rails.queue[:mail_queue] = SomeQueue.new
- #
- # # In your app code:
- # Rails.queue[:mail_queue].push SomeJob.new
- #
- class Container < DelegateClass(::Queue)
- def initialize(default_queue)
- @queues = { :default => default_queue }
- super(default_queue)
- end
-
- def [](queue_name)
- @queues[queue_name]
- end
-
- def []=(queue_name, queue)
- @queues[queue_name] = queue
- end
- end
-
- # A Queue that simply inherits from STDLIB's Queue. Everytime this
- # queue is used, Rails automatically sets up a ThreadedConsumer
- # to consume it.
- class Queue < ::Queue
- end
-
- class SynchronousQueue < ::Queue
- def push(job)
- job.run
- end
- alias << push
- alias enq push
- end
-
- # In test mode, the Rails queue is backed by an Array so that assertions
- # can be made about its contents. The test queue provides a +jobs+
- # method to make assertions about the queue's contents and a +drain+
- # method to drain the queue and run the jobs.
- #
- # Jobs are run in a separate thread to catch mistakes where code
- # assumes that the job is run in the same thread.
- class TestQueue < ::Queue
- # Get a list of the jobs off this queue. This method may not be
- # available on production queues.
- def jobs
- @que.dup
- end
-
- # Marshal and unmarshal job before pushing it onto the queue. This will
- # raise an exception on any attempts in tests to push jobs that can't (or
- # shouldn't) be marshalled.
- def push(job)
- super Marshal.load(Marshal.dump(job))
- end
-
- # Drain the queue, running all jobs in a different thread. This method
- # may not be available on production queues.
- def drain
- # run the jobs in a separate thread so assumptions of synchronous
- # jobs are caught in test mode.
- Thread.new { pop.run until empty? }.join
- end
- end
-
- # The threaded consumer will run jobs in a background thread in
- # development mode or in a VM where running jobs on a thread in
- # production mode makes sense.
- #
- # When the process exits, the consumer pushes a nil onto the
- # queue and joins the thread, which will ensure that all jobs
- # are executed before the process finally dies.
- class ThreadedConsumer
- def self.start(queue)
- new(queue).start
- end
-
- def initialize(queue)
- @queue = queue
- end
-
- def start
- @thread = Thread.new do
- while job = @queue.pop
- begin
- job.run
- rescue Exception => e
- handle_exception e
- end
- end
- end
- self
- end
-
- def shutdown
- @queue.push nil
- @thread.join
- end
-
- def handle_exception(e)
- Rails.logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}"
- end
- end
- end
-end
6 railties/test/application/initializers/frameworks_test.rb
View
@@ -52,19 +52,19 @@ def teardown
test "uses the default queue for ActionMailer" do
require "#{app_path}/config/environment"
- assert_kind_of Rails::Queueing::Container, ActionMailer::Base.queue
+ assert_kind_of ActiveSupport::QueueContainer, ActionMailer::Base.queue
end
test "allows me to configure queue for ActionMailer" do
app_file "config/environments/development.rb", <<-RUBY
AppTemplate::Application.configure do
- Rails.queue[:mailer] = Rails::Queueing::TestQueue.new
+ Rails.queue[:mailer] = ActiveSupport::TestQueue.new
config.action_mailer.queue = Rails.queue[:mailer]
end
RUBY
require "#{app_path}/config/environment"
- assert_kind_of Rails::Queueing::TestQueue, ActionMailer::Base.queue
+ assert_kind_of ActiveSupport::TestQueue, ActionMailer::Base.queue
end
test "does not include url helpers as action methods" do
16 railties/test/application/queue_test.rb
View
@@ -19,14 +19,14 @@ def app_const
test "the queue is a TestQueue in test mode" do
app("test")
- assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue[:default]
- assert_kind_of Rails::Queueing::TestQueue, Rails.queue[:default]
+ assert_kind_of ActiveSupport::TestQueue, Rails.application.queue[:default]
+ assert_kind_of ActiveSupport::TestQueue, Rails.queue[:default]
end
test "the queue is a SynchronousQueue in development mode" do
app("development")
- assert_kind_of Rails::Queueing::SynchronousQueue, Rails.application.queue[:default]
- assert_kind_of Rails::Queueing::SynchronousQueue, Rails.queue[:default]
+ assert_kind_of ActiveSupport::SynchronousQueue, Rails.application.queue[:default]
+ assert_kind_of ActiveSupport::SynchronousQueue, Rails.queue[:default]
end
class ThreadTrackingJob
@@ -47,7 +47,7 @@ def ran?
end
end
- test "in development mode, an enqueued job will be processed in the same thread" do
+ test "in development mode, an enqueued job will be processed in a separate thread" do
app("development")
job = ThreadTrackingJob.new
@@ -55,7 +55,7 @@ def ran?
sleep 0.1
assert job.ran?, "Expected job to be run"
- refute job.ran_in_different_thread?, "Expected job to run in the same thread"
+ assert job.ran_in_different_thread?, "Expected job to run in the same thread"
end
test "in test mode, explicitly draining the queue will process it in a separate thread" do
@@ -160,12 +160,12 @@ def run
test "a custom consumer implementation can be provided" do
add_to_env_config "production", <<-RUBY
require "my_queue_consumer"
- config.queue = Rails::Queueing::Queue
+ config.queue = ActiveSupport::Queue
config.queue_consumer = MyQueueConsumer
RUBY
app_file "lib/my_queue_consumer.rb", <<-RUBY
- class MyQueueConsumer < Rails::Queueing::ThreadedConsumer
+ class MyQueueConsumer < ActiveSupport::ThreadedQueueConsumer
attr_reader :started
def start
30 railties/test/queueing/container_test.rb
View
@@ -1,30 +0,0 @@
-require 'abstract_unit'
-require 'rails/queueing'
-
-module Rails
- module Queueing
- class ContainerTest < ActiveSupport::TestCase
- def test_delegates_to_default
- q = Queue.new
- container = Container.new q
- job = Object.new
-
- container.push job
- assert_equal job, q.pop
- end
-
- def test_access_default
- q = Queue.new
- container = Container.new q
- assert_equal q, container[:default]
- end
-
- def test_assign_queue
- container = Container.new Object.new
- q = Object.new
- container[:foo] = q
- assert_equal q, container[:foo]
- end
- end
- end
-end
Please sign in to comment.
Something went wrong with that request. Please try again.