Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for starting position AT_TIMESTAMP to MSK events #12034

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/providers/aws/events/msk.md
Expand Up @@ -47,7 +47,8 @@ functions:

For the MSK event integration, you can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000.
Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from MSK topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports three possible values, `TRIM_HORIZON`, `LATEST` and `AT_TIMESTAMP`, with `TRIM_HORIZON` being the default.
When `startingPosition` is configured as `AT_TIMESTAMP`, `startingPositionTimestamp` is also mandatory and is specified in Unix time seconds.

In the following example, we specify that the `compute` function should have an `msk` event configured with `batchSize` of 1000, `maximumBatchingWindow` to 30 seconds and `startingPosition` equal to `LATEST`.

Expand Down
4 changes: 3 additions & 1 deletion docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -977,8 +977,10 @@ functions:
batchSize: 100
# Optional, must be in 0-300 range (seconds)
maximumBatchingWindow: 30
# Optional, can be set to LATEST or TRIM_HORIZON
# Optional, can be set to LATEST, AT_TIMESTAMP or TRIM_HORIZON
startingPosition: LATEST
# Mandatory when startingPosition is AT_TIMESTAMP, must be in Unix time seconds
startingPositionTimestamp: 10000123
# (default: true)
enabled: false
# Optional, arn of the secret key for authenticating with the brokers in your MSK cluster.
Expand Down
17 changes: 16 additions & 1 deletion lib/plugins/aws/package/compile/events/msk/index.js
Expand Up @@ -2,6 +2,7 @@

const getMskClusterNameToken = require('./get-msk-cluster-name-token');
const resolveLambdaTarget = require('../../../../utils/resolve-lambda-target');
const ServerlessError = require('../../../../../../serverless-error');
const _ = require('lodash');

class AwsCompileMSKEvents {
Expand Down Expand Up @@ -38,7 +39,10 @@ class AwsCompileMSKEvents {
},
startingPosition: {
type: 'string',
enum: ['LATEST', 'TRIM_HORIZON'],
enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
},
startingPositionTimestamp: {
type: 'number',
},
topic: {
type: 'string',
Expand Down Expand Up @@ -88,6 +92,13 @@ class AwsCompileMSKEvents {
const maximumBatchingWindow = event.msk.maximumBatchingWindow;
const enabled = event.msk.enabled;
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const startingPositionTimestamp = event.msk.startingPositionTimestamp;
if (startingPosition === 'AT_TIMESTAMP' && startingPositionTimestamp == null) {
throw new ServerlessError(
`You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP.`,
'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID'
);
}
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;
const filterPatterns = event.msk.filterPatterns;
Expand Down Expand Up @@ -115,6 +126,10 @@ class AwsCompileMSKEvents {
},
};

if (startingPositionTimestamp != null) {
mskResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}

if (batchSize) {
mskResource.Properties.BatchSize = batchSize;
}
Expand Down
Expand Up @@ -11,7 +11,8 @@ describe('AwsCompileMSKEvents', () => {
const arn = 'arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a';
const topic = 'TestingTopic';
const enabled = false;
const startingPosition = 'LATEST';
const startingPosition = 'AT_TIMESTAMP';
const startingPositionTimestamp = 123;
const batchSize = 5000;
const maximumBatchingWindow = 10;
const saslScram512 =
Expand Down Expand Up @@ -56,6 +57,7 @@ describe('AwsCompileMSKEvents', () => {
maximumBatchingWindow,
enabled,
startingPosition,
startingPositionTimestamp,
saslScram512,
consumerGroupId,
filterPatterns,
Expand Down Expand Up @@ -121,6 +123,7 @@ describe('AwsCompileMSKEvents', () => {
Enabled: enabled,
EventSourceArn: arn,
StartingPosition: startingPosition,
StartingPositionTimestamp: startingPositionTimestamp,
SourceAccessConfigurations: sourceAccessConfigurations,
Topics: [topic],
FunctionName: {
Expand All @@ -145,6 +148,40 @@ describe('AwsCompileMSKEvents', () => {
},
});
});

describe('when startingPosition is AT_TIMESTAMP', () => {
it('if startingPosition is not provided, it should fail to compile EventSourceMapping resource properties', async () => {
await expect(
runServerless({
fixture: 'function',
configExt: {
functions: {
other: {
events: [
{
msk: {
topic,
arn,
batchSize,
maximumBatchingWindow,
enabled,
startingPosition,
saslScram512,
consumerGroupId,
filterPatterns,
},
},
],
},
},
},
command: 'package',
})
).to.be.rejected.and.eventually.contain({
code: 'FUNCTION_MSK_STARTING_POSITION_TIMESTAMP_INVALID',
});
});
});
});

describe('when no msk events are defined', () => {
Expand Down