Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

See this http://keepachangelog.com link for information on how we want this documented formatted.

## v2.0.0

#### Changed

- Leverage new `ReportBatchItemFailures` feature of SQS.

## v1.0.2, v1.0.3, v1.0.4

#### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
lambdakiq (1.0.4)
lambdakiq (2.0.0)
activejob
aws-sdk-sqs
concurrent-ruby
Expand Down
12 changes: 4 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ JobsLambda:
Properties:
Queue: !GetAtt JobsQueue.Arn
BatchSize: 1
FunctionResponseTypes:
- ReportBatchItemFailures
MemorySize: 1792
PackageType: Image
Policies:
Expand All @@ -160,9 +162,9 @@ JobsLambda:
Here are some key aspects of our `JobsLambda` resource above:

- The `Events` property uses the [SQS Type](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html).
- Our [BatchSize](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html#sam-function-sqs-batchsize) is set to one so we can handle retrys more easily without worrying about idempotency in larger batches.
- The [BatchSize](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html#sam-function-sqs-batchsize) can be any number you like. Less means more Lambda concurrency, more means some jobs could take longer. The jobs function `Timeout` must be lower than the `JobsQueue`'s `VisibilityTimeout` property. When the batch size is one, the queue's visibility is generally one second more.
- You must use `ReportBatchItemFailures` response types. Lambdakiq assumes we are [reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting). This is a new feature of SQS introduced in [November 2021](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting).
- The `Metadata`'s Docker properties must be the same as our web function except for the `DockerTag`. This is needed for the image to be shared. This works around a known [SAM issue](https://github.com/aws/aws-sam-cli/issues/2466) vs using the `ImageConfig` property.
- The jobs function `Timeout` must be lower than the `JobsQueue`'s `VisibilityTimeout` property. When the batch size is one, the queue's visibility is generally one second more.

🎉 Deploy your application and have fun with ActiveJob on SQS & Lambda.

Expand Down Expand Up @@ -192,12 +194,6 @@ end

- `retry` - Overrides the default Lambdakiq `max_retries` for this one job.

## Concurrency & Limits

AWS SQS is highly scalable with [few limits](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html). As your jobs in SQS increases so should your concurrent functions to process that work. However, as this article, ["Why isn't my Lambda function with an Amazon SQS event source scaling optimally?"](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-sqs-scaling/) describes it is possible that errors will effect your concurrency.

To help keep your queue and workers scalable, reduce the errors raised by your jobs. You an also reduce the retry count.

## Observability with CloudWatch

Get ready to gain way more insights into your ActiveJobs using AWS' [CloudWatch](https://aws.amazon.com/cloudwatch/) service. Every AWS service, including SQS & Lambda, publishes detailed [CloudWatch Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html). This gem leverages [CloudWatch Embedded Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html) to add detailed ActiveJob metrics to that system. You can mix and match these data points to build your own [CloudWatch Dashboards](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Dashboards.html). If needed, any combination can be used to trigger [CloudWatch Alarms](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html). Much like Sumo Logic, you can search & query for data using [CloudWatch Logs Insights](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html).
Expand Down
10 changes: 0 additions & 10 deletions lib/lambdakiq/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,6 @@ module Lambdakiq
class Error < StandardError
end

class JobError < Error
attr_reader :original_exception, :job

def initialize(error)
@original_exception = error
super(error.message)
set_backtrace Rails.backtrace_cleaner.clean(error.backtrace)
end
end

class FifoDelayError < Error
def initialize(error)
super
Expand Down
13 changes: 9 additions & 4 deletions lib/lambdakiq/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def handler(event)
records = Event.records(event)
jobs = records.map { |record| new(record) }
jobs.each(&:perform)
jwerror = jobs.detect{ |j| j.error }
return unless jwerror
raise JobError.new(jwerror.error)
failed_jobs = jobs.select { |j| j.error }
item_failures = failed_jobs.map { |j| { itemIdentifier: j.provider_job_id } }
{ batchItemFailures: item_failures }
end

end
Expand Down Expand Up @@ -40,10 +40,14 @@ def executions
active_job.executions
end

def provider_job_id
active_job.provider_job_id
end

def perform
if fifo_delay?
fifo_delay
raise FifoDelayError, active_job.job_id
return
end
execute
end
Expand Down Expand Up @@ -104,6 +108,7 @@ def fifo_delay?
end

def fifo_delay
@error = FifoDelayError.new(active_job.job_id)
params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout
client.change_message_visibility(params)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/lambdakiq/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Lambdakiq
VERSION = '1.0.4'
VERSION = '2.0.0'
end
60 changes: 40 additions & 20 deletions test/cases/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
class JobTest < LambdakiqSpec

it '#active_job - a deserialize representation of what will be executed' do
aj = job.active_job
e = event_basic messageId: message_id
aj = job(event: e).active_job
expect(aj).must_be_instance_of TestHelper::Jobs::BasicJob
expect(aj.job_id).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3'
expect(aj.queue_name).must_equal queue_name
expect(aj.enqueued_at).must_equal '2020-11-30T13:07:36Z'
expect(aj.executions).must_equal 0
expect(aj.provider_job_id).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807'
expect(aj.provider_job_id).must_equal message_id
end

it '#active_job - executions uses ApproximateReceiveCount' do
Expand All @@ -19,7 +20,8 @@ class JobTest < LambdakiqSpec
end

it 'must perform basic job' do
Lambdakiq::Job.handler(event_basic)
response = Lambdakiq::Job.handler(event_basic)
assert_response response, failures: false
expect(delete_message).must_be :present?
expect(change_message_visibility).must_be_nil
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
Expand All @@ -28,21 +30,23 @@ class JobTest < LambdakiqSpec
end

it 'logs cloudwatch embedded metrics' do
Lambdakiq::Job.handler(event_basic)
response = Lambdakiq::Job.handler(event_basic(messageId: message_id))
assert_response response, failures: false
metric = logged_metric('perform.active_job')
expect(metric).must_be :present?
expect(metric['AppName']).must_equal 'Dummy'
expect(metric['JobName']).must_equal 'TestHelper::Jobs::BasicJob'
expect(metric['Duration']).must_equal 0
expect(metric['JobId']).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3'
expect(metric['QueueName']).must_equal 'lambdakiq-JobsQueue-TESTING123.fifo'
expect(metric['MessageId']).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807'
expect(metric['MessageId']).must_equal message_id
expect(metric['JobArg1']).must_equal 'test'
end

it 'must change message visibility to next value for failed jobs' do
event = event_basic attributes: { ApproximateReceiveCount: '7' }, job_class: 'TestHelper::Jobs::ErrorJob'
expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
event = event_basic attributes: { ApproximateReceiveCount: '7' }, job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
response = Lambdakiq::Job.handler(event)
assert_response response, failures: true, identifiers: [message_id]
expect(change_message_visibility).must_be :present?
expect(change_message_visibility_params[:visibility_timeout]).must_equal 1416
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
Expand All @@ -57,10 +61,20 @@ class JobTest < LambdakiqSpec
end

it 'wraps returned errors with no backtrace which avoids excessive/duplicate cloudwatch logging' do
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob'
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
expect(error.class.name).must_equal 'Lambdakiq::JobError'
expect(error.backtrace).must_equal []
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
response = Lambdakiq::Job.handler(event)
assert_response response, failures: true, identifiers: [message_id]
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
end

it 'can handle batches with partial failures' do
event = event_basic
error = event_basic job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
event['Records'].push error['Records'].first
response = Lambdakiq::Job.handler(event)
assert_response response, failures: true, identifiers: [message_id]
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
Expand All @@ -69,7 +83,8 @@ class JobTest < LambdakiqSpec
it 'must delete message for failed jobs at the end of the queue/message max receive count' do
# See ClientHelpers for setting queue to max receive count of 8.
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
Lambdakiq::Job.handler(event)
response = Lambdakiq::Job.handler(event)
assert_response response, failures: false
expect(delete_message).must_be :present?
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
Expand All @@ -82,8 +97,9 @@ class JobTest < LambdakiqSpec
end

it 'must not perform and allow fifo queue to use message visibility as delay' do
event = event_basic_delay minutes: 6
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
event = event_basic_delay minutes: 6, overrides: { messageId: message_id }
response = Lambdakiq::Job.handler(event)
assert_response response, failures: true, identifiers: [message_id]
expect(delete_message).must_be :blank?
expect(change_message_visibility).must_be :present?
expect(change_message_visibility_params[:visibility_timeout]).must_be_close_to 6.minutes, 1
Expand All @@ -92,8 +108,9 @@ class JobTest < LambdakiqSpec
end

it 'must not perform and allow fifo queue to use message visibility as delay (using SentTimestamp)' do
event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N')
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N'), overrides: { messageId: message_id }
response = Lambdakiq::Job.handler(event)
assert_response response, failures: true, identifiers: [message_id]
expect(delete_message).must_be :blank?
expect(change_message_visibility).must_be :present?
expect(change_message_visibility_params[:visibility_timeout]).must_be_close_to 8.minutes, 1
Expand All @@ -103,7 +120,8 @@ class JobTest < LambdakiqSpec

it 'must perform and allow fifo queue to use message visibility as delay but not when SentTimestamp is too far in the past' do
event = event_basic_delay minutes: 2, timestamp: 3.minutes.ago.strftime('%s%3N')
Lambdakiq::Job.handler(event)
response = Lambdakiq::Job.handler(event)
assert_response response, failures: false
expect(delete_message).must_be :present?
expect(change_message_visibility).must_be_nil
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
Expand All @@ -113,16 +131,18 @@ class JobTest < LambdakiqSpec

it 'must use `lambdakiq_options` retry options set to 0 and not retry job' do
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobNoRetry'
Lambdakiq::Job.handler(event)
response = Lambdakiq::Job.handler(event)
assert_response response, failures: false
expect(delete_message).must_be :present?
expect(perform_buffer_last_value).must_equal 'ErrorJobNoRetry with: "test"'
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJobNoRetry'
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJobNoRetry'
end

it 'must use `lambdakiq_options` retry options set to 1 and retry job' do
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobOneRetry'
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobOneRetry', messageId: message_id
response = Lambdakiq::Job.handler(event)
assert_response response, failures: true, identifiers: [message_id]
expect(delete_message).must_be :blank?
expect(perform_buffer_last_value).must_equal 'ErrorJobOneRetry with: "test"'
expect(change_message_visibility).must_be :present?
Expand Down
3 changes: 2 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class LambdakiqSpec < Minitest::Spec
TestHelper::EventHelpers,
TestHelper::QueueHelpers,
TestHelper::LogHelpers,
TestHelper::PerformHelpers
TestHelper::PerformHelpers,
TestHelper::ResponseHelpers

before do
client_reset!
Expand Down
13 changes: 10 additions & 3 deletions test/test_helper/event_helpers.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
require 'test_helper/events/base'
require 'test_helper/events/basic'
require 'securerandom'

module TestHelper
module EventHelpers

MESSAGE_ID = '9081fe74-bc79-451f-a03a-2fe5c6e2f807'.freeze

private

def event_basic(overrides = {})
Events::Basic.create(overrides)
end

def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'))
Events::Basic.create(
def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'), overrides: {})
Events::Basic.create({
attributes: { SentTimestamp: timestamp },
messageAttributes: {
delay_seconds: {
Expand All @@ -21,7 +24,11 @@ def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'))
dataType: 'String'
}
}
)
}.merge(overrides))
end

def message_id
MESSAGE_ID
end

end
Expand Down
1 change: 1 addition & 0 deletions test/test_helper/events/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Base
self.event = Hash.new

def self.create(overrides = {})
overrides[:messageId] ||= SecureRandom.uuid
job_class = overrides.delete(:job_class)
event.deep_dup.tap do |e|
e['Records'].each do |r|
Expand Down
2 changes: 1 addition & 1 deletion test/test_helper/events/basic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Basic < Base
{
"Records": [
{
"messageId": "9081fe74-bc79-451f-a03a-2fe5c6e2f807",
"messageId": "abcdefgh-ijkl-mnop-qrst-uvwxyz012345",
"receiptHandle": "AQEBgbn8GmF1fMo4z3IIqlJYymS6e7NBynwE+LsQlzjjdcKtSIomGeKMe0noLC9UDShUSe8bzr0s+pby03stHNRv1hgg4WRB5YT4aO0dwOuio7LvMQ/VW88igQtWmca78K6ixnU9X5Sr6J+/+WMvjBgIdvO0ycAM2tyJ1nxRHs/krUoLo/bFCnnwYh++T5BLQtFjFGrRkPjWnzjAbLWKU6Hxxr5lkHSxGhjfAoTCOjhi9crouXaWD+H1uvoGx/O/ZXaeMNjKIQoKjhFguwbEpvrq2Pfh2x9nRgBP3cKa9qw4Q3oFQ0MiQAvnK+UO8cCnsKtD",
"body": "{\"job_class\":\"TestHelper::Jobs::BasicJob\",\"job_id\":\"527cd37e-08f4-4aa8-9834-a46220cdc5a3\",\"provider_job_id\":null,\"queue_name\":\"lambdakiq-JobsQueue-TESTING123.fifo\",\"priority\":null,\"arguments\":[\"test\"],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2020-11-30T13:07:36Z\"}",
"attributes": {
Expand Down
27 changes: 27 additions & 0 deletions test/test_helper/response_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module TestHelper
module ResponseHelpers
extend ActiveSupport::Concern

private

def assert_response(response, failures: false, identifiers: [])
expect(response).must_be_instance_of Hash
expect(response[:batchItemFailures]).must_be_instance_of Array
if failures
assert_response_failures response, identifiers: identifiers
else
expect(response[:batchItemFailures]).must_be :empty?
end
end

def assert_response_failures(response, identifiers: [])
expect(response[:batchItemFailures]).wont_be :empty?
return if identifiers.blank?
expect(response[:batchItemFailures].length).must_equal identifiers.length
response[:batchItemFailures].each_with_index do |failure, index|
expect(failure[:itemIdentifier]).must_equal identifiers[index]
end
end

end
end