Skip to content

Commit

Permalink
Add dequeue_timestamp to pending messages and commits
Browse files Browse the repository at this point in the history
  • Loading branch information
clarakosi committed Nov 4, 2020
1 parent 19245d9 commit 26e50af
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions lib/base_executor.js
Expand Up @@ -105,7 +105,7 @@ class BaseExecutor {
this._selfCheck = setInterval(() => {
for (const message of this._pendingMsgs.entries()) {
// If pending message is older than 1 minute then log error
if ((Date.now() - message.timestamp) > 60000) {
if ((Date.now() - message.dequeue_timestamp) > 60000) {
this._logger.log('error/commit_check', () => ({
message: 'Commit failed',
offset: message.offset,
Expand All @@ -116,7 +116,7 @@ class BaseExecutor {
for (const commitQueue of this._pendingCommits.values()) {
commitQueue.forEach(message => {
// If pending commit is older than 1 minute then log error
if ((Date.now() - message.timestamp) > 60000) {
if ((Date.now() - message.dequeue_timestamp) > 60000) {
this._logger.log('error/commit_check', () => ({
message: 'Commit failed',
offset: message.offset,
Expand Down Expand Up @@ -190,6 +190,7 @@ class BaseExecutor {
if (handler) {
// We're pushing it to pending messages only if it matched so that items
// that don't match don't count against the concurrency limit.
msg.dequeue_timestamp = Date.now();
this._pendingMsgs.add(msg);
// Note: we don't return the promise here since we wanna process messages
// asynchronously from consuming them to be able to fill up the pendingMsg
Expand Down Expand Up @@ -301,6 +302,7 @@ class BaseExecutor {

_notifyFinished(finishedMsg) {
this._pendingMsgs.delete(finishedMsg);
finishedMsg.dequeue_timestamp = Date.now();
if (this._pendingCommits.has(finishedMsg.topic)) {
this._pendingCommits.get(finishedMsg.topic).push(finishedMsg);
} else {
Expand Down

0 comments on commit 26e50af

Please sign in to comment.