Skip to content

Commit

Permalink
KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from fol…
Browse files Browse the repository at this point in the history
…lower concurrently (apache#14051)

When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen.

Reviewers: Luke Chen <showuon@gmail.com>
  • Loading branch information
vamossagar12 authored and rreddy-22 committed Sep 20, 2023
1 parent d83fde1 commit e07c4ee
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
9 changes: 8 additions & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -998,7 +998,14 @@ class Partition(val topicPartition: TopicPartition,
// 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch
// request broker epoch is -1 which bypasses the epoch verification.
case kRaftMetadataCache: KRaftMetadataCache =>
val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch
val mayBeReplica = getReplica(followerReplicaId)
// The topic is already deleted and we don't have any replica information. In this case, we can return false
// so as to avoid NPE
if (mayBeReplica.isEmpty) {
warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.")
return false
}
val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch
val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId)
!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
Expand Down
54 changes: 52 additions & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock

Expand All @@ -55,7 +55,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

Expand Down Expand Up @@ -1290,6 +1290,56 @@ class PartitionTest extends AbstractPartitionTest {
)
}

@Test
def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = {
val mockMetadataCache = mock(classOf[KRaftMetadataCache])
val partition = spy(new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = interBrokerProtocolVersion,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
mockMetadataCache,
logManager,
alterPartitionManager))

when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
.thenReturn(None)
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 6, leaderEpoch = 4)

val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)

val initializeTimeMs = time.milliseconds()
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List[Integer](brokerId).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")

doAnswer(_ => {
// simulate topic is deleted at the moment
partition.delete()
val replica = new Replica(remoteBrokerId, topicPartition)
partition.updateFollowerFetchState(replica, mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0)
mock(classOf[LogReadInfo])
}).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), anyBoolean(), anyBoolean())

assertDoesNotThrow(() => fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L))
}

@Test
def testInvalidAlterPartitionRequestsAreNotRetried(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
Expand Down

0 comments on commit e07c4ee

Please sign in to comment.