-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove nested retriers from producer #962
Conversation
In case we get a connection timeout, we currently don't refresh metadata in all cases. Fixes #950.
Co-authored-by: Sam <me@smartin.io>
This really just unwraps the error, as it would otherwise be wrapped in a KafkaJSNumberOfRetriesExceeded error Co-authored-by: Sam <me@smartin.io>
…ios/kafkajs into remove-nested-retriers-from-producer
Co-authored-by: Sam <me@smartin.io>
@@ -393,14 +394,14 @@ describe('Cluster > BrokerPool', () => { | |||
expect(broker.isConnected()).toEqual(true) | |||
}) | |||
|
|||
it('recreates the connection on connection errors', async () => { | |||
it('recreates the connection on ILLEGAL_SASL_STATE error', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure about this part. The comment seems to indicate that we only recreate the connection because of ILLEGAL_SASL_STATE errors, but this test indicates that we want to do it on any connection error. Yet on ECONNREFUSED we don't actually recreate the connection, as we bail out before reaching that.
Yet recreating the connection on any connection error sounds like madness, as there's plenty of state there that needs to be managed. For example, the RequestQueue lives there, and any inflight requests would need to be rejected. So I don't think that this was actually correct, even if we had a test that was trying to test for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think the intention was to cover SASL errors
This has been running in a fairly high throughput service by @smartinio for about 24h hours now without any issue, including dealing with a Kafka cluster redeploy (the client reconnected without a bunch of pointless retries towards the same host). |
There used to be a retrier in the
messageProducer
that would handle certain errors fromsendMessage
by doing cluster operations like refreshing metadata or reconnecting the cluster.sendMessages
also needed to have a retrier because it's doing potentially many requests to different brokers and needs to retry only the failed requests (i.e. it needs to own the retry semantics). This was solved by having a hardcoded retrier insidesendMessages
.The problem with this is that if there's an error that leads to retries inside
sendMessages
, it may also get retried on themessageProducer
level. So if you have 5 retries on themessageProducer
it could lead to up to 25 retries in total.Closes #958 as this handles the issue by having
sendMessage
refresh metadata itself, instead of bailing and letting themessageProducer
handle it.Fixes #950