-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19297: Refactor AsyncKafkaConsumer's use of Java Streams APIs in critical sections #19917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-19297: Refactor AsyncKafkaConsumer's use of Java Streams APIs in critical sections #19917
Conversation
…n critical sections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few comments to address.
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
Outdated
Show resolved
Hide resolved
if (requestState.isExpired()) | ||
expiredRequests.add(requestState); | ||
} | ||
|
||
expiredRequests.forEach(TopicMetadataRequestState::expire); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't you just not build expiredRequests
and call requestState.expire()
at L91?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the request’s expire()
method removes itself from inflightRequests
, won't that throw an exception because the for loop's underlying Iterator
wasn't used to remove the entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I take it back. Apparently it's not an error to remove it directly from the collection while iterating over the collection 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the superfluous list buildup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently it's not an error to remove it directly from the collection while iterating over the collection
Agree it doesn't seem to fail with the tests we have, but still I would expect it may not be safe to modify the collection from within the loop (and may lead to ConcurrentModification exception at some point?).
I would say we play safe and use an iterator, makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@AndrewJSchofield @lianetm—Anything else I need to fix before we can merge? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kirktrue!
Node node = entry.getKey(); | ||
FetchSessionHandler.FetchRequestData fetchRequestData = entry.getValue().build(); | ||
map.put(node, fetchRequestData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Node node = entry.getKey(); | |
FetchSessionHandler.FetchRequestData fetchRequestData = entry.getValue().build(); | |
map.put(node, fetchRequestData); | |
map.put(entry.getKey(), entry.getValue().build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just to make the code a little clearer about the exact processing and types in use. I'll make it more succinct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed as requested.
if (!revokePausedPartitions.isEmpty()) { | ||
log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); | ||
if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) { | ||
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that er end up printing the string in between [], can't we simply print the set directly?
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions)); | |
log.info("The pause flag in partitions {} will be removed due to revocation.", revokePausedPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Changed.
log.info("Revoking previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); | ||
|
||
if (log.isInfoEnabled()) | ||
log.info("Revoking previously assigned partitions {}", Utils.topicPartitionString(revokedPartitions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also I expect we can simplify and just print the set? Only difference is that it will include the brackets, which is ok I expect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Changed.
if (!(ce instanceof CompletableApplicationEvent)) | ||
continue; | ||
|
||
CompletableApplicationEvent<?> cae = (CompletableApplicationEvent<?>) ce; | ||
|
||
if (cae.requireSubscriptionMetadata()) | ||
subscriptionMetadataEvent.add(cae); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe simplify ?
if (!(ce instanceof CompletableApplicationEvent)) | |
continue; | |
CompletableApplicationEvent<?> cae = (CompletableApplicationEvent<?>) ce; | |
if (cae.requireSubscriptionMetadata()) | |
subscriptionMetadataEvent.add(cae); | |
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) { | |
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); | ||
|
||
if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) | ||
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to above, if we are adding [] to the string of partitions, couldn't we just print the set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
log.info("The pause flag in partitions [{}] will be removed due to partition lost.", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); | ||
|
||
if (!lostPausedPartitions.isEmpty() && log.isInfoEnabled()) | ||
log.info("The pause flag in partitions [{}] will be removed due to partition lost.", Utils.topicPartitionString(lostPartitions)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -310,7 +311,9 @@ private void process(final AssignmentChangeEvent event) { | |||
manager.updateTimerAndMaybeCommit(event.currentTimeMs()); | |||
} | |||
|
|||
log.info("Assigned to partition(s): {}", event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); | |||
if (log.isInfoEnabled()) | |||
log.info("Assigned to partition(s): {}", Utils.topicPartitionString(event.partitions())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok with the change, even though this one is not on any hot-path I expect, right? (just API calls to consumer.assign that do a single trip to the background, no requests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, it's just cleaner now.
.collect(Collectors.toList()); | ||
List<CompletableEvent<?>> events = new ArrayList<>(); | ||
|
||
for (CompletableEvent<?> event : tracked) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a comment saying that we're intentionally using for loop over the Streams API for perf, as this runs on every iteration of the background thread? (I'm afraid folks will be tempted to change this simple loops back to streams api)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I added one here and similar comments in a few other places.
|
||
if (event.future().completeExceptionally(error)) { | ||
log.debug("Event {} completed exceptionally since the consumer is closing", event); | ||
count++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incrementing here is not exactly the same that it used to happen before, right?
Before the PR, the count inc would happen after the peek(expireEvent)
, so it would count events that were expired but not completedExceptionally (the ones going through line 202, where now we don't increment the count)
This count doesn't seem used anyways, but better to be accurate and consistent with what it's done in the reap
func (there the count includes all expired events, no matter if we completed them exceptionally or not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's different. I assert it's more accurate, but I changed it to mimic the existing counting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kirktrue! Took another pass, just one comment
if (requestState.isExpired()) | ||
expiredRequests.add(requestState); | ||
} | ||
|
||
expiredRequests.forEach(TopicMetadataRequestState::expire); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently it's not an error to remove it directly from the collection while iterating over the collection
Agree it doesn't seem to fail with the tests we have, but still I would expect it may not be safe to modify the collection from within the loop (and may lead to ConcurrentModification exception at some point?).
I would say we play safe and use an iterator, makes sense?
… via an iterator This is to prevent possible state issues that could arise from removing elements of the list as it's being iterated over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
…n critical sections (#19917) Profiling has shown that using the Collections Streams API approach adds unnecessary overhead compared to a traditional for loop. Minor revisions to the code have been made to use simpler constructs to improve performance. Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Profiling has shown that using the Collections Streams API approach adds
unnecessary overhead compared to a traditional for loop. Minor revisions
to the code have been made to use simpler constructs to improve
performance.
Reviewers: Lianet Magrans lmagrans@confluent.io, Andrew Schofield
aschofield@confluent.io