Skip to content

Commit

Permalink
Use generic ValueReader
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Dec 30, 2017
1 parent 3ca3213 commit d72fefa
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 95 deletions.
32 changes: 10 additions & 22 deletions s3/src/main/scala/geotrellis/spark/io/s3/cog/S3COGValueReader.scala
Expand Up @@ -44,31 +44,19 @@ class S3COGValueReader(
def reader[
K: JsonFormat: SpatialComponent : ClassTag,
V <: CellGrid: TiffMethods
](layerId: LayerId): Reader[K, V] = new Reader[K, V] {
val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(layerId.name, 0), "cog_metadata")

val tiffMethods: TiffMethods[V] = implicitly[TiffMethods[V]]

def read(key: K): V = {
val (zoomRange, spatialKey, overviewIndex, gridBounds) =
cogLayerMetadata.getReadDefinition(key.getComponent[SpatialKey], layerId.zoom)

val baseKeyIndex = keyIndexes(zoomRange)

val maxWidth = Index.digits(baseKeyIndex.toIndex(baseKeyIndex.keyBounds.maxKey))
val path = (k: K) => s"$bucket/$prefix/${Index.encode(baseKeyIndex.toIndex(k), maxWidth)}.${Extension}"

val uri = new URI(s"s3://${path(key.setComponent(spatialKey))}")

try {
val tiff = tiffMethods.readTiff(uri, overviewIndex)
tiffMethods.cropTiff(tiff, gridBounds)
} catch {
](layerId: LayerId): Reader[K, V] = {
def keyPath(key: K, maxWidth: Int, baseKeyIndex: KeyIndex[K], zoomRange: ZoomRange): String =
s"$bucket/$prefix/${Index.encode(baseKeyIndex.toIndex(key), maxWidth)}.${Extension}"

baseReader[K, V](
layerId,
keyPath,
path => new URI(s"s3://${path}"),
key => {
case e: AmazonS3Exception if e.getStatusCode == 404 =>
throw new ValueNotFoundError(key, layerId)
}
}
)
}
}

Expand Up @@ -20,15 +20,57 @@ import geotrellis.raster._
import geotrellis.raster.resample._
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.index._
import geotrellis.util._

import spray.json._
import java.net.URI

import scala.reflect._

trait COGValueReader[ID] {
val attributeStore: AttributeStore

implicit def getByteReader(uri: URI): ByteReader
implicit def idLayerId(id: ID): LayerId

def reader[
K: JsonFormat : SpatialComponent : ClassTag,
V <: CellGrid : TiffMethods
](layerId: LayerId): Reader[K, V]

/** Produce a key value reader for a specific layer, prefetching layer metadata once at construction time */
def reader[K: JsonFormat: SpatialComponent: ClassTag, V <: CellGrid: TiffMethods](layerId: ID): Reader[K, V]
def baseReader[
K: JsonFormat: SpatialComponent: ClassTag,
V <: CellGrid: TiffMethods
](
layerId: ID,
keyPath: (K, Int, KeyIndex[K], ZoomRange) => String, // Key, maxWidth, toIndex, zoomRange
fullPath: String => URI,
exceptionHandler: K => PartialFunction[Throwable, Nothing] = { key: K => { case e: Throwable => throw e }: PartialFunction[Throwable, Nothing] }
): Reader[K, V] = new Reader[K, V] {
val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(layerId.name, 0), "cog_metadata")

val tiffMethods: TiffMethods[V] = implicitly[TiffMethods[V]]

def read(key: K): V = {
val (zoomRange, spatialKey, overviewIndex, gridBounds) =
cogLayerMetadata.getReadDefinition(key.getComponent[SpatialKey], layerId.zoom)

val baseKeyIndex = keyIndexes(zoomRange)

val maxWidth = Index.digits(baseKeyIndex.toIndex(baseKeyIndex.keyBounds.maxKey))
val uri = fullPath(keyPath(key.setComponent(spatialKey), maxWidth, baseKeyIndex, zoomRange))

try {
val tiff = tiffMethods.readTiff(uri, overviewIndex)
tiffMethods.cropTiff(tiff, gridBounds)
} catch {
case th: Throwable => exceptionHandler(key)(th)
}
}
}

