Skip to content

Commit

Permalink
Merge pull request #352 from clarakosi/MassMessage
Browse files Browse the repository at this point in the history
base_executor: Improve logging and retry failed message commits
  • Loading branch information
Pchelolo committed Nov 17, 2020
2 parents cd8f40e + 26e50af commit 16c2430
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
29 changes: 28 additions & 1 deletion lib/base_executor.js
Expand Up @@ -100,9 +100,32 @@ class BaseExecutor {
);

this._commitTimeout = null;
// In order ti filter out the pending messages faster make them offset->msg map
this._pendingMsgs = new Set();
this._pendingCommits = new Map();
this._selfCheck = setInterval(() => {
for (const message of this._pendingMsgs.entries()) {
// If pending message is older than 1 minute then log error
if ((Date.now() - message.dequeue_timestamp) > 60000) {
this._logger.log('error/commit_check', () => ({
message: 'Commit failed',
offset: message.offset,
description: 'Pending message is older than 1 minute'
}));
}
}
for (const commitQueue of this._pendingCommits.values()) {
commitQueue.forEach(message => {
// If pending commit is older than 1 minute then log error
if ((Date.now() - message.dequeue_timestamp) > 60000) {
this._logger.log('error/commit_check', () => ({
message: 'Commit failed',
offset: message.offset,
description: 'Pending commit is older than 1 minute'
}));
}
});
}
}, 60000);
this._consuming = false;
this._connected = false;
}
Expand Down Expand Up @@ -167,6 +190,7 @@ class BaseExecutor {
if (handler) {
// We're pushing it to pending messages only if it matched so that items
// that don't match don't count against the concurrency limit.
msg.dequeue_timestamp = Date.now();
this._pendingMsgs.add(msg);
// Note: we don't return the promise here since we wanna process messages
// asynchronously from consuming them to be able to fill up the pendingMsg
Expand Down Expand Up @@ -278,6 +302,7 @@ class BaseExecutor {

_notifyFinished(finishedMsg) {
this._pendingMsgs.delete(finishedMsg);
finishedMsg.dequeue_timestamp = Date.now();
if (this._pendingCommits.has(finishedMsg.topic)) {
this._pendingCommits.get(finishedMsg.topic).push(finishedMsg);
} else {
Expand Down Expand Up @@ -322,6 +347,8 @@ class BaseExecutor {
}
return P.all(toCommit.map(message => this.consumer.commitMessageAsync(message)
.catch((e) => {
// Add the message back to pending commits to re-try commit on next pass
this._pendingCommits.get(message.topic).push(message);
this._logger.log('error/commit', () => ({
message: 'Commit failed',
offset: message.offset,
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "change-propagation",
"version": "0.10.3",
"version": "0.10.4",
"description": "Listens to events from Kafka and delivers them",
"main": "server.js",
"repository": {
Expand Down

0 comments on commit 16c2430

Please sign in to comment.