Skip to content

Commit

Permalink
CUMULUS-1242: Throttle priority executions (#985)
Browse files Browse the repository at this point in the history
* refactor sf-starter lambda to use async handler & add stub functions for throttling priority executions

* update sf-starter lambda unit tests

* add reference to SemaphoresTable for sqs2sf lambda

* add test to show that sf-starter increments priority semaphore

* add DynamoDb.put() helper

* export DynamoDb utility from common package

* refactor sf-semaphore-down tests to use DynamoDb.put

* add unit tests for sf-starter lambda

* add more unit tests

* fix thrown error message for failed increment semaphore operation

* update localstack to 0.9.2 to fix SQS testing issues

* update handling of SQS message visibilityTimeout and allow 0 as a valid value

* update sf-starter unit test to check number of SQS messages remaining

* add sf-starter test for handling messages of multiple priority levels

* separate handlers for processing normal and priority messages from SQS & add unit tests for incrementAndDispatch()

* fix variable reference

* update handling of visibility timeout for receiveSQSMessages

* fix webpack entry for sf-semaphore-down lambda

* revert localstack to version 0.8.7

* re-factor sf-starter to improve testability

* update unit tests

* fix error handling for missing priority key

* remove unnecessary DynamoDb.put() helper

* fix eslint issues

* fix Consumer unit tests

* add function docs

* add sqs2sfThrottle lambda to deployment

* remove redundant logging

* update CHANGELOG

* Update CHANGELOG.md

Co-Authored-By: laurenfrederick <lauren@element84.com>

* Update packages/api/lambdas/sf-starter.js

Co-Authored-By: Marc <yjpa7145@users.noreply.github.com>

* Update packages/api/lambdas/sf-starter.js

Co-Authored-By: Marc <yjpa7145@users.noreply.github.com>

* refactor incrementAndDispatch

* fix unit test

* explicitly disallow 0 as a timeLimit for sqs2sf lambda

* fix internals of incrementAndDispatch & update test to mock startExecution() instead of dispatch()

* remove unused import
  • Loading branch information
markdboyd committed May 17, 2019
1 parent cc09461 commit 2d25c3d
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 66 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ If running Cumulus within a VPC and extended downtime is acceptable, we recommen

## Added

- **CUMULUS-1242** - Added `sqs2sfThrottle` lambda. The lambda reads SQS messages for queued executions and uses semaphores to only start new executions if the maximum number of executions defined for the priority key (`cumulus_meta.priorityKey`) has not been reached. Any SQS messages that are read but not used to start executions remain in the queue.

- **CUMULUS-1240**
- Added `sfSemaphoreDown` lambda. This lambda receives SNS messages and for each message it decrements the semaphore used to track the number of running executions if:
- the message is for a completed/failed workflow AND
- the message contains a level of priority (`cumulus_meta.priorityKey`)
- Added `sfSemaphoreDown` lambda as a subscriber to the `sfTracker` SNS topic

- **CUMULUS-1265**
- Added `apiConfigs` configuration option to configure API Gateway to be private
- All internal lambdas configured to run inside the VPC by default
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ services:
- 4567-4570:4567-4570
- 4572-4582:4572-4582
- 8080:8080

locales:
image: elasticsearch:5.6
environment:
Expand Down
11 changes: 10 additions & 1 deletion packages/api/config/lambdas.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
sqs2sf:
handler: index.handler
handler: index.sqs2sfHandler
timeout: 200
memory: 128
source: 'node_modules/@cumulus/api/dist/sfStarter/'
launchInVpc: true

sqs2sfThrottle:
handler: index.sqs2sfThrottleHandler
timeout: 200
memory: 128
source: 'node_modules/@cumulus/api/dist/sfStarter/'
launchInVpc: true
tables:
- SemaphoresTable

sns2elasticsearch:
handler: index.handler
Expand Down
159 changes: 141 additions & 18 deletions packages/api/lambdas/sf-starter.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
'use strict';

const uuidv4 = require('uuid/v4');
const { sfn } = require('@cumulus/common/aws');
const get = require('lodash.get');
const {
aws: {
dynamodbDocClient,
sfn
},
errors: {
ResourcesLockedError
},
log,
Semaphore,
util: {
isNil
}
} = require('@cumulus/common');
const { Consumer } = require('@cumulus/ingest/consumer');

/**
Expand All @@ -27,33 +41,142 @@ function dispatch(message) {
}

/**
* This is an SQS Queue consumer.
* Increment the priority semaphore.
*
* It reads messages from a given sqs queue based on the configuration provided
* in the event object
* @param {string} key - Key for the priority semaphore
* @param {number} maximum - Maximum number of executions allowed for this semaphore
* @returns {Promise}
* @throws {Error}
*/
async function incrementPrioritySemaphore(key, maximum) {
const semaphore = new Semaphore(
dynamodbDocClient(),
process.env.SemaphoresTable
);

try {
await semaphore.up(key, maximum);
} catch (err) {
if (err instanceof ResourcesLockedError) {
log.info(`Unable to start new execution: the maximum number of executions for ${key} are already running.`);
}
throw err;
}
}

/**
* Attempt to increment the priority semaphore and start a new execution.
*
* If `incrementPrioritySemaphore()` is unable to increment the priority semaphore,
* it throws an error and `dispatch()` is not called.
*
* @param {Object} queueMessage - SQS message
* @returns {Promise} - Promise returned by `dispatch()`
* @throws {Error}
*/
async function incrementAndDispatch(queueMessage) {
const cumulusMeta = get(queueMessage, 'Body.cumulus_meta', {});

const priorityKey = cumulusMeta.priorityKey;
if (isNil(priorityKey)) {
throw new Error('cumulus_meta.priorityKey not set in message');
}

const maxExecutions = get(cumulusMeta, `priorityLevels.${priorityKey}.maxExecutions`);
if (isNil(maxExecutions)) {
throw new Error(`Could not determine maximum executions for priority ${priorityKey}`);
}

await incrementPrioritySemaphore(priorityKey, maxExecutions);

return dispatch(queueMessage);
}

/**
* This is an SQS queue consumer.
*
* It reads messages from a given SQS queue based on the configuration provided
* in the event object.
*
* The default is to read 1 message from a given queueUrl and quit after 240
* seconds
* seconds.
*
* @param {Object} event - lambda input message
* @param {string} event.queueUrl - AWS SQS url
* @param {string} event.messageLimit - number of messages to read from SQS for
* this execution (default 1)
* @param {string} event.timeLimit - how many seconds the lambda function will
* remain active and query the queue (default 240 s)
* @param {Object} _context - lambda context
* @param {function} cb - lambda callback
* @returns {undefined} - undefined
* @param {function} dispatchFn - the function to dispatch to process each message
* @param {number} visibilityTimeout - how many seconds messages received from
* the queue will be invisible before they can be read again
* @returns {Promise} - A promise resolving to how many executions were started
* @throws {Error}
*/
function handler(event, _context, cb) {
async function handleEvent(event, dispatchFn, visibilityTimeout) {
const messageLimit = event.messageLimit || 1;
const timeLimit = event.timeLimit || 240;

if (event.queueUrl) {
const con = new Consumer(event.queueUrl, messageLimit, timeLimit);
con.consume(dispatch)
.then((r) => cb(null, r))
.catch(cb);
} else cb(new Error('queueUrl is missing'));
const timeLimit = get(event, 'timeLimit', 240);

if (!event.queueUrl) {
throw new Error('queueUrl is missing');
}

if (timeLimit <= 0) {
throw new Error('timeLimit must be greater than 0');
}

const consumer = new Consumer({
queueUrl: event.queueUrl,
messageLimit,
timeLimit,
visibilityTimeout
});
return consumer.consume(dispatchFn);
}
module.exports = { handler };

/**
* Handler for messages from normal SQS queues.
*
* @param {Object} event - Lambda input message from SQS
* @returns {Promise} - A promise resolving to how many executions were started
* @throws {Error}
*/
async function sqs2sfHandler(event) {
return handleEvent(event, dispatch);
}

/**
* Wrapper for handler of priority SQS messages.
*
* Using a wrapper function allows injecting optional parameters
* in testing, such as the visibility timeout when reading SQS
* messages.
*
* @param {Object} event - Lambda input message from SQS
* @param {number} visibilityTimeout - Optional visibility timeout to use when reading
* SQS messages
* @returns {Promise} - A promise resolving to how many executions were started
* @throws {Error}
*/
function handleThrottledEvent(event, visibilityTimeout) {
return handleEvent(event, incrementAndDispatch, visibilityTimeout);
}

/**
* Handler for messages from priority SQS queues.
*
* @param {Object} event - Lambda input message from SQS
* @returns {Promise} - A promise resolving to how many executions were started
* @throws {Error}
*/
async function sqs2sfThrottleHandler(event) {
return handleThrottledEvent(event);
}

module.exports = {
incrementAndDispatch,
sqs2sfHandler,
sqs2sfThrottleHandler,
handleEvent,
handleThrottledEvent
};
91 changes: 67 additions & 24 deletions packages/api/tests/lambdas/test-sf-semaphore-down.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@ const createSnsWorkflowMessage = ({

let manager;

const setSemaphoreValue = (key, value) =>
aws.dynamodbDocClient().put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: value
}
}).promise();

test.before(async () => {
process.env.SemaphoresTable = randomId('semaphoreTable');
manager = new Manager({
Expand All @@ -57,6 +48,7 @@ test.beforeEach(async (t) => {
aws.dynamodbDocClient(),
process.env.SemaphoresTable
);
t.context.client = aws.dynamodbDocClient();
});

test.after.always(() => manager.deleteTable());
Expand Down Expand Up @@ -98,10 +90,17 @@ test('getSemaphoreDecrementTasks() returns empty array for SNS message with empt
});

test('sfSemaphoreDown lambda does nothing for a workflow message with no priority info', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const key = randomId('low');

await setSemaphoreValue(key, 1);
await client.put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: 1
}
}).promise();

await handler({
Records: [
createSnsWorkflowMessage({
Expand All @@ -115,10 +114,17 @@ test('sfSemaphoreDown lambda does nothing for a workflow message with no priorit
});

test('sfSemaphoreDown lambda does nothing for a workflow message with no status', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const key = randomId('low');

await setSemaphoreValue(key, 1);
await client.put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: 1
}
}).promise();

await handler({
Records: [
createSnsWorkflowMessage({
Expand All @@ -132,10 +138,17 @@ test('sfSemaphoreDown lambda does nothing for a workflow message with no status'
});

test('sfSemaphoreDown lambda does nothing for a workflow message for a running workflow', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const key = randomId('low');

await setSemaphoreValue(key, 1);
await client.put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: 1
}
}).promise();

await handler({
Records: [
createSnsWorkflowMessage({
Expand Down Expand Up @@ -163,11 +176,17 @@ test('sfSemaphoreDown lambda throws error when attempting to decrement empty sem
});

test('sfSemaphoreDown lambda decrements priority semaphore for completed workflow message', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const key = randomId('low');

// arbitrarily set semaphore so it can be decremented
await setSemaphoreValue(key, 1);
await client.put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: 1
}
}).promise();

await handler({
Records: [
Expand All @@ -183,11 +202,17 @@ test('sfSemaphoreDown lambda decrements priority semaphore for completed workflo
});

test('sfSemaphoreDown lambda decrements priority semaphore for failed workflow message', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const key = randomId('low');

// arbitrarily set semaphore so it can be decremented
await setSemaphoreValue(key, 1);
await client.put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: 1
}
}).promise();

await handler({
Records: [
Expand All @@ -203,11 +228,17 @@ test('sfSemaphoreDown lambda decrements priority semaphore for failed workflow m
});

test('sfSemaphoreDown lambda handles multiple updates to a single semaphore', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const key = randomId('low');

// Arbitrarily set semaphore value so it can be decremented
await setSemaphoreValue(key, 3);
await client.put({
TableName: process.env.SemaphoresTable,
Item: {
key,
semvalue: 3
}
}).promise();

await handler({
Records: [
Expand All @@ -227,13 +258,25 @@ test('sfSemaphoreDown lambda handles multiple updates to a single semaphore', as
});

test('sfSemaphoreDown lambda updates multiple semaphores', async (t) => {
const { semaphore } = t.context;
const { client, semaphore } = t.context;
const lowPriorityKey = randomId('low');
const medPriorityKey = randomId('med');

await Promise.all([
setSemaphoreValue(lowPriorityKey, 3),
setSemaphoreValue(medPriorityKey, 3)
client.put({
TableName: process.env.SemaphoresTable,
Item: {
key: lowPriorityKey,
semvalue: 3
}
}).promise(),
client.put({
TableName: process.env.SemaphoresTable,
Item: {
key: medPriorityKey,
semvalue: 3
}
}).promise()
]);

await handler({
Expand Down
Loading

0 comments on commit 2d25c3d

Please sign in to comment.