Skip to content

Commit a7a89d2

Browse files
KAFKA-19383: Handle the deleted topics when applying ClearElrRecord
1 parent cc25d21 commit a7a89d2

File tree

2 files changed

+250
-8
lines changed

2 files changed

+250
-8
lines changed

metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,26 +95,43 @@ public void replay(PartitionChangeRecord record) {
9595
topicDelta.replay(record);
9696
}
9797

98+
private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord record) {
99+
// Only apply the record if the topic is not deleted.
100+
if (!deletedTopicIds.contains(topicId)) {
101+
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
102+
topicDelta.replay(record);
103+
}
104+
}
105+
106+
// When replaying the ClearElrRecord, we need to first find the latest topic ID associated with the topic(s) because
107+
// multiple topic IDs for the same topic in a TopicsDelta is possible in the event of topic deletion and recreation.
108+
// Second, we should not add the topicDelta if the given topic ID has been deleted. So that we don't leak the
109+
// deleted topic ID.
98110
public void replay(ClearElrRecord record) {
99111
if (!record.topicName().isEmpty()) {
100-
Uuid topicId;
101-
if (image.getTopic(record.topicName()) != null) {
102-
topicId = image.getTopic(record.topicName()).id();
103-
} else {
112+
Uuid topicId = null;
113+
// CreatedTopics contains the latest topic IDs. It should be checked first in case the topic is deleted and
114+
// created in the same batch.
115+
if (createdTopics.containsKey(record.topicName())) {
104116
topicId = createdTopics.get(record.topicName());
117+
} else if (image.getTopic(record.topicName()) != null) {
118+
topicId = image.getTopic(record.topicName()).id();
105119
}
120+
106121
if (topicId == null) {
107122
throw new RuntimeException("Unable to clear elr for topic with name " +
108123
record.topicName() + ": no such topic found.");
109124
}
110-
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
111-
topicDelta.replay(record);
125+
126+
maybeReplayClearElrRecord(topicId, record);
112127
} else {
113128
// Update all the existing topics
114129
image.topicsById().forEach((topicId, image) -> {
115-
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
116-
topicDelta.replay(record);
130+
maybeReplayClearElrRecord(topicId, record);
117131
});
132+
createdTopicIds().forEach((topicId -> {
133+
maybeReplayClearElrRecord(topicId, record);
134+
}));
118135
}
119136
}
120137

metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,231 @@ public void testClearElrRecords() {
428428
assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length);
429429
}
430430

