Skip to content

Commit

Permalink
MINOR: KRaftMetadataCache.getPartitionInfo must set all relevant fields
Browse files Browse the repository at this point in the history
Fix a case where KRaftMetadataCache.getPartitionInfo was not setting all the PartitionInfo fields it
should have been. Add a regression test.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
ahuang98 committed Apr 17, 2023
1 parent 2c1cf03 commit 7159f6c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
Expand Up @@ -228,6 +228,7 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
flatMap(_.node(listenerName.value()).asScala).toSeq
}

// Does NOT include offline replica metadata
override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
Option(_currentImage.topics().getTopic(topicName)).
flatMap(topic => Option(topic.partitions().get(partitionId))).
Expand All @@ -238,7 +239,8 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
setLeader(partition.leader).
setLeaderEpoch(partition.leaderEpoch).
setIsr(Replicas.toList(partition.isr)).
setZkVersion(partition.partitionEpoch)))
setZkVersion(partition.partitionEpoch).
setReplicas(Replicas.toList(partition.replicas))))
}

override def numPartitions(topicName: String): Option[Int] = {
Expand Down
59 changes: 59 additions & 0 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Expand Up @@ -706,4 +706,63 @@ class MetadataCacheTest {
assertEquals(100L, metadataCache.getAliveBrokerEpoch(0).getOrElse(-1L))
assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).getOrElse(-1L))
}

@ParameterizedTest
@MethodSource(Array("cacheProvider"))
def testGetPartitionInfo(cache: MetadataCache): Unit = {
val topic = "topic"
val partitionIndex = 0
val controllerEpoch = 1
val leader = 0
val leaderEpoch = 0
val isr = asList[Integer](2, 3, 0)
val zkVersion = 3
val replicas = asList[Integer](2, 3, 0, 1, 4)
val offlineReplicas = asList[Integer](0)

val partitionStates = Seq(new UpdateMetadataPartitionState()
.setTopicName(topic)
.setPartitionIndex(partitionIndex)
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(zkVersion)
.setReplicas(replicas)
.setOfflineReplicas(offlineReplicas))

val version = ApiKeys.UPDATE_METADATA.latestVersion

val controllerId = 2
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
val brokers = Seq(new UpdateMetadataBroker()
.setId(0)
.setRack("rack1")
.setEndpoints(Seq(new UpdateMetadataEndpoint()
.setHost("foo")
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
.setListener(listenerName.value)).asJava))
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(), false).build()
MetadataCacheTest.updateCache(cache, updateMetadataRequest)

val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
assertEquals(topic, partitionState.topicName())
assertEquals(partitionIndex, partitionState.partitionIndex())
if (cache.isInstanceOf[ZkMetadataCache]) {
assertEquals(controllerEpoch, partitionState.controllerEpoch())
} else {
assertEquals(-1, partitionState.controllerEpoch())
}
assertEquals(leader, partitionState.leader())
assertEquals(leaderEpoch, partitionState.leaderEpoch())
assertEquals(isr, partitionState.isr())
assertEquals(zkVersion, partitionState.zkVersion())
assertEquals(replicas, partitionState.replicas())
if (cache.isInstanceOf[ZkMetadataCache]) {
assertEquals(offlineReplicas, partitionState.offlineReplicas())
}
}
}

0 comments on commit 7159f6c

Please sign in to comment.