Skip to content

Commit

Permalink
Added documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan authored and tdas committed Oct 21, 2014
1 parent 172358d commit 5182ffb
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ import org.apache.spark.streaming.storage.WriteAheadLogManager._
import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils

/**
* This class manages write ahead log files.
* - Writes records (bytebuffers) to periodically rotating log files.
* - Recovers the log files and the reads the recovered records upon failures.
* - Cleans up old log files.
*
* Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write
* and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read.
*
*@param logDirectory Directory when rotating log files will be created.
* @param hadoopConf Hadoop configuration for reading/writing log files.
* @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
* Default is one minute.
* @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
* Default is three.
* @param callerName Optional name of the class who is using this manager.
* @param clock Optional clock that is used to check for rotation interval.
*/
private[streaming] class WriteAheadLogManager(
logDirectory: String,
hadoopConf: Configuration,
Expand All @@ -37,6 +55,7 @@ private[streaming] class WriteAheadLogManager(

initializeOrRecover()

/** Write a byte buffer to the log file */
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
var fileSegment: FileSegment = null
var failures = 0
Expand All @@ -49,17 +68,27 @@ private[streaming] class WriteAheadLogManager(
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to ...")
logWarning("Failed to write to write ahead log")
resetWriter()
failures += 1
}
}
if (fileSegment == null) {
logError(s"Failed to write to write ahead log after $failures failures")
throw lastException
}
fileSegment
}

/**
* Read all the existing logs from the log directory.
*
* Note that this is typically called when the caller is initializing and wants
* to recover past state from the write ahead logs (that is, before making any writes).
* If this is called after writes have been made using this manager, then it may not return
* the latest the records. This does not deal with currently active log files, and
* hence the implementation is kept simple.
*/
def readFromLog(): Iterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
Expand All @@ -73,7 +102,7 @@ private[streaming] class WriteAheadLogManager(
* Delete the log files that are older than the threshold time.
*
* Its important to note that the threshold time is based on the time stamps used in the log
* files, and is therefore based on the local system time. So if there is coordination necessary
* files, which is usually based on the local system time. So if there is coordination necessary
* between the node calculating the threshTime (say, driver node), and the local system time
* (say, worker node), the caller has to take account of possible time skew.
*/
Expand All @@ -92,7 +121,7 @@ private[streaming] class WriteAheadLogManager(
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing log file $logInfo", ex)
logWarning(s"Error clearing write ahead log file $logInfo", ex)
}
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
Expand All @@ -102,14 +131,16 @@ private[streaming] class WriteAheadLogManager(
}
}

/** Stop the manager, close any open log writer */
def stop(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter.close()
}
executionContext.shutdown()
logInfo("Stopped log manager")
logInfo("Stopped write ahead log manager")
}

/** Get the current log writer while taking care of rotation */
private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
Expand All @@ -126,6 +157,7 @@ private[streaming] class WriteAheadLogManager(
currentLogWriter
}

/** Initialize the log directory or recover existing logs inside the directory */
private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
Expand All @@ -134,12 +166,12 @@ private[streaming] class WriteAheadLogManager(
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} log files from $logDirectory")
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
} else {
fileSystem.mkdirs(logDirectoryPath,
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
logInfo(s"Created ${logDirectory} for log files")
logInfo(s"Created ${logDirectory} for write ahead log files")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration

/**
* A random access reader for reading write ahead log files written using
* [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info,
* this reads the record (bytebuffer) from the log file.
*/
private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
extends Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging

/**
* A reader for reading write ahead log files written using
* [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads
* the records (bytebuffers) in the log file sequentially and return them as an
* iterator of bytebuffers.
*/
private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
extends Iterator[ByteBuffer] with Closeable with Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
import org.apache.spark.streaming.storage.FileSegment

private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) extends Closeable {
/**
* A writer for writing byte-buffers to a write ahead log file.
*/
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
val uri = new URI(path)
val defaultFs = FileSystem.getDefaultUri(conf).getScheme
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
assert(!new File(uri.getPath).exists)
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
} else {
Right(HdfsUtils.getOutputStream(path, conf))
Right(HdfsUtils.getOutputStream(path, hadoopConf))
}
}

Expand All @@ -49,9 +53,7 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration)
private var closed = false


// Data is always written as:
// - Length - Long
// - Data - of length = Length
/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileSegment = synchronized {
assertOpen()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@
*/
package org.apache.spark.streaming.storage

import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile}
import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
import java.nio.ByteBuffer

import scala.util.Random

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.Random

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._

import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration

import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import WriteAheadLogSuite._

/**
* This testsuite tests all classes related to write ahead logs.
*/
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {

val hadoopConf = new Configuration()
Expand Down Expand Up @@ -163,8 +169,25 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
assert(dataToWrite.toList === readData.toList)
}

test("WriteAheadLogManager - cleanup old logs") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData(100)
val fakeClock = new ManualClock
val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
dataToWrite.foreach { item =>
fakeClock.addToTime(500) // half second for each
manager.writeToLog(item)
}
val logFiles = getLogFilesInDirectory(tempDirectory)
assert(logFiles.size > 1)
manager.cleanupOldLogs(fakeClock.currentTime() / 2)
eventually(timeout(1 second), interval(10 milliseconds)) {
assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size)
}
}

// TODO (Hari, TD): Test different failure conditions of writers and readers.
// - Failure in the middle of write
// - Failure while reading incomplete/corrupt file
}

Expand Down

0 comments on commit 5182ffb

Please sign in to comment.