Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/events/msk.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ functions:
a: [1]
```

## Provisioned Poller Configuration

AWS recently added support for directly controlling the polling configuration for MSK ESMs. See the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/msk-scaling-modes.html) for more information. To use that feature, add the following section to your `serverless.yml` function config:

Note: at least one of `minimumPollers` or `maximumPollers` must be provided.

```yml
---
functions:
compute:
handler: handler.compute
events:
- msk:
arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx
topic: mytopic
provisionedPollerConfig:
minimumPollers: 2
maximumPollers: 5
```

## IAM Permissions

The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) for more information about IAM Permissions for MSK events.
13 changes: 13 additions & 0 deletions lib/plugins/aws/package/compile/events/msk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AwsCompileMSKEvents {
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
},
filterPatterns: { $ref: '#/definitions/filterPatterns' },
provisionedPollerConfig: { $ref: '#/definitions/mskProvisionedPollerConfig' },
},
additionalProperties: false,
required: ['arn', 'topic'],
Expand Down Expand Up @@ -102,6 +103,7 @@ class AwsCompileMSKEvents {
const saslScram512 = event.msk.saslScram512;
const consumerGroupId = event.msk.consumerGroupId;
const filterPatterns = event.msk.filterPatterns;
const provisionedPollerConfig = event.msk.provisionedPollerConfig;

const mskClusterNameToken = getMskClusterNameToken(eventSourceArn);
const mskEventLogicalId = this.provider.naming.getMSKEventLogicalId(
Expand Down Expand Up @@ -165,6 +167,17 @@ class AwsCompileMSKEvents {
})),
};
}
if (provisionedPollerConfig) {
mskResource.Properties.ProvisionedPollerConfig = {};
if (provisionedPollerConfig.minimumPollers != null) {
mskResource.Properties.ProvisionedPollerConfig.MinimumPollers =
provisionedPollerConfig.minimumPollers;
}
if (provisionedPollerConfig.maximumPollers != null) {
mskResource.Properties.ProvisionedPollerConfig.MaximumPollers =
provisionedPollerConfig.maximumPollers;
}
}

mskStatement.Resource.push(eventSourceArn);

Expand Down
7 changes: 7 additions & 0 deletions lib/plugins/aws/provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,13 @@ class AwsProvider {
maxItems: 10,
items: { type: 'object' },
},
mskProvisionedPollerConfig: {
type: 'object',
properties: {
minimumPollers: { type: 'integer', minimum: 1, maximum: 1000 },
maximumPollers: { type: 'integer', minimum: 1, maximum: 1000 },
},
},
},
provider: {
properties: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ describe('AwsCompileMSKEvents', () => {
},
];
const filterPatterns = [{ value: { a: [1, 2] } }, { value: [3] }];
const provisionedPollerConfig = {
minimumPollers: 2,
maximumPollers: 5,
};

describe('when there are msk events defined', () => {
let minimalEventSourceMappingResource;
Expand Down Expand Up @@ -61,6 +65,7 @@ describe('AwsCompileMSKEvents', () => {
saslScram512,
consumerGroupId,
filterPatterns,
provisionedPollerConfig,
},
},
],
Expand Down Expand Up @@ -146,6 +151,10 @@ describe('AwsCompileMSKEvents', () => {
},
],
},
ProvisionedPollerConfig: {
MinimumPollers: 2,
MaximumPollers: 5,
},
});
});

Expand Down
1 change: 1 addition & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ export interface AWS {
saslScram512?: AwsArnString;
consumerGroupId?: string;
filterPatterns?: FilterPatterns;
provisionedPollerConfig?: {maximumPollers?: number, minimumPollers?: number}
};
}
| {
Expand Down