From 90aab38597d43c7b7a339d3a5537915d2ba90e22 Mon Sep 17 00:00:00 2001 From: Guilherme Martins Crocetti Date: Mon, 20 Sep 2021 22:35:23 -0300 Subject: [PATCH] feat: Support 'tumblingWindowInSeconds' for stream events --- docs/providers/aws/events/streams.md | 23 ++++++++ .../aws/package/compile/events/stream.js | 10 +++- .../aws/package/compile/events/stream.test.js | 59 +++++++++++++++++++ 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/docs/providers/aws/events/streams.md b/docs/providers/aws/events/streams.md index 12fed255ccb..6dca19d65ba 100644 --- a/docs/providers/aws/events/streams.md +++ b/docs/providers/aws/events/streams.md @@ -320,3 +320,26 @@ functions: ``` For more information, read this [AWS blog post](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/) or this [AWS documentation](https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html). + +## Setting TumblingWindowInSeconds + +This configuration allows customers to aggregate values in near-realtime, allowing state to by passed forward by Lambda invocations. A event source created with this property adds several new attributes to the events delivered to the Lambda function. + +- **window**: beginning and ending timestamps of the tumbling window; +- **state**: an object containing state of a previous execution. Initially empty can contain up to **1mb** of data; +- **isFinalInvokeForWindow**: indicates if this is the last execution for the tumbling window; +- **isWindowTerminatedEarly**: happens only when the state object exceeds maximum allowed size of 1mb. + +For more information and examples, read the [AWS release announcement](https://aws.amazon.com/blogs/compute/using-aws-lambda-for-streaming-analytics/) + +Note: Serverless only sets this property if you explicitly add it to the stream configuration (see example below). + +```yml +functions: + preprocess: + handler: handler.preprocess + events: + - stream: + arn: arn:aws:dynamodb:region:XXXXXX:table/foo/stream/1970-01-01T00:00:00.000 + tumblingWindowInSeconds: 30 +``` diff --git a/lib/plugins/aws/package/compile/events/stream.js b/lib/plugins/aws/package/compile/events/stream.js index 622206a5945..03264140bcb 100644 --- a/lib/plugins/aws/package/compile/events/stream.js +++ b/lib/plugins/aws/package/compile/events/stream.js @@ -68,6 +68,7 @@ class AwsCompileStreamEvents { additionalProperties: false, required: ['onFailure'], }, + tumblingWindowInSeconds: { type: 'integer', minimum: 0, maximum: 900 }, }, additionalProperties: false, anyOf: [ @@ -203,11 +204,11 @@ class AwsCompileStreamEvents { DependsOn: dependsOn, Properties: { BatchSize, - ParallelizationFactor, + Enabled, EventSourceArn, FunctionName: resolveLambdaTarget(functionName, functionObj), + ParallelizationFactor, StartingPosition, - Enabled, }, }; @@ -242,6 +243,11 @@ class AwsCompileStreamEvents { event.stream.maximumRecordAgeInSeconds; } + if (event.stream.tumblingWindowInSeconds != null) { + streamResource.Properties.TumblingWindowInSeconds = + event.stream.tumblingWindowInSeconds; + } + if (event.stream.destinations) { let OnFailureDestinationArn; diff --git a/test/unit/lib/plugins/aws/package/compile/events/stream.test.js b/test/unit/lib/plugins/aws/package/compile/events/stream.test.js index 910e424d64c..4966c3fda9b 100644 --- a/test/unit/lib/plugins/aws/package/compile/events/stream.test.js +++ b/test/unit/lib/plugins/aws/package/compile/events/stream.test.js @@ -1665,4 +1665,63 @@ describe('AwsCompileStreamEvents #2', () => { ]); }); }); + describe('with TumblingWindowInSeconds enabled', () => { + let eventSourceMappingKinesisResource; + let eventSourceMappingDynamoDBResource; + let eventSourceMappingNoTumblingResource; + + before(async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + foo: { + events: [ + { + stream: { + arn: 'arn:aws:kinesis:us-east-1:123456789012:stream/myKinesisStream', + tumblingWindowInSeconds: 30, + }, + }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/myDDBstream/stream/1', + tumblingWindowInSeconds: 50, + }, + }, + { + stream: { + arn: 'arn:aws:dynamodb:region:account:table/noTumblingStream/stream/1', + }, + }, + ], + }, + }, + }, + command: 'package', + }); + const kinesisLogicalId = awsNaming.getStreamLogicalId('foo', 'kinesis', 'myKinesisStream'); + const dynamoLogicalId = awsNaming.getStreamLogicalId('foo', 'dynamodb', 'myDDBstream'); + const noTumblingLogicalId = awsNaming.getStreamLogicalId( + 'foo', + 'dynamodb', + 'noTumblingStream' + ); + + eventSourceMappingKinesisResource = cfTemplate.Resources[kinesisLogicalId]; + eventSourceMappingDynamoDBResource = cfTemplate.Resources[dynamoLogicalId]; + eventSourceMappingNoTumblingResource = cfTemplate.Resources[noTumblingLogicalId]; + }); + + it('should have TumblingWindowInSeconds property', () => { + expect(eventSourceMappingKinesisResource.Properties.TumblingWindowInSeconds).to.equal(30); + expect(eventSourceMappingDynamoDBResource.Properties.TumblingWindowInSeconds).to.equal(50); + }); + + it('should not have TumblingWindowInSeconds property', () => { + expect(eventSourceMappingNoTumblingResource.Properties).to.not.have.property( + 'TumblingWindowInSeconds' + ); + }); + }); });