Skip to content

Commit

Permalink
Minor changes based on PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 24, 2014
1 parent d29fddd commit 55514e2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.apache.spark.streaming.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
import org.apache.hadoop.fs._

private[streaming] object HdfsUtils {

def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of creating a new file
Expand Down Expand Up @@ -61,7 +60,9 @@ private[streaming] object HdfsUtils {
blockLocs.map(_.flatMap(_.getHosts))
}

def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = synchronized {
// For local file systems, return the raw loca file system, such calls to flush()
// actually flushes the stream.
val fs = path.getFileSystem(conf)
fs match {
case localFs: LocalFileSystem => localFs.getRawFileSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import WriteAheadLogManager._
* Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
* and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
*
*@param logDirectory Directory when rotating log files will be created.
* @param logDirectory Directory when rotating log files will be created.
* @param hadoopConf Hadoop configuration for reading/writing log files.
* @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
* Default is one minute.
Expand All @@ -57,7 +57,7 @@ private[streaming] class WriteAheadLogManager(

private val pastLogs = new ArrayBuffer[LogInfo]
private val callerNameTag =
if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
if (callerName.nonEmpty) s" for $callerName" else ""
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, threadpoolName))
Expand Down

0 comments on commit 55514e2

Please sign in to comment.