diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index 5a0f372c8db8a..53e51e5222655 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,3 +1,10 @@ +* Set `scheduled_at` attribute as a Time object instead of epoch seconds, and serialize and deserialize the value + when enqueued. Assigning a numeric/epoch value to scheduled_at= is deprecated; use a Time object instead. + + Deserializes `enqueued_at` as a Time instead of ISO8601 String. + + *Ben Sheldon* + * Clarify the backoff strategy for the recommended `:wait` option when retrying jobs `wait: :exponentially_longer` is waiting polynomially longer, so it is now recommended to use `wait: :polynomially_longer` to keep the same behavior. diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index 1743327486391..3a6c4cd280fbc 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -12,8 +12,10 @@ module Core attr_accessor :arguments attr_writer :serialized_arguments - # Timestamp when the job should be performed - attr_accessor :scheduled_at + # Time when the job should be performed + attr_reader :scheduled_at + + attr_reader :_scheduled_at_time # :nodoc: # Job Identifier attr_accessor :job_id @@ -94,6 +96,8 @@ def initialize(*arguments) @arguments = arguments @job_id = SecureRandom.uuid @queue_name = self.class.queue_name + @scheduled_at = nil + @_scheduled_at_time = nil @priority = self.class.priority @executions = 0 @exception_executions = {} @@ -115,7 +119,8 @@ def serialize "exception_executions" => exception_executions, "locale" => I18n.locale.to_s, "timezone" => timezone, - "enqueued_at" => Time.now.utc.iso8601(9) + "enqueued_at" => Time.now.utc.iso8601(9), + "scheduled_at" => _scheduled_at_time ? _scheduled_at_time.utc.iso8601(9) : nil, } end @@ -155,19 +160,32 @@ def deserialize(job_data) self.exception_executions = job_data["exception_executions"] self.locale = job_data["locale"] || I18n.locale.to_s self.timezone = job_data["timezone"] || Time.zone&.name - self.enqueued_at = job_data["enqueued_at"] + self.enqueued_at = Time.iso8601(job_data["enqueued_at"]) if job_data["enqueued_at"] + self.scheduled_at = Time.iso8601(job_data["scheduled_at"]) if job_data["scheduled_at"] end # Configures the job with the given options. def set(options = {}) # :nodoc: - self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] - self.scheduled_at = options[:wait_until].to_f if options[:wait_until] + self.scheduled_at = options[:wait].seconds.from_now if options[:wait] + self.scheduled_at = options[:wait_until] if options[:wait_until] self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] self.priority = options[:priority].to_i if options[:priority] self end + def scheduled_at=(value) + @_scheduled_at_time = if value&.is_a?(Numeric) + ActiveJob.deprecator.warn(<<~MSG.squish) + Assigning a numeric/epoch value to scheduled_at is deprecated. Use a Time object instead. + MSG + Time.at(value) + else + value + end + @scheduled_at = value + end + private def serialize_arguments_if_needed(arguments) if arguments_serialized? diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 357dcd71d0e7b..a805a53f62c3d 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -23,7 +23,7 @@ def perform_all_later(*jobs) adapter_jobs.each do |job| job.successfully_enqueued = false if job.scheduled_at - queue_adapter.enqueue_at(job, job.scheduled_at) + queue_adapter.enqueue_at(job, job._scheduled_at_time.to_f) else queue_adapter.enqueue(job) end @@ -92,7 +92,7 @@ def enqueue(options = {}) run_callbacks :enqueue do if scheduled_at - queue_adapter.enqueue_at self, scheduled_at + queue_adapter.enqueue_at self, _scheduled_at_time.to_f else queue_adapter.enqueue self end diff --git a/activejob/lib/active_job/log_subscriber.rb b/activejob/lib/active_job/log_subscriber.rb index 0d39048b3a965..5982fe3ef2441 100644 --- a/activejob/lib/active_job/log_subscriber.rb +++ b/activejob/lib/active_job/log_subscriber.rb @@ -76,7 +76,7 @@ def enqueue_all(event) def perform_start(event) info do job = event.payload[:job] - "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job) + "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at.utc.iso8601(9)}" + args_info(job) end end subscribe_log_level :perform_start, :info diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 9cf90b38f849a..55b578ae7cd63 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -54,7 +54,7 @@ def enqueue_all(jobs) # :nodoc: "wrapped" => job_class, "queue" => queue, "args" => scheduled_jobs.map { |job| [job.serialize] }, - "at" => scheduled_jobs.map { |job| job.scheduled_at } + "at" => scheduled_jobs.map { |job| job.scheduled_at&.to_f } ) enqueued_count += jids.compact.size end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index 975ba9a7dfbb5..6e1c8ec6a2bbc 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -59,7 +59,7 @@ def filtered?(job) end def filtered_time?(job) - job.scheduled_at > at.to_f if at && job.scheduled_at + job.scheduled_at > at if at && job.scheduled_at end def filtered_queue?(job) diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb index aee3fb7a9c160..149fc8a47c4b1 100644 --- a/activejob/lib/active_job/test_helper.rb +++ b/activejob/lib/active_job/test_helper.rb @@ -730,7 +730,7 @@ def deserialize_args_for_assertion(job) def instantiate_job(payload, skip_deserialize_arguments: false) job = payload[:job].deserialize(payload) - job.scheduled_at = payload[:at].to_f if payload.key?(:at) + job.scheduled_at = payload[:at] if payload.key?(:at) job.send(:deserialize_arguments_if_needed) unless skip_deserialize_arguments job end diff --git a/activejob/test/cases/job_serialization_test.rb b/activejob/test/cases/job_serialization_test.rb index 4cf90412ac25b..f2b98958cdb5d 100644 --- a/activejob/test/cases/job_serialization_test.rb +++ b/activejob/test/cases/job_serialization_test.rb @@ -65,13 +65,50 @@ class JobSerializationTest < ActiveSupport::TestCase end end - test "serializes enqueued_at with full precision" do + test "serializes and deserializes enqueued_at with full precision" do freeze_time serialized = HelloJob.new.serialize assert_kind_of String, serialized["enqueued_at"] enqueued_at = HelloJob.deserialize(serialized).enqueued_at - assert_equal Time.now.utc, Time.iso8601(enqueued_at) + assert_kind_of Time, enqueued_at + assert_equal Time.now.utc, enqueued_at + end + + test "serializes and deserializes scheduled_at as Time" do + freeze_time + current_time = Time.now + + job = HelloJob.new + job.scheduled_at = current_time + serialized_job = job.serialize + assert_kind_of String, serialized_job["enqueued_at"] + assert_equal current_time.utc.iso8601(9), serialized_job["enqueued_at"] + + deserialized_job = HelloJob.new + deserialized_job.deserialize(serialized_job) + assert_equal current_time, deserialized_job.scheduled_at + + assert_equal job.serialize, deserialized_job.serialize + end + + test "deprecates and coerces numerical scheduled_at attribute to Time when serialized and deserialized" do + freeze_time + current_time = Time.now + + job = HelloJob.new + assert_deprecated(ActiveJob.deprecator) do + job.scheduled_at = current_time.to_f + end + + serialized_job = job.serialize + assert_kind_of String, serialized_job["scheduled_at"] + assert_equal current_time.utc.iso8601(9), serialized_job["scheduled_at"] + + deserialized_job = HelloJob.new + deserialized_job.deserialize(serialized_job) + assert_equal current_time, deserialized_job.scheduled_at + assert_equal job.serialize, deserialized_job.serialize end end diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb index 675496cbcb82a..2413987df33eb 100644 --- a/activejob/test/cases/queuing_test.rb +++ b/activejob/test/cases/queuing_test.rb @@ -5,6 +5,7 @@ require "jobs/enqueue_error_job" require "jobs/multiple_kwargs_job" require "active_support/core_ext/numeric/time" +require "minitest/mock" class QueuingTest < ActiveSupport::TestCase setup do @@ -35,7 +36,7 @@ class QueuingTest < ActiveSupport::TestCase test "job returned by perform_at has the timestamp available" do job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later - assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at + assert_equal Time.utc(2014, 1, 1), job.scheduled_at rescue NotImplementedError skip end @@ -71,6 +72,19 @@ class QueuingTest < ActiveSupport::TestCase assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort end + test "perform_all_later enqueues jobs with schedules" do + scheduled_job_1 = HelloJob.new("Scheduled 2014") + scheduled_job_1.set(wait_until: Time.utc(2014, 1, 1)) + + scheduled_job_2 = HelloJob.new("Scheduled 2015") + scheduled_job_2.scheduled_at = Time.utc(2015, 1, 1) + + ActiveJob.perform_all_later(scheduled_job_1, scheduled_job_2) + assert_equal ["Scheduled 2014 says hello", "Scheduled 2015 says hello"], JobBuffer.values.sort + rescue NotImplementedError + skip + end + test "perform_all_later instrumentation" do jobs = HelloJob.new("Jamie"), HelloJob.new("John") called = false diff --git a/activejob/test/jobs/retry_job.rb b/activejob/test/jobs/retry_job.rb index 33a415c471f90..a1b5a0f13ef08 100644 --- a/activejob/test/jobs/retry_job.rb +++ b/activejob/test/jobs/retry_job.rb @@ -38,7 +38,7 @@ class RetryJob < ActiveJob::Base before_enqueue do |job| if job.arguments.include?(:log_scheduled_at) && job.scheduled_at - JobBuffer.add("Next execution scheduled at #{job.scheduled_at}") + JobBuffer.add("Next execution scheduled at #{job.scheduled_at.to_f}") end end