diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala index c70ecb0da4e54..6ba0e9abee8f9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -13,6 +13,24 @@ import org.apache.spark.streaming.storage.WriteAheadLogManager._ import org.apache.spark.streaming.util.{Clock, SystemClock} import org.apache.spark.util.Utils +/** + * This class manages write ahead log files. + * - Writes records (bytebuffers) to periodically rotating log files. + * - Recovers the log files and the reads the recovered records upon failures. + * - Cleans up old log files. + * + * Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write + * and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read. + * + *@param logDirectory Directory when rotating log files will be created. + * @param hadoopConf Hadoop configuration for reading/writing log files. + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over. + * Default is one minute. + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log. + * Default is three. + * @param callerName Optional name of the class who is using this manager. + * @param clock Optional clock that is used to check for rotation interval. + */ private[streaming] class WriteAheadLogManager( logDirectory: String, hadoopConf: Configuration, @@ -37,6 +55,7 @@ private[streaming] class WriteAheadLogManager( initializeOrRecover() + /** Write a byte buffer to the log file */ def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { var fileSegment: FileSegment = null var failures = 0 @@ -49,17 +68,27 @@ private[streaming] class WriteAheadLogManager( } catch { case ex: Exception => lastException = ex - logWarning("Failed to ...") + logWarning("Failed to write to write ahead log") resetWriter() failures += 1 } } if (fileSegment == null) { + logError(s"Failed to write to write ahead log after $failures failures") throw lastException } fileSegment } + /** + * Read all the existing logs from the log directory. + * + * Note that this is typically called when the caller is initializing and wants + * to recover past state from the write ahead logs (that is, before making any writes). + * If this is called after writes have been made using this manager, then it may not return + * the latest the records. This does not deal with currently active log files, and + * hence the implementation is kept simple. + */ def readFromLog(): Iterator[ByteBuffer] = synchronized { val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath) logInfo("Reading from the logs: " + logFilesToRead.mkString("\n")) @@ -73,7 +102,7 @@ private[streaming] class WriteAheadLogManager( * Delete the log files that are older than the threshold time. * * Its important to note that the threshold time is based on the time stamps used in the log - * files, and is therefore based on the local system time. So if there is coordination necessary + * files, which is usually based on the local system time. So if there is coordination necessary * between the node calculating the threshTime (say, driver node), and the local system time * (say, worker node), the caller has to take account of possible time skew. */ @@ -92,7 +121,7 @@ private[streaming] class WriteAheadLogManager( logDebug(s"Cleared log file $logInfo") } catch { case ex: Exception => - logWarning(s"Error clearing log file $logInfo", ex) + logWarning(s"Error clearing write ahead log file $logInfo", ex) } } logInfo(s"Cleared log files in $logDirectory older than $threshTime") @@ -102,14 +131,16 @@ private[streaming] class WriteAheadLogManager( } } + /** Stop the manager, close any open log writer */ def stop(): Unit = synchronized { if (currentLogWriter != null) { currentLogWriter.close() } executionContext.shutdown() - logInfo("Stopped log manager") + logInfo("Stopped write ahead log manager") } + /** Get the current log writer while taking care of rotation */ private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized { if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { resetWriter() @@ -126,6 +157,7 @@ private[streaming] class WriteAheadLogManager( currentLogWriter } + /** Initialize the log directory or recover existing logs inside the directory */ private def initializeOrRecover(): Unit = synchronized { val logDirectoryPath = new Path(logDirectory) val fileSystem = logDirectoryPath.getFileSystem(hadoopConf) @@ -134,12 +166,12 @@ private[streaming] class WriteAheadLogManager( val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) pastLogs.clear() pastLogs ++= logFileInfo - logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory") + logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}") } else { fileSystem.mkdirs(logDirectoryPath, FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)) - logInfo(s"Created ${logDirectory} for log files") + logInfo(s"Created ${logDirectory} for write ahead log files") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala index 3df024834f7a4..912c4308aa8e5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala @@ -21,6 +21,11 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration +/** + * A random access reader for reading write ahead log files written using + * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info, + * this reads the record (bytebuffer) from the log file. + */ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) extends Closeable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala index 5e0dc1d49a89a..28b5d352cee01 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -22,6 +22,12 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging +/** + * A reader for reading write ahead log files written using + * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads + * the records (bytebuffers) in the log file sequentially and return them as an + * iterator of bytebuffers. + */ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) extends Iterator[ByteBuffer] with Closeable with Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index 68a1172d7d282..d4e417cc21faa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -26,17 +26,21 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} import org.apache.spark.streaming.storage.FileSegment -private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable { +/** + * A writer for writing byte-buffers to a write ahead log file. + */ +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration) + extends Closeable { private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = { val uri = new URI(path) - val defaultFs = FileSystem.getDefaultUri(conf).getScheme + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme val isDefaultLocal = defaultFs == null || defaultFs == "file" if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { assert(!new File(uri.getPath).exists) Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath)))) } else { - Right(HdfsUtils.getOutputStream(path, conf)) + Right(HdfsUtils.getOutputStream(path, hadoopConf)) } } @@ -49,9 +53,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) private var closed = false - // Data is always written as: - // - Length - Long - // - Data - of length = Length + /** Write the bytebuffer to the log file */ def write(data: ByteBuffer): FileSegment = synchronized { assertOpen() data.rewind() // Rewind to ensure all data in the buffer is retrieved diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala index 88b2b5095ceb6..726393b3dbc86 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -16,22 +16,28 @@ */ package org.apache.spark.streaming.storage -import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile} +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile} import java.nio.ByteBuffer -import scala.util.Random - import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.util.Random -import com.google.common.io.Files -import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import com.google.common.io.Files import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration + import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import WriteAheadLogSuite._ +/** + * This testsuite tests all classes related to write ahead logs. + */ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val hadoopConf = new Configuration() @@ -163,8 +169,25 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { assert(dataToWrite.toList === readData.toList) } + test("WriteAheadLogManager - cleanup old logs") { + // Write data with manager, recover with new manager and verify + val dataToWrite = generateRandomData(100) + val fakeClock = new ManualClock + val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf, + rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock) + dataToWrite.foreach { item => + fakeClock.addToTime(500) // half second for each + manager.writeToLog(item) + } + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + manager.cleanupOldLogs(fakeClock.currentTime() / 2) + eventually(timeout(1 second), interval(10 milliseconds)) { + assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size) + } + } + // TODO (Hari, TD): Test different failure conditions of writers and readers. - // - Failure in the middle of write // - Failure while reading incomplete/corrupt file }