Skip to content

Commit

Permalink
Merge pull request #621 from jbouffard/bug-fix/accumulo-reading
Browse files Browse the repository at this point in the history
Reading Layers and Values From Accumulo Fix
  • Loading branch information
Jacob Bouffard committed Jan 24, 2018
2 parents 8f6970a + ea779bf commit 5396a1d
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 45 deletions.
12 changes: 0 additions & 12 deletions docs/guides/catalog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,3 @@ When writing a workflow that places heavy demand on :class:`~geopyspark.geotrell
layer_name='spatial-layer',
tiled_raster_layer=spatial_tiled_layer,
store=store)
gps.query(uri="file:///tmp/spatial-catalog-2",
layer_name="spatial-layer",
layer_zoom=11,
store=store)
gps.read_value(uri="file:///tmp/spatial-catalog-2",
layer_name="spatial-layer",
layer_zoom=11,
col=min_key.col,
row=min_key.row,
store=store)
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import scala.collection.mutable
class LayerReaderWrapper(sc: SparkContext) {

def query(
attributeStore: AttributeStore,
catalogUri: String,
layerName: String,
zoom: Int,
Expand All @@ -42,10 +41,12 @@ class LayerReaderWrapper(sc: SparkContext) {
numPartitions: Integer
): TiledRasterLayer[_] = {
val id = LayerId(layerName, zoom)
val attributeStore = AttributeStore(catalogUri)

val spatialQuery: Option[Geometry] = Option(queryGeometryBytes).map(WKB.read)
val queryCRS: Option[CRS] = TileLayer.getCRS(projQuery)
val header = attributeStore.readHeader[LayerHeader](id)
val layerReader: FilteringLayerReader[LayerId] = LayerReader(attributeStore, catalogUri)(sc)
val header: LayerHeader = attributeStore.readHeader[LayerHeader](id)
val layerReader: FilteringLayerReader[LayerId] = LayerReader(catalogUri)(sc)

//val pyZoom: Option[Int] = ??? // is this top level zoom or zoom with None ?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import geopyspark.util.PythonTranslator
/**
* General interface for reading.
*/
class ValueReaderWrapper(attributeStore: AttributeStore, uri: String) {
val valueReader: ValueReader[LayerId] = ValueReader(attributeStore, uri)
class ValueReaderWrapper(uri: String) {
val attributeStore = AttributeStore(uri)
val valueReader: ValueReader[LayerId] = ValueReader(uri)

def getValueClass(id: LayerId): String =
attributeStore.readHeader[LayerHeader](id).valueClass
Expand Down
34 changes: 6 additions & 28 deletions geopyspark/geotrellis/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def read_value(uri,
layer_zoom,
col,
row,
zdt=None,
store=None):
zdt=None):
"""Reads a single ``Tile`` from a GeoTrellis catalog.
Unlike other functions in this module, this will not return a ``TiledRasterLayer``, but rather a
GeoPySpark formatted raster.
Expand All @@ -63,19 +62,12 @@ def read_value(uri,
zdt (``datetime.datetime``): The time stamp of the tile if the data is spatial-temporal.
This is represented as a ``datetime.datetime.`` instance. The default value is,
``None``. If ``None``, then only the spatial area will be queried.
store (str or :class:`~geopyspark.geotrellis.catalog.AttributeStore`, optional):
``AttributeStore`` instance or URI for layer metadata lookup.
Returns:
:class:`~geopyspark.geotrellis.Tile`
"""

if store:
store = AttributeStore.build(store)
else:
store = AttributeStore.cached(uri)

reader = ValueReader(uri, layer_name, layer_zoom, store)
reader = ValueReader(uri, layer_name, layer_zoom)
return reader.read(col, row, zdt)


Expand All @@ -84,25 +76,19 @@ class ValueReader(object):
Suitable for use in TMS service because it does not have Spark overhead.
"""

def __init__(self, uri, layer_name, zoom=None, store=None):
if store:
self.store = AttributeStore.build(store)
else:
self.store = AttributeStore.cached(uri)
def __init__(self, uri, layer_name, zoom=None):

self.layer_name = layer_name
self.zoom = zoom
pysc = get_spark_context()
scala_store = self.store.wrapper.attributeStore()
ValueReaderWrapper = pysc._gateway.jvm.geopyspark.geotrellis.io.ValueReaderWrapper
self.wrapper = ValueReaderWrapper(scala_store, uri)
self.wrapper = ValueReaderWrapper(uri)

def read(self, col, row, zdt=None, zoom=None):
"""Reads a single ``Tile`` from a GeoTrellis catalog.
When requesting a tile that does not exist, ``None`` will be returned.
Args:
col (int): The col number of the tile within the layout. Cols run east to west.
row (int): The row number of the tile within the layout. Row run north to south.
zdt (``datetime.datetime``): The time stamp of the tile if the data is spatial-temporal.
Expand Down Expand Up @@ -134,8 +120,7 @@ def query(uri,
query_geom=None,
time_intervals=None,
query_proj=None,
num_partitions=None,
store=None):
num_partitions=None):
"""Queries a single, zoom layer from a GeoTrellis catalog given spatial and/or time parameters.
Note:
Expand Down Expand Up @@ -179,16 +164,10 @@ def query(uri,
then the returned ``TiledRasterLayer`` could contain incorrect values. If ``None``,
then the geometry and layer are assumed to be in the same projection.
num_partitions (int, optional): Sets RDD partition count when reading from catalog.
store (str or :class:`~geopyspark.geotrellis.catalog.AttributeStore`, optional):
``AttributeStore`` instance or URI for layer metadata lookup.
Returns:
:class:`~geopyspark.geotrellis.layer.TiledRasterLayer`
"""
if store:
store = AttributeStore.build(store)
else:
store = AttributeStore.cached(uri)

pysc = get_spark_context()
layer_zoom = layer_zoom or 0
Expand Down Expand Up @@ -218,8 +197,7 @@ def query(uri,
time_intervals = []

reader = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerReaderWrapper(pysc._jsc.sc())
scala_store = store.wrapper.attributeStore()
srdd = reader.query(scala_store, uri,
srdd = reader.query(uri,
layer_name, layer_zoom,
query_geom, time_intervals, query_proj,
num_partitions)
Expand Down

0 comments on commit 5396a1d

Please sign in to comment.