Skip to content

Commit

Permalink
Merge pull request #142 from jamesmcclain/feature/reinstate
Browse files Browse the repository at this point in the history
Reinstate Mask Operations
  • Loading branch information
echeipesh committed May 1, 2017
2 parents d29ecb7 + 9822ccd commit 50c416d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@ package geopyspark.geotrellis

import geopyspark.geotrellis.GeoTrellisUtils._

import geotrellis.util._
import geotrellis.proj4._
import geotrellis.vector._
import geotrellis.vector.io.wkt.WKT
import geotrellis.raster._
import geotrellis.raster.rasterize._
import geotrellis.raster.render._
import geotrellis.raster.resample.ResampleMethod
import geotrellis.spark._
import geotrellis.spark.pyramid._
import geotrellis.spark.reproject._
import geotrellis.spark.costdistance.IterativeCostDistance
import geotrellis.spark.io._
import geotrellis.spark.io.json._
import geotrellis.spark.io.avro._
import geotrellis.spark.io.json._
import geotrellis.spark.mapalgebra.focal._
import geotrellis.spark.mask.Mask
import geotrellis.spark.pyramid._
import geotrellis.spark.reproject._
import geotrellis.spark.tiling._
import geotrellis.util._
import geotrellis.vector._
import geotrellis.vector.io.wkt.WKT

import spray.json._
import spray.json.DefaultJsonProtocol._
Expand Down Expand Up @@ -49,6 +50,19 @@ abstract class TiledRasterRDD[K: SpatialComponent: AvroRecordCodec: JsonFormat:

def layerMetadata: String = rdd.metadata.toJson.prettyPrint

def mask(wkts: java.util.ArrayList[String]): TiledRasterRDD[_] = {
val geometries: Seq[MultiPolygon] = wkts
.asScala.map({ wkt => WKT.read(wkt) })
.flatMap({
case p: Polygon => Some(MultiPolygon(p))
case m: MultiPolygon => Some(m)
case _ => None
})
mask(geometries)
}

protected def mask(geometries: Seq[MultiPolygon]): TiledRasterRDD[_]

def reproject(
extent: java.util.Map[String, Double],
layout: java.util.Map[String, Int],
Expand Down Expand Up @@ -287,6 +301,20 @@ class SpatialTiledRasterRDD(
SpatialTiledRasterRDD(None, multibandRDD)
}

def mask(geometries: Seq[MultiPolygon]): TiledRasterRDD[_] = {
val options = Mask.Options.DEFAULT
val singleBand = ContextRDD(
rdd.map({ case (k, v) => (k, v.band(0)) }),
rdd.metadata
)
val result = Mask(singleBand, geometries, options)
val multiBand = MultibandTileLayerRDD(
result.map({ case (k, v) => (k, MultibandTile(v)) }),
result.metadata
)
SpatialTiledRasterRDD(None, multiBand)
}

def stitch: (Array[Byte], String) = {
val contextRDD = ContextRDD(
rdd.map({ case (k, v) => (k, v.band(0)) }),
Expand All @@ -306,7 +334,7 @@ class SpatialTiledRasterRDD(
rdd.metadata
)

implicit def convertion(k: SpaceTimeKey): SpatialKey =
implicit def conversion(k: SpaceTimeKey): SpatialKey =
k.spatialKey

implicit val _sc = sc
Expand Down Expand Up @@ -336,6 +364,20 @@ class TemporalTiledRasterRDD(
val rdd: RDD[(SpaceTimeKey, MultibandTile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]
) extends TiledRasterRDD[SpaceTimeKey] {

def mask(geometries: Seq[MultiPolygon]): TiledRasterRDD[_] = {
val options = Mask.Options.DEFAULT
val singleBand = ContextRDD(
rdd.map({ case (k, v) => (k, v.band(0)) }),
rdd.metadata
)
val result = Mask(singleBand, geometries, options)
val multiBand = MultibandTileLayerRDD(
result.map({ case (k, v) => (k, MultibandTile(v)) }),
result.metadata
)
TemporalTiledRasterRDD(None, multiBand)
}

def reproject(
layout: Either[LayoutScheme, LayoutDefinition],
crs: CRS,
Expand Down
17 changes: 17 additions & 0 deletions geopyspark/geotrellis/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,23 @@ def stitch(self):
ser = self.geopysc.create_value_serializer(tup._2(), TILE)
return ser.loads(tup._1())[0]

def mask(self, geometries):
"""Performs cost distance of a TileLayer.
Args:
geometries (list): A list of shapely geometries to use as masks.
Note:
All geometries must be in the same CRS as the TileLayer.
Returns:
:class:`~geopyspark.geotrellis.rdd.TiledRasterRDD`
"""
wkts = [shapely.wkt.dumps(g) for g in geometries]
srdd = self.srdd.mask(wkts)

return TiledRasterRDD(self.geopysc, self.rdd_type, srdd)

def cost_distance(self, geometries, max_distance):
"""Performs cost distance of a TileLayer.
Expand Down
18 changes: 7 additions & 11 deletions geopyspark/tests/mask_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from shapely.geometry import Polygon
from geopyspark.tests.base_test_class import BaseTestClass
from geopyspark.geotrellis.rdd import TiledRasterRDD
from geopyspark.geotrellis.constants import SPATIAL


Expand Down Expand Up @@ -38,20 +39,15 @@ class MaskTest(BaseTestClass):
'tileLayout': layout}}

geometries = [Polygon([(17, 17), (42, 17), (42, 42), (17, 42)])]
raster_rdd = TiledRasterRDD.from_numpy_rdd(BaseTestClass.geopysc, SPATIAL, rdd, metadata)

@pytest.mark.skip('Mask is currently deprecated.')
def test_python_mask(self):
result = python_mask(self.rdd, self.metadata, self.geometries)
n = result.map(lambda kv: np.sum(kv[1]['data'])).reduce(lambda a,b: a + b)
self.assertEqual(n, -50)
@pytest.fixture(autouse=True)
def tearDown(self):
yield
BaseTestClass.geopysc.pysc._gateway.close()

@pytest.mark.skip('Mask is currently deprecated.')
def test_geotrellis_mask(self):
result = geotrellis_mask(geopysc=self.geopysc,
rdd_type=SPATIAL,
keyed_rdd=self.rdd,
metadata=self.metadata,
geometries=self.geometries)
result = self.raster_rdd.mask(geometries=self.geometries).to_numpy_rdd()
n = result.map(lambda kv: np.sum(kv[1]['data'])).reduce(lambda a,b: a + b)
self.assertEqual(n, 25.0)

Expand Down

0 comments on commit 50c416d

Please sign in to comment.