From a3edecf0c6b4e066bde2de8095582432d9fdd635 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 19 Oct 2021 14:10:46 +0100 Subject: [PATCH] feat(AWS Lambda): Support for Amazon MQ RabbitMQ events (#9919) Co-authored-by: Michael Co-authored-by: Michael --- docs/providers/aws/events/activemq.md | 10 +- docs/providers/aws/events/rabbitmq.md | 75 +++++++ lib/plugins/aws/lib/naming.js | 8 + .../aws/package/compile/events/rabbitmq.js | 142 +++++++++++++ lib/plugins/index.js | 1 + .../test/integration-setup/cloudformation.yml | 66 +++++- scripts/test/integration-setup/index.js | 66 +++++- test/README.md | 4 +- .../programmatic/functionRabbitmq/core.js | 43 ++++ .../functionRabbitmq/package.json | 5 + .../functionRabbitmq/serverless.yml | 18 ++ .../aws/infra-dependent/rabbitmq.test.js | 116 ++++++++++ .../package/compile/events/rabbitmq.test.js | 198 ++++++++++++++++++ test/utils/cloudformation.js | 3 + 14 files changed, 736 insertions(+), 19 deletions(-) create mode 100644 docs/providers/aws/events/rabbitmq.md create mode 100644 lib/plugins/aws/package/compile/events/rabbitmq.js create mode 100644 test/fixtures/programmatic/functionRabbitmq/core.js create mode 100644 test/fixtures/programmatic/functionRabbitmq/package.json create mode 100644 test/fixtures/programmatic/functionRabbitmq/serverless.yml create mode 100644 test/integration/aws/infra-dependent/rabbitmq.test.js create mode 100644 test/unit/lib/plugins/aws/package/compile/events/rabbitmq.test.js diff --git a/docs/providers/aws/events/activemq.md b/docs/providers/aws/events/activemq.md index 00fe421f880..d6b108dfa79 100644 --- a/docs/providers/aws/events/activemq.md +++ b/docs/providers/aws/events/activemq.md @@ -1,19 +1,19 @@ -### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/mq) +### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/activemq) -# MQ +# ActiveMQ -An Active MQ message broker can be used as an event source for AWS Lambda. +An ActiveMQ message broker can be used as an event source for AWS Lambda. ## Simple event definition diff --git a/docs/providers/aws/events/rabbitmq.md b/docs/providers/aws/events/rabbitmq.md new file mode 100644 index 00000000000..569d6d3a328 --- /dev/null +++ b/docs/providers/aws/events/rabbitmq.md @@ -0,0 +1,75 @@ + + + + +### [Read this on the main serverless docs site](https://www.serverless.com/framework/docs/providers/aws/events/rabbitmq) + + + +# RabbitMQ + +A RabbitMQ message broker can be used as an event source for AWS Lambda. + +## Simple event definition + +In the following example, we specify that the `compute` function should be triggered whenever there are new messages available to consume from defined RabbitMQ `queue`. + +In order to configure `rabbitmq` event, you have to provide three required properties: + +- `basicAuthArn`, which is a [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/) ARN for credentials required to do basic auth to allow Lambda to connect to your message broker +- `queue` to consume messages from. +- `arn` arn for your Amazon MQ message broker + +```yml +functions: + compute: + handler: handler.compute + events: + - rabbitmq: + arn: arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx + queue: queue-name + basicAuthArn: arn:aws:secretsmanager:us-east-1:01234567890:secret:MySecret +``` + +## Enabling and disabling RabbitMQ event + +The `rabbitmq` event also supports `enabled` parameter, which is used to control if the event source mapping is active. Setting it to `false` will pause polling for and processing new messages. + +In the following example, we specify that the `compute` function's `rabbitmq` event should be disabled. + +```yml +functions: + compute: + handler: handler.compute + events: + - rabbitmq: + arn: arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx + queue: queue-name + enabled: false + basicAuthArn: arn:aws:secretsmanager:us-east-1:01234567890:secret:MySecret +``` + +## Specifying batch size + +You can also specify `batchSize` of number of items to retrieve in a single batch. If not specified, this will default to `100`. + +```yml +functions: + compute: + handler: handler.compute + events: + - rabbitmq: + arn: arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx + queue: queue-name + batchSize: 5000 + basicAuthArn: arn:aws:secretsmanager:us-east-1:01234567890:secret:MySecret +``` + +## IAM Permissions + +The Serverless Framework will automatically configure the most minimal set of IAM permissions for you. However you can still add additional permissions if you need to. Read the official [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html#events-mq-permissions) for more information about IAM Permissions for Amazon MQ events. diff --git a/lib/plugins/aws/lib/naming.js b/lib/plugins/aws/lib/naming.js index 031e6b62ee1..c5d2d5e6a24 100644 --- a/lib/plugins/aws/lib/naming.js +++ b/lib/plugins/aws/lib/naming.js @@ -448,6 +448,14 @@ module.exports = { return `${normalizedFunctionName}EventSourceMappingActiveMQ${normalizedQueueName}`; }, + // RabbitMQ + getRabbitMQEventLogicalId(functionName, queueName) { + const normalizedFunctionName = this.getNormalizedFunctionName(functionName); + // QueueName is trimmed to 150 chars to avoid going over 255 character limit + const normalizedQueueName = this.normalizeNameToAlphaNumericOnly(queueName).slice(0, 150); + return `${normalizedFunctionName}EventSourceMappingRabbitMQ${normalizedQueueName}`; + }, + // ALB getAlbTargetGroupLogicalId(functionName, albId, multiValueHeaders) { return `${this.getNormalizedFunctionName(functionName)}Alb${ diff --git a/lib/plugins/aws/package/compile/events/rabbitmq.js b/lib/plugins/aws/package/compile/events/rabbitmq.js new file mode 100644 index 00000000000..a1ee60ff78a --- /dev/null +++ b/lib/plugins/aws/package/compile/events/rabbitmq.js @@ -0,0 +1,142 @@ +'use strict'; + +class AwsCompileRabbitMQEvents { + constructor(serverless) { + this.serverless = serverless; + this.provider = this.serverless.getProvider('aws'); + + this.hooks = { + 'package:compileEvents': this.compileRabbitMQEvents.bind(this), + }; + + this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'rabbitmq', { + type: 'object', + properties: { + arn: { + anyOf: [ + { + type: 'string', + pattern: 'arn:[a-z-]+:mq:[a-z0-9-]+:\\d+:broker:[A-Za-z0-9/_+=.@-]+:b-[a-z0-9-]+', + }, + { $ref: '#/definitions/awsCfImport' }, + { $ref: '#/definitions/awsCfRef' }, + ], + }, + basicAuthArn: { + anyOf: [ + { $ref: '#/definitions/awsSecretsManagerArnString' }, + { $ref: '#/definitions/awsCfImport' }, + { $ref: '#/definitions/awsCfRef' }, + ], + }, + batchSize: { + type: 'number', + minimum: 1, + maximum: 10000, + }, + enabled: { + type: 'boolean', + }, + queue: { + type: 'string', + }, + }, + additionalProperties: false, + required: ['basicAuthArn', 'arn', 'queue'], + }); + } + + compileRabbitMQEvents() { + this.serverless.service.getAllFunctions().forEach((functionName) => { + const functionObj = this.serverless.service.getFunction(functionName); + const cfTemplate = this.serverless.service.provider.compiledCloudFormationTemplate; + + // It is required to add the following statement in order to be able to connect to RabbitMQ cluster + const ec2Statement = { + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }; + + // The omission of kms:Decrypt is intentional, since we won't know + // which resources should be valid to decrypt. It's also probably + // not best practice to allow '*' for this. + const secretsManagerStatement = { + Effect: 'Allow', + Action: ['secretsmanager:GetSecretValue'], + Resource: [], + }; + + const brokerStatement = { + Effect: 'Allow', + Action: ['mq:DescribeBroker'], + Resource: [], + }; + + let hasMQEvent = false; + + functionObj.events.forEach((event) => { + if (!event.rabbitmq) return; + + hasMQEvent = true; + const { basicAuthArn, arn, batchSize, enabled, queue } = event.rabbitmq; + + const mqEventLogicalId = this.provider.naming.getRabbitMQEventLogicalId( + functionName, + queue + ); + const lambdaLogicalId = this.provider.naming.getLambdaLogicalId(functionName); + const dependsOn = this.provider.resolveFunctionIamRoleResourceName(functionObj) || []; + + const mqResource = { + Type: 'AWS::Lambda::EventSourceMapping', + DependsOn: dependsOn, + Properties: { + FunctionName: { + 'Fn::GetAtt': [lambdaLogicalId, 'Arn'], + }, + EventSourceArn: arn, + Queues: [queue], + SourceAccessConfigurations: [ + { + Type: 'BASIC_AUTH', + URI: basicAuthArn, + }, + ], + }, + }; + + if (batchSize) { + mqResource.Properties.BatchSize = batchSize; + } + + if (enabled != null) { + mqResource.Properties.Enabled = enabled; + } + + brokerStatement.Resource.push(arn); + secretsManagerStatement.Resource.push(basicAuthArn); + cfTemplate.Resources[mqEventLogicalId] = mqResource; + }); + + // https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html#events-mq-permissions + if (cfTemplate.Resources.IamRoleLambdaExecution && hasMQEvent) { + const statement = + cfTemplate.Resources.IamRoleLambdaExecution.Properties.Policies[0].PolicyDocument + .Statement; + statement.push(secretsManagerStatement); + statement.push(brokerStatement); + statement.push(ec2Statement); + } + }); + } +} + +module.exports = AwsCompileRabbitMQEvents; diff --git a/lib/plugins/index.js b/lib/plugins/index.js index 1232a41e407..397e7d0450e 100644 --- a/lib/plugins/index.js +++ b/lib/plugins/index.js @@ -42,6 +42,7 @@ module.exports = [ require('./aws/package/compile/events/stream.js'), require('./aws/package/compile/events/kafka.js'), require('./aws/package/compile/events/activemq.js'), + require('./aws/package/compile/events/rabbitmq.js'), require('./aws/package/compile/events/msk/index.js'), require('./aws/package/compile/events/alb/index.js'), require('./aws/package/compile/events/alexaSkill.js'), diff --git a/scripts/test/integration-setup/cloudformation.yml b/scripts/test/integration-setup/cloudformation.yml index 16840a7be0e..e4f8aa3e206 100644 --- a/scripts/test/integration-setup/cloudformation.yml +++ b/scripts/test/integration-setup/cloudformation.yml @@ -6,13 +6,23 @@ Parameters: Description: Name of MSK Cluster ActiveMQBrokerName: Type: String - Description: Name of Active MQ Broker + Description: Name of ActiveMQ Broker ActiveMQUser: Type: String - Description: Name of Active MQ User + Description: Name of ActiveMQ User ActiveMQPassword: Type: String - Description: Password of Active MQ User + Description: Password of ActiveMQ User + NoEcho: true + RabbitMQBrokerName: + Type: String + Description: Name of RabbitMQ Broker + RabbitMQUser: + Type: String + Description: Name of RabbitMQ User + RabbitMQPassword: + Type: String + Description: Password of RabbitMQ User NoEcho: true ClusterConfigurationArn: Type: String @@ -132,7 +142,7 @@ Resources: ActiveMQSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: - GroupDescription: Security Group for ApacheMQ + GroupDescription: Security Group for ActiveMQ VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp @@ -163,6 +173,36 @@ Resources: Password: !Ref ActiveMQPassword Username: !Ref ActiveMQUser + RabbitMQSecurityGroup: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security Group for RabbitMQ + VpcId: !Ref VPC + SecurityGroupIngress: + - IpProtocol: tcp + FromPort: 5671 + ToPort: 5671 + CidrIp: 0.0.0.0/0 + + RabbitMQBroker: + Type: 'AWS::AmazonMQ::Broker' + Properties: + AutoMinorVersionUpgrade: 'false' + BrokerName: !Ref RabbitMQBrokerName + DeploymentMode: SINGLE_INSTANCE + EngineType: RABBITMQ + EngineVersion: '3.8.11' + HostInstanceType: mq.t3.micro + PubliclyAccessible: 'false' + SecurityGroups: + - !Ref RabbitMQSecurityGroup + SubnetIds: + - !Ref PrivateSubnetA + Users: + - ConsoleAccess: 'true' + Password: !Ref RabbitMQPassword + Username: !Ref RabbitMQUser + MSKCluster: Type: 'AWS::MSK::Cluster' Properties: @@ -225,17 +265,29 @@ Outputs: Value: !GetAtt VPC.DefaultSecurityGroup ActiveMQBrokerArn: - Description: Created Active MQ Broker ARN + Description: Created ActiveMQ Broker ARN Value: !GetAtt ActiveMQBroker.Arn ActiveMQBrokerId: - Description: Created Active MQ Broker Id + Description: Created ActiveMQ Broker Id Value: !Ref ActiveMQBroker ActiveMQSecurityGroup: - Description: Security Group for Active MQ + Description: Security Group for ActiveMQ Value: !Ref ActiveMQSecurityGroup + RabbitMQBrokerArn: + Description: Created RabbitMQ Broker ARN + Value: !GetAtt RabbitMQBroker.Arn + + RabbitMQBrokerId: + Description: Created RabbitMQ Broker Id + Value: !Ref RabbitMQBroker + + RabbitMQSecurityGroup: + Description: Security Group for RabbitMQ + Value: !Ref RabbitMQSecurityGroup + MSKCluster: Description: Created MSK Cluster Value: !Ref MSKCluster diff --git a/scripts/test/integration-setup/index.js b/scripts/test/integration-setup/index.js index 0d0fbf2c7be..2a2266c6477 100755 --- a/scripts/test/integration-setup/index.js +++ b/scripts/test/integration-setup/index.js @@ -12,14 +12,15 @@ const path = require('path'); const { SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, SHARED_INFRA_TESTS_ACTIVE_MQ_CREDENTIALS_NAME, + SHARED_INFRA_TESTS_RABBITMQ_CREDENTIALS_NAME, } = require('../../../test/utils/cloudformation'); -const ensureMQCredentialsSecret = async () => { +const ensureActiveMQCredentialsSecret = async () => { const ssmMqCredentials = { username: process.env.SLS_INTEGRATION_TESTS_ACTIVE_MQ_USER, password: process.env.SLS_INTEGRATION_TESTS_ACTIVE_MQ_PASSWORD, }; - log.notice('Creating SecretsManager Active MQ Credentials secret...'); + log.notice('Creating SecretsManager ActiveMQ Credentials secret...'); try { await awsRequest('SecretsManager', 'createSecret', { Name: SHARED_INFRA_TESTS_ACTIVE_MQ_CREDENTIALS_NAME, @@ -32,7 +33,26 @@ const ensureMQCredentialsSecret = async () => { } }; -const activeMqBrokerName = 'integration-tests-active-mq-broker'; +const ensureRabbitMQCredentialsSecret = async () => { + const ssmMqCredentials = { + username: process.env.SLS_INTEGRATION_TESTS_RABBITMQ_USER, + password: process.env.SLS_INTEGRATION_TESTS_RABBITMQ_PASSWORD, + }; + log.notice('Creating SecretsManager RabbitMQ Credentials secret...'); + try { + await awsRequest('SecretsManager', 'createSecret', { + Name: SHARED_INFRA_TESTS_RABBITMQ_CREDENTIALS_NAME, + SecretString: JSON.stringify(ssmMqCredentials), + }); + } catch (e) { + if (!e.code === 'ResourceExistsException') { + throw e; + } + } +}; + +const activeMqBrokerName = 'integration-tests-activemq-broker'; +const rabbitMqBrokerName = 'integration-tests-rabbitmq-broker'; async function handleInfrastructureCreation() { const [cfnTemplate, kafkaServerProperties] = await Promise.all([ @@ -40,7 +60,8 @@ async function handleInfrastructureCreation() { fsp.readFile(path.join(__dirname, 'kafka.server.properties')), ]); - await ensureMQCredentialsSecret(); + await ensureActiveMQCredentialsSecret(); + await ensureRabbitMQCredentialsSecret(); const clusterName = 'integration-tests-msk-cluster'; const clusterConfName = 'integration-tests-msk-cluster-configuration'; @@ -70,6 +91,15 @@ async function handleInfrastructureCreation() { ParameterKey: 'ActiveMQPassword', ParameterValue: process.env.SLS_INTEGRATION_TESTS_ACTIVE_MQ_PASSWORD, }, + { ParameterKey: 'RabbitMQBrokerName', ParameterValue: rabbitMqBrokerName }, + { + ParameterKey: 'RabbitMQUser', + ParameterValue: process.env.SLS_INTEGRATION_TESTS_RABBITMQ_USER, + }, + { + ParameterKey: 'RabbitMQPassword', + ParameterValue: process.env.SLS_INTEGRATION_TESTS_RABBITMQ_PASSWORD, + }, { ParameterKey: 'ClusterConfigurationArn', ParameterValue: clusterConfigurationArn }, { ParameterKey: 'ClusterConfigurationRevision', @@ -87,7 +117,8 @@ async function handleInfrastructureCreation() { async function handleInfrastructureUpdate() { log.notice('Updating integration tests CloudFormation stack...'); - await ensureMQCredentialsSecret(); + await ensureActiveMQCredentialsSecret(); + await ensureRabbitMQCredentialsSecret(); const cfnTemplate = await fsp.readFile(path.join(__dirname, 'cloudformation.yml'), 'utf8'); @@ -106,6 +137,15 @@ async function handleInfrastructureUpdate() { ParameterKey: 'ActiveMQPassword', ParameterValue: process.env.SLS_INTEGRATION_TESTS_ACTIVE_MQ_PASSWORD, }, + { ParameterKey: 'RabbitMQBrokerName', ParameterValue: rabbitMqBrokerName }, + { + ParameterKey: 'RabbitMQUser', + ParameterValue: process.env.SLS_INTEGRATION_TESTS_RABBITMQ_USER, + }, + { + ParameterKey: 'RabbitMQPassword', + ParameterValue: process.env.SLS_INTEGRATION_TESTS_RABBITMQ_PASSWORD, + }, { ParameterKey: 'ClusterConfigurationArn', UsePreviousValue: true }, { ParameterKey: 'ClusterConfigurationRevision', @@ -146,6 +186,22 @@ async function handleInfrastructureUpdate() { return; } + if (!process.env.SLS_INTEGRATION_TESTS_RABBITMQ_USER) { + log.error( + '"SLS_INTEGRATION_TESTS_RABBITMQ_USER" env variable has to be set when provisioning integration infrastructure' + ); + process.exitCode = 1; + return; + } + + if (!process.env.SLS_INTEGRATION_TESTS_RABBITMQ_PASSWORD) { + log.error( + '"SLS_INTEGRATION_TESTS_RABBITMQ_PASSWORD" env variable has to be set when provisioning integration infrastructure' + ); + process.exitCode = 1; + return; + } + let describeResponse; log.notice('Checking if integration tests CloudFormation stack already exists...'); diff --git a/test/README.md b/test/README.md index ef0ee90fb77..dfa6118f5bf 100644 --- a/test/README.md +++ b/test/README.md @@ -68,8 +68,8 @@ AWS_ACCESS_KEY_ID=XXX AWS_SECRET_ACCESS_KEY=xxx npx mocha test/integration/{chos Due to the fact that some of the tests require a bit more complex infrastructure setup which might be lengthy, two additional commands has been made available: -- `integration-test-setup-infrastructure` - used for setting up all needed intrastructure dependencies -- `integration-test-teardown-infrastructure` - used for tearing down the infrastructure setup by the above command +- `integration-test-setup` - used for setting up all needed intrastructure dependencies +- `integration-test-teardown` - used for tearing down the infrastructure setup by the above command Such tests take advantage of `isDependencyStackAvailable` util to check if all needed dependencies are ready. If not, it skips the given test suite. diff --git a/test/fixtures/programmatic/functionRabbitmq/core.js b/test/fixtures/programmatic/functionRabbitmq/core.js new file mode 100644 index 00000000000..4c823006610 --- /dev/null +++ b/test/fixtures/programmatic/functionRabbitmq/core.js @@ -0,0 +1,43 @@ +'use strict'; + +// NOTE: `amqplib` is bundled into the deployment package +// eslint-disable-next-line import/no-unresolved +const amqp = require('amqplib'); + +function consumer(event, context, callback) { + const functionName = 'consumer'; + const messages = []; + + Object.keys(event.rmqMessagesByQueue).forEach((queueKey) => { + const queue = event.rmqMessagesByQueue[queueKey]; + queue.forEach((message) => { + messages.push(Buffer.from(message.data, 'base64').toString()); + }); + }); + // eslint-disable-next-line no-console + console.log(functionName, JSON.stringify(messages)); + + return callback(null, event); +} + +async function producer() { + const connectOptions = { + protocol: 'amqps', + hostname: process.env.RABBITMQ_HOST, + port: 5671, + username: process.env.RABBITMQ_USERNAME, + password: process.env.RABBITMQ_PASSWORD, + }; + + const connection = await amqp.connect(connectOptions); + const channel = await connection.createChannel(); + const queueName = process.env.QUEUE_NAME; + await channel.assertQueue(queueName); + await channel.sendToQueue(queueName, Buffer.from('Hello from RabbitMQ Integration test!')); + + return { + statusCode: 200, + }; +} + +module.exports = { producer, consumer }; diff --git a/test/fixtures/programmatic/functionRabbitmq/package.json b/test/fixtures/programmatic/functionRabbitmq/package.json new file mode 100644 index 00000000000..0a22a3db431 --- /dev/null +++ b/test/fixtures/programmatic/functionRabbitmq/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "amqplib": "^0.8.0" + } +} diff --git a/test/fixtures/programmatic/functionRabbitmq/serverless.yml b/test/fixtures/programmatic/functionRabbitmq/serverless.yml new file mode 100644 index 00000000000..ce4f94dddd0 --- /dev/null +++ b/test/fixtures/programmatic/functionRabbitmq/serverless.yml @@ -0,0 +1,18 @@ +service: service + +configValidationMode: error +frameworkVersion: '*' + +# VPC and Events configuration is added dynamically during test run +# Because it has to be provisioned separately via CloudFormation stack + +provider: + name: aws + runtime: nodejs14.x + versionFunctions: false + +functions: + producer: + handler: core.producer + consumer: + handler: core.consumer diff --git a/test/integration/aws/infra-dependent/rabbitmq.test.js b/test/integration/aws/infra-dependent/rabbitmq.test.js new file mode 100644 index 00000000000..2c5c7a30c33 --- /dev/null +++ b/test/integration/aws/infra-dependent/rabbitmq.test.js @@ -0,0 +1,116 @@ +'use strict'; + +const { expect } = require('chai'); +const log = require('log').get('serverless:test'); +const fixtures = require('../../../fixtures/programmatic'); +const { confirmCloudWatchLogs } = require('../../../utils/misc'); +const { + isDependencyStackAvailable, + getDependencyStackOutputMap, + SHARED_INFRA_TESTS_RABBITMQ_CREDENTIALS_NAME, +} = require('../../../utils/cloudformation'); + +const awsRequest = require('@serverless/test/aws-request'); +const crypto = require('crypto'); +const { deployService, removeService } = require('../../../utils/integration'); + +describe('AWS - RabbitMQ Integration Test', function () { + this.timeout(1000 * 60 * 100); // Involves time-taking deploys + let stackName; + let serviceDir; + const stage = 'dev'; + + const queueName = `testqueue${crypto.randomBytes(8).toString('hex')}`; + + before(async () => { + const isDepsStackAvailable = await isDependencyStackAvailable(); + if (!isDepsStackAvailable) { + throw new Error('CloudFormation stack with integration test dependencies not found.'); + } + + const outputMap = await getDependencyStackOutputMap(); + + log.notice('Getting RabbitMQ Credentials ARN'); + const getSecretValueResponse = await awsRequest('SecretsManager', 'getSecretValue', { + SecretId: SHARED_INFRA_TESTS_RABBITMQ_CREDENTIALS_NAME, + }); + const { username: mqUsername, password: mqPassword } = JSON.parse( + getSecretValueResponse.SecretString + ); + + const describeBrokerResponse = await awsRequest('MQ', 'describeBroker', { + BrokerId: outputMap.get('RabbitMQBrokerId'), + }); + const amqpEndpoint = describeBrokerResponse.BrokerInstances[0].Endpoints.find((endpoint) => + endpoint.startsWith('amqp') + ); + + const serviceData = await fixtures.setup('functionRabbitmq', { + configExt: { + functions: { + producer: { + vpc: { + subnetIds: [outputMap.get('PrivateSubnetA')], + securityGroupIds: [outputMap.get('RabbitMQSecurityGroup')], + }, + environment: { + QUEUE_NAME: queueName, + RABBITMQ_PASSWORD: mqPassword, + RABBITMQ_USERNAME: mqUsername, + RABBITMQ_HOST: amqpEndpoint.split(':')[1].slice(2), + }, + }, + consumer: { + vpc: { + subnetIds: [outputMap.get('PrivateSubnetA')], + securityGroupIds: [outputMap.get('RabbitMQSecurityGroup')], + }, + events: [ + { + rabbitmq: { + arn: outputMap.get('RabbitMQBrokerArn'), + queue: queueName, + basicAuthArn: getSecretValueResponse.ARN, + }, + }, + ], + }, + }, + }, + }); + + ({ servicePath: serviceDir } = serviceData); + + const serviceName = serviceData.serviceConfig.service; + stackName = `${serviceName}-${stage}`; + await deployService(serviceDir); + }); + + after(async () => { + if (serviceDir) { + await removeService(serviceDir); + } + }); + + it('correctly processes messages from RabbitMQ queue', async () => { + const functionName = 'consumer'; + const message = 'Hello from RabbitMQ Integration test!'; + + const events = await confirmCloudWatchLogs( + `/aws/lambda/${stackName}-${functionName}`, + async () => + await awsRequest('Lambda', 'invoke', { + FunctionName: `${stackName}-producer`, + InvocationType: 'RequestResponse', + }), + { + checkIsComplete: (soFarEvents) => + soFarEvents.reduce((data, event) => data + event.message, '').includes(message), + } + ); + + const logs = events.reduce((data, event) => data + event.message, ''); + expect(logs).to.include(functionName); + expect(logs).to.include(message); + }); +}); diff --git a/test/unit/lib/plugins/aws/package/compile/events/rabbitmq.test.js b/test/unit/lib/plugins/aws/package/compile/events/rabbitmq.test.js new file mode 100644 index 00000000000..fb061de6d40 --- /dev/null +++ b/test/unit/lib/plugins/aws/package/compile/events/rabbitmq.test.js @@ -0,0 +1,198 @@ +'use strict'; + +const chai = require('chai'); +const runServerless = require('../../../../../../../utils/run-serverless'); + +const { expect } = chai; + +chai.use(require('chai-as-promised')); + +describe('test/unit/lib/plugins/aws/package/compile/events/rabbitmq.test.js', () => { + const brokerArn = 'arn:aws:mq:us-east-1:0000:broker:ExampleMQBroker:b-xxx-xxx'; + const basicAuthArn = 'arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName'; + const queue = 'TestingQueue'; + const enabled = false; + const batchSize = 5000; + + describe('when there are rabbitmq events defined', () => { + let minimalEventSourceMappingResource; + let allParamsEventSourceMappingResource; + let defaultIamRole; + let naming; + + before(async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + events: [ + { + rabbitmq: { + queue, + arn: brokerArn, + basicAuthArn, + }, + }, + ], + }, + other: { + events: [ + { + rabbitmq: { + queue, + arn: brokerArn, + basicAuthArn, + batchSize, + enabled, + }, + }, + ], + }, + }, + }, + command: 'package', + }); + naming = awsNaming; + minimalEventSourceMappingResource = + cfTemplate.Resources[naming.getRabbitMQEventLogicalId('basic', queue)]; + allParamsEventSourceMappingResource = + cfTemplate.Resources[naming.getRabbitMQEventLogicalId('other', queue)]; + defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + }); + + it('should correctly compile EventSourceMapping resource properties with minimal configuration', () => { + expect(minimalEventSourceMappingResource.Properties).to.deep.equal({ + EventSourceArn: brokerArn, + SourceAccessConfigurations: [ + { + Type: 'BASIC_AUTH', + URI: basicAuthArn, + }, + ], + Queues: [queue], + FunctionName: { + 'Fn::GetAtt': [naming.getLambdaLogicalId('basic'), 'Arn'], + }, + }); + }); + + it('should update default IAM role with DescribeBroker statement', () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: ['mq:DescribeBroker'], + Resource: [brokerArn], + }); + }); + + it('should update default IAM role with SecretsManager statement', () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: ['secretsmanager:GetSecretValue'], + Resource: [basicAuthArn], + }); + }); + + it('should correctly compile EventSourceMapping resource DependsOn ', () => { + expect(minimalEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + expect(allParamsEventSourceMappingResource.DependsOn).to.equal('IamRoleLambdaExecution'); + }); + + it('should correctly compile EventSourceMapping resource with all parameters', () => { + expect(allParamsEventSourceMappingResource.Properties).to.deep.equal({ + EventSourceArn: brokerArn, + BatchSize: batchSize, + Enabled: enabled, + SourceAccessConfigurations: [ + { + Type: 'BASIC_AUTH', + URI: basicAuthArn, + }, + ], + Queues: [queue], + FunctionName: { + 'Fn::GetAtt': [naming.getLambdaLogicalId('other'), 'Arn'], + }, + }); + }); + + it('should update default IAM role with EC2 statement', async () => { + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }); + }); + }); + + describe('configuring rabbitmq events', () => { + it('should not add dependsOn for imported role', async () => { + const { awsNaming, cfTemplate } = await runServerless({ + fixture: 'function', + configExt: { + functions: { + basic: { + role: { 'Fn::ImportValue': 'MyImportedRole' }, + events: [ + { + rabbitmq: { + queue, + arn: brokerArn, + basicAuthArn, + }, + }, + ], + }, + }, + }, + command: 'package', + }); + + const eventSourceMappingResource = + cfTemplate.Resources[awsNaming.getRabbitMQEventLogicalId('basic', queue)]; + expect(eventSourceMappingResource.DependsOn).to.deep.equal([]); + }); + }); + + describe('when no rabbitmq events are defined', () => { + it('should not modify the default IAM role', async () => { + const { cfTemplate } = await runServerless({ + fixture: 'function', + command: 'package', + }); + + const defaultIamRole = cfTemplate.Resources.IamRoleLambdaExecution; + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: ['mq:DescribeBroker'], + Resource: [brokerArn], + }); + + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: ['secretsmanager:GetSecretValue'], + Resource: [basicAuthArn], + }); + + expect(defaultIamRole.Properties.Policies[0].PolicyDocument.Statement).not.to.deep.include({ + Effect: 'Allow', + Action: [ + 'ec2:CreateNetworkInterface', + 'ec2:DescribeNetworkInterfaces', + 'ec2:DescribeVpcs', + 'ec2:DeleteNetworkInterface', + 'ec2:DescribeSubnets', + 'ec2:DescribeSecurityGroups', + ], + Resource: '*', + }); + }); + }); +}); diff --git a/test/utils/cloudformation.js b/test/utils/cloudformation.js index 0d6bf23818d..a3ca7fad0d3 100644 --- a/test/utils/cloudformation.js +++ b/test/utils/cloudformation.js @@ -5,6 +5,8 @@ const awsRequest = require('@serverless/test/aws-request'); const SHARED_INFRA_TESTS_CLOUDFORMATION_STACK = 'integration-tests-deps-stack'; const SHARED_INFRA_TESTS_ACTIVE_MQ_CREDENTIALS_NAME = 'integration-tests-active-mq-broker-credentials'; +const SHARED_INFRA_TESTS_RABBITMQ_CREDENTIALS_NAME = + 'integration-tests-rabbitmq-broker-credentials'; function findStacks(name, status) { const params = {}; @@ -104,6 +106,7 @@ module.exports = { getStackOutputMap, SHARED_INFRA_TESTS_CLOUDFORMATION_STACK, SHARED_INFRA_TESTS_ACTIVE_MQ_CREDENTIALS_NAME, + SHARED_INFRA_TESTS_RABBITMQ_CREDENTIALS_NAME, isDependencyStackAvailable, getDependencyStackOutputMap, };