Skip to content

KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC #20049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 3, 2025

Conversation

AndrewJSchofield
Copy link
Member

@AndrewJSchofield AndrewJSchofield commented Jun 26, 2025

While testing the code in #19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.

Reviewers: Apoorv Mittal apoorvmittal10@gmail.com, Lan Ding
isDing_L@163.com, TaiJuWu tjwu1217@gmail.com

*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.filter(e -> e.getValue() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, just out of curiosity, would the AlterConsumerGroupOffsets RPC have the same issue?

return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering group offsets for the following partitions: " + partitionsFailed);
}
}

Copy link
Member Author

@AndrewJSchofield AndrewJSchofield Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check it out before I merge, but the important difference here is in KafkaApis.scala. For DeleteSGO, it already handled a non-zero error code. For AlterSGO, that code was missing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry. I didn't read your comment accurately. The difference is that AlterShareGroupOffsets can successfully pass back an error code, which is why this is in terms of ApiException rather than Errors. For AlterConsumerGroupOffsets, the RPC is actually OffsetCommit and this does not have an ErrorMessage field at all. So, it cannot be fixed for consumer groups until we have a version bump on the OffsetCommit RPC.

*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.filter(e -> e.getValue() != null)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
Copy link
Collaborator

@TaiJuWu TaiJuWu Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this change to immutable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it matters here. The list in internal to this method, and it is just converted to a string and appended to the exception message.

@DL1231
Copy link
Contributor

DL1231 commented Jul 2, 2025

@AndrewJSchofield The last build failed with AuthorizerIntegrationTest failed. Could you please check?

@AndrewJSchofield
Copy link
Member Author

@AndrewJSchofield The last build failed with AuthorizerIntegrationTest failed. Could you please check?

Interesting. I fixed a mistake and I expect the test needs fixing too. Will do.

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, LGTM.

Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM!

@AndrewJSchofield AndrewJSchofield merged commit 729f9cc into apache:trunk Jul 3, 2025
25 checks passed
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-19440 branch July 3, 2025 10:01
jiafu1115 pushed a commit to jiafu1115/kafka that referenced this pull request Jul 3, 2025
…pache#20049)

While testing the code in apache#19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants