Skip to content

Commit

Permalink
Active Job refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianbica committed Sep 3, 2014
1 parent 5db4e7f commit 1e237b4
Show file tree
Hide file tree
Showing 33 changed files with 441 additions and 256 deletions.
13 changes: 4 additions & 9 deletions actionmailer/lib/action_mailer/message_delivery.rb
Expand Up @@ -98,15 +98,10 @@ def deliver #:nodoc:

def enqueue_delivery(delivery_method, options={})
args = @mailer.name, @mail_method.to_s, delivery_method.to_s, *@args
enqueue_method = :enqueue
if options[:at]
enqueue_method = :enqueue_at
args.unshift options[:at]
elsif options[:in]
enqueue_method = :enqueue_in
args.unshift options[:in]
end
ActionMailer::DeliveryJob.send enqueue_method, *args
set_options = {}
set_options[:wait_until] = options[:at] if options[:at]
set_options[:wait] = options[:in] if options[:in]
ActionMailer::DeliveryJob.set(set_options).perform_later(*args)
end
end
end
24 changes: 12 additions & 12 deletions actionmailer/test/message_delivery_test.rb
Expand Up @@ -3,8 +3,10 @@
require 'active_job'
require 'minitest/mock'
require 'mailers/delayed_mailer'
require 'active_support/core_ext/numeric/time'

class MessageDeliveryTest < ActiveSupport::TestCase
include ActiveJob::TestHelper

setup do
@previous_logger = ActiveJob::Base.logger
Expand All @@ -13,6 +15,8 @@ class MessageDeliveryTest < ActiveSupport::TestCase
ActiveJob::Base.logger = Logger.new(nil)
@mail = DelayedMailer.test_message(1, 2, 3)
ActionMailer::Base.deliveries.clear
ActiveJob::Base.queue_adapter.perform_enqueued_at_jobs = true
ActiveJob::Base.queue_adapter.perform_enqueued_jobs = true
end

teardown do
Expand Down Expand Up @@ -70,33 +74,29 @@ def test_should_enqueue_and_run_correctly_in_activejob
ActionMailer::Base.deliveries.clear
end

test 'should enqueue the email with :deliver delivery method' do
ret = ActionMailer::DeliveryJob.stub :enqueue, ->(*args){ args } do
test 'should enqueue the email with :deliver_now delivery method' do
assert_performed_with(job: ActionMailer::DeliveryJob, args: ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3]) do
@mail.deliver_later
end
assert_equal ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3], ret
end

test 'should enqueue the email with :deliver! delivery method' do
ret = ActionMailer::DeliveryJob.stub :enqueue, ->(*args){ args } do
test 'should enqueue the email with :deliver_now! delivery method' do
assert_performed_with(job: ActionMailer::DeliveryJob, args: ['DelayedMailer', 'test_message', 'deliver_now!', 1, 2, 3]) do
@mail.deliver_later!
end
assert_equal ['DelayedMailer', 'test_message', 'deliver_now!', 1, 2, 3], ret
end

