Skip to content

Commit

Permalink
Remove underlying stream from the WALWriter.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Oct 22, 2014
1 parent 4ab602a commit 5c70d1f
Showing 1 changed file with 4 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
*/
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
val uri = new URI(path)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
assert(!new File(uri.getPath).exists)
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
} else {
Right(HdfsUtils.getOutputStream(path, hadoopConf))
}
}
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
Expand Down Expand Up @@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
stream.close()
}

private def stream(): DataOutputStream = {
underlyingStream.fold(x => x, x => x)
}

private def getPosition(): Long = {
underlyingStream match {
case Left(localStream) => localStream.size
case Right(dfsStream) => dfsStream.getPos()
}
stream.getPos()
}

private def flush() {
underlyingStream match {
case Left(localStream) => localStream.flush
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
hadoopFlushMethod.foreach {
_.invoke(stream)
}
}

Expand Down

0 comments on commit 5c70d1f

Please sign in to comment.