Skip to content

Commit

Permalink
feat: Add support for starting position AT_TIMESTAMP to MSK events
Browse files Browse the repository at this point in the history
- Resolves #12033

- Add support for a StartingPosition of AT_TIMESTAMP to MSK event
source mappings. When this configuration is used,
StartingPositionTimestamp must also be provided, in Unix Time seconds,
which is consistent with the existing support in the `kafka` event

- Support for this starting position has recently been added to AWS
Lambda event source mappings using MSK
(https://aws.amazon.com/about-aws/whats-new/2023/06/aws-lambda-starting-timestamp-kafka-sources/)

- Since `runServerless` tests are expensive, test the happy path in the
existing test of a valid configuration rather than adding an explicit
separate test for it, but add a new test to explicitly verify the unhappy
path in which a timestamp at which to start is not provided
  • Loading branch information
griffithsbs committed Jun 22, 2023
1 parent a50773b commit d8a3722
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
3 changes: 2 additions & 1 deletion docs/providers/aws/events/msk.md
Expand Up @@ -47,7 +47,8 @@ functions:

For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000.
Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default.
When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory and is specified in Unix time seconds.

In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000, `maximumBatchingWindow` to 30 seconds and `startingPosition` equal to `LATEST`.

Expand Down
4 changes: 3 additions & 1 deletion docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -977,8 +977,10 @@ functions:
batchSize: 100
# Optional, must be in 0-300 range (seconds)
maximumBatchingWindow: 30
# Optional, can be set to LATEST or TRIM_HORIZON
# Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON
startingPosition: LATEST
# Mandatory when startingPosition is AT_TIMESTAMP, must be in Unix time seconds
startingPositionTimestamp: 10000123
# (default: true)
enabled: false
# Optional, arn of the secret key for authenticating with the brokers in your MSK cluster.
Expand Down
17 changes: 16 additions & 1 deletion lib/plugins/aws/package/compile/events/msk/index.js
Expand Up @@ -2,6 +2,7 @@

const getMskClusterNameToken = require('./get-msk-cluster-name-token');
const resolveLambdaTarget = require('../../../../utils/resolve-lambda-target');
const ServerlessError = require('../../../../../../serverless-error');
const _ = require('lodash');

class AwsCompileMSKEvents {
Expand Down Expand Up @@ -38,7 +39,10 @@ class AwsCompileMSKEvents {
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
},
startingPositionTimestamp: {
type: 'number',
},
topic: {
type: 'string',
Expand Down Expand Up @@ -88,6 +92,13 @@ class AwsCompileMSKEvents {
const maximumBatchingWindow = event.msk.maximumBatchingWindow;
const enabled = event.msk.enabled;
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const startingPositionTimestamp = event.msk.startingPositionTimestamp;
if (startingPosition === 'AT_TIMESTAMP' && startingPositionTimestamp == null) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`,
'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID'
);
}
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;
const filterPatterns = event.msk.filterPatterns;
Expand Down Expand Up @@ -115,6 +126,10 @@ class AwsCompileMSKEvents {
},
};

if (startingPositionTimestamp != null) {
mskResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}

if (batchSize) {
mskResource.Properties.BatchSize = batchSize;
}
Expand Down
Expand Up @@ -11,7 +11,8 @@ describe('AwsCompileMSKEvents', () => {
const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a';
const topic = 'TestingTopic';
const enabled = false;
const startingPosition = 'LATEST';
const startingPosition = 'AT_TIMESTAMP';
const startingPositionTimestamp = 123;
const batchSize = 5000;
const maximumBatchingWindow = 10;
const saslScram512 =
Expand Down Expand Up @@ -56,6 +57,7 @@ describe('AwsCompileMSKEvents', () => {
maximumBatchingWindow,
enabled,
startingPosition,
startingPositionTimestamp,
saslScram512,
consumerGroupId,
filterPatterns,
Expand Down Expand Up @@ -121,6 +123,7 @@ describe('AwsCompileMSKEvents', () => {
Enabled: enabled,
EventSourceArn: arn,
StartingPosition: startingPosition,
StartingPositionTimestamp: startingPositionTimestamp,
SourceAccessConfigurations: sourceAccessConfigurations,
Topics: [topic],
FunctionName: {
Expand All @@ -145,6 +148,40 @@ describe('AwsCompileMSKEvents', () => {
},
});
});

describe('when startingPosition is AT_TIMESTAMP', () => {
it('if startingPosition is not provided, it should fail to compile EventSourceMapping resource properties', async () => {
await expect(
runServerless({
fixture: 'function',
configExt: {
functions: {
other: {
events: [
{
msk: {
topic,
arn,
batchSize,
maximumBatchingWindow,
enabled,
startingPosition,
saslScram512,
consumerGroupId,
filterPatterns,
},
},
],
},
},
},
command: 'package',
})
).to.be.rejected.and.eventually.contain({
code: 'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID',
});
});
});
});

describe('when no msk events are defined', () => {
Expand Down

0 comments on commit d8a3722

Please sign in to comment.