test 'should enqueue a delivery with a delay' do
ret = ActionMailer::DeliveryJob.stub :enqueue_in, ->(*args){ args } do
@mail.deliver_later in: 600
assert_performed_with(job: ActionMailer::DeliveryJob, args: ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3]) do
@mail.deliver_later in: 600.seconds
end
assert_equal [600, 'DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3], ret
end

test 'should enqueue a delivery at a specific time' do
later_time = Time.now.to_i + 3600
ret = ActionMailer::DeliveryJob.stub :enqueue_at, ->(*args){ args } do
later_time = Time.now.to_f + 3600
assert_performed_with(job: ActionMailer::DeliveryJob, at: later_time, args: ['DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3]) do
@mail.deliver_later at: later_time
end
assert_equal [later_time, 'DelayedMailer', 'test_message', 'deliver_now', 1, 2, 3], ret
end

end
6 changes: 3 additions & 3 deletions activejob/README.md
Expand Up @@ -43,15 +43,15 @@ end
Enqueue a job like so:

```ruby
MyJob.enqueue record # Enqueue a job to be performed as soon the queueing system is free.
MyJob.perform_later record # Enqueue a job to be performed as soon the queueing system is free.
```

```ruby
MyJob.enqueue_at Date.tomorrow.noon, record # Enqueue a job to be performed tomorrow at noon.
MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record) # Enqueue a job to be performed tomorrow at noon.
```

```ruby
MyJob.enqueue_in 1.week, record # Enqueue a job to be performed 1 week from now.
MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1 week from now.
```

That's it!
Expand Down
1 change: 1 addition & 0 deletions activejob/lib/active_job.rb
Expand Up @@ -31,6 +31,7 @@ module ActiveJob

autoload :Base
autoload :QueueAdapters
autoload :ConfiguredJob
autoload :TestCase
autoload :TestHelper
end
4 changes: 2 additions & 2 deletions activejob/lib/active_job/base.rb
@@ -1,19 +1,19 @@
require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
require 'active_job/identifier'
require 'active_job/logging'

module ActiveJob
class Base
include Core
include QueueAdapter
include QueueName
include Enqueuing
include Execution
include Callbacks
include Identifier
include Logging

ActiveSupport.run_load_hooks(:active_job, self)
Expand Down
18 changes: 18 additions & 0 deletions activejob/lib/active_job/configured_job.rb
@@ -0,0 +1,18 @@
module ActiveJob
class ConfiguredJob #:nodoc:
def initialize(job_class, options={})
@options = options
@options[:in] = @options.delete(:wait) if @options[:wait]
@options[:at] = @options.delete(:wait_until) if @options[:wait_until]
@job_class = job_class
end

def perform_now(*args)
@job_class.new(*args).perform_now
end

def perform_later(*args)
@job_class.new(*args).enqueue @options
end
end
end
89 changes: 89 additions & 0 deletions activejob/lib/active_job/core.rb
@@ -0,0 +1,89 @@
module ActiveJob
module Core
extend ActiveSupport::Concern

included do
# Job arguments
attr_accessor :arguments
attr_writer :serialized_arguments

# Timestamp when the job should be performed
attr_accessor :scheduled_at

# Job Identifier
attr_accessor :job_id

# Queue on which the job should be run on.
attr_writer :queue_name
end

module ClassMethods
# Creates a new job instance from a hash created with +serialize+
def deserialize(job_data)
job = job_data['job_class'].constantize.new
job.job_id = job_data['job_id']
job.queue_name = job_data['queue_name']
job.serialized_arguments = job_data['arguments']
job
end

# Creates a job preconfigured with the given options. You can call
# perform_later with the job arguments to enqueue the job with the
# preconfigured options
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
#
# ==== Examples
#
# VideoJob.set(queue: :some_queue).perform_later(Video.last)
# VideoJob.set(wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(wait_until: Time.tomorroe).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.tomorroe).perform_later(Video.last)
def set(options={})
ConfiguredJob.new(self, options)
end
end

# Creates a new job instance. Takes as arguments the arguments that
# will be passed to the perform method.
def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
end

# Returns a hash with the job data that can safely be passed to the
# queueing adapter.
def serialize
{
'job_class' => self.class.name,
'job_id' => job_id,
'queue_name' => queue_name,
'arguments' => serialize_arguments(arguments)
}
end

private
def deserialize_arguments_if_needed
if defined?(@serialized_arguments) && @serialized_arguments.present?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end

def serialize_arguments(serialized_args)
Arguments.serialize(serialized_args)
end

def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end
end
end



98 changes: 51 additions & 47 deletions activejob/lib/active_job/enqueuing.rb
Expand Up @@ -12,60 +12,64 @@ module ClassMethods
#
# Returns an instance of the job class queued with args available in
# Job#arguments.
def enqueue(*args)
new(args).tap do |job|
job.run_callbacks :enqueue do
queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args)
end
end
end

# Enqueue a job to be performed at +interval+ from now.
#
# enqueue_in(1.week, "mike")
#
# Returns an instance of the job class queued with args available in
# Job#arguments and the timestamp in Job#enqueue_at.
def enqueue_in(interval, *args)
enqueue_at interval.seconds.from_now, *args
def perform_later(*args)
job_or_instantiate(*args).enqueue
end

# Enqueue a job to be performed at an explicit point in time.
#
# enqueue_at(Date.tomorrow.midnight, "mike")
#
# Returns an instance of the job class queued with args available in
# Job#arguments and the timestamp in Job#enqueue_at.
def enqueue_at(timestamp, *args)
new(args).tap do |job|
job.enqueued_at = timestamp

job.run_callbacks :enqueue do
queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args)
end
protected
def job_or_instantiate(*args)
args.first.is_a?(self) ? args.first : new(*args)
end
end
end

included do
attr_accessor :arguments
attr_accessor :enqueued_at
end

def initialize(arguments = nil)
@arguments = arguments
end

def retry_now
self.class.enqueue(*arguments)
# Reschedule the job to be re-executed. This is usefull in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
# ==== Options
# * <tt>:in</tt> - Enqueues the job with the specified delay
# * <tt>:at</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
#
# ==== Examples
#
# class SiteScrapperJob < ActiveJob::Base
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
# end
def retry_job(options={})
enqueue options
end

def retry_in(interval)
self.class.enqueue_in interval, *arguments
end

def retry_at(timestamp)
self.class.enqueue_at timestamp, *arguments
# Equeue the job to be performed by the queue adapter.
#
# ==== Options
# * <tt>:in</tt> - Enqueues the job with the specified delay
# * <tt>:at</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
#
# ==== Examples
#
# my_job_instance.enqueue
# my_job_instance.enqueue in: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue at: Date.tomorrow.midnight
def enqueue(options={})
self.scheduled_at = options[:in].seconds.from_now.to_f if options[:in]
self.scheduled_at = options[:at].to_f if options[:at]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
run_callbacks :enqueue do
if self.scheduled_at
self.class.queue_adapter.enqueue_at self, self.scheduled_at
else
self.class.queue_adapter.enqueue self
end
end
self
end
end
end
32 changes: 20 additions & 12 deletions activejob/lib/active_job/execution.rb
Expand Up @@ -4,15 +4,29 @@
module ActiveJob
module Execution
extend ActiveSupport::Concern
include ActiveSupport::Rescuable

included do
include ActiveSupport::Rescuable
end
module ClassMethods
# Performs the job immediately.
#
# MyJob.perform_now("mike")
#
def perform_now(*args)
job_or_instantiate(*args).perform_now
end

def execute(job_id, *serialized_args)
self.job_id = job_id
self.arguments = deserialize_arguments(serialized_args)
def execute(job_data) #:nodoc:
job = deserialize(job_data)
job.perform_now
end
end

# Performs the job immediately. The job is not sent to the queueing adapter
# and will block the execution until it's finished.
#
# MyJob.new(*args).perform_now
def perform_now
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
Expand All @@ -23,11 +37,5 @@ def execute(job_id, *serialized_args)
def perform(*)
fail NotImplementedError
end

private
def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end

end
end

0 comments on commit 1e237b4

Please sign in to comment.