Skip to content

Commit

Permalink
Use SeekableFileInput on HdfsFileStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsmartin authored and nevillelyh committed Oct 21, 2016
1 parent 9f363fc commit 36efa33
Showing 1 changed file with 5 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.URI
import java.nio.file.Path

import com.spotify.scio.io.FileStorage
import org.apache.avro.file.SeekableInput
import org.apache.avro.file.{SeekableFileInput, SeekableInput}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, PathFilter}
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand Down Expand Up @@ -62,21 +62,9 @@ private class HdfsFileStorage(protected val path: String) extends FileStorage {
}
}

override protected def getAvroSeekableInput(path: Path): SeekableInput =
new SeekableInput {
private val hPath = new org.apache.hadoop.fs.Path(path.toString)
private val fs = FileSystem.get(path.toUri, new Configuration())
private val in = fs.open(hPath)

override def tell(): Long = in.getPos

override def length(): Long = fs.getContentSummary(hPath).getLength

override def seek(p: Long): Unit = in.seek(p)

override def read(b: Array[Byte], off: Int, len: Int): Int = in.read(b, off, len)

override def close(): Unit = in.close()
}
override protected def getAvroSeekableInput(path: Path): SeekableInput = {
val hPath = new org.apache.hadoop.fs.Path(path.toString)
new SeekableFileInput(new File(hPath.toUri))
}

}

0 comments on commit 36efa33

Please sign in to comment.