Skip to content

KAFKA-19397: Ensure consistent metadata usage in produce request and response #19964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: trunk
Choose a base branch
from

Conversation

OmniaGM
Copy link
Contributor

@OmniaGM OmniaGM commented Jun 13, 2025

  • Metadata doesn't have the full view of topicNames to ids during
    rebootstrap of client or when topic has been deleted/recreated. The
    solution is to pass down topic id and stop trying to figure it out later
    in the logic.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the PR. Overall, it looks good. A few comments below. Also, could we add a unit test?

@lucasbru : Could you test this PR with the stream job?

@github-actions github-actions bot removed the triage PRs from the community label Jun 15, 2025
@lucasbru
Copy link
Member

@junrao I redeployed the soak with the fix. I will report back if the problem reoccurs

@lucasbru
Copy link
Member

@OmniaGM @junrao I have been running the soak for 24h, and it's looking good.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM thanks for this patch. a couple of comments are left. PTAL

// topic name in the response might be empty.
ProducerBatch batch = batches.entrySet().stream()
.filter(entry ->
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create the TopicIdPartition outside the stream to avoid creating many temporary objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the initalisation out of stream

ProducerBatch batch = batches.entrySet().stream()
.filter(entry ->
entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name()))
).map(Map.Entry::getValue).findFirst().orElse(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to have null batch, right? For example, the topic is recreated after the batch is generated

Copy link
Contributor Author

@OmniaGM OmniaGM Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always had this potentail of batch is null this why I raised the comment here #19964 (comment) that should we have IllegalStateException. I updated this to fail with IllegalStateException instead of leaving it like this

@github-actions github-actions bot added core Kafka Broker and removed small Small PRs labels Jun 17, 2025
@OmniaGM
Copy link
Contributor Author

OmniaGM commented Jun 17, 2025

I have added some test in ProducerSendWhileDeletionTest to cover recreation while producing as well hope this will be enough to cover these cases.

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @OmniaGM!

A few very minor comments.

Thanks!

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a rebase, but LGTM

OmniaGM added 2 commits June 18, 2025 14:56
- Metadata doesn't have the full view of topicNames to ids during rebootstrap of client or when topic has been deleted/recreated. The solution is to pass down topic id and stop trying to figure it out later in the logic.
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the updated PR. A few more comments.

return topicId.equals(tpId.topicId) &&
topicPartition.partition() == tpId.partition();
} else {
return topicPartition.equals(tpId.topicPartition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rare case that Sender::topicIdsForBatches returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about change ProducerBatch to use TopicIdPartition instead of TopicPartition this will simplify things instead of all of these checks and metadata lookups also I wouldn' tneed to filter batches to find the topicId

Copy link
Contributor Author

@OmniaGM OmniaGM Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rare case that Sender::topicIdsForBatches returns 0 topic id (e.g. topic is deleted), we will pass along topicName -> 0 to handleProduceResponse(). The response will include empty topic and 0 topic id. It's important that we find a match in this case to avoid IllegalStateException. I am thinking that we should first try to do the comparison on topic name, if it's not empty. Otherwise, just do the comparison on topic id even if it's zero.

Thinking out loud here shouldn't this be true for TopicIdPartition::equals as we might have this rare situation in other places in the code where topic id partition metadata wouldn't exist fully? Specially that right now not all Kafka a topic id aware yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what I said about topic deletion wasn't correct. sendProduceRequest() is called in the Sender thread and at that point, the metadata cached in the client won't change. We drain ProducerBatch based on the topic/partition in the metadata. The topicIds map created is based on the same metadata. So, if a partition is included in a produce request, the topicIds should always include that topic (assuming the topic Id is supported on the server side), even though the server may have deleted that topic. So, this code is fine.

Regarding whether to use TopicIdPartition in ProducerBatch. In the rare case, topicId could change over time for the same topic. So, we probably can't store TopicIdPartition as a final field when ProducerBatch is created. The easiest way to do this is probably to bind the topicId when the ProducerBatch is drained, based on the metadata at that time. This is more or less what this PR does.

topicMetadata(admin, topic).topicId()
} else Uuid.ZERO_UUID
// don't wait for the physical delete
deleteTopicWithAdminRaw(admin, topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, deleteTopicWithAdminRaw() doesn't wait for the metadata propagation to the brokers. However, the producer only sees the deleted topic after the metadata is propagated. Is this test effective?

Copy link
Contributor Author

@OmniaGM OmniaGM Jun 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete topic usually faster at deleting the metadata first however deleteTopic without the admin raw waits for deleting the hard partitions which might take longer.

// We need to find batch based on topic id and partition index only as
// topic name in the response might be empty.
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name());
ProducerBatch batch = batches.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative is for handleProduceResponse() to take a Map<TopicPartition, ProducerBatch> and a Map<UUID, String>. If the response has non-zero topicId, we look up the second map to find the topic name and then use the first map to find the batch. Otherwise, we look up the first map using the topic name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes a map lookup to an iteration. Could we do some produce perf test (with multiple topic/partitions) to verify there is no performance degradation?
The perf of this

5000000 records sent

pr trunk
records/sec 2709.9 2454.8
MB/sec 2.65 2.40
avg latency 11110.81 ms 12260.82 ms
max latency 47484.00ms 46715.00 ms
50th 10147 ms 11341 ms
95th 22677 ms 24620 ms
99th 27950 ms 30265 ms
99.9th 33468 ms 36318 ms

An alternative is for handleProduceResponse() to take a Map<TopicPartition, ProducerBatch> and a Map<UUID, String>. If the response has non-zero topicId, we look up the second map to find the topic name and then use the first map to find the batch. Otherwise, we look up the first map using the topic name.

I update the code to do this instead as we seems to have already all these data ready to send out to handleProduceResponse
running the perf test one last time with this updata and will report on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for 5000000 records sent, record-size 1024, linger.ms=100 batch.size=10000
topic 1000 partitions 3 replicas

old implementation in the pr second approach trunk
records/sec 2709.9 2316.2 2454.8
MB/sec 2.65 2.26 2.40
avg latency 11110.81 ms 12993.45 ms 12260.82 ms
max latency 47484.00ms 51446.00 ms 46715.00 ms
50th 10147 ms 11952 ms 11341 ms
95th 22677 ms 26238 m 24620 ms
99th 27950 ms 32202 ms 30265 ms
99.9th 33468 ms 38580 ms 36318 ms

@github-actions github-actions bot added the small Small PRs label Jun 22, 2025
@chia7712
Copy link
Member

@OmniaGM could you please fix the conflicts?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the updated PR. A few more comments.

return topicId.equals(tpId.topicId) &&
topicPartition.partition() == tpId.partition();
} else {
return topicPartition.equals(tpId.topicPartition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what I said about topic deletion wasn't correct. sendProduceRequest() is called in the Sender thread and at that point, the metadata cached in the client won't change. We drain ProducerBatch based on the topic/partition in the metadata. The topicIds map created is based on the same metadata. So, if a partition is included in a produce request, the topicIds should always include that topic (assuming the topic Id is supported on the server side), even though the server may have deleted that topic. So, this code is fine.

Regarding whether to use TopicIdPartition in ProducerBatch. In the rare case, topicId could change over time for the same topic. So, we probably can't store TopicIdPartition as a final field when ProducerBatch is created. The easiest way to do this is probably to bind the topicId when the ProducerBatch is drained, based on the metadata at that time. This is more or less what this PR does.

// We need to find batch based on topic id and partition index only as
// topic name in the response might be empty.
TopicIdPartition tpId = new TopicIdPartition(r.topicId(), p.index(), r.name());
ProducerBatch batch = batches.entrySet().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative is for handleProduceResponse() to take a Map<TopicPartition, ProducerBatch> and a Map<UUID, String>. If the response has non-zero topicId, we look up the second map to find the topic name and then use the first map to find the batch. Otherwise, we look up the first map using the topic name.

recreateTopicFuture.join();
producerFutures.forEach(CompletableFuture::join);
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5);
assertEquals(20, numSuccess.intValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 => 2 * numRecords ?

})).toList();
recreateTopicFuture.join();
producerFutures.forEach(CompletableFuture::join);
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why doesn't each topic creation create a new topic Id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happeneds when delete take longer and when topic get created again we hit TopicExistsException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create admin with cluster.admin(Map.of(), true)? This way, all admin requests are sent to the controller. Since the controller always has the latest metadata, it seems that we should never hit TopicExistsException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we could keep recreating the topic until there are "enough" new topic ids?

        var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
            var topicIds = new HashSet<Uuid>();
            while (topicIds.size() < maxNumRecreatTopicAttempts) {
                try (var admin = cluster.admin()) {
                    if (admin.listTopics().names().get().contains(topic)) {
                        admin.deleteTopics(List.of(topic)).all().get();
                    }
                    topicIds.add(admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1))).topicId(topic).get());
                } catch (Exception e) {
                    // ignore
                }
            }
            return topicIds;
        });
        var topicIds = recreateTopicFuture.join();
        assertEquals(maxNumRecreatTopicAttempts, topicIds.size());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create admin with cluster.admin(Map.of(), true)? This way, all admin requests are sent to the controller. Since the controller always has the latest metadata, it seems that we should never hit TopicExistsException.

