Skip to content

Commit

Permalink
feat(AWS Kafka): Support maximumBatchingWindow
Browse files Browse the repository at this point in the history
  • Loading branch information
rm-hull committed Jan 30, 2022
1 parent d341b6b commit e90c114
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
25 changes: 25 additions & 0 deletions docs/providers/aws/events/kafka.md
Expand Up @@ -141,3 +141,28 @@ functions:
## IAM Permissions

The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) for more information about IAM Permissions for Kafka events.

## Setting the BatchSize, MaximumBatchingWindow and StartingPosition

You can set the `batchSize`, which effects how many messages can be processed in a single Lambda invocation. The default `batchSize` is 100, and the max `batchSize` is 10000.
Likewise `maximumBatchingWindow` can be set to determine the amount of time the Lambda spends gathering records before invoking the function. The default is 0, but **if you set `batchSize` to more than 10, you must set `maximumBatchingWindow` to at least 1**. The maximum is 300.
In addition, you can also configure `startingPosition`, which controls the position at which Lambda should start consuming messages from the topic. It supports two possible values, `TRIM_HORIZON` and `LATEST`, with `TRIM_HORIZON` being the default.

In the following example, we specify that the `compute` function should have a `kafka` event configured with `batchSize` of 1000, `maximumBatchingWindow` of 30 seconds and `startingPosition` equal to `LATEST`.

```yml
functions:
compute:
handler: handler.compute
events:
- kafka:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: MySelfManagedKafkaTopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
batchSize: 1000
maximumBatchingWindow: 30
startingPosition: LATEST
```
29 changes: 29 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -928,6 +928,35 @@ functions:
enabled: false
```

### Kafka

[Kakfa events](../events/kafka.md):

```yaml
functions:
hello:
# ...
events:
- kafka:
# See main kafka documentation for various access configuration settings
accessConfigurations:
# ...
# An array of bootstrap server addresses
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
# name of Kafka topic to consume from
topic: MySelfManagedKafkaTopic
# Optional, must be in 1-10000 range
batchSize: 100
# Optional, must be in 0-300 range (seconds)
maximumBatchingWindow: 30
# Optional, can be set to LATEST or TRIM_HORIZON
startingPosition: LATEST
# (default: true)
enabled: false
```

### Alexa

[Alexa Skill events](../events/alexa-skill.md) and [Alexa Smart Home events](../events/alexa-smart-home.md):
Expand Down
11 changes: 10 additions & 1 deletion lib/plugins/aws/package/compile/events/kafka.js
Expand Up @@ -67,6 +67,11 @@ class AwsCompileKafkaEvents {
minimum: 1,
maximum: 10000,
},
maximumBatchingWindow: {
type: 'number',
minimum: 0,
maximum: 300,
},
enabled: {
type: 'boolean',
},
Expand Down Expand Up @@ -149,7 +154,7 @@ class AwsCompileKafkaEvents {
}

hasKafkaEvent = true;
const { topic, batchSize, enabled } = event.kafka;
const { topic, batchSize, maximumBatchingWindow, enabled } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';

const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
Expand Down Expand Up @@ -231,6 +236,10 @@ class AwsCompileKafkaEvents {
kafkaResource.Properties.BatchSize = batchSize;
}

if (maximumBatchingWindow) {
kafkaResource.Properties.MaximumBatchingWindowInSeconds = maximumBatchingWindow;
}

if (enabled != null) {
kafkaResource.Properties.Enabled = enabled;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
const enabled = false;
const startingPosition = 'LATEST';
const batchSize = 5000;
const maximumBatchingWindow = 20;

describe('when there are kafka events defined', () => {
let minimalEventSourceMappingResource;
Expand Down Expand Up @@ -50,6 +51,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
batchSize,
maximumBatchingWindow,
enabled,
startingPosition,
},
Expand Down Expand Up @@ -105,6 +107,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
it('should correctly compile EventSourceMapping resource with all parameters', () => {
expect(allParamsEventSourceMappingResource.Properties).to.deep.equal({
BatchSize: batchSize,
MaximumBatchingWindowInSeconds: maximumBatchingWindow,
Enabled: enabled,
SelfManagedEventSource: {
Endpoints: {
Expand Down

0 comments on commit e90c114

Please sign in to comment.