Skip to content

KAFKA-19297: Refactor AsyncKafkaConsumer's use of Java Streams APIs in critical sections #19917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 18, 2025

Conversation

kirktrue
Copy link
Contributor

@kirktrue kirktrue commented Jun 6, 2025

Profiling has shown that using the Collections Streams API approach adds
unnecessary overhead compared to a traditional for loop. Minor revisions
to the code have been made to use simpler constructs to improve
performance.

Reviewers: Lianet Magrans lmagrans@confluent.io, Andrew Schofield
aschofield@confluent.io

@github-actions github-actions bot added triage PRs from the community consumer clients labels Jun 6, 2025
@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Jun 7, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few comments to address.

if (requestState.isExpired())
expiredRequests.add(requestState);
}

expiredRequests.forEach(TopicMetadataRequestState::expire);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you just not build expiredRequests and call requestState.expire() at L91?

Copy link
Contributor Author

@kirktrue kirktrue Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the request’s expire() method removes itself from inflightRequests, won't that throw an exception because the for loop's underlying Iterator wasn't used to remove the entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I take it back. Apparently it's not an error to remove it directly from the collection while iterating over the collection 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the superfluous list buildup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently it's not an error to remove it directly from the collection while iterating over the collection

Agree it doesn't seem to fail with the tests we have, but still I would expect it may not be safe to modify the collection from within the loop (and may lead to ConcurrentModification exception at some point?).

I would say we play safe and use an iterator, makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@kirktrue
Copy link
Contributor Author

@AndrewJSchofield @lianetm—Anything else I need to fix before we can merge? Thanks!

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kirktrue!

Comment on lines 485 to 487
Node node = entry.getKey();
FetchSessionHandler.FetchRequestData fetchRequestData = entry.getValue().build();
map.put(node, fetchRequestData);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Node node = entry.getKey();
FetchSessionHandler.FetchRequestData fetchRequestData = entry.getValue().build();
map.put(node, fetchRequestData);
map.put(entry.getKey(), entry.getValue().build());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just to make the code a little clearer about the exact processing and types in use. I'll make it more succinct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as requested.

if (!revokePausedPartitions.isEmpty()) {
log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) {
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that er end up printing the string in between [], can't we simply print the set directly?

Suggested change
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions));
log.info("The pause flag in partitions {} will be removed due to revocation.", revokePausedPartitions);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Changed.

log.info("Revoking previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));

if (log.isInfoEnabled())
log.info("Revoking previously assigned partitions {}", Utils.topicPartitionString(revokedPartitions));
Copy link
Member

@lianetm lianetm Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also I expect we can simplify and just print the set? Only difference is that it will include the brackets, which is ok I expect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Changed.

Comment on lines 374 to 380
if (!(ce instanceof CompletableApplicationEvent))
continue;

CompletableApplicationEvent<?> cae = (CompletableApplicationEvent<?>) ce;

if (cae.requireSubscriptionMetadata())
subscriptionMetadataEvent.add(cae);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simplify ?

Suggested change
if (!(ce instanceof CompletableApplicationEvent))
continue;
CompletableApplicationEvent<?> cae = (CompletableApplicationEvent<?>) ce;
if (cae.requireSubscriptionMetadata())
subscriptionMetadataEvent.add(cae);
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) {
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));

if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled())
log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to above, if we are adding [] to the string of partitions, couldn't we just print the set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

log.info("The pause flag in partitions [{}] will be removed due to partition lost.", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));

if (!lostPausedPartitions.isEmpty() && log.isInfoEnabled())
log.info("The pause flag in partitions [{}] will be removed due to partition lost.", Utils.topicPartitionString(lostPartitions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -310,7 +311,9 @@ private void process(final AssignmentChangeEvent event) {
manager.updateTimerAndMaybeCommit(event.currentTimeMs());
}

log.info("Assigned to partition(s): {}", event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
if (log.isInfoEnabled())
log.info("Assigned to partition(s): {}", Utils.topicPartitionString(event.partitions()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok with the change, even though this one is not on any hot-path I expect, right? (just API calls to consumer.assign that do a single trip to the background, no requests).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, it's just cleaner now.

.collect(Collectors.toList());
List<CompletableEvent<?>> events = new ArrayList<>();

for (CompletableEvent<?> event : tracked) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a comment saying that we're intentionally using for loop over the Streams API for perf, as this runs on every iteration of the background thread? (I'm afraid folks will be tempted to change this simple loops back to streams api)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I added one here and similar comments in a few other places.


if (event.future().completeExceptionally(error)) {
log.debug("Event {} completed exceptionally since the consumer is closing", event);
count++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incrementing here is not exactly the same that it used to happen before, right?

Before the PR, the count inc would happen after the peek(expireEvent), so it would count events that were expired but not completedExceptionally (the ones going through line 202, where now we don't increment the count)

This count doesn't seem used anyways, but better to be accurate and consistent with what it's done in the reap func (there the count includes all expired events, no matter if we completed them exceptionally or not)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's different. I assert it's more accurate, but I changed it to mimic the existing counting.

@kirktrue kirktrue requested a review from lianetm June 17, 2025 12:25
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kirktrue! Took another pass, just one comment

if (requestState.isExpired())
expiredRequests.add(requestState);
}

expiredRequests.forEach(TopicMetadataRequestState::expire);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently it's not an error to remove it directly from the collection while iterating over the collection

Agree it doesn't seem to fail with the tests we have, but still I would expect it may not be safe to modify the collection from within the loop (and may lead to ConcurrentModification exception at some point?).

I would say we play safe and use an iterator, makes sense?

… via an iterator

This is to prevent possible state issues that could arise from removing elements of the list as it's being iterated over.
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM.

@lianetm lianetm merged commit adcf10c into apache:trunk Jun 18, 2025
25 checks passed
lianetm pushed a commit that referenced this pull request Jun 18, 2025
…n critical sections (#19917)

Profiling has shown that using the Collections Streams API approach adds
unnecessary overhead compared to a traditional for loop. Minor revisions
to the code have been made to use simpler constructs to improve
performance.

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield
 <aschofield@confluent.io>
@kirktrue kirktrue deleted the KAFKA-19297-refactor-away-streams branch June 18, 2025 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants