Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure failed messages go to DLQ #31

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 14 additions & 4 deletions lib/lambdakiq/job.rb
Expand Up @@ -72,7 +72,11 @@ def perform_error(e)
@error = e
else
instrument :retry_stopped, error: e
delete_message
if should_redrive?
@error = e
else
delete_message
end
end
end

Expand All @@ -97,10 +101,16 @@ def max_receive_count?
executions > retry_limit
end

def job_retry
[active_job.lambdakiq_retry, Lambdakiq.config.max_retries, 12].compact.min
end

def retry_limit
config_retry = [Lambdakiq.config.max_retries, 12].min
[ (active_job.lambdakiq_retry || config_retry),
(queue.max_receive_count - 1) ].min
[job_retry, (queue.max_receive_count - 1)].min
end

def should_redrive?
!queue.redrive_policy.nil? && job_retry >= queue.max_receive_count
end

def fifo_delay?
Expand Down
4 changes: 2 additions & 2 deletions lib/lambdakiq/queue.rb
Expand Up @@ -22,11 +22,11 @@ def attributes
end

def redrive_policy
@redrive_policy ||= JSON.parse(attributes['RedrivePolicy'])
@redrive_policy ||= attributes['RedrivePolicy'] ? JSON.parse(attributes['RedrivePolicy']) : nil
end

def max_receive_count
redrive_policy['maxReceiveCount'].to_i
redrive_policy&.dig('maxReceiveCount')&.to_i || 1
end

def fifo?
Expand Down
25 changes: 21 additions & 4 deletions test/cases/job_test.rb
Expand Up @@ -52,7 +52,6 @@ class JobTest < LambdakiqSpec
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
# binding.pry ; return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning this up BTW.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries, glad to help

expect(logged_metric('retry_stopped.active_job')).must_be_nil
enqueue_retry = logged_metric('enqueue_retry.active_job')
expect(enqueue_retry).must_be :present?
Expand Down Expand Up @@ -80,9 +79,9 @@ class JobTest < LambdakiqSpec
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
end

it 'must delete message for failed jobs at the end of the queue/message max receive count' do
# See ClientHelpers for setting queue to max receive count of 8.
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
it 'must delete message for failed jobs after the first try when queues do not have a redrive policy' do
client.stub_responses(:get_queue_attributes, { attributes: {} })
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob'
response = Lambdakiq::Job.handler(event)
assert_response response, failures: false
expect(delete_message).must_be :present?
Expand All @@ -92,6 +91,24 @@ class JobTest < LambdakiqSpec
expect(logged_metric('enqueue_retry.active_job')).must_be_nil
retry_stopped = logged_metric('retry_stopped.active_job')
expect(retry_stopped).must_be :present?
expect(retry_stopped['Executions']).must_equal 1
expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError'
end

it 'must not delete message for failed jobs and instead return a failure at the end of the queue/message max receive count' do
# See ClientHelpers for setting queue to max receive count of 8.
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
response = Lambdakiq::Job.handler(event)

assert_response response, failures: true, identifiers: [message_id]
expect(change_message_visibility).must_be_nil
expect(delete_message).must_be_nil
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
expect(logged_metric('enqueue_retry.active_job')).must_be_nil
retry_stopped = logged_metric('retry_stopped.active_job')
expect(retry_stopped).must_be :present?
expect(retry_stopped['Executions']).must_equal 8
expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError'
end
Expand Down
2 changes: 1 addition & 1 deletion test/cases/jobs/basic_job_nofifo_job_test.rb
Expand Up @@ -6,7 +6,7 @@ class BasicJobNofifoTest < LambdakiqSpec
expect(sent_message).must_be :present?
end

it 'message body has no fifo queue nave vs fifo super class ' do
it 'message body has no fifo queue name vs fifo super class ' do
expect(sent_message_body['queue_name']).must_equal 'lambdakiq-JobsQueue-TESTING123'
end

Expand Down
15 changes: 11 additions & 4 deletions test/cases/queue_test.rb
@@ -1,13 +1,20 @@
require 'test_helper'

class QueueTest < LambdakiqSpec
let(:queue) { Lambdakiq.client.queues[queue_name] }
let(:fifo_queue) { Lambdakiq.client.queues[queue_name] }
let(:non_fifo_queue) { Lambdakiq.client.queues['non-fifo-queue'] }

it '#fifo?' do
expect(queue.fifo?).must_equal true
expect(fifo_queue.fifo?).must_equal true
expect(non_fifo_queue.fifo?).must_equal false
end

it '#max_receive_count' do
expect(queue.max_receive_count).must_equal 8
it '#max_receive_count returns the queue redrive policy maxReceiveCount' do
expect(fifo_queue.max_receive_count).must_equal 8
end

it '#max_receive_count returns 1 when the queue does not have a redrive policy' do
client.stub_responses(:get_queue_attributes, { attributes: {} })
expect(fifo_queue.max_receive_count).must_equal 1
end
end