Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(AWS Lambda): Add support for consumer group ID to kafka event #11345

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/providers/aws/events/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ In order to configure lambda to trigger via `kafka` events, you must provide thr
- `topic` to consume messages from
- `bootstrapServers` - an array of bootstrap server addresses for your Kafka cluster

Optionally, you can provide the following properties:

- `consumerGroupId` - the consumer group id to use for consuming messages
Phil-Pinkowski marked this conversation as resolved.
Show resolved Hide resolved

## Authentication

You must authenticate your Lambda with a self-managed Apache Kafka cluster using one of;
Expand Down Expand Up @@ -72,6 +76,7 @@ functions:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: MySelfManagedKafkaTopic
consumerGroupId: MyConsumerGroupId
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
Expand All @@ -91,6 +96,7 @@ functions:
clientCertificateTlsAuth: arn:aws:secretsmanager:us-east-1:01234567890:secret:ClientCertificateTLS
serverRootCaCertificate: arn:aws:secretsmanager:us-east-1:01234567890:secret:ServerRootCaCertificate
topic: MySelfManagedMTLSKafkaTopic
consumerGroupId: MyConsumerGroupId
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
Expand Down
20 changes: 20 additions & 0 deletions docs/providers/aws/events/msk.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ functions:
startingPosition: LATEST
```

Optionally, you can provide the following properties:

- `consumerGroupId` - the consumer group id to use for consuming messages

For example:

```yml
functions:
compute:
handler: handler.compute
events:
- msk:
arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx
topic: mytopic
batchSize: 1000
maximumBatchingWindow: 30
startingPosition: LATEST
consumerGroupId: MyConsumerGroupId
```

## Enabling and disabling MSK event

The `msk` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages.
Expand Down
4 changes: 4 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,8 @@ functions:
enabled: false
# Optional, arn of the secret key for authenticating with the brokers in your MSK cluster.
saslScram512: arn:aws:secretsmanager:region:XXXXXX:secret:AmazonMSK_xxxxxx
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
consumerGroupId: MyConsumerGroupId
```

### ActiveMQ
Expand Down Expand Up @@ -1025,6 +1027,8 @@ functions:
startingPosition: LATEST
# (default: true)
enabled: false
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
consumerGroupId: MyConsumerGroupId
```

### RabbitMQ
Expand Down
13 changes: 12 additions & 1 deletion lib/plugins/aws/package/compile/events/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class AwsCompileKafkaEvents {
topic: {
type: 'string',
},
consumerGroupId: {
type: 'string',
maxLength: 200,
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
},
additionalProperties: false,
required: ['accessConfigurations', 'bootstrapServers', 'topic'],
Expand Down Expand Up @@ -142,7 +147,7 @@ class AwsCompileKafkaEvents {
}

hasKafkaEvent = true;
const { topic, batchSize, maximumBatchingWindow, enabled } = event.kafka;
const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';

const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
Expand Down Expand Up @@ -232,6 +237,12 @@ class AwsCompileKafkaEvents {
kafkaResource.Properties.Enabled = enabled;
}

if (consumerGroupId) {
kafkaResource.Properties.SelfManagedKafkaEventSourceConfig = {
ConsumerGroupId: consumerGroupId,
};
}

cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});

Expand Down
12 changes: 12 additions & 0 deletions lib/plugins/aws/package/compile/events/msk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class AwsCompileMSKEvents {
type: 'string',
},
saslScram512: { $ref: '#/definitions/awsArnString' },
consumerGroupId: {
type: 'string',
maxLength: 200,
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
},
additionalProperties: false,
required: ['arn', 'topic'],
Expand Down Expand Up @@ -81,6 +86,7 @@ class AwsCompileMSKEvents {
const enabled = event.msk.enabled;
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;

const mskClusterNameToken = getMskClusterNameToken(eventSourceArn);
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId(
Expand Down Expand Up @@ -114,6 +120,12 @@ class AwsCompileMSKEvents {
mskResource.Properties.MaximumBatchingWindowInSeconds = maximumBatchingWindow;
}

if (consumerGroupId) {
mskResource.Properties.AmazonManagedKafkaEventSourceConfig = {
ConsumerGroupId: consumerGroupId,
};
}

if (enabled != null) {
mskResource.Properties.Enabled = enabled;
}
Expand Down
37 changes: 37 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/kafka.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,43 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
await runCompileEventSourceMappingTest(eventConfig);
});

it('should correctly compile EventSourceMapping resource properties for ConsumerGroupId', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: {
clientCertificateTlsAuth: clientCertificateTlsAuthArn,
},
consumerGroupId: 'my-consumer-group-id',
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'CLIENT_CERTIFICATE_TLS_AUTH',
URI: clientCertificateTlsAuthArn,
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('basic'), 'Arn'],
},
SelfManagedKafkaEventSourceConfig: {
ConsumerGroupId: 'my-consumer-group-id',
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});

it('should update default IAM role with EC2 statement when VPC accessConfiguration is provided', async () => {
const { cfTemplate } = await runServerless({
fixture: 'function',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ describe('AwsCompileMSKEvents', () => {
const maximumBatchingWindow = 10;
const saslScram512 =
'arn:aws:secretsmanager:us-east-1:111111111111:secret:AmazonMSK_a1a1a1a1a1a1a1a1';
const consumerGroupId = 'TestConsumerGroupId';
const sourceAccessConfigurations = [
{
Type: 'SASL_SCRAM_512_AUTH',
Expand Down Expand Up @@ -55,6 +56,7 @@ describe('AwsCompileMSKEvents', () => {
enabled,
startingPosition,
saslScram512,
consumerGroupId,
},
},
],
Expand Down Expand Up @@ -122,6 +124,9 @@ describe('AwsCompileMSKEvents', () => {
FunctionName: {
'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'],
},
AmazonManagedKafkaEventSourceConfig: {
ConsumerGroupId: consumerGroupId,
},
});
});
});
Expand Down