Skip to content

Commit

Permalink
Added StreamBytes
Browse files Browse the repository at this point in the history
Made a few changes to StreamBytes and ByteReader

Renamed and a few changes to the files in S3 util
  • Loading branch information
jbouffard committed Oct 13, 2016
1 parent 1297b44 commit 789d79e
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 114 deletions.
Expand Up @@ -4,20 +4,21 @@ import geotrellis.util._
import geotrellis.spark.io.s3._
import com.amazonaws.services.s3.model._

import spire.syntax.cfor._

import java.nio.{ByteOrder, ByteBuffer, Buffer}
import scala.language.implicitConversions

class S3BytesByteReader(
val request: GetObjectRequest,
val client: AmazonS3Client)
extends S3Bytes {
extends S3StreamBytes with ByteReader {

private var chunk = getMappedArray
private def offset = chunk.head._1
private def array = chunk.head._2
private def length = array.length
def length = array.length

private var byteBuffer = getByteBuffer
var byteBuffer = getByteBuffer

private val byteOrder: ByteOrder = {
val order =
Expand All @@ -30,82 +31,101 @@ class S3BytesByteReader(
order
}

def position = (offset + byteBuffer.position).toInt
final def position = (offset + byteBuffer.position).toInt

def position(newPoint: Int): Buffer = {
final def position(newPoint: Int): Buffer = {
if (isContained(newPoint)) {
byteBuffer.position(newPoint)
} else {
chunk = getMappedArray(newPoint)
byteBuffer = getByteBuffer
byteBuffer = getByteBuffer(chunk.head._2)
byteBuffer.position(0)
}
}

def get: Byte = {
final def get: Byte = {
if (!(byteBuffer.position + 1 <= byteBuffer.capacity)) {
chunk = getMappedArray(position + 1)
byteBuffer = getByteBuffer
byteBuffer = getByteBuffer(chunk.head._2)
}
byteBuffer.get
}

private def fillArray: Array[Byte] = {
val arr = Array.ofDim[Byte](byteBuffer.remaining)
cfor(0)(_ < byteBuffer.remaining, _ + 1) { i =>
arr(i) = byteBuffer.get
}
arr
}

def getChar: Char = {
final def getChar: Char = {
if (!(byteBuffer.position + 2 <= byteBuffer.capacity)) {
val remaining = byteBuffer.slice.array
chunk = getMappedArray(position + 2)
val remaining = fillArray
println("the remaining length is", remaining.length)
chunk = getMappedArray(position + remaining.length)
val newArray = remaining ++ array
byteBuffer = getByteBuffer(newArray)
}
byteBuffer.getChar
}

def getShort: Short = {
final def getShort: Short = {
if (!(byteBuffer.position + 2 <= byteBuffer.capacity)) {
val remaining = byteBuffer.slice.array
chunk = getMappedArray(position + 2)
chunk = getMappedArray(position + remaining.length)
val newArray = remaining ++ array
byteBuffer = getByteBuffer(newArray)
}
byteBuffer.getShort
}

def getInt: Int = {
final def getInt: Int = {
if (!(byteBuffer.position + 4 <= byteBuffer.capacity)) {
val remaining = byteBuffer.slice.array
chunk = getMappedArray(position + 4)
chunk = getMappedArray(position + remaining.length)
val newArray = remaining ++ array
byteBuffer = getByteBuffer(newArray)
}
byteBuffer.getInt
}

def getFloat: Float = {
final def getFloat: Float = {
if (!(byteBuffer.position + 4 <= byteBuffer.capacity)) {
val remaining = byteBuffer.slice.array
chunk = getMappedArray(position + 4)
chunk = getMappedArray(position + remaining.length)
val newArray = remaining ++ array
byteBuffer = getByteBuffer(newArray)
}
byteBuffer.getFloat
}

def getDouble: Double = {
final def getDouble: Double = {
if (!(byteBuffer.position + 8 <= byteBuffer.capacity)) {
val remaining = byteBuffer.slice.array
chunk = getMappedArray(position + 8)
chunk = getMappedArray(position + remaining.length)
val newArray = remaining ++ array
byteBuffer = getByteBuffer(newArray)
}
byteBuffer.getDouble
}

def getByteBuffer: ByteBuffer = ByteBuffer.wrap(array).order(byteOrder)
def getLong: Long = {
if (byteBuffer.position + 8 > byteBuffer.capacity) {
val remaining = byteBuffer.slice.array
chunk = getMappedArray(position + remaining.length)
val newArray = remaining ++ array
byteBuffer = getByteBuffer(newArray)
}
byteBuffer.getLong
}

final def getByteBuffer: ByteBuffer = ByteBuffer.wrap(array).order(byteOrder)

def getByteBuffer(arr: Array[Byte]) = ByteBuffer.wrap(arr).order(byteOrder)
final def getByteBuffer(arr: Array[Byte]) = ByteBuffer.wrap(arr).order(byteOrder)

def isContained(newPosition: Int): Boolean =
if (newPosition >= offset && newPosition <= offset + array.length) true else false
final def isContained(newPosition: Int): Boolean =
if (newPosition >= offset && newPosition < offset + length) true else false
}

object S3BytesByteReader {
Expand All @@ -114,18 +134,4 @@ object S3BytesByteReader {

def apply(request: GetObjectRequest, client: AmazonS3Client): S3BytesByteReader =
new S3BytesByteReader(request, client)

@inline implicit def toByteReader(s3BBR: S3BytesByteReader): ByteReader = {
new ByteReader() {
override def get = s3BBR.get
override def getChar = s3BBR.getChar
override def getShort = s3BBR.getShort
override def getInt = s3BBR.getInt
override def getFloat = s3BBR.getFloat
override def getDouble = s3BBR.getDouble
override def position: Int = s3BBR.position
override def position(i: Int): Buffer = s3BBR.position(i)
override def getByteBuffer = s3BBR.byteBuffer
}
}
}

This file was deleted.

36 changes: 36 additions & 0 deletions s3/src/main/scala/geotrellis/spark/io/s3/util/S3StreamBytes.scala
@@ -0,0 +1,36 @@
package geotrellis.spark.io.s3.util

import geotrellis.util.StreamBytes
import geotrellis.spark.io.s3._

import scala.collection.mutable.Queue
import com.amazonaws.services.s3.model._

trait S3StreamBytes extends StreamBytes {
def client: AmazonS3Client
def request: GetObjectRequest

def metadata =
client.getObjectMetadata(request.getBucketName, request.getKey)

def objectLength = metadata.getContentLength

def readStream(start: Int, end: Int): S3ObjectInputStream = {
val obj = client.readRange(start, end, request)
obj.getObjectContent
}
}


object S3Queue {
private val mapQueue = Queue[Map[Long, Array[Byte]]]()

def size: Int = mapQueue.length

def isEmpty: Boolean = mapQueue.isEmpty

def saveChunk(chunk: Map[Long, Array[Byte]]): Unit =
mapQueue.enqueue(chunk)

def getChunk: Map[Long, Array[Byte]] = mapQueue.dequeue
}
23 changes: 19 additions & 4 deletions util/src/main/scala/geotrellis/util/ByteReader.scala
Expand Up @@ -4,28 +4,43 @@ import java.nio.{Buffer, ByteBuffer}
import scala.language.implicitConversions

trait ByteReader {
def length: Int

def isContained(point: Int): Boolean

def position: Int
def position(i: Int): Buffer

def get: Byte
def getChar: Char
def getShort: Short
def getInt: Int
def getFloat: Float
def getDouble: Double
def position: Int
def position(i: Int): Buffer
def getLong: Long

def getByteBuffer: ByteBuffer
}

object ByteReader {
implicit def byteBuffer2ByteReader(byteBuffer: ByteBuffer): ByteReader = {
new ByteReader() {
def length = byteBuffer.capacity

def isContained(point: Int) =
if (point >= 0 && point <= byteBuffer.capacity) true else false

def position: Int = byteBuffer.position
def position(i: Int): Buffer = byteBuffer.position(i)

def get = byteBuffer.get
def getChar = byteBuffer.getChar
def getShort = byteBuffer.getShort
def getInt = byteBuffer.getInt
def getFloat = byteBuffer.getFloat
def getDouble = byteBuffer.getDouble
def position: Int = byteBuffer.position
def position(i: Int): Buffer = byteBuffer.position(i)
def getLong = byteBuffer.getLong

def getByteBuffer = byteBuffer
}
}
Expand Down
47 changes: 47 additions & 0 deletions util/src/main/scala/geotrellis/util/StreamBytes.scala
@@ -0,0 +1,47 @@
package geotrellis.util

import java.io.InputStream
import java.nio.ByteBuffer

trait StreamBytes {
private val chunkSize = 256000
var streamPosition = 0

def objectLength: Long

def readStream(start: Int, end: Int): InputStream

def pastLength(size: Int): Boolean =
if (size > objectLength) true else false

def getArray: Array[Byte] =
getArray(streamPosition)

def getArray(start: Int): Array[Byte] =
getArray(start, chunkSize)

def getArray(start: Int, length: Int): Array[Byte] = {
val chunk =
if (!pastLength(length + start))
length
else
(objectLength - start).toInt

val arr = Array.ofDim[Byte](chunk)
val stream = readStream(start, chunk)

stream.read(arr, 0, chunk)
streamPosition = start + length

arr
}

def getMappedArray: Map[Long, Array[Byte]] =
getMappedArray(streamPosition, chunkSize)

def getMappedArray(start: Int): Map[Long, Array[Byte]] =
getMappedArray(start, chunkSize)

def getMappedArray(start: Int, length: Int): Map[Long, Array[Byte]] =
Map(start.toLong -> getArray(start, length))
}

0 comments on commit 789d79e

Please sign in to comment.