Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Update Kafka Client to 2.8.0 #1588

Merged

Conversation

eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Nov 23, 2022

Summary of changes:

  • Bump client dependency from 2.1.x to 2.8.x
  • The Java Model for all the Request/Response changed to a Fluent/Builder style, this requires many changes
  • NotLeaderForPartitionException -> NotLeaderOrFollowerException
  • Copy new version of MockTime from Kafka codebase
  • ListOffsetRequest -> ListOffsetsRequest, ListOffsetResponse -> ListOffsetsResponse
  • LeaveGroup -> now support multiple members

@github-actions
Copy link

@eolivelli:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions github-actions bot added the doc-info-missing This pr needs to mark a document option in description label Nov 23, 2022
@Demogorgon314
Copy link
Member

Overall, LGTM. I will wait for the unit's test to pass.

@eolivelli eolivelli mentioned this pull request Nov 30, 2022
2 tasks
@eolivelli eolivelli marked this pull request as ready for review December 1, 2022 10:33
@eolivelli
Copy link
Contributor Author

eolivelli commented Dec 1, 2022

@Demogorgon314 @BewareMyPower this patch is finally ready for review.
All the tests are passing locally on my laptop and Codacy is happy

@codecov
Copy link

codecov bot commented Dec 1, 2022

Codecov Report

Merging #1588 (64ce7c8) into master (b4dbd6d) will decrease coverage by 0.35%.
The diff coverage is 0.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1588      +/-   ##
============================================
- Coverage     15.34%   14.98%   -0.36%     
  Complexity      592      592              
============================================
  Files           164      164              
  Lines         11913    12199     +286     
  Branches       1102     1112      +10     
============================================
  Hits           1828     1828              
- Misses         9931    10217     +286     
  Partials        154      154              
Impacted Files Coverage Δ
...streamnative/pulsar/handlers/kop/AdminManager.java 0.40% <0.00%> (-0.01%) ⬇️
...ative/pulsar/handlers/kop/KafkaCommandDecoder.java 0.34% <0.00%> (+<0.01%) ⬆️
...ative/pulsar/handlers/kop/KafkaRequestHandler.java 1.09% <0.00%> (-0.07%) ⬇️
...ive/pulsar/handlers/kop/MessagePublishContext.java 0.00% <0.00%> (ø)
...ndlers/kop/coordinator/group/GroupCoordinator.java 0.00% <0.00%> (ø)
.../handlers/kop/coordinator/group/GroupMetadata.java 51.78% <0.00%> (-0.19%) ⬇️
...andlers/kop/coordinator/group/JoinGroupResult.java 0.00% <0.00%> (ø)
...rs/kop/coordinator/transaction/PendingRequest.java 0.00% <0.00%> (ø)
...s/kop/coordinator/transaction/ResponseContext.java 0.00% <0.00%> (ø)
...r/transaction/TransactionMarkerChannelHandler.java 0.00% <0.00%> (ø)
... and 9 more

@BewareMyPower BewareMyPower changed the title Update Kafka Client to 2.8.0 - PREVIEW Update Kafka Client to 2.8.0 Dec 1, 2022
@eolivelli
Copy link
Contributor Author

CI passed

Comment on lines 1804 to 1811
Set<String> members = new HashSet<>();
if (!data.memberId().isEmpty()) {
// old clients
members.add(data.memberId());
}
data.members().forEach(memberIdentity -> {
members.add(memberIdentity.memberId());
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use functional style here

        Set<String> members = data.members().stream()
                .map(LeaveGroupRequestData.MemberIdentity::memberId)
                .collect(Collectors.toSet());
        if (!data.memberId().isEmpty()) {
            // old clients
            members.add(data.memberId());
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

.filter(p -> p.partitionIndex() == topicPartition1.partition())
.findFirst()
.get()
.errorCode(),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Errors.UNKNOWN_TOPIC_OR_PARTITION);
Errors.UNKNOWN_TOPIC_OR_PARTITION.code());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

.filter(p -> p.partitionIndex() == topicPartition2.partition())
.findFirst()
.get()
.errorCode(),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Errors.UNKNOWN_TOPIC_OR_PARTITION);
Errors.UNKNOWN_TOPIC_OR_PARTITION.code());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Errors.UNKNOWN_TOPIC_OR_PARTITION);

