Skip to content

Commit

Permalink
Optimize RasterizeRDD API and implemintation (+1 squashed commit)
Browse files Browse the repository at this point in the history
Squashed commits:
[f449338] Add scaladocs for rdd.rasterizeWithValue (+1 squashed commit)
Squashed commits:
[a42724a] fix: Pass through Rasterizer.Options correctly
  • Loading branch information
echeipesh committed Aug 31, 2017
1 parent fda3f99 commit 3aed6de
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package geotrellis.spark.rasterize

import geotrellis.raster._
import geotrellis.raster.rasterize._
//import geotrellis.raster.rasterize.Rasterizer
import geotrellis.spark._
import geotrellis.spark.tiling._
import geotrellis.spark.rasterize._
Expand All @@ -31,20 +30,25 @@ import org.apache.spark.rdd._
* Extension methods for invoking the rasterizer on RDD of Geometry objects.
*/
trait GeometryRDDRasterizeMethods[G <: Geometry] extends MethodExtensions[RDD[G]] {

/**
* Rasterize an RDD of Geometry objects into a tiled raster RDD.
* Cells not intersecting any geometry will left as NODATA.
* Value will be converted to type matching specified [[CellType]].
*
* @param value Cell value for cells intersecting a geometry
* @param layout Raster layer layout for the result of rasterization
* @param cellType [[CellType]] for creating raster tiles
* @param options Rasterizer options for cell intersection rules
* @param partitioner Partitioner for result RDD
*/
def rasterizeWithValue(
value: Double,
layout: LayoutDefinition
)(
cellType: CellType = IntConstantNoDataCellType,
includePartial: Boolean = true,
sampleType: PixelSampleType = PixelIsPoint,
partitioner: Partitioner = new HashPartitioner(self.getNumPartitions)
cellType: CellType,
layout: LayoutDefinition,
options: Rasterizer.Options = Rasterizer.Options.DEFAULT,
partitioner: Option[Partitioner] = None
): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
RasterizeRDD.fromGeometry(self, value, layout, cellType,
RasterizeRDD.Options(
rasterizerOptions = Rasterizer.Options(includePartial, sampleType),
partitioner = Some(partitioner)
)
)
RasterizeRDD.fromGeometry(self, value, cellType, layout, options, partitioner)
}
}
101 changes: 41 additions & 60 deletions spark/src/main/scala/geotrellis/spark/rasterize/RasterizeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,60 @@ import org.apache.spark.rdd._
import scala.collection.immutable.VectorBuilder

