Skip to content

Commit

Permalink
MINOR Refactored the existing CheckpointFile in core module, moved to…
Browse files Browse the repository at this point in the history
… server-common module and introduced it as SnapshotFile. (apache#11060)

MINOR Refactored the existing CheckpointFile in core module, moved to server-common module.

Refactored CheckpointFile to server-common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile.
This will be used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation.
Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile.

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
satishd authored and Ralph Debusmann committed Dec 22, 2021
1 parent 849c8ca commit a1f2cb9
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 163 deletions.
142 changes: 0 additions & 142 deletions core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.checkpoints

import kafka.server.LogDirFailureChannel
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.server.common.CheckpointFile
import CheckpointFile.EntryFormatter

import java.io._
import scala.collection.Seq
import scala.jdk.CollectionConverters._

class CheckpointFileWithFailureHandler[T](val file: File,
version: Int,
formatter: EntryFormatter[T],
logDirFailureChannel: LogDirFailureChannel,
logDir: String) {
private val checkpointFile = new CheckpointFile[T](file, version, formatter)

def write(entries: Iterable[T]): Unit = {
try {
checkpointFile.write(entries.toSeq.asJava)
} catch {
case e: IOException =>
val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}

def read(): Seq[T] = {
try {
checkpointFile.read().asScala
} catch {
case e: IOException =>
val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
*/
package kafka.server.checkpoints

import java.io._
import java.util.regex.Pattern

import kafka.server.LogDirFailureChannel
import kafka.server.epoch.EpochEntry
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter

import java.io._
import java.util.Optional
import java.util.regex.Pattern
import scala.collection._

trait LeaderEpochCheckpoint {
Expand All @@ -36,15 +37,15 @@ object LeaderEpochCheckpointFile {

def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)

object Formatter extends CheckpointFileFormatter[EpochEntry] {
object Formatter extends EntryFormatter[EpochEntry] {

override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
override def toString(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"

override def fromLine(line: String): Option[EpochEntry] = {
override def fromString(line: String): Optional[EpochEntry] = {
WhiteSpacesPattern.split(line) match {
case Array(epoch, offset) =>
Some(EpochEntry(epoch.toInt, offset.toLong))
case _ => None
Optional.of(EpochEntry(epoch.toInt, offset.toLong))
case _ => Optional.empty()
}
}

Expand All @@ -65,7 +66,7 @@ object LeaderEpochCheckpointFile {
class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
import LeaderEpochCheckpointFile._

val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
val checkpoint = new CheckpointFileWithFailureHandler[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)

def write(epochs: Iterable[EpochEntry]): Unit = checkpoint.write(epochs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,30 @@
*/
package kafka.server.checkpoints

import java.io._
import java.util.regex.Pattern

import kafka.server.LogDirFailureChannel
import kafka.server.epoch.EpochEntry
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter

import java.io._
import java.util.Optional
import java.util.regex.Pattern
import scala.collection._

object OffsetCheckpointFile {
private val WhiteSpacesPattern = Pattern.compile("\\s+")
private[checkpoints] val CurrentVersion = 0

object Formatter extends CheckpointFileFormatter[(TopicPartition, Long)] {
override def toLine(entry: (TopicPartition, Long)): String = {
object Formatter extends EntryFormatter[(TopicPartition, Long)] {
override def toString(entry: (TopicPartition, Long)): String = {
s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
}

override def fromLine(line: String): Option[(TopicPartition, Long)] = {
override def fromString(line: String): Optional[(TopicPartition, Long)] = {
WhiteSpacesPattern.split(line) match {
case Array(topic, partition, offset) =>
Some(new TopicPartition(topic, partition.toInt), offset.toLong)
case _ => None
Optional.of(new TopicPartition(topic, partition.toInt), offset.toLong)
case _ => Optional.empty()
}
}
}
Expand All @@ -61,7 +62,7 @@ trait OffsetCheckpoint {
* -----checkpoint file end----------
*/
class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) {
val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
val checkpoint = new CheckpointFileWithFailureHandler[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)

def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.utils.Logging
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

class LeaderEpochCheckpointFileTest extends Logging {
class LeaderEpochCheckpointFileWithFailureHandlerTest extends Logging {

@Test
def shouldPersistAndOverwriteAndReloadFile(): Unit ={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.mockito.Mockito

import scala.collection.Map

class OffsetCheckpointFileTest extends Logging {
class OffsetCheckpointFileWithFailureHandlerTest extends Logging {

@Test
def shouldPersistAndOverwriteAndReloadFile(): Unit = {
Expand Down Expand Up @@ -93,7 +93,7 @@ class OffsetCheckpointFileTest extends Logging {
def shouldThrowIfVersionIsNotRecognised(): Unit = {
val file = TestUtils.tempFile()
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFile(file, OffsetCheckpointFile.CurrentVersion + 1,
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
Expand Down
Loading

0 comments on commit a1f2cb9

Please sign in to comment.