Skip to content

Commit

Permalink
feat(AWS Kafka): Support consumerGroupId option (#11345)
Browse files Browse the repository at this point in the history
  • Loading branch information
Phil-Pinkowski committed Aug 26, 2022
1 parent 9c2ebb7 commit 9bb3f11
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/providers/aws/events/kafka.md
Expand Up @@ -21,6 +21,10 @@ In order to configure lambda to trigger via `kafka` events, you must provide thr
- `topic` to consume messages from
- `bootstrapServers` - an array of bootstrap server addresses for your Kafka cluster

Optionally, you can provide the following properties:

- `consumerGroupId` - the consumer group id to use for consuming messages

## Authentication

You must authenticate your Lambda with a self-managed Apache Kafka cluster using one of;
Expand Down Expand Up @@ -72,6 +76,7 @@ functions:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: MySelfManagedKafkaTopic
consumerGroupId: MyConsumerGroupId
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
Expand All @@ -91,6 +96,7 @@ functions:
clientCertificateTlsAuth: arn:aws:secretsmanager:us-east-1:01234567890:secret:ClientCertificateTLS
serverRootCaCertificate: arn:aws:secretsmanager:us-east-1:01234567890:secret:ServerRootCaCertificate
topic: MySelfManagedMTLSKafkaTopic
consumerGroupId: MyConsumerGroupId
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
Expand Down
20 changes: 20 additions & 0 deletions docs/providers/aws/events/msk.md
Expand Up @@ -64,6 +64,26 @@ functions:
startingPosition: LATEST
```

Optionally, you can provide the following properties:

- `consumerGroupId` - the consumer group id to use for consuming messages

For example:

```yml
functions:
compute:
handler: handler.compute
events:
- msk:
arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx
topic: mytopic
batchSize: 1000
maximumBatchingWindow: 30
startingPosition: LATEST
consumerGroupId: MyConsumerGroupId
```

## Enabling and disabling MSK event

The `msk` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages.
Expand Down
4 changes: 4 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -970,6 +970,8 @@ functions:
enabled: false
# Optional, arn of the secret key for authenticating with the brokers in your MSK cluster.
saslScram512: arn:aws:secretsmanager:region:XXXXXX:secret:AmazonMSK_xxxxxx
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
consumerGroupId: MyConsumerGroupId
```

### ActiveMQ
Expand Down Expand Up @@ -1025,6 +1027,8 @@ functions:
startingPosition: LATEST
# (default: true)
enabled: false
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
consumerGroupId: MyConsumerGroupId
```

### RabbitMQ
Expand Down
13 changes: 12 additions & 1 deletion lib/plugins/aws/package/compile/events/kafka.js
Expand Up @@ -89,6 +89,11 @@ class AwsCompileKafkaEvents {
topic: {
type: 'string',
},
consumerGroupId: {
type: 'string',
maxLength: 200,
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
},
additionalProperties: false,
required: ['accessConfigurations', 'bootstrapServers', 'topic'],
Expand Down Expand Up @@ -142,7 +147,7 @@ class AwsCompileKafkaEvents {
}

hasKafkaEvent = true;
const { topic, batchSize, maximumBatchingWindow, enabled } = event.kafka;
const { topic, batchSize, maximumBatchingWindow, enabled, consumerGroupId } = event.kafka;
const startingPosition = event.kafka.startingPosition || 'TRIM_HORIZON';

const kafkaEventLogicalId = this.provider.naming.getKafkaEventLogicalId(
Expand Down Expand Up @@ -232,6 +237,12 @@ class AwsCompileKafkaEvents {
kafkaResource.Properties.Enabled = enabled;
}

if (consumerGroupId) {
kafkaResource.Properties.SelfManagedKafkaEventSourceConfig = {
ConsumerGroupId: consumerGroupId,
};
}

cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});

Expand Down
12 changes: 12 additions & 0 deletions lib/plugins/aws/package/compile/events/msk/index.js
Expand Up @@ -42,6 +42,11 @@ class AwsCompileMSKEvents {
type: 'string',
},
saslScram512: { $ref: '#/definitions/awsArnString' },
consumerGroupId: {
type: 'string',
maxLength: 200,
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
},
additionalProperties: false,
required: ['arn', 'topic'],
Expand Down Expand Up @@ -81,6 +86,7 @@ class AwsCompileMSKEvents {
const enabled = event.msk.enabled;
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;

const mskClusterNameToken = getMskClusterNameToken(eventSourceArn);
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId(
Expand Down Expand Up @@ -114,6 +120,12 @@ class AwsCompileMSKEvents {
mskResource.Properties.MaximumBatchingWindowInSeconds = maximumBatchingWindow;
}

if (consumerGroupId) {
mskResource.Properties.AmazonManagedKafkaEventSourceConfig = {
ConsumerGroupId: consumerGroupId,
};
}

if (enabled != null) {
mskResource.Properties.Enabled = enabled;
}
Expand Down
37 changes: 37 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/kafka.test.js
Expand Up @@ -457,6 +457,43 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
await runCompileEventSourceMappingTest(eventConfig);
});

it('should correctly compile EventSourceMapping resource properties for ConsumerGroupId', async () => {
const eventConfig = {
event: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: {
clientCertificateTlsAuth: clientCertificateTlsAuthArn,
},
consumerGroupId: 'my-consumer-group-id',
},
resource: (awsNaming) => {
return {
SelfManagedEventSource: {
Endpoints: {
KafkaBootstrapServers: ['abc.xyz:9092'],
},
},
SourceAccessConfigurations: [
{
Type: 'CLIENT_CERTIFICATE_TLS_AUTH',
URI: clientCertificateTlsAuthArn,
},
],
StartingPosition: 'TRIM_HORIZON',
Topics: [topic],
FunctionName: {
'Fn::GetAtt': [awsNaming.getLambdaLogicalId('basic'), 'Arn'],
},
SelfManagedKafkaEventSourceConfig: {
ConsumerGroupId: 'my-consumer-group-id',
},
};
},
};
await runCompileEventSourceMappingTest(eventConfig);
});

it('should update default IAM role with EC2 statement when VPC accessConfiguration is provided', async () => {
const { cfTemplate } = await runServerless({
fixture: 'function',
Expand Down
Expand Up @@ -16,6 +16,7 @@ describe('AwsCompileMSKEvents', () => {
const maximumBatchingWindow = 10;
const saslScram512 =
'arn:aws:secretsmanager:us-east-1:111111111111:secret:AmazonMSK_a1a1a1a1a1a1a1a1';
const consumerGroupId = 'TestConsumerGroupId';
const sourceAccessConfigurations = [
{
Type: 'SASL_SCRAM_512_AUTH',
Expand Down Expand Up @@ -55,6 +56,7 @@ describe('AwsCompileMSKEvents', () => {
enabled,
startingPosition,
saslScram512,
consumerGroupId,
},
},
],
Expand Down Expand Up @@ -122,6 +124,9 @@ describe('AwsCompileMSKEvents', () => {
FunctionName: {
'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'],
},
AmazonManagedKafkaEventSourceConfig: {
ConsumerGroupId: consumerGroupId,
},
});
});
});
Expand Down

0 comments on commit 9bb3f11

Please sign in to comment.