Skip to content

Commit

Permalink
Merge pull request alteryx#17 from harishreedharan/driver-ha-wal
Browse files Browse the repository at this point in the history
Use Minicluster for WAL tests.
  • Loading branch information
tdas committed Oct 22, 2014
2 parents 4ab602a + 7e40e56 commit ef8db09
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 120 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ private[streaming] object HdfsUtils {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
Expand All @@ -46,27 +43,26 @@ private[streaming] object HdfsUtils {

def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = getFileSystemForPath(dfsPath, conf)
val instream = dfs.open(dfsPath)
instream
}

def checkState(state: Boolean, errorMsg: => String) {
if(!state) {
if (!state) {
throw new IllegalStateException(errorMsg)
}
}

def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = getFileSystemForPath(dfsPath, conf)
val fileStatus = dfs.getFileStatus(dfsPath)
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
blockLocs.map(_.flatMap(_.getHosts))
}

def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
path.getFileSystem(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ private[streaming] class WriteAheadLogManager(
Utils.newDaemonFixedThreadPool(1, threadpoolName))
override protected val logName = s"WriteAheadLogManager $callerNameTag"

private var currentLogPath: String = null
private var currentLogPath: Option[String] = None
private var currentLogWriter: WriteAheadLogWriter = null
private var currentLogWriterStartTime: Long = -1L
private var currentLogWriterStopTime: Long = -1L

initializeOrRecover()

/** Write a byte buffer to the log file */
/**
* Write a byte buffer to the log file. This method synchronously writes the data in the
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
var fileSegment: FileSegment = null
var failures = 0
Expand Down Expand Up @@ -99,13 +103,13 @@ private[streaming] class WriteAheadLogManager(
* Read all the existing logs from the log directory.
*
* Note that this is typically called when the caller is initializing and wants
* to recover past state from the write ahead logs (that is, before making any writes).
* to recover past state from the write ahead logs (that is, before making any writes).
* If this is called after writes have been made using this manager, then it may not return
* the latest the records. This does not deal with currently active log files, and
* hence the implementation is kept simple.
*/
def readFromLog(): Iterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
logFilesToRead.iterator.map { file =>
logDebug(s"Creating log reader with $file")
Expand All @@ -130,7 +134,7 @@ private[streaming] class WriteAheadLogManager(
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
Expand Down Expand Up @@ -159,34 +163,30 @@ private[streaming] class WriteAheadLogManager(
private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
if (currentLogPath != null) {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
currentLogPath.foreach {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
}
currentLogWriterStartTime = currentTime
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = newLogPath.toString
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
currentLogPath = Some(newLogPath.toString)
currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
}
currentLogWriter
}

/** Initialize the log directory or recover existing logs inside the directory */
private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)

if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
} else {
fileSystem.mkdirs(logDirectoryPath,
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
logInfo(s"Created ${logDirectory} for write ahead log files")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
close()
false
case e: Exception =>
logDebug("Error reading next item, EOF reached", e)
logWarning("Error while trying to read data from HDFS.", e)
close()
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
*/
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
val uri = new URI(path)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
assert(!new File(uri.getPath).exists)
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
} else {
Right(HdfsUtils.getOutputStream(path, hadoopConf))
}
}
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
Expand Down Expand Up @@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
stream.close()
}

private def stream(): DataOutputStream = {
underlyingStream.fold(x => x, x => x)
}

private def getPosition(): Long = {
underlyingStream match {
case Left(localStream) => localStream.size
case Right(dfsStream) => dfsStream.getPos()
}
stream.getPos()
}

private def flush() {
underlyingStream match {
case Left(localStream) => localStream.flush
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
hadoopFlushMethod.foreach {
_.invoke(stream)
}
}

Expand Down
Loading

0 comments on commit ef8db09

Please sign in to comment.