Skip to content

Commit

Permalink
KAFKA-14920: Address timeouts and out of order sequences (apache#14033)
Browse files Browse the repository at this point in the history
When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc).

Reviewers:  David Jacot <david.jacot@gmail.com>,  Artem Livshits <alivshits@confluent.io>
  • Loading branch information
jolshan committed Jul 24, 2023
1 parent 84691b1 commit 38781f9
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 63 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -577,9 +577,9 @@ class Partition(val topicPartition: TopicPartition,
}

// Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
def maybeStartTransactionVerification(producerId: Long): Object = {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId)
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
case None => throw new NotLeaderOrFollowerException();
}
}
Expand Down
22 changes: 16 additions & 6 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Expand Up @@ -581,18 +581,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return null.
*/
def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
if (hasOngoingTransaction(producerId))
null
else
getOrMaybeCreateVerificationGuard(producerId, true)
maybeCreateVerificationGuard(producerId, sequence, epoch)
}

/**
* Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
* Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
*/
def getOrMaybeCreateVerificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId, createIfAbsent)
def maybeCreateVerificationGuard(producerId: Long,
sequence: Int,
epoch: Short): Object = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
}

/**
* If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
*/
def verificationGuard(producerId: Long): Object = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId)
if (entry != null) entry.verificationGuard else null
}

Expand Down Expand Up @@ -1042,7 +1051,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && (requestVerificationGuard != getOrMaybeCreateVerificationGuard(batch.producerId) || requestVerificationGuard == null)
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() &&
(requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Expand Up @@ -862,7 +862,8 @@ class ReplicaManager(val config: KafkaConfig,

if (transactionalBatches.nonEmpty) {
// We return verification guard if the partition needs to be verified. If no state is present, no need to verify.
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(records.firstBatch.producerId)
val firstBatch = records.firstBatch
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
if (verificationGuard != null) {
verificationGuards.put(topicPartition, verificationGuard)
unverifiedEntries.put(topicPartition, records)
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Expand Up @@ -998,7 +998,7 @@ class PartitionTest extends AbstractPartitionTest {
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L,
producerId = 2L)
val verificationGuard = partition.maybeStartTransactionVerification(2L)
val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0)
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching, verificationGuard)

def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = {
Expand Down Expand Up @@ -3390,20 +3390,20 @@ class PartitionTest extends AbstractPartitionTest {
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))

// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
val verificationGuard = partition.maybeStartTransactionVerification(producerId)
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNotNull(verificationGuard)

// With the wrong verification guard, append should fail.
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(),
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))

// We should return the same verification object when we still need to verify. Append should proceed.
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId)
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertEquals(verificationGuard, verificationGuard2)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)

// We should no longer need a verification object. Future appends without verification guard will also succeed.
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId)
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNull(verificationGuard3)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
}
Expand Down
57 changes: 48 additions & 9 deletions core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
Expand Up @@ -200,7 +200,8 @@ class ProducerStateManagerTest {
val producerEpoch = 0.toShort
val offset = 992342L
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT)
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT,
stateManager.maybeCreateVerificationStateEntry(producerId, seq, producerEpoch))

val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
Expand Down Expand Up @@ -388,7 +389,8 @@ class ProducerStateManagerTest {
partition,
producerId,
ProducerStateEntry.empty(producerId),
AppendOrigin.CLIENT
AppendOrigin.CLIENT,
stateManager.maybeCreateVerificationStateEntry(producerId, 0, producerEpoch)
)
val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset)
producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
Expand Down Expand Up @@ -1089,37 +1091,74 @@ class ProducerStateManagerTest {

@Test
def testEntryForVerification(): Unit = {
val originalEntry = stateManager.verificationStateEntry(producerId, true)
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
val originalEntryVerificationGuard = originalEntry.verificationGuard()

def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
val entry = stateManager.verificationStateEntry(producerId, false)
val entry = stateManager.verificationStateEntry(producerId)
assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
assertEquals(entry.verificationGuard, newEntry.verificationGuard)
}

// If we already have an entry, reuse it.
val updatedEntry = stateManager.verificationStateEntry(producerId, true)
val updatedEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
verifyEntry(producerId, updatedEntry)

// Add the transactional data and clear the entry.
append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
stateManager.clearVerificationStateEntry(producerId)
assertNull(stateManager.verificationStateEntry(producerId, false))
assertNull(stateManager.verificationStateEntry(producerId))
}

