From 36efa33ba9dab8b424775581696bc007ae8085f7 Mon Sep 17 00:00:00 2001 From: Andrew Martin Date: Thu, 20 Oct 2016 13:32:56 -0400 Subject: [PATCH] Use SeekableFileInput on HdfsFileStorage --- .../spotify/scio/hdfs/HdfsFileStorage.scala | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/HdfsFileStorage.scala b/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/HdfsFileStorage.scala index 5de48ad119..7954200d80 100644 --- a/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/HdfsFileStorage.scala +++ b/scio-hdfs/src/main/scala/com/spotify/scio/hdfs/HdfsFileStorage.scala @@ -22,7 +22,7 @@ import java.net.URI import java.nio.file.Path import com.spotify.scio.io.FileStorage -import org.apache.avro.file.SeekableInput +import org.apache.avro.file.{SeekableFileInput, SeekableInput} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, PathFilter} import org.apache.hadoop.io.compress.CompressionCodecFactory @@ -62,21 +62,9 @@ private class HdfsFileStorage(protected val path: String) extends FileStorage { } } - override protected def getAvroSeekableInput(path: Path): SeekableInput = - new SeekableInput { - private val hPath = new org.apache.hadoop.fs.Path(path.toString) - private val fs = FileSystem.get(path.toUri, new Configuration()) - private val in = fs.open(hPath) - - override def tell(): Long = in.getPos - - override def length(): Long = fs.getContentSummary(hPath).getLength - - override def seek(p: Long): Unit = in.seek(p) - - override def read(b: Array[Byte], off: Int, len: Int): Int = in.read(b, off, len) - - override def close(): Unit = in.close() - } + override protected def getAvroSeekableInput(path: Path): SeekableInput = { + val hPath = new org.apache.hadoop.fs.Path(path.toString) + new SeekableFileInput(new File(hPath.toUri)) + } }