Skip to content

Commit

Permalink
feat(AWS MSK): Support functions[].events[].filterPatterns (#11636)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyance-jain committed Dec 28, 2022
1 parent 906ea31 commit 63584a9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 0 deletions.
20 changes: 20 additions & 0 deletions docs/providers/aws/events/msk.md
Expand Up @@ -116,6 +116,26 @@ functions:
saslScram512: arn:aws:secretsmanager:region:XXXXXX:secret:AmazonMSK_xxxxxx
```

## Setting filter patterns

This configuration allows customers to filter event before lambda invocation. It accepts up to 5 filter criterion by default and up to 10 with quota extension. If one event matches at least 1 pattern, lambda will process it.

For more details and examples of filter patterns, please see the [AWS event filtering documentation](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)

Note: Serverless only sets this property if you explicitly add it to the `msk` configuration (see an example below). The following example will only process records that are published in the MSK cluster where field `a` is equal to 1 or 2.

```yml
functions:
handleInsertedDynamoDBItem:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
filterPatterns:
- value:
a: [1, 2]
```

## IAM Permissions

The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) for more information about IAM Permissions for MSK events.
4 changes: 4 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Expand Up @@ -979,6 +979,10 @@ functions:
saslScram512: arn:aws:secretsmanager:region:XXXXXX:secret:AmazonMSK_xxxxxx
# Optional, specifies the consumer group ID to be used when consuming from Kafka. If not provided, a random UUID will be generated
consumerGroupId: MyConsumerGroupId
# Optional, specifies event pattern content filtering
filterPatterns:
- value:
a: [1, 2]
```

### ActiveMQ
Expand Down
10 changes: 10 additions & 0 deletions lib/plugins/aws/package/compile/events/msk/index.js
Expand Up @@ -47,6 +47,7 @@ class AwsCompileMSKEvents {
maxLength: 200,
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
filterPatterns: { $ref: '#/definitions/filterPatterns' },
},
additionalProperties: false,
required: ['arn', 'topic'],
Expand Down Expand Up @@ -87,6 +88,7 @@ class AwsCompileMSKEvents {
const startingPosition = event.msk.startingPosition || 'TRIM_HORIZON';
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;
const filterPatterns = event.msk.filterPatterns;

const mskClusterNameToken = getMskClusterNameToken(eventSourceArn);
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId(
Expand Down Expand Up @@ -140,6 +142,14 @@ class AwsCompileMSKEvents {
mskResource.Properties.SourceAccessConfigurations = secureAccessConfigurations;
}

if (filterPatterns) {
mskResource.Properties.FilterCriteria = {
Filters: filterPatterns.map((pattern) => ({
Pattern: JSON.stringify(pattern),
})),
};
}

mskStatement.Resource.push(eventSourceArn);

cfTemplate.Resources[mskEventLogicalId] = mskResource;
Expand Down
16 changes: 16 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/msk/index.test.js
Expand Up @@ -23,6 +23,7 @@ describe('AwsCompileMSKEvents', () => {
URI: saslScram512,
},
];
const filterPatterns = [{ value: { a: [1, 2] } }, { value: [3] }];

describe('when there are msk events defined', () => {
let minimalEventSourceMappingResource;
Expand Down Expand Up @@ -57,6 +58,7 @@ describe('AwsCompileMSKEvents', () => {
startingPosition,
saslScram512,
consumerGroupId,
filterPatterns,
},
},
],
Expand Down Expand Up @@ -127,6 +129,20 @@ describe('AwsCompileMSKEvents', () => {
AmazonManagedKafkaEventSourceConfig: {
ConsumerGroupId: consumerGroupId,
},
FilterCriteria: {
Filters: [
{
Pattern: JSON.stringify({
value: { a: [1, 2] },
}),
},
{
Pattern: JSON.stringify({
value: [3],
}),
},
],
},
});
});
});
Expand Down

0 comments on commit 63584a9

Please sign in to comment.