-
Notifications
You must be signed in to change notification settings - Fork 103
/
sqs-message-remover.js
102 lines (88 loc) · 3.19 KB
/
sqs-message-remover.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
'use strict';
const get = require('lodash.get');
const {
deleteSQSMessage,
sqs
} = require('@cumulus/common/aws');
const {
getSfEventMessageObject,
getSfEventStatus,
isFailedSfStatus,
isSfExecutionEvent,
isTerminalSfStatus
} = require('@cumulus/common/cloudwatch-event');
const log = require('@cumulus/common/log');
/**
* Determine if the SQS queue update is needed for the event
*
* Return false if:
* - Event has no workflow status
* - Workflow is not in a terminal state
* - Event message has no 'meta.eventSource' property or eventSource type is not 'sqs'
* - Event message property meta.eventSource.deleteCompletedMessage is not true
* - Event message property meta.eventSource.workflow_name is not the same as meta.workflow_name
*
* @param {Object} event - A workflow execution event
* @returns {boolean} True if SQS queue update is needed
*/
function isSqsQueueUpdateNeeded(event) {
const eventStatus = getSfEventStatus(event);
const eventMessage = getSfEventMessageObject(event, 'input', '{}');
if (!isSfExecutionEvent(event)
|| !isTerminalSfStatus(eventStatus)
|| get(eventMessage, 'meta.eventSource.type') !== 'sqs'
|| get(eventMessage, 'meta.eventSource.deleteCompletedMessage', false) !== true
|| get(eventMessage, 'meta.eventSource.workflow_name') === null
|| get(eventMessage, 'meta.eventSource.workflow_name') !== get(eventMessage, 'meta.workflow_name')) {
return false;
}
return true;
}
/**
* Update SQS queue when workflow of the message is completed
*
* @param {Object} event - Cloudwatch event
* @returns {Promise} A promise indicating function completion
*/
async function updateSqsQueue(event) {
if (!isSqsQueueUpdateNeeded(event)) return Promise.resolve('Not a valid event for updating SQS queue');
const eventStatus = getSfEventStatus(event);
const eventMessage = getSfEventMessageObject(event, 'input', '{}');
const {
queueUrl,
receiptHandle
} = eventMessage.meta.eventSource;
if (isFailedSfStatus(eventStatus)) {
// update visibilityTimeout to 5s so the message can be retried
log.debug(`update message ${receiptHandle} queue ${queueUrl} visibilityTimeout to 5s`);
const params = {
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: 5
};
await sqs().changeMessageVisibility(params).promise();
} else {
// delete SQS message from the source queue when the workflow succeeded
log.debug(`remove message ${receiptHandle} from queue ${queueUrl}`);
await deleteSQSMessage(queueUrl, receiptHandle);
}
return Promise.resolve();
}
/**
* Lambda handler for sqsMessageRemover Lambda
*
* This Lambda is triggered via a [Cloudwatch rule for any Step Function execution status
* changes] (https://docs.aws.amazon.com/step-functions/latest/dg/cw-events.html).
* It works together with sqsMessageConsumer. sqsMessageConsumer lambda
* consumes message from SQS queue, and sqsMessageRemover deletes the message from the SQS
* queue when the workflow triggered by the message is executed successfully.
*
* @param {Object} event - Cloudwatch event
* @returns {Promise}
*/
function handler(event) {
return updateSqsQueue(event);
}
module.exports = {
handler
};