diff --git a/lib/lambdakiq/job.rb b/lib/lambdakiq/job.rb index 7dd0a45..926cddc 100644 --- a/lib/lambdakiq/job.rb +++ b/lib/lambdakiq/job.rb @@ -72,7 +72,11 @@ def perform_error(e) @error = e else instrument :retry_stopped, error: e - delete_message + if should_redrive? + @error = e + else + delete_message + end end end @@ -97,10 +101,16 @@ def max_receive_count? executions > retry_limit end + def job_retry + [active_job.lambdakiq_retry, Lambdakiq.config.max_retries, 12].compact.min + end + def retry_limit - config_retry = [Lambdakiq.config.max_retries, 12].min - [ (active_job.lambdakiq_retry || config_retry), - (queue.max_receive_count - 1) ].min + [job_retry, (queue.max_receive_count - 1)].min + end + + def should_redrive? + !queue.redrive_policy.nil? && job_retry >= queue.max_receive_count end def fifo_delay? diff --git a/lib/lambdakiq/queue.rb b/lib/lambdakiq/queue.rb index d1968ae..1eafd89 100644 --- a/lib/lambdakiq/queue.rb +++ b/lib/lambdakiq/queue.rb @@ -22,11 +22,11 @@ def attributes end def redrive_policy - @redrive_policy ||= JSON.parse(attributes['RedrivePolicy']) + @redrive_policy ||= attributes['RedrivePolicy'] ? JSON.parse(attributes['RedrivePolicy']) : nil end def max_receive_count - redrive_policy['maxReceiveCount'].to_i + redrive_policy&.dig('maxReceiveCount')&.to_i || 1 end def fifo? diff --git a/test/cases/job_test.rb b/test/cases/job_test.rb index daa6df8..57b4e2a 100644 --- a/test/cases/job_test.rb +++ b/test/cases/job_test.rb @@ -52,7 +52,6 @@ class JobTest < LambdakiqSpec expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"' expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob' expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob' - # binding.pry ; return expect(logged_metric('retry_stopped.active_job')).must_be_nil enqueue_retry = logged_metric('enqueue_retry.active_job') expect(enqueue_retry).must_be :present? @@ -80,9 +79,9 @@ class JobTest < LambdakiqSpec expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob' end - it 'must delete message for failed jobs at the end of the queue/message max receive count' do - # See ClientHelpers for setting queue to max receive count of 8. - event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob' + it 'must delete message for failed jobs after the first try when queues do not have a redrive policy' do + client.stub_responses(:get_queue_attributes, { attributes: {} }) + event = event_basic job_class: 'TestHelper::Jobs::ErrorJob' response = Lambdakiq::Job.handler(event) assert_response response, failures: false expect(delete_message).must_be :present? @@ -92,6 +91,24 @@ class JobTest < LambdakiqSpec expect(logged_metric('enqueue_retry.active_job')).must_be_nil retry_stopped = logged_metric('retry_stopped.active_job') expect(retry_stopped).must_be :present? + expect(retry_stopped['Executions']).must_equal 1 + expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError' + end + + it 'must not delete message for failed jobs and instead return a failure at the end of the queue/message max receive count' do + # See ClientHelpers for setting queue to max receive count of 8. + event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id + response = Lambdakiq::Job.handler(event) + + assert_response response, failures: true, identifiers: [message_id] + expect(change_message_visibility).must_be_nil + expect(delete_message).must_be_nil + expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob' + expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob' + expect(logged_metric('enqueue_retry.active_job')).must_be_nil + retry_stopped = logged_metric('retry_stopped.active_job') + expect(retry_stopped).must_be :present? expect(retry_stopped['Executions']).must_equal 8 expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError' end diff --git a/test/cases/jobs/basic_job_nofifo_job_test.rb b/test/cases/jobs/basic_job_nofifo_job_test.rb index c39433c..a31111a 100644 --- a/test/cases/jobs/basic_job_nofifo_job_test.rb +++ b/test/cases/jobs/basic_job_nofifo_job_test.rb @@ -6,7 +6,7 @@ class BasicJobNofifoTest < LambdakiqSpec expect(sent_message).must_be :present? end - it 'message body has no fifo queue nave vs fifo super class ' do + it 'message body has no fifo queue name vs fifo super class ' do expect(sent_message_body['queue_name']).must_equal 'lambdakiq-JobsQueue-TESTING123' end diff --git a/test/cases/queue_test.rb b/test/cases/queue_test.rb index 5efb081..5e5fc9e 100644 --- a/test/cases/queue_test.rb +++ b/test/cases/queue_test.rb @@ -1,13 +1,20 @@ require 'test_helper' class QueueTest < LambdakiqSpec - let(:queue) { Lambdakiq.client.queues[queue_name] } + let(:fifo_queue) { Lambdakiq.client.queues[queue_name] } + let(:non_fifo_queue) { Lambdakiq.client.queues['non-fifo-queue'] } it '#fifo?' do - expect(queue.fifo?).must_equal true + expect(fifo_queue.fifo?).must_equal true + expect(non_fifo_queue.fifo?).must_equal false end - it '#max_receive_count' do - expect(queue.max_receive_count).must_equal 8 + it '#max_receive_count returns the queue redrive policy maxReceiveCount' do + expect(fifo_queue.max_receive_count).must_equal 8 + end + + it '#max_receive_count returns 1 when the queue does not have a redrive policy' do + client.stub_responses(:get_queue_attributes, { attributes: {} }) + expect(fifo_queue.max_receive_count).must_equal 1 end end