object RasterizeRDD {
case class Options(
rasterizerOptions: Rasterizer.Options = Rasterizer.Options.DEFAULT,
partitioner: Option[Partitioner] = None
)

object Options {
val DEFAULT = Options()
}

/** Rasterize geometries using a constant value */
/**
* Rasterize an RDD of Geometry objects into a tiled raster RDD.
* Cells not intersecting any geometry will left as NODATA.
* Value will be converted to type matching specified [[CellType]].
*
* @param value Cell value for cells intersecting a geometry
* @param layout Raster layer layout for the result of rasterization
* @param cellType [[CellType]] for creating raster tiles
* @param options Rasterizer options for cell intersection rules
* @param partitioner Partitioner for result RDD
*/
def fromGeometry[G <: Geometry](
geoms: RDD[G],
value: Double,
cellType: CellType,
layout: LayoutDefinition,
ct: CellType,
options: Options
options: Rasterizer.Options = Rasterizer.Options.DEFAULT,
partitioner: Option[Partitioner] = None
): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
val intValue = d2i(value)
val dblValue = value
val rasterExtent = RasterExtent(layout.extent, layout.layoutCols, layout.layoutRows)
val partitioner = options.partitioner.getOrElse(new HashPartitioner(geoms.getNumPartitions))
val layoutRasterExtent = RasterExtent(layout.extent, layout.layoutCols, layout.layoutRows)
val layoutRasterizerOptions = Rasterizer.Options(includePartial=true, sampleType=PixelIsArea)

// key the geometry to intersecting tiles so it can be rasterized in the map-side combine
val keyed: RDD[(SpatialKey, (Geometry, RasterExtent))] =
geoms.flatMap { geom =>
var buff = Map.empty[SpatialKey, (Geometry, RasterExtent)]
geom.foreach(rasterExtent, options.rasterizerOptions){ (col, row) =>
/** Key geometry by spatial keys of intersecting tiles */
def keyGeom(geom: Geometry): List[(SpatialKey, (Geometry, SpatialKey))] = {
val mask = ArrayTile.empty(BitCellType, layout.layoutCols, layout.layoutRows)
var buff = List.empty[(SpatialKey, (Geometry, SpatialKey))]
// Visit all layout tiles intersected by the geometry
geom.foreach(layoutRasterExtent, layoutRasterizerOptions){ (col, row) =>
if (mask.get(col, row) == 0) { // have not seen this key before
val key = SpatialKey(col, row)
val keyRasterExtent = RasterExtent(layout.mapTransform(key), layout.tileCols, layout.tileRows)
// have to check because MultiLine can cause repeat visits from same geometry
if (! buff.contains(key))
buff = buff.updated(key, (geom, keyRasterExtent))
buff = (key, (geom, key)) :: buff
}
buff.toSeq
mask.set(col, row, 1)
}
buff
}

// key the geometry to intersecting tiles so it can be rasterized in the map-side combine
val keyed: RDD[(SpatialKey, (Geometry, SpatialKey))] =
geoms.flatMap { geom => keyGeom(geom) }

val createTile = (value: (Geometry, RasterExtent)) => {
val (geom, re) = value
val tile = ArrayTile.empty(ct, re.cols, re.rows)
if (ct.isFloatingPoint)
geom.foreach(re){ tile.setDouble(_, _, dblValue) }
else
geom.foreach(re){ tile.set(_, _, intValue) }
val createTile = (tup: (Geometry, SpatialKey)) => {
val (geom, key) = tup
val tile = ArrayTile.empty(cellType, layout.tileCols, layout.tileRows)
val re = RasterExtent(layout.mapTransform(key), layout.tileCols, layout.tileRows)
geom.foreach(re, options){ tile.setDouble(_, _, value) }
tile: MutableArrayTile
}

val updateTile = (tile: MutableArrayTile, value: (Geometry, RasterExtent)) => {
val (geom, re) = value
if (ct.isFloatingPoint)
geom.foreach(re){ tile.setDouble(_, _, dblValue) }
else
geom.foreach(re){ tile.set(_, _, intValue) }
val updateTile = (tile: MutableArrayTile, tup: (Geometry, SpatialKey)) => {
val (geom, key) = tup
val re = RasterExtent(layout.mapTransform(key), layout.tileCols, layout.tileRows)
geom.foreach(re, options){ tile.setDouble(_, _, value) }
tile: MutableArrayTile
}

Expand All @@ -74,30 +76,9 @@ object RasterizeRDD {
createCombiner = createTile,
mergeValue = updateTile,
mergeCombiners = mergeTiles,
partitioner
partitioner.getOrElse(new HashPartitioner(geoms.getNumPartitions))
)

ContextRDD(tiles.asInstanceOf[RDD[(SpatialKey, Tile)]], layout)
}

// /** Rasterize geometries using a constant value */
// def fromGeometry[G <: Geometry, T: Numeric](
// geoms: RDD[G],
// value: T,
// layout: LayoutDefinition,
// ct: CellType,
// options: Options
// ): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
// fromGeometry(geoms, layout, ct, value, new HashPartitioner(geoms.sparkContext.defaultParallelism), options)
// }

// /** Rasterize geometries using a constant value */
// def fromGeometry[G <: Geometry, T: Numeric](
// geoms: RDD[G],
// value: T
// layout: LayoutDefinition,
// ct: CellType,
// ): RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] = {
// fromGeometry(geoms, layout, ct, value, new HashPartitioner(geoms.sparkContext.defaultParallelism), Options.DEFAULT)
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RasterizeRDDSpec extends FunSpec with Matchers
val layout = TileLayout(3,3,256,256)
val ld = LayoutDefinition(septaExtent, layout)

val rasterizedRdd = linesRdd.rasterizeWithValue(1, ld)(cellType = IntConstantNoDataCellType)
val rasterizedRdd = linesRdd.rasterizeWithValue(1, IntConstantNoDataCellType, ld)
val actual = rasterizedRdd.stitch()

// rasterizing a single 768x768 tile would actuall produce numerical differencies
Expand Down Expand Up @@ -81,7 +81,7 @@ class RasterizeRDDSpec extends FunSpec with Matchers
val ld = LayoutDefinition(huc10.envelope, layout)

val polyRdd = sc.parallelize(huc10.polygons)
val rasterizedRdd = polyRdd.rasterizeWithValue(1, ld)(cellType = IntConstantNoDataCellType)
val rasterizedRdd = polyRdd.rasterizeWithValue(1, IntConstantNoDataCellType, ld)
val actual = rasterizedRdd.stitch()

val expected: Tile = {
Expand All @@ -97,7 +97,9 @@ class RasterizeRDDSpec extends FunSpec with Matchers
1)
}
}.stitch

info("MD: " + rasterizedRdd.metadata.tileLayout.toString)
info("Expected" + expected.dimensions.toString)
info("Actual: " + actual.tile.dimensions.toString)
tilesEqual(actual.tile, expected)
}
}

0 comments on commit 3aed6de

Please sign in to comment.