def overzoomingReader[
K: JsonFormat: SpatialComponent: ClassTag,
Expand Down
Expand Up @@ -27,6 +27,8 @@ import spray.json._
import scala.reflect._

trait OverzoomingCOGValueReader extends COGValueReader[LayerId] {
implicit def idLayerId(id: LayerId): LayerId = id

def overzoomingReader[
K: JsonFormat: SpatialComponent: ClassTag,
V <: CellGrid: TiffMethods: ? => TileResampleMethods[V]
Expand Down
Expand Up @@ -29,7 +29,6 @@ import spray.json._
import scala.reflect.ClassTag

import java.net.URI
import java.io.File

class FileCOGValueReader(
val attributeStore: AttributeStore,
Expand All @@ -40,29 +39,11 @@ class FileCOGValueReader(

def reader[
K: JsonFormat : SpatialComponent : ClassTag,
V <: CellGrid: TiffMethods
](layerId: LayerId): Reader[K, V] = new Reader[K, V] {
val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(layerId.name, 0), "cog_metadata")
V <: CellGrid : TiffMethods
](layerId: LayerId): Reader[K, V] = {
def keyPath(key: K, maxWidth: Int, baseKeyIndex: KeyIndex[K], zoomRange: ZoomRange): String =
(KeyPathGenerator(catalogPath, s"${layerId.name}/${zoomRange.slug}", baseKeyIndex, maxWidth) andThen (_ ++ s".$Extension"))(key)

val tiffMethods: TiffMethods[V] = implicitly[TiffMethods[V]]

def read(key: K): V = {
val (zoomRange, spatialKey, overviewIndex, gridBounds) =
cogLayerMetadata.getReadDefinition(key.getComponent[SpatialKey], layerId.zoom)

val baseKeyIndex = keyIndexes(zoomRange)

val maxWidth = Index.digits(baseKeyIndex.toIndex(baseKeyIndex.keyBounds.maxKey))
val keyPath =
KeyPathGenerator(catalogPath, s"${layerId.name}/${zoomRange.slug}", baseKeyIndex, maxWidth) andThen (_ ++ s".$Extension")

Filesystem.ensureDirectory(new File(catalogPath, s"${layerId.name}/${zoomRange.slug}").getAbsolutePath)

val uri = new URI(keyPath(key.setComponent(spatialKey)))
val tiff = tiffMethods.readTiff(uri, overviewIndex)

tiffMethods.cropTiff(tiff, gridBounds)
}
baseReader[K, V](layerId, keyPath, new URI(_))
}
}
Expand Up @@ -20,17 +20,15 @@ import geotrellis.raster._
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.cog._
import geotrellis.spark.io.hadoop.formats.FilterMapFileInputFormat
import geotrellis.spark.io.index.{Index, KeyIndex}
import geotrellis.util._

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BigIntWritable, BytesWritable, MapFile}
import spray.json._
import com.github.blemale.scaffeine.{Cache, Scaffeine}

import scala.collection.immutable.Vector
import scala.reflect.ClassTag

class HadoopCOGValueReader(
Expand All @@ -42,52 +40,17 @@ class HadoopCOGValueReader(

implicit def getByteReader(uri: URI): ByteReader = byteReader(uri)

val readers: Cache[(LayerId, Path), MapFile.Reader] =
Scaffeine()
.recordStats()
.maximumSize(maxOpenFiles.toLong)
.removalListener[(LayerId, Path), MapFile.Reader] { case (_, v, _) => v.close() }
.build[(LayerId, Path), MapFile.Reader]

private def predicate(row: (Path, BigInt, BigInt), index: BigInt): Boolean =
(index >= row._2) && ((index <= row._3) || (row._3 == -1))

def reader[
K: JsonFormat: SpatialComponent: ClassTag,
V <: CellGrid: TiffMethods
](layerId: LayerId): Reader[K, V] = new Reader[K, V] {

val COGLayerStorageMetadata(cogLayerMetadata, keyIndexes) =
attributeStore.read[COGLayerStorageMetadata[K]](LayerId(layerId.name, 0), "cog_metadata")

val tiffMethods: TiffMethods[V] = implicitly[TiffMethods[V]]

val ranges: Vector[(Path, BigInt, BigInt)] =
FilterMapFileInputFormat.layerRanges(catalogPath, conf)

def read(key: K): V = {
val (zoomRange, spatialKey, overviewIndex, gridBounds) =
cogLayerMetadata.getReadDefinition(key.getComponent[SpatialKey], layerId.zoom)

val baseKeyIndex = keyIndexes(zoomRange)

val index = baseKeyIndex.toIndex(key.setComponent(spatialKey))

val valueWritable: BytesWritable =
ranges
.find(row => predicate(row, index))
.map { case (path, _, _) =>
readers.get((layerId, path), _ => new MapFile.Reader(path, conf))
}
.getOrElse(throw new ValueNotFoundError(key, layerId))
.get(new BigIntWritable(index.toByteArray), new BytesWritable())
.asInstanceOf[BytesWritable]

if (valueWritable == null) throw new ValueNotFoundError(key, layerId)
val bytes = valueWritable.getBytes
val tiff = tiffMethods.readTiff(bytes, overviewIndex)

tiffMethods.cropTiff(tiff, gridBounds)
}
](layerId: LayerId): Reader[K, V] = {
def keyPath(key: K, maxWidth: Int, baseKeyIndex: KeyIndex[K], zoomRange: ZoomRange): String =
s"$catalogPath/${Index.encode(baseKeyIndex.toIndex(key), maxWidth)}.${Extension}"

baseReader[K, V](
layerId,
keyPath,
path => new URI(s"hdfs://${path}")
)
}
}

0 comments on commit d72fefa

Please sign in to comment.