Skip to content

Commit

Permalink
CONFLUENT: Fix exception on trying to retrieve tierable segments afte…
Browse files Browse the repository at this point in the history
…r leader failover (apache#270)

After leader failover, segments that have already been tiered may no longer be considered tierable. The criteria for whether a segment is tierable or not depends on several factors, amongst which are whether the segment has already been rolled and whether it has been flushed to disk (i.e. recovery point has advanced). Because each replica performs these operations independently, it is possible that the previous leader tiered a segment that is not yet eligible to be tiered on the new leader. This is not a problem as such. What we need to do is ensure that the archiver sees no tierable segments after the failover until the tiering criteria is met again.

This patch adds a check to see if we are in this state, and avoids trying to get a list of candidate segments, which would otherwise throw an exception as the from would be greater than the to when calling MergedLog#localLogSegments.
  • Loading branch information
dhruvilshah3 committed Apr 10, 2019
1 parent 1a8c75a commit b192b40
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/log/MergedLog.scala
Expand Up @@ -287,8 +287,13 @@ class MergedLog(private[log] val localLog: Log,
// 3. is the current active segment: we only tier immutable segments (that have been rolled already)
// 4. the segment end offset is less than the recovery point. This ensures we only upload segments that have been fsync'd.
val upperBoundOffset = Utils.min(firstUnstableOffset.map(_.messageOffset).getOrElse(logEndOffset), highWatermark, recoveryPoint)
val candidateSegments = localLogSegments(firstUntieredOffset, upperBoundOffset).toArray

// After a leader failover, it is possible that the upperBoundOffset has not moved to the point where the previous
// leader tiered segments. No segments are tierable until the upperBoundOffset catches up with the tiered segments.
if (firstUntieredOffset > upperBoundOffset)
return Iterable.empty

val candidateSegments = localLogSegments(firstUntieredOffset, upperBoundOffset).toArray
candidateSegments.lastOption match {
case Some(lastSegment) =>
nextLocalLogSegment(lastSegment) match {
Expand Down
60 changes: 59 additions & 1 deletion core/src/test/scala/unit/kafka/log/MergedLogTest.scala
Expand Up @@ -272,7 +272,6 @@ class MergedLogTest {
assertEquals(List(EpochEntry(0, 100)), log.leaderEpochCache.get.epochEntries)
}


@Test
def testSizeOfLogWithOverlap(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = Int.MaxValue, tierEnable = true, tierLocalHotsetBytes = 1)
Expand All @@ -286,6 +285,65 @@ class MergedLogTest {
assertEquals(expectedLogSize, log.size)
}

@Test
def testTierableSegments(): Unit = {
val noopScheduler = new Scheduler { // noopScheduler allows us to roll segments without scheduling a background flush
override def startup(): Unit = ()
override def shutdown(): Unit = ()
override def isStarted: Boolean = true
override def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): Unit = ()
}

val logConfig = LogTest.createLogConfig(segmentBytes = Int.MaxValue, tierEnable = true, tierLocalHotsetBytes = 1)
val log = createMergedLog(logConfig, scheduler = noopScheduler)
val messagesToWrite = 10
for (_ <- 0 until messagesToWrite) {
val segmentStr = "foo"
val messageStr = "bar"
def createRecords = TestUtils.singletonRecords(("test" + segmentStr + messageStr).getBytes)
log.appendAsLeader(createRecords, 0)
log.roll()
}

val tierPartitionState = tierMetadataManager.tierPartitionState(log.topicPartition).get
val epoch = 0
val tieredSegments = log.localLogSegments.take(2)

// append an init message
tierPartitionState.onCatchUpComplete()
tierPartitionState.append(new TierTopicInitLeader(log.topicPartition,
epoch, java.util.UUID.randomUUID(), 0))

// append metadata for tiered segments
tieredSegments.foreach { segment =>
val tierObjectMetadata = new TierObjectMetadata(log.topicPartition,
epoch,
segment.baseOffset,
(segment.readNextOffset - segment.baseOffset - 1).toInt,
segment.readNextOffset,
segment.largestTimestamp,
segment.size,
true,
false,
0.toByte)
val appendResult = tierPartitionState.append(tierObjectMetadata)
assertEquals(AppendResult.ACCEPTED, appendResult)
}

// no segments should be tierable yet, as recovery point and highwatermark have not moved
assertEquals(0, log.tierableLogSegments.size)

// no segments are tierable after recovery point and highwatermark move to the end of first tiered segment
log.onHighWatermarkIncremented(tieredSegments.head.readNextOffset - 1)
log.flush(1)
assertEquals(0, log.tierableLogSegments.size)

// all non tiered segments become tierable after recovery point and highwatermark move to the end of the log
log.onHighWatermarkIncremented(log.logEndOffset)
log.flush(log.logEndOffset)
assertEquals(log.localLogSegments.size - tieredSegments.size - 1, log.tierableLogSegments.size)
}

private def logRanges(log: MergedLog): LogRanges = {
val tierPartitionState = tierMetadataManager.tierPartitionState(log.topicPartition).get

Expand Down

0 comments on commit b192b40

Please sign in to comment.