Skip to content

Commit

Permalink
feat(AWS Lambda): Support for Amazon MQ RabbitMQ events (#9919)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael <michael@Michaels-MacBook-Pro.local>
Co-authored-by: Michael <michael@ip-192-168-158-61.eu-west-1.compute.internal>
  • Loading branch information
3 people committed Oct 19, 2021
1 parent cfd828e commit a3edecf
Show file tree
Hide file tree
Showing 14 changed files with 736 additions and 19 deletions.
10 changes: 5 additions & 5 deletions docs/providers/aws/events/activemq.md
@@ -1,19 +1,19 @@
<!--
title: Serverless Framework - AWS Lambda Events - MQ
title: Serverless Framework - AWS Lambda Events - ActiveMQ
menuText: ActiveMQ
description: Setting up AWS Active MQ Events with AWS Lambda via the Serverless Framework
description: Setting up AWS ActiveMQ Events with AWS Lambda via the Serverless Framework
layout: Doc
-->

<!-- DOCS-SITE-LINK:START automatically generated -->

### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/mq)
### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/activemq)

<!-- DOCS-SITE-LINK:END -->

# MQ
# ActiveMQ

An Active MQ message broker can be used as an event source for AWS Lambda.
An ActiveMQ message broker can be used as an event source for AWS Lambda.

## Simple event definition

Expand Down
75 changes: 75 additions & 0 deletions docs/providers/aws/events/rabbitmq.md
@@ -0,0 +1,75 @@
<!--
title: Serverless Framework - AWS Lambda Events - RabbitMQ
menuText: RabbitMQ
description: Setting up AWS RabbitMQ Events with AWS Lambda via the Serverless Framework
layout: Doc
-->

<!-- DOCS-SITE-LINK:START automatically generated -->

### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/rabbitmq)

<!-- DOCS-SITE-LINK:END -->

# RabbitMQ

A RabbitMQ message broker can be used as an event source for AWS Lambda.

## Simple event definition

In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined RabbitMQ `queue`.

In order to configure `rabbitmq` event, you have to provide three required properties:

- `basicAuthArn`, which is a [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/) ARN for credentials required to do basic auth to allow Lambda to connect to your message broker
- `queue` to consume messages from.
- `arn` arn for your Amazon MQ message broker

```yml
functions:
compute:
handler: handler.compute
events:
- rabbitmq:
arn: arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx
queue: queue-name
basicAuthArn: arn:aws:secretsmanager:us-east-1:01234567890:secret:MySecret
```

## Enabling and disabling RabbitMQ event

The `rabbitmq` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages.

In the following example, we specify that the `compute` function's `rabbitmq` event should be disabled.

```yml
functions:
compute:
handler: handler.compute
events:
- rabbitmq:
arn: arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx
queue: queue-name
enabled: false
basicAuthArn: arn:aws:secretsmanager:us-east-1:01234567890:secret:MySecret
```

## Specifying batch size

You can also specify `batchSize` of number of items to retrieve in a single batch. If not specified, this will default to `100`.

```yml
functions:
compute:
handler: handler.compute
events:
- rabbitmq:
arn: arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx
queue: queue-name
batchSize: 5000
basicAuthArn: arn:aws:secretsmanager:us-east-1:01234567890:secret:MySecret
```

## IAM Permissions

The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html#events-mq-permissions) for more information about IAM Permissions for Amazon MQ events.
8 changes: 8 additions & 0 deletions lib/plugins/aws/lib/naming.js
Expand Up @@ -448,6 +448,14 @@ module.exports = {
return `${normalizedFunctionName}EventSourceMappingActiveMQ${normalizedQueueName}`;
},

// RabbitMQ
getRabbitMQEventLogicalId(functionName, queueName) {
const normalizedFunctionName = this.getNormalizedFunctionName(functionName);
// QueueName is trimmed to 150 chars to avoid going over 255 character limit
const normalizedQueueName = this.normalizeNameToAlphaNumericOnly(queueName).slice(0, 150);
return `${normalizedFunctionName}EventSourceMappingRabbitMQ${normalizedQueueName}`;
},

// ALB
getAlbTargetGroupLogicalId(functionName, albId, multiValueHeaders) {
return `${this.getNormalizedFunctionName(functionName)}Alb${
Expand Down
142 changes: 142 additions & 0 deletions lib/plugins/aws/package/compile/events/rabbitmq.js
@@ -0,0 +1,142 @@
'use strict';

class AwsCompileRabbitMQEvents {
constructor(serverless) {
this.serverless = serverless;
this.provider = this.serverless.getProvider('aws');

this.hooks = {
'package:compileEvents': this.compileRabbitMQEvents.bind(this),
};

this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'rabbitmq', {
type: 'object',
properties: {
arn: {
anyOf: [
{
type: 'string',
pattern: 'arn:[a-z-]+:mq:[a-z0-9-]+:\\d+:broker:[A-Za-z0-9/_+=.@-]+:b-[a-z0-9-]+',
},
{ $ref: '#/definitions/awsCfImport' },
{ $ref: '#/definitions/awsCfRef' },
],
},
basicAuthArn: {
anyOf: [
{ $ref: '#/definitions/awsSecretsManagerArnString' },
{ $ref: '#/definitions/awsCfImport' },
{ $ref: '#/definitions/awsCfRef' },
],
},
batchSize: {
type: 'number',
minimum: 1,
maximum: 10000,
},
enabled: {
type: 'boolean',
},
queue: {
type: 'string',
},
},
additionalProperties: false,
required: ['basicAuthArn', 'arn', 'queue'],
});
}

compileRabbitMQEvents() {
this.serverless.service.getAllFunctions().forEach((functionName) => {
const functionObj = this.serverless.service.getFunction(functionName);
const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate;

// It is required to add the following statement in order to be able to connect to RabbitMQ cluster
const ec2Statement = {
Effect: 'Allow',
Action: [
'ec2:CreateNetworkInterface',
'ec2:DescribeNetworkInterfaces',
'ec2:DescribeVpcs',
'ec2:DeleteNetworkInterface',
'ec2:DescribeSubnets',
'ec2:DescribeSecurityGroups',
],
Resource: '*',
};

// The omission of kms:Decrypt is intentional, since we won't know
// which resources should be valid to decrypt. It's also probably
// not best practice to allow '*' for this.
const secretsManagerStatement = {
Effect: 'Allow',
Action: ['secretsmanager:GetSecretValue'],
Resource: [],
};

const brokerStatement = {
Effect: 'Allow',
Action: ['mq:DescribeBroker'],
Resource: [],
};

let hasMQEvent = false;

functionObj.events.forEach((event) => {
if (!event.rabbitmq) return;

hasMQEvent = true;
const { basicAuthArn, arn, batchSize, enabled, queue } = event.rabbitmq;

const mqEventLogicalId = this.provider.naming.getRabbitMQEventLogicalId(
functionName,
queue
);
const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName);
const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || [];

const mqResource = {
Type: 'AWS::Lambda::EventSourceMapping',
DependsOn: dependsOn,
Properties: {
FunctionName: {
'Fn::GetAtt': [lambdaLogicalId, 'Arn'],
},
EventSourceArn: arn,
Queues: [queue],
SourceAccessConfigurations: [
{
Type: 'BASIC_AUTH',
URI: basicAuthArn,
},
],
},
};

if (batchSize) {
mqResource.Properties.BatchSize = batchSize;
}

if (enabled != null) {
mqResource.Properties.Enabled = enabled;
}

brokerStatement.Resource.push(arn);
secretsManagerStatement.Resource.push(basicAuthArn);
cfTemplate.Resources[mqEventLogicalId] = mqResource;
});

// https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html#events-mq-permissions
if (cfTemplate.Resources.IamRoleLambdaExecution && hasMQEvent) {
const statement =
cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument
.Statement;
statement.push(secretsManagerStatement);
statement.push(brokerStatement);
statement.push(ec2Statement);
}
});
}
}

module.exports = AwsCompileRabbitMQEvents;
1 change: 1 addition & 0 deletions lib/plugins/index.js
Expand Up @@ -42,6 +42,7 @@ module.exports = [
require('./aws/package/compile/events/stream.js'),
require('./aws/package/compile/events/kafka.js'),
require('./aws/package/compile/events/activemq.js'),
require('./aws/package/compile/events/rabbitmq.js'),
require('./aws/package/compile/events/msk/index.js'),
require('./aws/package/compile/events/alb/index.js'),
require('./aws/package/compile/events/alexaSkill.js'),
Expand Down
66 changes: 59 additions & 7 deletions scripts/test/integration-setup/cloudformation.yml
Expand Up @@ -6,13 +6,23 @@ Parameters:
Description: Name of MSK Cluster
ActiveMQBrokerName:
Type: String
Description: Name of Active MQ Broker
Description: Name of ActiveMQ Broker
ActiveMQUser:
Type: String
Description: Name of Active MQ User
Description: Name of ActiveMQ User
ActiveMQPassword:
Type: String
Description: Password of Active MQ User
Description: Password of ActiveMQ User
NoEcho: true
RabbitMQBrokerName:
Type: String
Description: Name of RabbitMQ Broker
RabbitMQUser:
Type: String
Description: Name of RabbitMQ User
RabbitMQPassword:
Type: String
Description: Password of RabbitMQ User
NoEcho: true
ClusterConfigurationArn:
Type: String
Expand Down Expand Up @@ -132,7 +142,7 @@ Resources:
ActiveMQSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security Group for ApacheMQ
GroupDescription: Security Group for ActiveMQ
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
Expand Down Expand Up @@ -163,6 +173,36 @@ Resources:
Password: !Ref ActiveMQPassword
Username: !Ref ActiveMQUser

RabbitMQSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Security Group for RabbitMQ
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 5671
ToPort: 5671
CidrIp: 0.0.0.0/0

RabbitMQBroker:
Type: 'AWS::AmazonMQ::Broker'
Properties:
AutoMinorVersionUpgrade: 'false'
BrokerName: !Ref RabbitMQBrokerName
DeploymentMode: SINGLE_INSTANCE
EngineType: RABBITMQ
EngineVersion: '3.8.11'
HostInstanceType: mq.t3.micro
PubliclyAccessible: 'false'
SecurityGroups:
- !Ref RabbitMQSecurityGroup
SubnetIds:
- !Ref PrivateSubnetA
Users:
- ConsoleAccess: 'true'
Password: !Ref RabbitMQPassword
Username: !Ref RabbitMQUser

MSKCluster:
Type: 'AWS::MSK::Cluster'
Properties:
Expand Down Expand Up @@ -225,17 +265,29 @@ Outputs:
Value: !GetAtt VPC.DefaultSecurityGroup

ActiveMQBrokerArn:
Description: Created Active MQ Broker ARN
Description: Created ActiveMQ Broker ARN
Value: !GetAtt ActiveMQBroker.Arn

ActiveMQBrokerId:
Description: Created Active MQ Broker Id
Description: Created ActiveMQ Broker Id
Value: !Ref ActiveMQBroker

ActiveMQSecurityGroup:
Description: Security Group for Active MQ
Description: Security Group for ActiveMQ
Value: !Ref ActiveMQSecurityGroup

RabbitMQBrokerArn:
Description: Created RabbitMQ Broker ARN
Value: !GetAtt RabbitMQBroker.Arn

RabbitMQBrokerId:
Description: Created RabbitMQ Broker Id
Value: !Ref RabbitMQBroker

RabbitMQSecurityGroup:
Description: Security Group for RabbitMQ
Value: !Ref RabbitMQSecurityGroup

MSKCluster:
Description: Created MSK Cluster
Value: !Ref MSKCluster
Expand Down

0 comments on commit a3edecf

Please sign in to comment.