// invalid partition id 1.
CompletableFuture<AbstractResponse> invalidResponse2 = new CompletableFuture<>();
checkInvalidPartition(invalidResponse2, topicName, 1);
TopicPartition topicPartition2 = new TopicPartition(topicName, 1);
AbstractResponse response2 = invalidResponse2.get();
assertEquals(((OffsetCommitResponse) response2).responseData().get(topicPartition2),
assertEquals(((OffsetCommitResponse) response2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertEquals(((OffsetCommitResponse) response2)
assertEquals(((OffsetCommitResponse) response2)

Adjust indent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -680,6 +680,7 @@ public void testDescribeConfigFailed() throws PulsarAdminException {
try {
describeConfigsResult.all().get();
} catch (Exception ex) {
log.error("Error", ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("Error", ex);

Remove this line because the exception is expected. Or just use log.info("Error: {}", ex.getMessage(). BTW, I found here we should add a fail() in the try block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower I didn't add "fail()" for this PR, because with "fail()" actually the test will fail.
We can address this problem in the test in a follow up work.
it is not strictly related to this patch

@BewareMyPower
Copy link
Collaborator

My review is done, PTAL.

Copy link
Contributor Author

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @BewareMyPower for you quick feedback
I have addressed all of your comments.

@@ -680,6 +680,7 @@ public void testDescribeConfigFailed() throws PulsarAdminException {
try {
describeConfigsResult.all().get();
} catch (Exception ex) {
log.error("Error", ex);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 1804 to 1811
Set<String> members = new HashSet<>();
if (!data.memberId().isEmpty()) {
// old clients
members.add(data.memberId());
}
data.members().forEach(memberIdentity -> {
members.add(memberIdentity.memberId());
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@eolivelli
Copy link
Contributor Author

@BewareMyPower what about the "snyk" failures ? before my last commit it was passing

1 similar comment
@eolivelli
Copy link
Contributor Author

@BewareMyPower what about the "snyk" failures ? before my last commit it was passing

Copy link
Member

@Demogorgon314 Demogorgon314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just left some small comments.

) {
return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(CompletableFuture::completedFuture
).orElseGet(() -> groupManager.getGroup(groupId).map(group -> group.inLock(() -> {
if (group.is(Dead) || !group.has(memberId)) {
if (group.is(Dead)) {
return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error should be Errors.COORDINATOR_NOT_AVAILABLE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously it was UNKNOWN_MEMBER_ID, I didn't want to change the behaviour

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I just found the Kafka 2.8 code is using Errors.COORDINATOR_NOT_AVAILABLE. We can change it in next PR if necessary.

https://github.com/apache/kafka/blob/2.8/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L470-L471

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me follow up with another patch on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Demogorgon314 finally I updated this value in this PR

Copy link
Contributor Author

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addresses your comments.
I hope I haven't missed any of them

thank you @Demogorgon314 @BewareMyPower

) {
return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(CompletableFuture::completedFuture
).orElseGet(() -> groupManager.getGroup(groupId).map(group -> group.inLock(() -> {
if (group.is(Dead) || !group.has(memberId)) {
if (group.is(Dead)) {
return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously it was UNKNOWN_MEMBER_ID, I didn't want to change the behaviour

.filter(p -> p.partitionIndex() == topicPartition1.partition())
.findFirst()
.get()
.errorCode(),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Errors.UNKNOWN_TOPIC_OR_PARTITION);

// invalid partition id 1.
CompletableFuture<AbstractResponse> invalidResponse2 = new CompletableFuture<>();
checkInvalidPartition(invalidResponse2, topicName, 1);
TopicPartition topicPartition2 = new TopicPartition(topicName, 1);
AbstractResponse response2 = invalidResponse2.get();
assertEquals(((OffsetCommitResponse) response2).responseData().get(topicPartition2),
assertEquals(((OffsetCommitResponse) response2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.filter(p -> p.partitionIndex() == topicPartition2.partition())
.findFirst()
.get()
.errorCode(),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -680,6 +680,7 @@ public void testDescribeConfigFailed() throws PulsarAdminException {
try {
describeConfigsResult.all().get();
} catch (Exception ex) {
log.error("Error", ex);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower I didn't add "fail()" for this PR, because with "fail()" actually the test will fail.
We can address this problem in the test in a follow up work.
it is not strictly related to this patch

@eolivelli
Copy link
Contributor Author

The tests that failed on CI pass on my laptop

Demogorgon314
Demogorgon314 previously approved these changes Dec 2, 2022
@Demogorgon314 Demogorgon314 added no-need-doc This pr does not need any document release/2.11 and removed doc-info-missing This pr needs to mark a document option in description labels Dec 2, 2022
BewareMyPower
BewareMyPower previously approved these changes Dec 2, 2022
@eolivelli
Copy link
Contributor Author

@BewareMyPower is the failed test a flaky test ?
it doesn't fail locally for me

@eolivelli
Copy link
Contributor Author

I have rebased on master

@Demogorgon314
Copy link
Member

@eolivelli I can reproduce this failed test in my locally.

@eolivelli
Copy link
Contributor Author

@eolivelli I can reproduce this failed test in my locally.

I will run the test many times. thanks for your feedback.
The test is pretty straight forward

@eolivelli
Copy link
Contributor Author

@Demogorgon314 I have fixed the test.
it seems that the 2.8.0 client eagerly pre-fetches the messages and so it is able to pre-fetch the records before we delete the partition.

@eolivelli
Copy link
Contributor Author

@BewareMyPower @Demogorgon314 can you please restart CI ?
I believe that this time we hit a flaky test.

@eolivelli
Copy link
Contributor Author

the same test did not fail here
#1606

@eolivelli
Copy link
Contributor Author

All tests passed.

@Demogorgon314 Demogorgon314 merged commit 66efad6 into streamnative:master Dec 4, 2022
@eolivelli eolivelli deleted the impl/upgrade-kafka-dep-280 branch December 4, 2022 14:30
michaeljmarshall pushed a commit to michaeljmarshall/kop that referenced this pull request Dec 13, 2022
Summary of changes:
- Bump client dependency from 2.1.x to 2.8.x
- The Java Model for all the Request/Response changed to a
Fluent/Builder style, this requires many changes
- NotLeaderForPartitionException -> NotLeaderOrFollowerException
- Copy new version of MockTime from Kafka codebase
- ListOffsetRequest -> ListOffsetsRequest, ListOffsetResponse ->
ListOffsetsResponse
- LeaveGroup -> now support multiple members

(cherry picked from commit 66efad6)
Demogorgon314 pushed a commit that referenced this pull request Feb 6, 2023
Summary of changes:
- Bump client dependency from 2.1.x to 2.8.x
- The Java Model for all the Request/Response changed to a
Fluent/Builder style, this requires many changes
- NotLeaderForPartitionException -> NotLeaderOrFollowerException
- Copy new version of MockTime from Kafka codebase
- ListOffsetRequest -> ListOffsetsRequest, ListOffsetResponse ->
ListOffsetsResponse
- LeaveGroup -> now support multiple members

(cherry picked from commit 66efad6)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants