-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Accept max in-flight requests on the connection #216
Accept max in-flight requests on the connection #216
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stoked to see this PR @tulios! It's a cool feature. I don't see any real blockers, just left a few thoughts and suggestions.
src/network/requestQueue/index.js
Outdated
return request | ||
} | ||
|
||
push(request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we advance through the queue essentially by chaining callbacks - when one requests succeeds/fails we add the next pending message to the queue. If for whatever reason an error or bug prevented the expected end condition from being met we could encounter some sort of a deadlock. I do think that in most cases the timeout will account for this, but it does make me wonder if it'd make sense to have some sort of "cleanup" process running on a setInterval to ensure the queue is advancing. Just a thought, I don't think it's necessarily something we have to implement now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case is covered, besides the timeout, all requests are rejected on socket errors/timeout (https://github.com/tulios/kafkajs/blob/master/src/network/connection.js#L116), the request queue will reject all inflight and pending requests.
@ianwsperber @Nevon I addressed all the comments, please take another look |
const current = this[PRIVATE.STATE].toString() | ||
|
||
throw new KafkaJSNonRetriableError( | ||
`Invalid state, can't transition from ${current} to ${next.toString()}` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically we only provide next
for logging purposes, since we only validate accepted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just to improve the error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making those updates @tulios! Looks good to me 👍
This PR adds support to max in-flight requests. The option is set to
null
now so we can test it out, I want to understand the impact of more or less concurrent requests before we commit to a default value.