-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19397: Ensure consistent metadata usage in produce request and response #19964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
@junrao I redeployed the soak with the fix. I will report back if the problem reoccurs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM thanks for this patch. a couple of comments are left. PTAL
// topic name in the response might be empty. | ||
ProducerBatch batch = batches.entrySet().stream() | ||
.filter(entry -> | ||
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create the TopicIdPartition
outside the stream to avoid creating many temporary objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the initalisation out of stream
ProducerBatch batch = batches.entrySet().stream() | ||
.filter(entry -> | ||
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name())) | ||
).map(Map.Entry::getValue).findFirst().orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to have null batch, right? For example, the topic is recreated after the batch is generated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always had this potentail of batch is null this why I raised the comment here #19964 (comment) that should we have IllegalStateException. I updated this to fail with IllegalStateException instead of leaving it like this
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
I have added some test in ProducerSendWhileDeletionTest to cover recreation while producing as well hope this will be enough to cover these cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a rebase, but LGTM
- Metadata doesn't have the full view of topicNames to ids during rebootstrap of client or when topic has been deleted/recreated. The solution is to pass down topic id and stop trying to figure it out later in the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. A few more comments.
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
return topicId.equals(tpId.topicId) && | ||
topicPartition.partition() == tpId.partition(); | ||
} else { | ||
return topicPartition.equals(tpId.topicPartition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rare case that Sender::topicIdsForBatches
returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about change ProducerBatch
to use TopicIdPartition
instead of TopicPartition
this will simplify things instead of all of these checks and metadata lookups also I wouldn' tneed to filter batches to find the topicId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the rare case that Sender::topicIdsForBatches returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.
Thinking out loud here shouldn't this be true for TopicIdPartition::equals
as we might have this rare situation in other places in the code where topic id partition metadata wouldn't exist fully? Specially that right now not all Kafka a topic id aware yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, what I said about topic deletion wasn't correct. sendProduceRequest()
is called in the Sender thread and at that point, the metadata cached in the client won't change. We drain ProducerBatch
based on the topic/partition in the metadata. The topicIds
map created is based on the same metadata. So, if a partition is included in a produce request, the topicIds
should always include that topic (assuming the topic Id is supported on the server side), even though the server may have deleted that topic. So, this code is fine.
Regarding whether to use TopicIdPartition
in ProducerBatch
. In the rare case, topicId could change over time for the same topic. So, we probably can't store TopicIdPartition
as a final field when ProducerBatch
is created. The easiest way to do this is probably to bind the topicId when the ProducerBatch is drained, based on the metadata at that time. This is more or less what this PR does.
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
topicMetadata(admin, topic).topicId() | ||
} else Uuid.ZERO_UUID | ||
// don't wait for the physical delete | ||
deleteTopicWithAdminRaw(admin, topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, deleteTopicWithAdminRaw()
doesn't wait for the metadata propagation to the brokers. However, the producer only sees the deleted topic after the metadata is propagated. Is this test effective?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete topic usually faster at deleting the metadata first however deleteTopic
without the admin raw waits for deleting the hard partitions which might take longer.
// We need to find batch based on topic id and partition index only as | ||
// topic name in the response might be empty. | ||
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name()); | ||
ProducerBatch batch = batches.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative is for handleProduceResponse()
to take a Map<TopicPartition, ProducerBatch> and a Map<UUID, String>. If the response has non-zero topicId, we look up the second map to find the topic name and then use the first map to find the batch. Otherwise, we look up the first map using the topic name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?
The perf of this
5000000 records sent
pr | trunk | |
---|---|---|
records/sec | 2709.9 | 2454.8 |
MB/sec | 2.65 | 2.40 |
avg latency | 11110.81 ms | 12260.82 ms |
max latency | 47484.00ms | 46715.00 ms |
50th | 10147 ms | 11341 ms |
95th | 22677 ms | 24620 ms |
99th | 27950 ms | 30265 ms |
99.9th | 33468 ms | 36318 ms |
An alternative is for handleProduceResponse() to take a Map<TopicPartition, ProducerBatch> and a Map<UUID, String>. If the response has non-zero topicId, we look up the second map to find the topic name and then use the first map to find the batch. Otherwise, we look up the first map using the topic name.
I update the code to do this instead as we seems to have already all these data ready to send out to handleProduceResponse
running the perf test one last time with this updata and will report on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for 5000000 records sent, record-size 1024, linger.ms=100 batch.size=10000
topic 1000 partitions 3 replicas
old implementation in the pr | second approach | trunk | |
---|---|---|---|
records/sec | 2709.9 | 2316.2 | 2454.8 |
MB/sec | 2.65 | 2.26 | 2.40 |
avg latency | 11110.81 ms | 12993.45 ms | 12260.82 ms |
max latency | 47484.00ms | 51446.00 ms | 46715.00 ms |
50th | 10147 ms | 11952 ms | 11341 ms |
95th | 22677 ms | 26238 m | 24620 ms |
99th | 27950 ms | 32202 ms | 30265 ms |
99.9th | 33468 ms | 38580 ms | 36318 ms |
….java Co-authored-by: Kirk True <kirk@kirktrue.pro>
@OmniaGM could you please fix the conflicts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. A few more comments.
return topicId.equals(tpId.topicId) && | ||
topicPartition.partition() == tpId.partition(); | ||
} else { | ||
return topicPartition.equals(tpId.topicPartition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, what I said about topic deletion wasn't correct. sendProduceRequest()
is called in the Sender thread and at that point, the metadata cached in the client won't change. We drain ProducerBatch
based on the topic/partition in the metadata. The topicIds
map created is based on the same metadata. So, if a partition is included in a produce request, the topicIds
should always include that topic (assuming the topic Id is supported on the server side), even though the server may have deleted that topic. So, this code is fine.
Regarding whether to use TopicIdPartition
in ProducerBatch
. In the rare case, topicId could change over time for the same topic. So, we probably can't store TopicIdPartition
as a final field when ProducerBatch
is created. The easiest way to do this is probably to bind the topicId when the ProducerBatch is drained, based on the metadata at that time. This is more or less what this PR does.
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
// We need to find batch based on topic id and partition index only as | ||
// topic name in the response might be empty. | ||
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name()); | ||
ProducerBatch batch = batches.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative is for handleProduceResponse()
to take a Map<TopicPartition, ProducerBatch> and a Map<UUID, String>. If the response has non-zero topicId, we look up the second map to find the topic name and then use the first map to find the batch. Otherwise, we look up the first map using the topic name.
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
recreateTopicFuture.join(); | ||
producerFutures.forEach(CompletableFuture::join); | ||
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5); | ||
assertEquals(20, numSuccess.intValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20 => 2 * numRecords ?
})).toList(); | ||
recreateTopicFuture.join(); | ||
producerFutures.forEach(CompletableFuture::join); | ||
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why doesn't each topic creation create a new topic Id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It happeneds when delete take longer and when topic get created again we hit TopicExistsException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we create admin with cluster.admin(Map.of(), true)
? This way, all admin requests are sent to the controller. Since the controller always has the latest metadata, it seems that we should never hit TopicExistsException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we could keep recreating the topic until there are "enough" new topic ids?
var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
var topicIds = new HashSet<Uuid>();
while (topicIds.size() < maxNumRecreatTopicAttempts) {
try (var admin = cluster.admin()) {
if (admin.listTopics().names().get().contains(topic)) {
admin.deleteTopics(List.of(topic)).all().get();
}
topicIds.add(admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1))).topicId(topic).get());
} catch (Exception e) {
// ignore
}
}
return topicIds;
});
var topicIds = recreateTopicFuture.join();
assertEquals(maxNumRecreatTopicAttempts, topicIds.size());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we create admin with cluster.admin(Map.of(), true)? This way, all admin requests are sent to the controller. Since the controller always has the latest metadata, it seems that we should never hit TopicExistsException.
I don't believe we can use admin client with controller bootstrap for create, delete or even list topics as these uses LeastLoadedNodeProvider
instead of ControllerNodeProvider
recreate until we have enough topic ids is better approach here i think
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
Outdated
Show resolved
Hide resolved
Also, regarding the title of the PR. "Not relaying on metadata to map between topic id and name". We are still relying on the metadata to map topic id and name. We just want to use consistent metadata between generating the produce request and handing the produce response. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. A couple of more comments. Also, are the test failures related to this PR?
...ion-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
Outdated
Show resolved
Hide resolved
})).toList(); | ||
recreateTopicFuture.join(); | ||
producerFutures.forEach(CompletableFuture::join); | ||
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we create admin with cluster.admin(Map.of(), true)
? This way, all admin requests are sent to the controller. Since the controller always has the latest metadata, it seems that we should never hit TopicExistsException
.
...ion-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
})).toList(); | ||
recreateTopicFuture.join(); | ||
producerFutures.forEach(CompletableFuture::join); | ||
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or we could keep recreating the topic until there are "enough" new topic ids?
var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
var topicIds = new HashSet<Uuid>();
while (topicIds.size() < maxNumRecreatTopicAttempts) {
try (var admin = cluster.admin()) {
if (admin.listTopics().names().get().contains(topic)) {
admin.deleteTopics(List.of(topic)).all().get();
}
topicIds.add(admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1))).topicId(topic).get());
} catch (Exception e) {
// ignore
}
}
return topicIds;
});
var topicIds = recreateTopicFuture.join();
assertEquals(maxNumRecreatTopicAttempts, topicIds.size());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @OmniaGM!
Left a few comments, mostly minor.
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Show resolved
Hide resolved
int maxNumRecreatTopicAttempts = 10; | ||
List<Uuid> topicIds = new CopyOnWriteArrayList<>(); | ||
var recreateTopicFuture = CompletableFuture.runAsync(() -> { | ||
for (int i = 1; i <= maxNumRecreatTopicAttempts; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpicky, but is there a reason not to start i
at 0? I don't mind, I just want to make sure I'm not missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the rest of the pattern in the file which append number from 1 to 10 to record value.
...ion-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
Outdated
Show resolved
Hide resolved
...ion-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
Outdated
Show resolved
Hide resolved
...ion-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
Show resolved
Hide resolved
...ion-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
Outdated
Show resolved
Hide resolved
I will run perf test one more time this time with some topic recreation in the background while am waiting for the pipline to finish to ensure everything is okay. |
last perf tests results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OmniaGM : Thanks for the updated PR. One more comment.
if (metadata != null) { | ||
numSuccess.incrementAndGet(); | ||
} | ||
}).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible for get() to throw an exception like TopicNotExist, right? If that happens, the assertion on numRecords will fail.
rebootstrap of client or when topic has been deleted/recreated. The
solution is to pass down topic id and stop trying to figure it out later
in the logic.