[bugfix] Fix regression with the upgrade to 2.8.x: Count corrently the number of partitions in ProduceRequest #1645
[bugfix] Fix regression with the upgrade to 2.8.x: Count corrently the number of partitions in ProduceRequest #1645
Conversation
…e number of partitions in ProduceRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test to avoid the regression?
Codecov Report
@@ Coverage Diff @@
## master #1645 +/- ##
=========================================
Coverage 15.77% 15.78%
- Complexity 612 613 +1
=========================================
Files 164 164
Lines 12249 12254 +5
Branches 1124 1124
=========================================
+ Hits 1932 1934 +2
- Misses 10158 10162 +4
+ Partials 159 158 -1
|
Sure, I will. |
I also have a couple of other follow up PRs regarding the upgrade of the kafka clients, in order to improve performances. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch @eolivelli.
final int numPartitions = produceRequest | ||
.data() | ||
.topicData() | ||
.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streams can be less efficient than iterating directly and adding to a mutable variable. Given that this code is on the hot path, can we use a for
loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While a generally agree on this point, the new Java model for the wire protocol is using this pattern with multiple layers (topics > partitions).
Doing 2 'for' loops will require in any case a similar amount of resources, especially if the data structures are small. The gain probably is negligible.
Please note that we could create some utility method and refactor the codebase. I didn't do it because I wasn't sure that we would be axtuslly able to make the code more readable or efficient.
The code in this form is pretty compact and readable.
I am leaning towards keeping it like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just took a look again. It only affects the time to send produce response and it might be a little hard to add the test. I will merge this fix first.
…e number of partitions in ProduceRequest (#1645) ### Motivation With the upgrade to Kafka 2.8.x we introduced a regression in the handleProduceRequest method, we are not counting correctly the number of partitions. If the client sends data for more than one partition for the same topic, then the result is a REQUEST_TIMED_OUT because we are calling "completeOne.run()" once per partition, but we are using the number of topics as the expected number of "completeOne" calls. ### Modifications Count correctly the number of partitions. (cherry picked from commit 9d0f57b)
Motivation
With the upgrade to Kafka 2.8.x we introduced a regression in the handleProduceRequest method, we are not counting
correctly the number of partitions.
If the client sends data for more than one partition for the same topic, then the result is a REQUEST_TIMED_OUT because we are calling "completeOne.run()" once per partition, but we are using the number of topics as the expected number of "completeOne" calls.
Modifications
Count correctly the number of partitions.
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Documentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)