Skip to content

Commit

Permalink
feat: add nestedErrors when using allSettled
Browse files Browse the repository at this point in the history
  • Loading branch information
willfarrell committed Jan 4, 2022
1 parent 8705af0 commit bb910c8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 66 deletions.
97 changes: 33 additions & 64 deletions packages/sqs-partial-batch-failure/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,6 @@ const defaults = {
const sqsPartialBatchFailureMiddleware = (opts = {}) => {
const options = { ...defaults, ...opts }

// needs to be async for aws-sdk v3
const getQueueUrl = (eventSourceARN) => {
const [, , , , accountId, queueName] = eventSourceARN.split(':')
// aws-sdk v2
const urlParts = client.endpoint // possible alt, https://${client.config.endpoint}/
// aws-sdk v3
// const urlParts = await client.config.endpoint() // Missing `href` and a promise ... Why AWS, just why ... ?

return `${urlParts.protocol}//${urlParts.hostname}${urlParts.path}${accountId}/${queueName}`
}

const deleteSqsMessages = async (fulfilledRecords) => {
if (!fulfilledRecords?.length) return null

const Entries = getEntries(fulfilledRecords)
const { eventSourceARN } = fulfilledRecords[0]
const QueueUrl = getQueueUrl(eventSourceARN)

const promises = []

// deleteMessageBatch only supports 10 messages at a time
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html
let i; let j; const chunk = 10
for (i = 0, j = Entries.length; i < j; i += chunk) {
const chunkedEntries = Entries.slice(i, i + chunk)
promises.push(client.deleteMessageBatch({ Entries: chunkedEntries, QueueUrl }).promise())
}

return Promise.allSettled(promises)
}

let client
if (canPrefetch(options)) {
client = createPrefetchClient(options)
Expand All @@ -62,23 +31,21 @@ const sqsPartialBatchFailureMiddleware = (opts = {}) => {
event: { Records },
response
} = request
const rejectedReasons = getRejectedReasons(response)

const { fulfilledRecordEntries, rejectedRecordErrors } = splitRecords(response)

// If all messages were processed successfully, continue and let the messages be deleted by Lambda's native functionality
if (!rejectedReasons.length) return
if (!rejectedRecordErrors.length) return

/*
** Since we're dealing with batch records, we need to manually delete messages from the queue.
** On function failure, the remaining undeleted messages will automatically be retried and then
** eventually be automatically put on the DLQ if they continue to fail.
** If we didn't manually delete the successful messages, the entire batch would be retried/DLQd.
*/
const fulfilledRecords = getFulfilledRecords(Records, response)
await deleteSqsMessages(fulfilledRecords)
await client
.deleteMessageBatch({
Entries: fulfilledRecordEntries,
QueueUrl: getQueueUrl(client, Records[0].eventSourceARN)
})
.promise()

const errorMessage = getErrorMessage(rejectedReasons)
const error = new Error(errorMessage)
error.originalErrors = rejectedReasons
const error = new Error(`Failed to process SQS messages`)
error.nestedErrors = rejectedRecordErrors
throw error
}

Expand All @@ -87,30 +54,32 @@ const sqsPartialBatchFailureMiddleware = (opts = {}) => {
}
}

const getRejectedReasons = (response) => {
const filteredRes = response
.filter((r) => r.status === 'rejected')
.map((r) => r.reason)
const splitRecords = (response) => {
const fulfilledRecordEntries = []
const rejectedRecordErrors = []
response.forEach((record, idx) => {
if (record.status === 'rejected') {
rejectedRecordErrors.push(record.reason)
} else {
fulfilledRecordEntries.push({
Id: idx.toString(),
ReceiptHandle: record.receiptHandle
})
}
})
return { fulfilledRecordEntries, rejectedRecordErrors }

return filteredRes
}

const getFulfilledRecords = (records, response) => {
const fulfilledRecords = records.filter(
(r, index) => response[index].status === 'fulfilled'
)
// needs to be async for aws-sdk v3
const getQueueUrl = (client, eventSourceARN) => {
const [, , , , accountId, queueName] = eventSourceARN.split(':')
// aws-sdk v2
const urlParts = client.endpoint // possible alt, https://${client.config.endpoint}/
// aws-sdk v3
// const urlParts = await client.config.endpoint() // Missing `href` and a promise ... Why AWS, just why ... ?

return fulfilledRecords
return `${urlParts.protocol}//${urlParts.hostname}${urlParts.path}${accountId}/${queueName}`
}

const getEntries = (fulfilledRecords) => {
return fulfilledRecords.map((fulfilledRecord, key) => ({
Id: key.toString(),
ReceiptHandle: fulfilledRecord.receiptHandle
}))
}

const getErrorMessage = (rejectedReasons) => {
return rejectedReasons.map(error => error.message).join('\n')
}
module.exports = sqsPartialBatchFailureMiddleware
8 changes: 6 additions & 2 deletions packages/util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ const getInternal = async (variables, request) => {
values = await Promise.allSettled(promises)
const errors = values
.filter((res) => res.status === 'rejected')
.map((res) => res.reason.message)
if (errors.length) throw new Error(JSON.stringify(errors))
.map((res) => res.reason)
if (errors.length) {
const error = new Error('Failed to resolve internal values')
error.nestedErrors = errors
throw error
}
return keys.reduce(
(obj, key, index) => ({ ...obj, [sanitizeKey(key)]: values[index].value }),
{}
Expand Down

0 comments on commit bb910c8

Please sign in to comment.