diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 970f5146b7cc..7ea4c075128e 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -998,7 +998,14 @@ class Partition(val topicPartition: TopicPartition, // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch // request broker epoch is -1 which bypasses the epoch verification. case kRaftMetadataCache: KRaftMetadataCache => - val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch + val mayBeReplica = getReplica(followerReplicaId) + // The topic is already deleted and we don't have any replica information. In this case, we can return false + // so as to avoid NPE + if (mayBeReplica.isEmpty) { + warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.") + return false + } + val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId) !kRaftMetadataCache.isBrokerFenced(followerReplicaId) && !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) && diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index b451dd857841..131db5e9032c 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -37,7 +37,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.{any, anyString} +import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong, anyString} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -55,7 +55,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -1290,6 +1290,56 @@ class PartitionTest extends AbstractPartitionTest { ) } + @Test + def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = { + val mockMetadataCache = mock(classOf[KRaftMetadataCache]) + val partition = spy(new Partition(topicPartition, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = interBrokerProtocolVersion, + localBrokerId = brokerId, + () => defaultBrokerEpoch(brokerId), + time, + alterPartitionListener, + delayedOperations, + mockMetadataCache, + logManager, + alterPartitionManager)) + + when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition))) + .thenReturn(None) + val log = logManager.getOrCreateLog(topicPartition, topicId = None) + seedLogData(log, numRecords = 6, leaderEpoch = 4) + + val controllerEpoch = 0 + val leaderEpoch = 5 + val remoteBrokerId = brokerId + 1 + val replicas = List[Integer](brokerId, remoteBrokerId).asJava + + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + + val initializeTimeMs = time.milliseconds() + assertTrue(partition.makeLeader( + new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(brokerId) + .setLeaderEpoch(leaderEpoch) + .setIsr(List[Integer](brokerId).asJava) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setIsNew(true), + offsetCheckpoints, None), "Expected become leader transition to succeed") + + doAnswer(_ => { + // simulate topic is deleted at the moment + partition.delete() + val replica = new Replica(remoteBrokerId, topicPartition) + partition.updateFollowerFetchState(replica, mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0) + mock(classOf[LogReadInfo]) + }).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), anyBoolean(), anyBoolean()) + + assertDoesNotThrow(() => fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)) + } + @Test def testInvalidAlterPartitionRequestsAreNotRetried(): Unit = { val log = logManager.getOrCreateLog(topicPartition, topicId = None)