431+
@Test
432+
public void testClearElrRecordOnNonExistingTopic() {
433+
TopicsImage image = TopicsImage.EMPTY;
434+
435+
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
436+
topicRecords.addAll(List.of(
437+
new ApiMessageAndVersion(
438+
new ClearElrRecord().setTopicName("foo"),
439+
CLEAR_ELR_RECORD.highestSupportedVersion()
440+
))
441+
);
442+
TopicsDelta delta = new TopicsDelta(image, __ -> null);
443+
assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords));
444+
}
445+
446+
@Test
447+
public void testClearElrRecords_All_ForDeletedTopics() {
448+
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
449+
Uuid fooId2 = Uuid.randomUuid();
450+
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
451+
Uuid barId2 = Uuid.randomUuid();
452+
453+
List<TopicImage> topics = new ArrayList<>();
454+
topics.add(
455+
newTopicImage(
456+
"foo",
457+
fooId,
458+
NULL_MIRROR_TOPIC_STATE,
459+
newPartition(new int[] {0, 1, 2, 3})
460+
)
461+
);
462+
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
463+
newTopicsByNameMap(topics), ImmutableMap.empty(), ImmutableMap.empty());
464+
465+
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
466+
topicRecords.add(
467+
new ApiMessageAndVersion(
468+
new PartitionRecord().setTopicId(fooId).
469+
setPartitionId(0).
470+
setLeader(0).
471+
setIsr(List.of(1, 2, 3)),
472+
PARTITION_RECORD.highestSupportedVersion()
473+
)
474+
);
475+
476+
TopicsDelta delta = new TopicsDelta(image, __ -> null);
477+
RecordTestUtils.replayAll(delta, topicRecords);
478+
image = delta.apply();
479+
480+
topicRecords = new ArrayList<>();
481+
/* Test the following:
482+
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
483+
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
484+
*/
485+
topicRecords.addAll(List.of(
486+
new ApiMessageAndVersion(
487+
new RemoveTopicRecord().setTopicId(fooId),
488+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
489+
),
490+
new ApiMessageAndVersion(
491+
new TopicRecord().setTopicId(fooId2).
492+
setName("foo"),
493+
TOPIC_RECORD.highestSupportedVersion()
494+
),
495+
new ApiMessageAndVersion(
496+
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
497+
setIsr(List.of(0, 1)).
498+
setEligibleLeaderReplicas(List.of(2)).
499+
setLastKnownElr(List.of(3)),
500+
PARTITION_CHANGE_RECORD.highestSupportedVersion()
501+
),
502+
new ApiMessageAndVersion(
503+
new TopicRecord().setTopicId(barId).
504+
setName("bar"),
505+
TOPIC_RECORD.highestSupportedVersion()
506+
),
507+
new ApiMessageAndVersion(
508+
new PartitionRecord().setTopicId(barId).
509+
setPartitionId(0).
510+
setLeader(0).
511+
setIsr(List.of(1, 2, 3)),
512+
PARTITION_RECORD.highestSupportedVersion()
513+
),
514+
new ApiMessageAndVersion(
515+
new RemoveTopicRecord().setTopicId(barId),
516+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
517+
),
518+
new ApiMessageAndVersion(
519+
new TopicRecord().setTopicId(barId2).
520+
setName("bar"),
521+
TOPIC_RECORD.highestSupportedVersion()
522+
),
523+
new ApiMessageAndVersion(
524+
new PartitionRecord().setTopicId(barId2).
525+
setPartitionId(0).
526+
setLeader(0).
527+
setIsr(List.of(1)).
528+
setEligibleLeaderReplicas(List.of(2)).
529+
setLastKnownElr(List.of(3)),
530+
PARTITION_RECORD.highestSupportedVersion()
531+
),
532+
new ApiMessageAndVersion(
533+
new ClearElrRecord(),
534+
CLEAR_ELR_RECORD.highestSupportedVersion()
535+
))
536+
);
537+
delta = new TopicsDelta(image, __ -> null);
538+
RecordTestUtils.replayAll(delta, topicRecords);
539+
image = delta.apply();
540+
assertEquals(2, image.topicsById().size());
541+
assertEquals(2, image.topicsByName().size());
542+
543+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
544+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
545+
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
546+
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
547+
}
548+
549+
@Test
550+
public void testClearElrRecords_Single_ForDeletedTopics() {
551+
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
552+
Uuid fooId2 = Uuid.randomUuid();
553+
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
554+
Uuid barId2 = Uuid.randomUuid();
555+
556+
List<TopicImage> topics = new ArrayList<>();
557+
topics.add(
558+
newTopicImage(
559+
"foo",
560+
fooId,
561+
NULL_MIRROR_TOPIC_STATE,
562+
newPartition(new int[] {0, 1, 2, 3})
563+
)
564+
);
565+
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
566+
newTopicsByNameMap(topics), ImmutableMap.empty(), ImmutableMap.empty());
567+
568+
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
569+
topicRecords.add(
570+
new ApiMessageAndVersion(
571+
new PartitionRecord().setTopicId(fooId).
572+
setPartitionId(0).
573+
setLeader(0).
574+
setIsr(List.of(1, 2, 3)),
575+
PARTITION_RECORD.highestSupportedVersion()
576+
)
577+
);
578+
579+
TopicsDelta delta = new TopicsDelta(image, __ -> null);
580+
RecordTestUtils.replayAll(delta, topicRecords);
581+
image = delta.apply();
582+
583+
topicRecords = new ArrayList<>();
584+
/* Test the following:
585+
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
586+
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
587+
*/
588+
topicRecords.addAll(List.of(
589+
new ApiMessageAndVersion(
590+
new RemoveTopicRecord().setTopicId(fooId),
591+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
592+
),
593+
new ApiMessageAndVersion(
594+
new TopicRecord().setTopicId(fooId2).
595+
setName("foo"),
596+
TOPIC_RECORD.highestSupportedVersion()
597+
),
598+
new ApiMessageAndVersion(
599+
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
600+
setIsr(List.of(0, 1)).
601+
setEligibleLeaderReplicas(List.of(2)).
602+
setLastKnownElr(List.of(3)),
603+
PARTITION_CHANGE_RECORD.highestSupportedVersion()
604+
),
605+
new ApiMessageAndVersion(
606+
new TopicRecord().setTopicId(barId).
607+
setName("bar"),
608+
TOPIC_RECORD.highestSupportedVersion()
609+
),
610+
new ApiMessageAndVersion(
611+
new PartitionRecord().setTopicId(barId).
612+
setPartitionId(0).
613+
setLeader(0).
614+
setIsr(List.of(1, 2, 3)),
615+
PARTITION_RECORD.highestSupportedVersion()
616+
),
617+
new ApiMessageAndVersion(
618+
new RemoveTopicRecord().setTopicId(barId),
619+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
620+
),
621+
new ApiMessageAndVersion(
622+
new TopicRecord().setTopicId(barId2).
623+
setName("bar"),
624+
TOPIC_RECORD.highestSupportedVersion()
625+
),
626+
new ApiMessageAndVersion(
627+
new PartitionRecord().setTopicId(barId2).
628+
setPartitionId(0).
629+
setLeader(0).
630+
setIsr(List.of(1)).
631+
setEligibleLeaderReplicas(List.of(2)).
632+
setLastKnownElr(List.of(3)),
633+
PARTITION_RECORD.highestSupportedVersion()
634+
),
635+
new ApiMessageAndVersion(
636+
new ClearElrRecord().setTopicName("foo"),
637+
CLEAR_ELR_RECORD.highestSupportedVersion()
638+
),
639+
new ApiMessageAndVersion(
640+
new ClearElrRecord().setTopicName("bar"),
641+
CLEAR_ELR_RECORD.highestSupportedVersion()
642+
))
643+
);
644+
delta = new TopicsDelta(image, __ -> null);
645+
RecordTestUtils.replayAll(delta, topicRecords);
646+
image = delta.apply();
647+
assertEquals(2, image.topicsById().size());
648+
assertEquals(2, image.topicsByName().size());
649+
650+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
651+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
652+
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
653+
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
654+
}
655+
431656
@Test
432657
public void testClearElrRecordForNonExistTopic() {
433658
TopicsImage image = new TopicsImage(newTopicsByIdMap(List.of()),

0 commit comments

Comments
 (0)