diff --git a/athena/queues/new-message-in-thread/buffer-email.js b/athena/queues/new-message-in-thread/buffer-email.js index 5748e93c27..1cb45b60e3 100644 --- a/athena/queues/new-message-in-thread/buffer-email.js +++ b/athena/queues/new-message-in-thread/buffer-email.js @@ -164,7 +164,26 @@ const bufferMessageNotificationEmail = async ( thread.id }` ); - const job = await bufferNewMessageEmailQueue.getJob(recipient.email); + let job = await bufferNewMessageEmailQueue.getJob(recipient.email); + // Try to remove the job if it exists. This fails sometimes due to a + // race condition where the getJob returns a failed or active job. + // We circumvent that issue by simply pretending like no job exists + // and adding a new one + // Ref: withspectrum/spectrum#3189 + if (job) { + debug(`timeout exists for ${recipient.email}, clearing`); + try { + await job.remove(); + } catch (err) { + try { + await job.finished(); + // Note(@mxstbr): This throws if the job fails to complete + // but we don't care if that happens, so we ignore it + } catch (err) {} + + job = null; + } + } if (!job) { debug( `creating new timeout for ${ @@ -185,9 +204,6 @@ const bufferMessageNotificationEmail = async ( ); } else { const timeout = job.data; - // If we already have a timeout going - debug(`timeout exists for ${recipient.email}, clearing`); - await job.remove(); debug(`adding new thread to ${recipient.email}'s threads`); timeout.threads.push(thread); timeout.notifications.push(notification); diff --git a/shared/bull/types.js b/shared/bull/types.js index d1f897d0a7..0d0d91fb8d 100644 --- a/shared/bull/types.js +++ b/shared/bull/types.js @@ -23,6 +23,7 @@ export type Job = {| id: string, data: JobData, remove: () => Promise, + finished: () => Promise, |}; type JobOptions = {|