Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event filtering patterns to stream event type #10285

19 changes: 19 additions & 0 deletions docs/providers/aws/events/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,22 @@ functions:
arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
tumblingWindowInSeconds: 30
```

## Setting filter patterns

This configuration allows customers to filter event before lambda invocation. It accepts up to 5 filter criterion by default and up to 10 with quota extension. If one event matches at least 1 pattern, lambda will process it.

For more details and examples of filter patterns, please see the [AWS event filtering documentation](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)

Note: Serverless only sets this property if you explicitly add it to the stream configuration (see an example below). The following example will only process inserted items in the DynamoDB table (it will skip removed and modified items).

```yml
functions:
handleInsertedDynamoDBItem:
handler: handler.preprocess
events:
- stream:
arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000
filterPatterns:
- eventName: [INSERT]
```
2 changes: 2 additions & 0 deletions docs/providers/aws/guide/serverless.yml.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ functions:
startingPosition: LATEST
enabled: true
functionResponseType: ReportBatchItemFailures
filterPatterns:
- partitionKey: [ 1 ]
- msk:
arn: arn:aws:kafka:us-east-1:111111111111:cluster/ClusterName/a1a1a1a1a1a1a1a1a # ARN of MSK Cluster
topic: kafkaTopic # name of Kafka topic to consume from
Expand Down
14 changes: 14 additions & 0 deletions lib/plugins/aws/package/compile/events/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class AwsCompileStreamEvents {
required: ['onFailure'],
},
tumblingWindowInSeconds: { type: 'integer', minimum: 0, maximum: 900 },
filterPatterns: {
type: 'array',
minItems: 1,
maxItems: 10,
items: { type: 'object' },
},
},
additionalProperties: false,
anyOf: [
Expand Down Expand Up @@ -273,6 +279,14 @@ class AwsCompileStreamEvents {
};
}

if (event.stream.filterPatterns) {
streamResource.Properties.FilterCriteria = {
Filters: event.stream.filterPatterns.map((pattern) => ({
Pattern: JSON.stringify(pattern),
fredericbarthelet marked this conversation as resolved.
Show resolved Hide resolved
})),
};
}

const newStreamObject = {
[streamLogicalId]: streamResource,
};
Expand Down
39 changes: 39 additions & 0 deletions test/unit/lib/plugins/aws/package/compile/events/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1724,4 +1724,43 @@ describe('AwsCompileStreamEvents #2', () => {
);
});
});
describe('with filterPatterns', () => {
let eventSourceMappingResource;

before(async () => {
const { awsNaming, cfTemplate } = await runServerless({
fixture: 'function',
configExt: {
functions: {
basic: {
events: [
{
stream: {
arn: 'arn:aws:dynamodb:region:account:table/foo/stream/1',
filterPatterns: [{ eventName: ['INSERT'] }, { eventName: ['MODIFY'] }],
},
},
],
},
},
},
command: 'package',
});
const streamLogicalId = awsNaming.getStreamLogicalId('basic', 'dynamodb', 'foo');
eventSourceMappingResource = cfTemplate.Resources[streamLogicalId];
});

it('should wrap patterns within FilterCriteria property', () => {
expect(eventSourceMappingResource.Properties.FilterCriteria).to.deep.equal({
Filters: [
{
Pattern: JSON.stringify({ eventName: ['INSERT'] }),
},
{
Pattern: JSON.stringify({ eventName: ['MODIFY'] }),
},
],
});
});
});
});