diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index 4a6f3006922cd..3f3143b820247 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -17,12 +17,11 @@ package org.apache.spark.streaming.util import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs._ private[streaming] object HdfsUtils { def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { - // HDFS is not thread-safe when getFileSystem is called, so synchronize on that val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file @@ -61,7 +60,9 @@ private[streaming] object HdfsUtils { blockLocs.map(_.flatMap(_.getHosts)) } - def getFileSystemForPath(path: Path, conf: Configuration) = synchronized { + def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = synchronized { + // For local file systems, return the raw loca file system, such calls to flush() + // actually flushes the stream. val fs = path.getFileSystem(conf) fs match { case localFs: LocalFileSystem => localFs.getRawFileSystem diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index f0c552e9593c4..70d234320be7c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -37,7 +37,7 @@ import WriteAheadLogManager._ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read. * - *@param logDirectory Directory when rotating log files will be created. + * @param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over. * Default is one minute. @@ -57,7 +57,7 @@ private[streaming] class WriteAheadLogManager( private val pastLogs = new ArrayBuffer[LogInfo] private val callerNameTag = - if (callerName != null && callerName.nonEmpty) s" for $callerName" else "" + if (callerName.nonEmpty) s" for $callerName" else "" private val threadpoolName = s"WriteAheadLogManager $callerNameTag" implicit private val executionContext = ExecutionContext.fromExecutorService( Utils.newDaemonFixedThreadPool(1, threadpoolName))