Skip to content

Commit

Permalink
KAFKA-9946; Partition deletion event should only be sent if deletion …
Browse files Browse the repository at this point in the history
…was requested in the StopReplica request (apache#8609)

This patch fixes a regression in the `StopReplica` response handling. We should only send the event on receiving the `StopReplica` response if we had requested deletion in the request.

Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac authored and jwijgerd committed May 14, 2020
1 parent f715a62 commit c5ed1ee
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 24 deletions.
45 changes: 25 additions & 20 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Expand Up @@ -550,25 +550,20 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
else 0

def stopReplicaPartitionDeleteResponseCallback(brokerId: Int)(response: AbstractResponse): Unit = {
def responseCallback(brokerId: Int, isPartitionDeleted: TopicPartition => Boolean)
(response: AbstractResponse): Unit = {
val stopReplicaResponse = response.asInstanceOf[StopReplicaResponse]
val partitionErrorsForDeletingTopics = stopReplicaResponse.partitionErrors.asScala.iterator.filter { pe =>
controllerContext.isTopicDeletionInProgress(pe.topicName)
}.map(pe => new TopicPartition(pe.topicName, pe.partitionIndex) -> Errors.forCode(pe.errorCode)).toMap

val partitionErrorsForDeletingTopics = mutable.Map.empty[TopicPartition, Errors]
stopReplicaResponse.partitionErrors.asScala.foreach { pe =>
val tp = new TopicPartition(pe.topicName, pe.partitionIndex)
if (controllerContext.isTopicDeletionInProgress(pe.topicName) &&
isPartitionDeleted(tp)) {
partitionErrorsForDeletingTopics += tp -> Errors.forCode(pe.errorCode)
}
}
if (partitionErrorsForDeletingTopics.nonEmpty)
sendEvent(TopicDeletionStopReplicaResponseReceived(brokerId, stopReplicaResponse.error, partitionErrorsForDeletingTopics))
}

def sendStopReplicaRequest(brokerId: Int,
brokerEpoch: Long,
deletePartitions: Boolean,
topicStates: mutable.Map[String, StopReplicaTopicState]): Unit = {
val stopReplicaRequestBuilder = new StopReplicaRequest.Builder(
stopReplicaRequestVersion, controllerId, controllerEpoch, brokerEpoch,
deletePartitions, topicStates.values.toBuffer.asJava)
val callback = stopReplicaPartitionDeleteResponseCallback(brokerId) _
sendRequest(brokerId, stopReplicaRequestBuilder, callback)
sendEvent(TopicDeletionStopReplicaResponseReceived(brokerId, stopReplicaResponse.error,
partitionErrorsForDeletingTopics))
}

stopReplicaRequestMap.foreach { case (brokerId, partitionStates) =>
Expand All @@ -590,7 +585,11 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,

stateChangeLog.info(s"Sending StopReplica request for ${partitionStates.size} " +
s"replicas to broker $brokerId")
sendStopReplicaRequest(brokerId, brokerEpoch, false, stopReplicaTopicState)
val stopReplicaRequestBuilder = new StopReplicaRequest.Builder(
stopReplicaRequestVersion, controllerId, controllerEpoch, brokerEpoch,
false, stopReplicaTopicState.values.toBuffer.asJava)
sendRequest(brokerId, stopReplicaRequestBuilder,
responseCallback(brokerId, tp => partitionStates.get(tp).exists(_.deletePartition)))
} else {
var numPartitionStateWithDelete = 0
var numPartitionStateWithoutDelete = 0
Expand All @@ -613,13 +612,19 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
if (topicStatesWithDelete.nonEmpty) {
stateChangeLog.info(s"Sending StopReplica request (delete = true) for " +
s"$numPartitionStateWithDelete replicas to broker $brokerId")
sendStopReplicaRequest(brokerId, brokerEpoch, true, topicStatesWithDelete)
val stopReplicaRequestBuilder = new StopReplicaRequest.Builder(
stopReplicaRequestVersion, controllerId, controllerEpoch, brokerEpoch,
true, topicStatesWithDelete.values.toBuffer.asJava)
sendRequest(brokerId, stopReplicaRequestBuilder, responseCallback(brokerId, _ => true))
}

if (topicStatesWithoutDelete.nonEmpty) {
stateChangeLog.info(s"Sending StopReplica request (delete = false) for " +
s"$numPartitionStateWithoutDelete replicas to broker $brokerId")
sendStopReplicaRequest(brokerId, brokerEpoch, false, topicStatesWithoutDelete)
val stopReplicaRequestBuilder = new StopReplicaRequest.Builder(
stopReplicaRequestVersion, controllerId, controllerEpoch, brokerEpoch,
false, topicStatesWithoutDelete.values.toBuffer.asJava)
sendRequest(brokerId, stopReplicaRequestBuilder)
}
}
}
Expand Down
Expand Up @@ -426,8 +426,15 @@ class ControllerChannelManagerTest {

@Test
def testStopReplicaRequestsWhileTopicQueuedForDeletion(): Unit = {
for (apiVersion <- ApiVersion.allVersions) {
testStopReplicaRequestsWhileTopicQueuedForDeletion(apiVersion)
}
}

private def testStopReplicaRequestsWhileTopicQueuedForDeletion(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val batch = new MockControllerBrokerRequestBatch(context)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

val partitions = Map(
new TopicPartition("foo", 0) -> LeaderAndDelete(1, true),
Expand Down Expand Up @@ -456,7 +463,7 @@ class ControllerChannelManagerTest {
assertEquals(1, sentStopReplicaRequests.size)

val stopReplicaRequest = sentStopReplicaRequests.head
assertEquals(partitionStates(partitions, context.topicsQueuedForDeletion),
assertEquals(partitionStates(partitions, context.topicsQueuedForDeletion, stopReplicaRequest.version),
stopReplicaRequest.partitionStates().asScala)

// No events will be sent after the response returns
Expand All @@ -466,8 +473,15 @@ class ControllerChannelManagerTest {

@Test
def testStopReplicaRequestsWhileTopicDeletionStarted(): Unit = {
for (apiVersion <- ApiVersion.allVersions) {
testStopReplicaRequestsWhileTopicDeletionStarted(apiVersion)
}
}

private def testStopReplicaRequestsWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val batch = new MockControllerBrokerRequestBatch(context)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

val partitions = Map(
new TopicPartition("foo", 0) -> LeaderAndDelete(1, true),
Expand Down Expand Up @@ -496,7 +510,7 @@ class ControllerChannelManagerTest {
assertEquals(1, sentStopReplicaRequests.size)

val stopReplicaRequest = sentStopReplicaRequests.head
assertEquals(partitionStates(partitions, context.topicsQueuedForDeletion),
assertEquals(partitionStates(partitions, context.topicsQueuedForDeletion, stopReplicaRequest.version),
stopReplicaRequest.partitionStates().asScala)

// When the topic is being deleted, we should provide a callback which sends
Expand All @@ -512,6 +526,53 @@ class ControllerChannelManagerTest {
assertEquals(partitions.keys.filter(_.topic == "foo"), includedPartitions)
}

@Test
def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(): Unit = {
for (apiVersion <- ApiVersion.allVersions) {
testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(apiVersion)
}
}

private def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = {
val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
val config = createConfig(interBrokerProtocolVersion)
val batch = new MockControllerBrokerRequestBatch(context, config)

val partitions = Map(
new TopicPartition("foo", 0) -> LeaderAndDelete(1, false),
new TopicPartition("foo", 1) -> LeaderAndDelete(2, false),
new TopicPartition("bar", 1) -> LeaderAndDelete(3, false)
)

context.queueTopicDeletion(Set("foo"))
context.beginTopicDeletion(Set("foo"))

batch.newBatch()
partitions.foreach { case (partition, LeaderAndDelete(leaderAndIsr, deletePartition)) =>
context.partitionLeadershipInfo.put(partition, LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch))
batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition)
}
batch.sendRequestsToBrokers(controllerEpoch)

assertEquals(0, batch.sentEvents.size)
assertEquals(1, batch.sentRequests.size)
assertTrue(batch.sentRequests.contains(2))

val sentRequests = batch.sentRequests(2)
assertEquals(1, sentRequests.size)

val sentStopReplicaRequests = batch.collectStopReplicaRequestsFor(2)
assertEquals(1, sentStopReplicaRequests.size)

val stopReplicaRequest = sentStopReplicaRequests.head
assertEquals(partitionStates(partitions, context.topicsQueuedForDeletion, stopReplicaRequest.version),
stopReplicaRequest.partitionStates().asScala)

// No events should be fired
applyStopReplicaResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
assertEquals(0, batch.sentEvents.size)
}

@Test
def testMixedDeleteAndNotDeleteStopReplicaRequests(): Unit = {
testMixedDeleteAndNotDeleteStopReplicaRequests(ApiVersion.latestVersion,
Expand Down

0 comments on commit c5ed1ee

Please sign in to comment.