Skip to content

KAFKA-19383: Handle the deleted topics when applying ClearElrRecord #19916

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,43 @@ public void replay(PartitionChangeRecord record) {
topicDelta.replay(record);
}

private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord record) {
// Only apply the record if the topic is not deleted.
if (!deletedTopicIds.contains(topicId)) {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
}
}

// When replaying the ClearElrRecord, we need to first find the latest topic ID associated with the topic(s) because
// multiple topic IDs for the same topic in a TopicsDelta is possible in the event of topic deletion and recreation.
// Second, we should not add the topicDelta if the given topic ID has been deleted. So that we don't leak the
// deleted topic ID.
public void replay(ClearElrRecord record) {
if (!record.topicName().isEmpty()) {
Uuid topicId;
if (image.getTopic(record.topicName()) != null) {
topicId = image.getTopic(record.topicName()).id();
} else {
Uuid topicId = null;
// CreatedTopics contains the latest topic IDs. It should be checked first in case the topic is deleted and
// created in the same batch.
if (createdTopics.containsKey(record.topicName())) {
topicId = createdTopics.get(record.topicName());
} else if (image.getTopic(record.topicName()) != null) {
topicId = image.getTopic(record.topicName()).id();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed that image.getTopic(record.topicName()) is not null? The previous code checked that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated with a new UT.

}

if (topicId == null) {
throw new RuntimeException("Unable to clear elr for topic with name " +
record.topicName() + ": no such topic found.");
}
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);

maybeReplayClearElrRecord(topicId, record);
} else {
// Update all the existing topics
image.topicsById().forEach((topicId, image) -> {
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
topicDelta.replay(record);
maybeReplayClearElrRecord(topicId, record);
});
createdTopicIds().forEach((topicId -> {
maybeReplayClearElrRecord(topicId, record);
}));
}
}

Expand Down
223 changes: 223 additions & 0 deletions metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,229 @@ public void testClearElrRecords() {
assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length);
}

@Test
public void testClearElrRecordOnNonExistingTopic() {
TopicsImage image = TopicsImage.EMPTY;

List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.addAll(List.of(
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("foo"),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
TopicsDelta delta = new TopicsDelta(image);
assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords));
}

@Test
public void testClearElrRecords_All_ForDeletedTopics() {
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
Uuid fooId2 = Uuid.randomUuid();
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
Uuid barId2 = Uuid.randomUuid();

List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"foo",
fooId,
newPartition(new int[] {0, 1, 2, 3})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
newTopicsByNameMap(topics));

List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.add(
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
)
);

TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();

topicRecords = new ArrayList<>();
/* Test the following:
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
*/
topicRecords.addAll(List.of(
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(fooId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(fooId2).
setName("foo"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
setIsr(List.of(0, 1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(barId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId2).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId2).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord(),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
assertEquals(2, image.topicsById().size());
assertEquals(2, image.topicsByName().size());

assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
}

@Test
public void testClearElrRecords_Single_ForDeletedTopics() {
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
Uuid fooId2 = Uuid.randomUuid();
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
Uuid barId2 = Uuid.randomUuid();

List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"foo",
fooId,
newPartition(new int[] {0, 1, 2, 3})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
newTopicsByNameMap(topics));

List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
topicRecords.add(
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
)
);

TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();

topicRecords = new ArrayList<>();
/* Test the following:
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
*/
topicRecords.addAll(List.of(
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(fooId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(fooId2).
setName("foo"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
setIsr(List.of(0, 1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1, 2, 3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(barId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new TopicRecord().setTopicId(barId2).
setName("bar"),
TOPIC_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new PartitionRecord().setTopicId(barId2).
setPartitionId(0).
setLeader(0).
setIsr(List.of(1)).
setEligibleLeaderReplicas(List.of(2)).
setLastKnownElr(List.of(3)),
PARTITION_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("foo"),
CLEAR_ELR_RECORD.highestSupportedVersion()
),
new ApiMessageAndVersion(
new ClearElrRecord().setTopicName("bar"),
CLEAR_ELR_RECORD.highestSupportedVersion()
))
);
delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
image = delta.apply();
assertEquals(2, image.topicsById().size());
assertEquals(2, image.topicsByName().size());

assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
}

@Test
public void testClearElrRecordForNonExistTopic() {
TopicsImage image = new TopicsImage(newTopicsByIdMap(List.of()),
Expand Down