Skip to content

Commit

Permalink
Set, serialize, and deserialize Active Job scheduled_at as Time; de…
Browse files Browse the repository at this point in the history
…serialize `enqueued_at` as Time; deprecate setting `scheduled_at=` with numeric/epoch

Co-authored-by: Adam Pahlevi <adam.pahlevi@gmail.com>
  • Loading branch information
bensheldon and adamnoto committed Sep 27, 2023
1 parent 60002c3 commit c919b47
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 16 deletions.
7 changes: 7 additions & 0 deletions activejob/CHANGELOG.md
@@ -1,3 +1,10 @@
* Set `scheduled_at` attribute as a Time object instead of epoch seconds, and serialize and deserialize the value
when enqueued. Assigning a numeric/epoch value to scheduled_at= is deprecated; use a Time object instead.

Deserializes `enqueued_at` as a Time instead of ISO8601 String.

*Ben Sheldon*

* Clarify the backoff strategy for the recommended `:wait` option when retrying jobs

`wait: :exponentially_longer` is waiting polynomially longer, so it is now recommended to use `wait: :polynomially_longer` to keep the same behavior.
Expand Down
30 changes: 24 additions & 6 deletions activejob/lib/active_job/core.rb
Expand Up @@ -12,8 +12,10 @@ module Core
attr_accessor :arguments
attr_writer :serialized_arguments

# Timestamp when the job should be performed
attr_accessor :scheduled_at
# Time when the job should be performed
attr_reader :scheduled_at

attr_reader :_scheduled_at_time # :nodoc:

# Job Identifier
attr_accessor :job_id
Expand Down Expand Up @@ -94,6 +96,8 @@ def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@scheduled_at = nil
@_scheduled_at_time = nil
@priority = self.class.priority
@executions = 0
@exception_executions = {}
Expand All @@ -115,7 +119,8 @@ def serialize
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => timezone,
"enqueued_at" => Time.now.utc.iso8601(9)
"enqueued_at" => Time.now.utc.iso8601(9),
"scheduled_at" => _scheduled_at_time ? _scheduled_at_time.utc.iso8601(9) : nil,
}
end

Expand Down Expand Up @@ -155,19 +160,32 @@ def deserialize(job_data)
self.exception_executions = job_data["exception_executions"]
self.locale = job_data["locale"] || I18n.locale.to_s
self.timezone = job_data["timezone"] || Time.zone&.name
self.enqueued_at = job_data["enqueued_at"]
self.enqueued_at = Time.iso8601(job_data["enqueued_at"]) if job_data["enqueued_at"]
self.scheduled_at = Time.iso8601(job_data["scheduled_at"]) if job_data["scheduled_at"]
end

# Configures the job with the given options.
def set(options = {}) # :nodoc:
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.scheduled_at = options[:wait].seconds.from_now if options[:wait]
self.scheduled_at = options[:wait_until] if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]

self
end

def scheduled_at=(value)
@_scheduled_at_time = if value&.is_a?(Numeric)
ActiveJob.deprecator.warn(<<~MSG.squish)
Assigning a numeric/epoch value to scheduled_at is deprecated. Use a Time object instead.

This comment has been minimized.

Copy link
@excid3

excid3 Sep 27, 2023

Contributor

@bensheldon @adamnoto I think I ran into a bug with this commit?

I have a job that I enqueue and the test for it is raising this warning.

InboundWebhooks::IncinerationJob.set(wait: 7.days).perform_later(self)
inbound_webhook = inbound_webhooks(:fake_service)
assert_enqueued_with job: InboundWebhooks::IncinerationJob, args: [inbound_webhook] do
   inbound_webhook.processed!
end

Test passes like it always has, but now hits that deprecation warning.

Poking around the assert_enqueued_with test helper, it seems like the job it finds has at option set to a float still.

From: /Users/chris/.asdf/installs/ruby/3.2.2/lib/ruby/gems/3.2.0/gems/activejob-7.1.0.rc1/lib/active_job/test_helper.rb @ line 441 :

    436:         message << potential_matches.map { |job| job["job_class"] }.join(", ")
    437:       else
    438:         message << "\n\nPotential matches: #{matching_class.join("\n")}"
    439:       end
    440:
 => 441:       binding.irb
    442:       assert matching_job, message
    443:       instantiate_job(matching_job)
    444:     end
    445:
    446:     # Asserts that the job has been performed with the given arguments.

irb(#<InboundWebhookTest:0x000000...):001> matching_job
=>
{"job_class"=>"InboundWebhooks::IncinerationJob",
 "job_id"=>"a012f478-e680-491b-9ca8-5f51b3dbd1f5",
 "provider_job_id"=>nil,
 "queue_name"=>"default",
 "priority"=>nil,
 "arguments"=>[{"_aj_globalid"=>"gid://jumpstart-app/InboundWebhook/585423320"}],
 "executions"=>0,
 "exception_executions"=>{},
 "locale"=>"en",
 "timezone"=>"UTC",
 "enqueued_at"=>"2023-09-27T14:56:26.496494000Z",
 "scheduled_at"=>"2023-10-04T14:56:26.496147000Z",
 :job=>InboundWebhooks::IncinerationJob,
 :args=>[{"_aj_globalid"=>"gid://jumpstart-app/InboundWebhook/585423320"}],
 :queue=>"default",
 :priority=>nil,
 :at=>1696431386.496147}

This comment has been minimized.

Copy link
@rafaelfranca

rafaelfranca Sep 27, 2023

Member

I can see the deprecation in the test suite as well. I'll fix it.

This comment has been minimized.

Copy link
@excid3

excid3 Sep 27, 2023

Contributor

Thanks @rafaelfranca!

This comment has been minimized.

Copy link
@rafaelfranca

rafaelfranca Sep 27, 2023

Member

It should be fixed now

This comment has been minimized.

Copy link
@excid3

excid3 Sep 27, 2023

Contributor

Confirmed that fixed it. 👍

MSG
Time.at(value)
else
value
end
@scheduled_at = value
end

private
def serialize_arguments_if_needed(arguments)
if arguments_serialized?
Expand Down
4 changes: 2 additions & 2 deletions activejob/lib/active_job/enqueuing.rb
Expand Up @@ -23,7 +23,7 @@ def perform_all_later(*jobs)
adapter_jobs.each do |job|
job.successfully_enqueued = false
if job.scheduled_at
queue_adapter.enqueue_at(job, job.scheduled_at)
queue_adapter.enqueue_at(job, job._scheduled_at_time.to_f)
else
queue_adapter.enqueue(job)
end
Expand Down Expand Up @@ -92,7 +92,7 @@ def enqueue(options = {})

run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
queue_adapter.enqueue_at self, _scheduled_at_time.to_f
else
queue_adapter.enqueue self
end
Expand Down
2 changes: 1 addition & 1 deletion activejob/lib/active_job/log_subscriber.rb
Expand Up @@ -76,7 +76,7 @@ def enqueue_all(event)
def perform_start(event)
info do
job = event.payload[:job]
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at.utc.iso8601(9)}" + args_info(job)
end
end
subscribe_log_level :perform_start, :info
Expand Down
2 changes: 1 addition & 1 deletion activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
Expand Up @@ -54,7 +54,7 @@ def enqueue_all(jobs) # :nodoc:
"wrapped" => job_class,
"queue" => queue,
"args" => scheduled_jobs.map { |job| [job.serialize] },
"at" => scheduled_jobs.map { |job| job.scheduled_at }
"at" => scheduled_jobs.map { |job| job.scheduled_at&.to_f }
)
enqueued_count += jids.compact.size
end
Expand Down
2 changes: 1 addition & 1 deletion activejob/lib/active_job/queue_adapters/test_adapter.rb
Expand Up @@ -59,7 +59,7 @@ def filtered?(job)
end

def filtered_time?(job)
job.scheduled_at > at.to_f if at && job.scheduled_at
job.scheduled_at > at if at && job.scheduled_at
end

def filtered_queue?(job)
Expand Down
2 changes: 1 addition & 1 deletion activejob/lib/active_job/test_helper.rb
Expand Up @@ -730,7 +730,7 @@ def deserialize_args_for_assertion(job)

def instantiate_job(payload, skip_deserialize_arguments: false)
job = payload[:job].deserialize(payload)
job.scheduled_at = payload[:at].to_f if payload.key?(:at)
job.scheduled_at = payload[:at] if payload.key?(:at)
job.send(:deserialize_arguments_if_needed) unless skip_deserialize_arguments
job
end
Expand Down
41 changes: 39 additions & 2 deletions activejob/test/cases/job_serialization_test.rb
Expand Up @@ -65,13 +65,50 @@ class JobSerializationTest < ActiveSupport::TestCase
end
end

test "serializes enqueued_at with full precision" do
test "serializes and deserializes enqueued_at with full precision" do
freeze_time

serialized = HelloJob.new.serialize
assert_kind_of String, serialized["enqueued_at"]

enqueued_at = HelloJob.deserialize(serialized).enqueued_at
assert_equal Time.now.utc, Time.iso8601(enqueued_at)
assert_kind_of Time, enqueued_at
assert_equal Time.now.utc, enqueued_at
end

test "serializes and deserializes scheduled_at as Time" do
freeze_time
current_time = Time.now

job = HelloJob.new
job.scheduled_at = current_time
serialized_job = job.serialize
assert_kind_of String, serialized_job["enqueued_at"]
assert_equal current_time.utc.iso8601(9), serialized_job["enqueued_at"]

deserialized_job = HelloJob.new
deserialized_job.deserialize(serialized_job)
assert_equal current_time, deserialized_job.scheduled_at

assert_equal job.serialize, deserialized_job.serialize
end

test "deprecates and coerces numerical scheduled_at attribute to Time when serialized and deserialized" do
freeze_time
current_time = Time.now

job = HelloJob.new
assert_deprecated(ActiveJob.deprecator) do
job.scheduled_at = current_time.to_f
end

serialized_job = job.serialize
assert_kind_of String, serialized_job["scheduled_at"]
assert_equal current_time.utc.iso8601(9), serialized_job["scheduled_at"]

deserialized_job = HelloJob.new
deserialized_job.deserialize(serialized_job)
assert_equal current_time, deserialized_job.scheduled_at
assert_equal job.serialize, deserialized_job.serialize
end
end
16 changes: 15 additions & 1 deletion activejob/test/cases/queuing_test.rb
Expand Up @@ -5,6 +5,7 @@
require "jobs/enqueue_error_job"
require "jobs/multiple_kwargs_job"
require "active_support/core_ext/numeric/time"
require "minitest/mock"

class QueuingTest < ActiveSupport::TestCase
setup do
Expand Down Expand Up @@ -35,7 +36,7 @@ class QueuingTest < ActiveSupport::TestCase

test "job returned by perform_at has the timestamp available" do
job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later
assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at
assert_equal Time.utc(2014, 1, 1), job.scheduled_at
rescue NotImplementedError
skip
end
Expand Down Expand Up @@ -71,6 +72,19 @@ class QueuingTest < ActiveSupport::TestCase
assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort
end

test "perform_all_later enqueues jobs with schedules" do
scheduled_job_1 = HelloJob.new("Scheduled 2014")
scheduled_job_1.set(wait_until: Time.utc(2014, 1, 1))

scheduled_job_2 = HelloJob.new("Scheduled 2015")
scheduled_job_2.scheduled_at = Time.utc(2015, 1, 1)

ActiveJob.perform_all_later(scheduled_job_1, scheduled_job_2)
assert_equal ["Scheduled 2014 says hello", "Scheduled 2015 says hello"], JobBuffer.values.sort
rescue NotImplementedError
skip
end

test "perform_all_later instrumentation" do
jobs = HelloJob.new("Jamie"), HelloJob.new("John")
called = false
Expand Down
2 changes: 1 addition & 1 deletion activejob/test/jobs/retry_job.rb
Expand Up @@ -38,7 +38,7 @@ class RetryJob < ActiveJob::Base

before_enqueue do |job|
if job.arguments.include?(:log_scheduled_at) && job.scheduled_at
JobBuffer.add("Next execution scheduled at #{job.scheduled_at}")
JobBuffer.add("Next execution scheduled at #{job.scheduled_at.to_f}")
end
end

Expand Down

0 comments on commit c919b47

Please sign in to comment.