From d8a3722998416e439a3803fdb3654c63a9ce7a86 Mon Sep 17 00:00:00 2001 From: Ben Griffiths Date: Thu, 22 Jun 2023 09:07:47 +0100 Subject: [PATCH] feat: Add support for starting position AT_TIMESTAMP to MSK events - Resolves #12033 - Add support for a StartingPosition of AT_TIMESTAMP to MSK event source mappings. When this configuration is used, StartingPositionTimestamp must also be provided, in Unix Time seconds, which is consistent with the existing support in the `kafka` event - Support for this starting position has recently been added to AWS Lambda event source mappings using MSK (https://aws.amazon.com/about-aws/whats-new/2023/06/aws-lambda-starting-timestamp-kafka-sources/) - Since `runServerless` tests are expensive, test the happy path in the existing test of a valid configuration rather than adding an explicit separate test for it, but add a new test to explicitly verify the unhappy path in which a timestamp at which to start is not provided --- docs/providers/aws/events/msk.md | 3 +- docs/providers/aws/guide/serverless.yml.md | 4 +- .../aws/package/compile/events/msk/index.js | 17 +++++++- .../package/compile/events/msk/index.test.js | 39 ++++++++++++++++++- 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/docs/providers/aws/events/msk.md b/docs/providers/aws/events/msk.md index 7b3f4f8ac11..e595dade651 100644 --- a/docs/providers/aws/events/msk.md +++ b/docs/providers/aws/events/msk.md @@ -47,7 +47,8 @@ functions: For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000. Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300. -In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default. +In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default. +When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory and is specified in Unix time seconds. In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000, `maximumBatchingWindow` to 30 seconds and `startingPosition` equal to `LATEST`. diff --git a/docs/providers/aws/guide/serverless.yml.md b/docs/providers/aws/guide/serverless.yml.md index c4452b1bc62..947e4d1d1e6 100644 --- a/docs/providers/aws/guide/serverless.yml.md +++ b/docs/providers/aws/guide/serverless.yml.md @@ -977,8 +977,10 @@ functions: batchSize: 100 # Optional, must be in 0-300 range (seconds) maximumBatchingWindow: 30 - # Optional, can be set to LATEST or TRIM_HORIZON + # Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON startingPosition: LATEST + # Mandatory when startingPosition is AT_TIMESTAMP, must be in Unix time seconds + startingPositionTimestamp: 10000123 # (default: true) enabled: false # Optional, arn of the secret key for authenticating with the brokers in your MSK cluster. diff --git a/lib/plugins/aws/package/compile/events/msk/index.js b/lib/plugins/aws/package/compile/events/msk/index.js index 4efbed5a63d..72deed94eda 100644 --- a/lib/plugins/aws/package/compile/events/msk/index.js +++ b/lib/plugins/aws/package/compile/events/msk/index.js @@ -2,6 +2,7 @@ const getMskClusterNameToken = require('./get-msk-cluster-name-token'); const resolveLambdaTarget = require('../../../../utils/resolve-lambda-target'); +const ServerlessError = require('../../../../../../serverless-error'); const _ = require('lodash'); class AwsCompileMSKEvents { @@ -38,7 +39,10 @@ class AwsCompileMSKEvents { }, startingPosition: { type: 'string', - enum: ['LATEST', 'TRIM_HORIZON'], + enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'], + }, + startingPositionTimestamp: { + type: 'number', }, topic: { type: 'string', @@ -88,6 +92,13 @@ class AwsCompileMSKEvents { const maximumBatchingWindow = event.msk.maximumBatchingWindow; const enabled = event.msk.enabled; const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON'; + const startingPositionTimestamp = event.msk.startingPositionTimestamp; + if (startingPosition === 'AT_TIMESTAMP' && startingPositionTimestamp == null) { + throw new ServerlessError( + `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`, + 'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID' + ); + } const saslScram512 = event.msk.saslScram512; const consumerGroupId = event.msk.consumerGroupId; const filterPatterns = event.msk.filterPatterns; @@ -115,6 +126,10 @@ class AwsCompileMSKEvents { }, }; + if (startingPositionTimestamp != null) { + mskResource.Properties.StartingPositionTimestamp = startingPositionTimestamp; + } + if (batchSize) { mskResource.Properties.BatchSize = batchSize; } diff --git a/test/unit/lib/plugins/aws/package/compile/events/msk/index.test.js b/test/unit/lib/plugins/aws/package/compile/events/msk/index.test.js index f8a560fab8e..216a636b1cc 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/msk/index.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/msk/index.test.js @@ -11,7 +11,8 @@ describe('AwsCompileMSKEvents', () => { const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a'; const topic = 'TestingTopic'; const enabled = false; - const startingPosition = 'LATEST'; + const startingPosition = 'AT_TIMESTAMP'; + const startingPositionTimestamp = 123; const batchSize = 5000; const maximumBatchingWindow = 10; const saslScram512 = @@ -56,6 +57,7 @@ describe('AwsCompileMSKEvents', () => { maximumBatchingWindow, enabled, startingPosition, + startingPositionTimestamp, saslScram512, consumerGroupId, filterPatterns, @@ -121,6 +123,7 @@ describe('AwsCompileMSKEvents', () => { Enabled: enabled, EventSourceArn: arn, StartingPosition: startingPosition, + StartingPositionTimestamp: startingPositionTimestamp, SourceAccessConfigurations: sourceAccessConfigurations, Topics: [topic], FunctionName: { @@ -145,6 +148,40 @@ describe('AwsCompileMSKEvents', () => { }, }); }); + + describe('when startingPosition is AT_TIMESTAMP', () => { + it('if startingPosition is not provided, it should fail to compile EventSourceMapping resource properties', async () => { + await expect( + runServerless({ + fixture: 'function', + configExt: { + functions: { + other: { + events: [ + { + msk: { + topic, + arn, + batchSize, + maximumBatchingWindow, + enabled, + startingPosition, + saslScram512, + consumerGroupId, + filterPatterns, + }, + }, + ], + }, + }, + }, + command: 'package', + }) + ).to.be.rejected.and.eventually.contain({ + code: 'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID', + }); + }); + }); }); describe('when no msk events are defined', () => {