Skip to content

Commit

Permalink
Active Job: async adapter should always run jobs immediately if `im…
Browse files Browse the repository at this point in the history
…mediate` set

This is an internal fix, not user facing. I noticed it while working on #48585.

The `async` adapter has an `immediate` option, which should only be used in tests. This option should tell the adapter to run jobs inline. This works correctly with `perform_later`, but it does not work with `enqueue_at`, which is what other internal mechanisms such as `retry_job` use.

This PR fixes this bug.
  • Loading branch information
ghiculescu committed Jul 2, 2023
1 parent f46d345 commit 639dab5
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 10 deletions.
10 changes: 6 additions & 4 deletions activejob/Rakefile
Expand Up @@ -36,8 +36,9 @@ namespace :test do
Rake::TestTask.new(adapter => "test:env:#{adapter}") do |t|
t.description = "Run adapter tests for #{adapter}"
t.libs << "test"
t.test_files = FileList["test/cases/**/*_test.rb"].reject {
|x| x.include?("delayed_job") && adapter != "delayed_job"
t.test_files = FileList["test/cases/**/*_test.rb"].reject { |x|
(x.include?("delayed_job") && adapter != "delayed_job") ||
(x.include?("async") && adapter != "async")
}
t.verbose = true
t.warning = true
Expand All @@ -46,8 +47,9 @@ namespace :test do

namespace :isolated do
task adapter => "test:env:#{adapter}" do
Dir.glob("#{__dir__}/test/cases/**/*_test.rb").reject {
|x| x.include?("delayed_job") && adapter != "delayed_job"
Dir.glob("#{__dir__}/test/cases/**/*_test.rb").reject { |x|
(x.include?("delayed_job") && adapter != "delayed_job") ||
(x.include?("async") && adapter != "async")
}.all? do |file|
sh(Gem.ruby, "-w", "-I#{__dir__}/lib", "-I#{__dir__}/test", file)
end || raise("Failures")
Expand Down
2 changes: 1 addition & 1 deletion activejob/lib/active_job/queue_adapters/async_adapter.rb
Expand Up @@ -95,7 +95,7 @@ def enqueue(job, queue_name:)

def enqueue_at(job, timestamp, queue_name:)
delay = timestamp - Time.current.to_f
if delay > 0
if !immediate && delay > 0
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
else
enqueue(job, queue_name: queue_name)
Expand Down
22 changes: 22 additions & 0 deletions activejob/test/cases/async_adapter_test.rb
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require "helper"
require "active_job/queue_adapters/async_adapter"
require "jobs/hello_job"

class AsyncAdapterTest < ActiveSupport::TestCase
setup do
JobBuffer.clear
ActiveJob::Base.queue_adapter.immediate = true
end

test "in immediate run, perform_later runs immediately" do
HelloJob.perform_later "Alex"
assert_match(/Alex/, JobBuffer.last_value)
end

test "in immediate run, enqueue with wait: runs immediately" do
HelloJob.set(wait_until: Date.tomorrow.noon).perform_later "Alex"
assert_match(/Alex/, JobBuffer.last_value)
end
end
2 changes: 1 addition & 1 deletion activejob/test/cases/logging_test.rb
Expand Up @@ -280,8 +280,8 @@ def test_enqueue_retry_logging_on_retry_job
def test_retry_stopped_logging
perform_enqueued_jobs do
RetryJob.perform_later "CustomCatchError", 6
assert_match(/Stopped retrying RetryJob \(Job ID: .*?\) due to a CustomCatchError.*, which reoccurred on \d+ attempts\./, @logger.messages)
end
assert_match(/Stopped retrying RetryJob \(Job ID: .*?\) due to a CustomCatchError.*, which reoccurred on \d+ attempts\./, @logger.messages)
end

def test_retry_stopped_logging_without_block
Expand Down
4 changes: 4 additions & 0 deletions activejob/test/helper.rb
Expand Up @@ -19,3 +19,7 @@
require "active_support/testing/autorun"

require_relative "../../tools/test_common"

def adapter_is?(*adapter_class_symbols)
adapter_class_symbols.map(&:to_s).include? ActiveJob::Base.queue_adapter_name
end
4 changes: 0 additions & 4 deletions activejob/test/support/integration/test_case_helpers.rb
Expand Up @@ -27,10 +27,6 @@ def clear_jobs
jobs_manager.clear_jobs
end

def adapter_is?(*adapter_class_symbols)
adapter_class_symbols.map(&:to_s).include? ActiveJob::Base.queue_adapter_name
end

def wait_for_jobs_to_finish_for(seconds = 60)
Timeout.timeout(seconds) do
while !job_executed do
Expand Down

0 comments on commit 639dab5

Please sign in to comment.