Skip to content

Commit

Permalink
KAFKA-14337; Correctly remove topicsWithCollisionChars after topic de…
Browse files Browse the repository at this point in the history
…letion (apache#12790)

In apache#11910 , we added a feature to prevent topics with conflicting metrics names from being created. We added a map to store the normalized topic name to the topic names, but we didn't remove it correctly while deleting topics. This PR fixes this bug and add a test.

Reviewers: Igor Soarez <i@soarez.me>, dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
showuon authored and rutvijmehta-harness committed Feb 9, 2024
1 parent fb29519 commit b750385
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -470,6 +471,30 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTopicWithCollidingCharDeletionAndCreateAgain(quorum: String): Unit = {
// create the topic with colliding chars
val topicWithCollidingChar = "test.a"
val createOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
"--topic", topicWithCollidingChar))
createAndWaitTopic(createOpts)

// delete the topic
val deleteOpts = new TopicCommandOptions(Array("--topic", topicWithCollidingChar))

if (!isKRaftTest()) {
val deletePath = DeleteTopicsTopicZNode.path(topicWithCollidingChar)
assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
}
topicService.deleteTopic(deleteOpts)
TestUtils.verifyTopicDeletion(zkClientOrNull, topicWithCollidingChar, 1, brokers)

val createTopic: Executable = () => createAndWaitTopic(createOpts)
assertDoesNotThrow(createTopic)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteInternalTopic(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ static Map<String, String> translateCreationConfigs(CreateableTopicConfigCollect
* Since we reject topic creations that would collide, under normal conditions the
* sets in this map should only have a size of 1. However, if the cluster was
* upgraded from a version prior to KAFKA-13743, it may be possible to have more
* values here, since collidiing topic names will be "grandfathered in."
* values here, since colliding topic names will be "grandfathered in."
*/
private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;

Expand Down Expand Up @@ -527,7 +527,7 @@ public void replay(RemoveTopicRecord record) {
if (colliding != null) {
colliding.remove(topic.name);
if (colliding.isEmpty()) {
topicsWithCollisionChars.remove(topic.name);
topicsWithCollisionChars.remove(normalizedName);
}
}
}
Expand Down

0 comments on commit b750385

Please sign in to comment.