From 26e50af2b695db5f90bb6527e028fd95fad289b1 Mon Sep 17 00:00:00 2001 From: Clara Andrew-Wani Date: Wed, 4 Nov 2020 15:58:42 -0500 Subject: [PATCH] Add dequeue_timestamp to pending messages and commits --- lib/base_executor.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/base_executor.js b/lib/base_executor.js index f046d6c..4a425dc 100644 --- a/lib/base_executor.js +++ b/lib/base_executor.js @@ -105,7 +105,7 @@ class BaseExecutor { this._selfCheck = setInterval(() => { for (const message of this._pendingMsgs.entries()) { // If pending message is older than 1 minute then log error - if ((Date.now() - message.timestamp) > 60000) { + if ((Date.now() - message.dequeue_timestamp) > 60000) { this._logger.log('error/commit_check', () => ({ message: 'Commit failed', offset: message.offset, @@ -116,7 +116,7 @@ class BaseExecutor { for (const commitQueue of this._pendingCommits.values()) { commitQueue.forEach(message => { // If pending commit is older than 1 minute then log error - if ((Date.now() - message.timestamp) > 60000) { + if ((Date.now() - message.dequeue_timestamp) > 60000) { this._logger.log('error/commit_check', () => ({ message: 'Commit failed', offset: message.offset, @@ -190,6 +190,7 @@ class BaseExecutor { if (handler) { // We're pushing it to pending messages only if it matched so that items // that don't match don't count against the concurrency limit. + msg.dequeue_timestamp = Date.now(); this._pendingMsgs.add(msg); // Note: we don't return the promise here since we wanna process messages // asynchronously from consuming them to be able to fill up the pendingMsg @@ -301,6 +302,7 @@ class BaseExecutor { _notifyFinished(finishedMsg) { this._pendingMsgs.delete(finishedMsg); + finishedMsg.dequeue_timestamp = Date.now(); if (this._pendingCommits.has(finishedMsg.topic)) { this._pendingCommits.get(finishedMsg.topic).push(finishedMsg); } else {