-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow to handle consumer retry failure at the user level #643
Conversation
I don't think test failure related to this change. |
Could you provide an example use-case for this change? I don't exactly see what would be the purpose of this, especially since as of the current implementation, there's no way for the user to actually affect the consumer (stopping it from restarting, for example). As it is currently, it's simply a hook that allows you to run some arbitrary code on crashes, which we already have in the form of the |
@Nevon from what i understand this change will allow for users to stop auto restart on system fail of the consumer if they wish. As We had few case cases which were caused by network issues (hard to constantly reproduce) where we want to stop consumer, wait while system recovers and then create new consumer. The behaviour was we trigger disconnect which does not resolve then when we are back we get old consumer reconnected but runner thinks that consumer is disconnected so basically bad state of the consumer all of that due to above I think that this |
I understand. So the goal is to allow users to bail out of the restart cycle. I can see how that would be useful, and I think it's a reasonable thing to allow for. In that case, we should change the implementation a bit though. Currently it's just a synchronous function that receives no arguments and returns nothing, which means that pretty much the only thing you can do in there is I would say that we should change it to be an async function that receives the error as an argument and then communicates back to the caller whether or not it should restart. Something like: if (e.name === 'KafkaJSNumberOfRetriesExceeded' || e.retriable === true) {
const shouldRestart = !retry.restartOnFailure || (await retry.restartOnFailure(e)).catch(error => {
logger.error('Caught error when invoking user-provided "restartOnFailure" callback. Defaulting to restarting.', {
error: error.message || error,
originalError: e.message || e,
groupId,
})
return true
})
if (shouldRestart) {
const retryTime = e.retryTime || retry.initialRetryTime || initialRetryTime
logger.error(`Restarting the consumer in ${retryTime}ms`, {
retryCount: e.retryCount,
retryTime,
groupId,
})
setTimeout(() => restart(onCrash), retryTime)
}
} This would allow you to stop the restarting depending on the error, as well as clean up any resources you might have (database connections etc.) or wait for ongoing processes to finish. |
@Nevon you approach looks better :) I have updated PR, let me know if anything else should be added |
I fixed a syntax error in the code that I proposed, and also pushed some tests to verify that it actually does what it says on the tin. |
Was this option released? I really needed it! |
Initial implementation for:
#69
Allow to handle consumer retry failure at the user level.
If you have any suggesting/improvement/changes let me know. Would be good to have this, or something similar soon :)