I don't believe we can use admin client with controller bootstrap for create, delete or even list topics as these uses LeastLoadedNodeProvider instead of ControllerNodeProvider

recreate until we have enough topic ids is better approach here i think

@junrao
Copy link
Contributor

junrao commented Jun 23, 2025

Also, regarding the title of the PR. "Not relaying on metadata to map between topic id and name".

We are still relying on the metadata to map topic id and name. We just want to use consistent metadata between generating the produce request and handing the produce response.

@OmniaGM OmniaGM changed the title KAFKA-19397: Not relaying on metadata to map between topic id and name. KAFKA-19397: Ensure consistent metadata usage in produce request and response Jun 26, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the updated PR. A couple of more comments. Also, are the test failures related to this PR?

})).toList();
recreateTopicFuture.join();
producerFutures.forEach(CompletableFuture::join);
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create admin with cluster.admin(Map.of(), true)? This way, all admin requests are sent to the controller. Since the controller always has the latest metadata, it seems that we should never hit TopicExistsException.

})).toList();
recreateTopicFuture.join();
producerFutures.forEach(CompletableFuture::join);
assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we could keep recreating the topic until there are "enough" new topic ids?

        var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
            var topicIds = new HashSet<Uuid>();
            while (topicIds.size() < maxNumRecreatTopicAttempts) {
                try (var admin = cluster.admin()) {
                    if (admin.listTopics().names().get().contains(topic)) {
                        admin.deleteTopics(List.of(topic)).all().get();
                    }
                    topicIds.add(admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1))).topicId(topic).get());
                } catch (Exception e) {
                    // ignore
                }
            }
            return topicIds;
        });
        var topicIds = recreateTopicFuture.join();
        assertEquals(maxNumRecreatTopicAttempts, topicIds.size());

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @OmniaGM!

Left a few comments, mostly minor.

int maxNumRecreatTopicAttempts = 10;
List<Uuid> topicIds = new CopyOnWriteArrayList<>();
var recreateTopicFuture = CompletableFuture.runAsync(() -> {
for (int i = 1; i <= maxNumRecreatTopicAttempts; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky, but is there a reason not to start i at 0? I don't mind, I just want to make sure I'm not missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the rest of the pattern in the file which append number from 1 to 10 to record value.

@OmniaGM
Copy link
Contributor Author

OmniaGM commented Jun 27, 2025

I will run perf test one more time this time with some topic recreation in the background while am waiting for the pipline to finish to ensure everything is okay.

@OmniaGM
Copy link
Contributor Author

OmniaGM commented Jun 27, 2025

last perf tests results
5000000 records sent, 2378.4 records/sec (2.32 MB/sec), 12647.78 ms avg latency, 55251.00 ms max latency, 11555 ms 50th, 25864 ms 95th, 32131 ms 99th, 39096 ms 99.9th.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM : Thanks for the updated PR. One more comment.

if (metadata != null) {
numSuccess.incrementAndGet();
}
}).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible for get() to throw an exception like TopicNotExist, right? If that happens, the assertion on numRecords will fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants