Skip to content

Commit

Permalink
feat(AWS Kafka): Support functions[].events[].filterPatterns (#11645)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonrowsell committed Jan 5, 2023
1 parent b7d6af6 commit 6a5e8d9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 0 deletions.
22 changes: 22 additions & 0 deletions docs/providers/aws/events/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ functions:
enabled: false
```

## Setting filter patterns

This configuration allows to filter events before they are passed to a Lambda function for processing. By default, it accepts up to 5 filter criteria, but this can be increased to a maximum of 10 with a quota extension. If an event matches at least one of the specified filter patterns, the Lambda function will process it. For more information, see the [AWS Event Filtering](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html).

The following example demonstrates using this property to only process records that are published in the Kafka cluster where field `eventName` is equal to `INSERT`.

```yml
functions:
compute:
handler: handler.compute
events:
- kafka:
accessConfigurations:
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
topic: MySelfManagedKafkaTopic
bootstrapServers:
- abc3.xyz.com:9092
- abc2.xyz.com:9092
filterPatterns:
- eventName: INSERT
```

## IAM Permissions

The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) for more information about IAM Permissions for Kafka events.
Expand Down
3 changes: 3 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,9 @@ functions:
enabled: false
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
consumerGroupId: MyConsumerGroupId
# Optional, specifies event pattern content filtering
filterPatterns:
- eventName: INSERT
```

### RabbitMQ
Expand Down
10 changes: 10 additions & 0 deletions lib/plugins/aws/package/compile/events/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class AwsCompileKafkaEvents {
maxLength: 200,
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
filterPatterns: { $ref: '#/definitions/filterPatterns' },
},
additionalProperties: false,
required: ['accessConfigurations', 'bootstrapServers', 'topic'],
Expand Down Expand Up @@ -258,6 +259,15 @@ class AwsCompileKafkaEvents {
kafkaResource.Properties.StartingPositionTimestamp = startingPositionTimestamp;
}

const filterPatterns = event.kafka.filterPatterns;
if (filterPatterns) {
kafkaResource.Properties.FilterCriteria = {
Filters: filterPatterns.map((pattern) => ({
Pattern: JSON.stringify(pattern),
})),
};
}

cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
});

Expand Down
53 changes: 53 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/kafka.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
const startingPosition = 'LATEST';
const batchSize = 5000;
const maximumBatchingWindow = 20;
const filterPatterns = [{ eventName: 'INSERT' }];

describe('when there are kafka events defined', () => {
let minimalEventSourceMappingResource;
Expand Down Expand Up @@ -54,6 +55,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
maximumBatchingWindow,
enabled,
startingPosition,
filterPatterns,
},
},
],
Expand Down Expand Up @@ -125,6 +127,15 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
FunctionName: {
'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'],
},
FilterCriteria: {
Filters: [
{
Pattern: JSON.stringify({
eventName: 'INSERT',
}),
},
],
},
});
});
});
Expand Down Expand Up @@ -618,6 +629,48 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];
expect(eventSourceMappingResource.DependsOn).to.deep.equal([]);
});

it('should correctly compile EventSourceMapping resource properties for filterPatterns', async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
role: { 'Fn::ImportValue': 'MyImportedRole' },
events: [
{
kafka: {
topic,
bootstrapServers: ['abc.xyz:9092'],
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
filterPatterns: [{ value: { a: [1, 2] } }, { value: [3] }],
},
},
],
},
},
},
command: 'package',
});

const eventSourceMappingResource =
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];

expect(eventSourceMappingResource.Properties.FilterCriteria).to.deep.equal({
Filters: [
{
Pattern: JSON.stringify({
value: { a: [1, 2] },
}),
},
{
Pattern: JSON.stringify({
value: [3],
}),
},
],
});
});
});

describe('when no kafka events are defined', () => {
Expand Down

0 comments on commit 6a5e8d9

Please sign in to comment.