@Test
def testSequenceAndEpochInVerificationEntry(): Unit = {
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 1, 0)
val originalEntryVerificationGuard = originalEntry.verificationGuard()

def verifyEntry(producerId: Long, newEntry: VerificationStateEntry, expectedSequence: Int, expectedEpoch: Short): Unit = {
val entry = stateManager.verificationStateEntry(producerId)
assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
assertEquals(entry.verificationGuard, newEntry.verificationGuard)
assertEquals(expectedSequence, entry.lowestSequence)
assertEquals(expectedEpoch, entry.epoch)
}
verifyEntry(producerId, originalEntry, 1, 0)

// If we see a lower sequence, update to the lower one.
val updatedEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
verifyEntry(producerId, updatedEntry, 0, 0)

// If we see a new epoch that is higher, update the sequence.
val updatedEntryNewEpoch = stateManager.maybeCreateVerificationStateEntry(producerId, 2, 1)
verifyEntry(producerId, updatedEntryNewEpoch, 2, 1)

// Ignore a lower epoch.
val updatedEntryOldEpoch = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
verifyEntry(producerId, updatedEntryOldEpoch, 2, 1)
}

@Test
def testThrowOutOfOrderSequenceWithVerificationSequenceCheck(): Unit = {
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)

// Trying to append with a higher sequence should fail
assertThrows(classOf[OutOfOrderSequenceException], () => append(stateManager, producerId, 0, 4, offset = 0, isTransactional = true))

assertEquals(originalEntry, stateManager.verificationStateEntry(producerId))
}

@Test
def testVerificationStateEntryExpiration(): Unit = {
val originalEntry = stateManager.verificationStateEntry(producerId, true)
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)

// Before timeout we do not remove. Note: Accessing the verification entry does not update the time.
time.sleep(producerStateManagerConfig.producerIdExpirationMs / 2)
stateManager.removeExpiredProducers(time.milliseconds())
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId, false))
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId))

time.sleep((producerStateManagerConfig.producerIdExpirationMs / 2) + 1)
stateManager.removeExpiredProducers(time.milliseconds())
assertNull(stateManager.verificationStateEntry(producerId, false))
assertNull(stateManager.verificationStateEntry(producerId))
}

private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
Expand Down
37 changes: 19 additions & 18 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Expand Up @@ -3677,7 +3677,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

val idempotentRecords = MemoryRecords.withIdempotentRecords(
CompressionType.NONE,
Expand All @@ -3688,15 +3688,6 @@ class UnifiedLogTest {
new SimpleRecord("2".getBytes)
)

val verificationGuard = log.maybeStartTransactionVerification(producerId)
assertNotNull(verificationGuard)

log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))

// Since we wrote idempotent records, we keep verification guard.
assertEquals(verificationGuard, log.getOrMaybeCreateVerificationGuard(producerId))

val transactionalRecords = MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
producerId,
Expand All @@ -3706,13 +3697,23 @@ class UnifiedLogTest {
new SimpleRecord("2".getBytes)
)

val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence + 2, producerEpoch)
assertNotNull(verificationGuard)

log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))

// Since we wrote idempotent records, we keep verification guard.
assertEquals(verificationGuard, log.verificationGuard(producerId))

// Now write the transactional records
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId))
// Verification guard should be cleared now.
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

// A subsequent maybeStartTransactionVerification will be empty since we are already verified.
assertNull(log.maybeStartTransactionVerification(producerId))
assertNull(log.maybeStartTransactionVerification(producerId, sequence + 2, producerEpoch))

val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
Expand All @@ -3722,10 +3723,10 @@ class UnifiedLogTest {

log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

// A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
val newVerificationGuard = log.maybeStartTransactionVerification(producerId)
val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence + 3, producerEpoch)
assertNotNull(newVerificationGuard)
assertNotEquals(verificationGuard, newVerificationGuard)
}
Expand All @@ -3739,7 +3740,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)

val verificationGuard = log.maybeStartTransactionVerification(producerId)
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
assertNotNull(verificationGuard)

val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
Expand All @@ -3750,7 +3751,7 @@ class UnifiedLogTest {

log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))
}

@Test
Expand All @@ -3763,7 +3764,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

val transactionalRecords = MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
Expand All @@ -3774,7 +3775,7 @@ class UnifiedLogTest {
new SimpleRecord("2".getBytes)
)

val verificationGuard = log.maybeStartTransactionVerification(producerId)
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
// Append should not throw error.
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}
Expand Down

0 comments on commit 38781f9

Please sign in to comment.