From 46a8a2877b18600abb608da5afcb3bef13e882f0 Mon Sep 17 00:00:00 2001 From: vamossagar12 Date: Wed, 26 Jul 2023 07:21:23 +0530 Subject: [PATCH] KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently (#14051) When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen. Reviewers: Luke Chen --- .../main/scala/kafka/cluster/Partition.scala | 9 +++- .../unit/kafka/cluster/PartitionTest.scala | 54 ++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 970f5146b7cc..7ea4c075128e 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -998,7 +998,14 @@ class Partition(val topicPartition: TopicPartition, // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch // request broker epoch is -1 which bypasses the epoch verification. case kRaftMetadataCache: KRaftMetadataCache => - val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch + val mayBeReplica = getReplica(followerReplicaId) + // The topic is already deleted and we don't have any replica information. In this case, we can return false + // so as to avoid NPE + if (mayBeReplica.isEmpty) { + warn(s"The replica state of replica ID:[$followerReplicaId] doesn't exist in the leader node. It might because the topic is already deleted.") + return false + } + val storedBrokerEpoch = mayBeReplica.get.stateSnapshot.brokerEpoch val cachedBrokerEpoch = kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId) !kRaftMetadataCache.isBrokerFenced(followerReplicaId) && !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) && diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index b451dd857841..131db5e9032c 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -37,7 +37,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.{any, anyString} +import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong, anyString} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -55,7 +55,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -1290,6 +1290,56 @@ class PartitionTest extends AbstractPartitionTest { ) } + @Test + def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = { + val mockMetadataCache = mock(classOf[KRaftMetadataCache]) + val partition = spy(new Partition(topicPartition, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = interBrokerProtocolVersion, + localBrokerId = brokerId, + () => defaultBrokerEpoch(brokerId), + time, + alterPartitionListener, + delayedOperations, + mockMetadataCache, + logManager, + alterPartitionManager)) + + when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition))) + .thenReturn(None) + val log = logManager.getOrCreateLog(topicPartition, topicId = None) + seedLogData(log, numRecords = 6, leaderEpoch = 4) + + val controllerEpoch = 0 + val leaderEpoch = 5 + val remoteBrokerId = brokerId + 1 + val replicas = List[Integer](brokerId, remoteBrokerId).asJava + + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + + val initializeTimeMs = time.milliseconds() + assertTrue(partition.makeLeader( + new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(brokerId) + .setLeaderEpoch(leaderEpoch) + .setIsr(List[Integer](brokerId).asJava) + .setPartitionEpoch(1) + .setReplicas(replicas) + .setIsNew(true), + offsetCheckpoints, None), "Expected become leader transition to succeed") + + doAnswer(_ => { + // simulate topic is deleted at the moment + partition.delete() + val replica = new Replica(remoteBrokerId, topicPartition) + partition.updateFollowerFetchState(replica, mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0) + mock(classOf[LogReadInfo]) + }).when(partition).fetchRecords(any(), any(), anyLong(), anyInt(), anyBoolean(), anyBoolean()) + + assertDoesNotThrow(() => fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)) + } + @Test def testInvalidAlterPartitionRequestsAreNotRetried(): Unit = { val log = logManager.getOrCreateLog(topicPartition, topicId = None)