Skip to content

Commit

Permalink
Merge pull request #616 from jbouffard/improvement/partitioner
Browse files Browse the repository at this point in the history
Partition Strategies and Preservation
  • Loading branch information
Jacob Bouffard committed Jan 17, 2018
2 parents 4e976ac + d45d132 commit 7091b88
Show file tree
Hide file tree
Showing 21 changed files with 724 additions and 196 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package geopyspark.geotrellis

import org.apache.spark._


abstract class PartitionStrategy(numPartitions: Option[Int]) {
def producePartitioner(partitions: Int): Option[Partitioner]
}


class HashPartitionStrategy(val numPartitions: Option[Int]) extends PartitionStrategy(numPartitions) {
def producePartitioner(partitions: Int): Option[Partitioner] =
numPartitions match {
case None => Some(new HashPartitioner(partitions))
case Some(num) => Some(new HashPartitioner(num))
}
}

object HashPartitionStrategy {
def apply(numPartitions: Integer): HashPartitionStrategy =
numPartitions match {
case i: Integer => new HashPartitionStrategy(Some(i.toInt))
case null => new HashPartitionStrategy(None)
}
}


class SpatialPartitionStrategy(val numPartitions: Option[Int], val bits: Int) extends PartitionStrategy(numPartitions) {
def producePartitioner(partitions: Int): Option[Partitioner] =
numPartitions match {
case None => Some(SpatialPartitioner(partitions, bits))
case Some(num) => Some(SpatialPartitioner(num, bits))
}
}

object SpatialPartitionStrategy {
def apply(numPartitions: Integer, bits: Int): SpatialPartitionStrategy =
numPartitions match {
case i: Integer => new SpatialPartitionStrategy(Some(i.toInt), bits)
case null => new SpatialPartitionStrategy(None, bits)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,26 @@ class ProjectedRasterLayer(val rdd: RDD[(ProjectedExtent, MultibandTile)]) exten
).toJson.compactPrint
}

def tileToLayout(tileLayerMetadata: String, resampleMethod: ResampleMethod): TiledRasterLayer[SpatialKey] = {
def tileToLayout(
tileLayerMetadata: String,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val md = tileLayerMetadata.parseJson.convertTo[TileLayerMetadata[SpatialKey]]
new SpatialTiledRasterLayer(None, MultibandTileLayerRDD(rdd.tileToLayout(md, resampleMethod), md))
val options = getTilerOptions(resampleMethod, partitionStrategy)

new SpatialTiledRasterLayer(None, MultibandTileLayerRDD(rdd.tileToLayout(md, options), md))
}

def tileToLayout(layoutDefinition: LayoutDefinition, resampleMethod: ResampleMethod): TiledRasterLayer[SpatialKey] = {
def tileToLayout(
layoutDefinition: LayoutDefinition,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val sms = RasterSummary.collect[ProjectedExtent, SpatialKey](rdd)
require(sms.length == 1, s"Multiple raster CRS layers found: ${sms.map(_.crs).toList}")
val sm = sms.head

val sm = sms.head
val metadata = TileLayerMetadata[SpatialKey](
sm.cellType,
layoutDefinition,
Expand All @@ -69,15 +79,24 @@ class ProjectedRasterLayer(val rdd: RDD[(ProjectedExtent, MultibandTile)]) exten
sm.bounds.setSpatialBounds(layoutDefinition.mapTransform(sm.extent))
)

SpatialTiledRasterLayer(None, MultibandTileLayerRDD(rdd.tileToLayout(metadata, resampleMethod), metadata))
val options = getTilerOptions(resampleMethod, partitionStrategy)

SpatialTiledRasterLayer(None, MultibandTileLayerRDD(rdd.tileToLayout(metadata, options), metadata))
}

def tileToLayout(layoutType: LayoutType, resampleMethod: ResampleMethod): TiledRasterLayer[SpatialKey] = {
def tileToLayout(
layoutType: LayoutType,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val sms = RasterSummary.collect[ProjectedExtent, SpatialKey](rdd)
require(sms.length == 1, s"Multiple raster CRS layers found: ${sms.map(_.crs).toList}")

val sm = sms.head
val (metadata, zoom) = sm.toTileLayerMetadata(layoutType)
val tiled = rdd.tileToLayout(metadata, resampleMethod)
val options = getTilerOptions(resampleMethod, partitionStrategy)
val tiled = rdd.tileToLayout(metadata, options)

new SpatialTiledRasterLayer(zoom, MultibandTileLayerRDD(tiled, metadata))
}

Expand All @@ -86,9 +105,15 @@ class ProjectedRasterLayer(val rdd: RDD[(ProjectedExtent, MultibandTile)]) exten
new ProjectedRasterLayer(rdd.reproject(crs, resampleMethod))
}

def reproject(targetCRS: String, layoutType: LayoutType, resampleMethod: ResampleMethod): TiledRasterLayer[SpatialKey] = {
def reproject(
targetCRS: String,
layoutType: LayoutType,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val crs = TileLayer.getCRS(targetCRS).get
val tiled = tileToLayout(LocalLayout(256), resampleMethod).rdd
val tiled = tileToLayout(LocalLayout(256), resampleMethod, partitionStrategy).rdd

layoutType match {
case GlobalLayout(tileSize, null, threshold) =>
val scheme = new ZoomedLayoutScheme(crs, tileSize, threshold)
Expand All @@ -109,10 +134,12 @@ class ProjectedRasterLayer(val rdd: RDD[(ProjectedExtent, MultibandTile)]) exten
def reproject(
target_crs: String,
layoutDefinition: LayoutDefinition,
resampleMethod: ResampleMethod
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val tiled = tileToLayout(layoutDefinition, resampleMethod).rdd
val tiled = tileToLayout(layoutDefinition, resampleMethod, partitionStrategy).rdd
val (zoom, reprojected) = TileRDDReproject(tiled, TileLayer.getCRS(target_crs).get, Right(layoutDefinition), resampleMethod)

SpatialTiledRasterLayer(Some(zoom), reprojected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ import scala.collection.immutable.HashMap
abstract class RasterLayer[K](implicit ev0: ClassTag[K], ev1: Component[K, ProjectedExtent]) extends TileLayer[K] with Serializable {
def rdd: RDD[(K, MultibandTile)]

def repartition(numPartitions: Int): RasterLayer[K] = withRDD(rdd.repartition(numPartitions))

def partitionBy(partitionStrategy: PartitionStrategy): RasterLayer[K] =
withRDD(rdd.partitionBy(partitionStrategy.producePartitioner(rdd.getNumPartitions).get))

def toProtoRDD(): JavaRDD[Array[Byte]]

def collectKeys(): java.util.ArrayList[Array[Byte]]
Expand All @@ -57,18 +62,45 @@ abstract class RasterLayer[K](implicit ev0: ClassTag[K], ev1: Component[K, Proje
def convertDataType(newType: String): RasterLayer[_] =
withRDD(rdd.map { x => (x._1, x._2.convert(CellType.fromName(newType))) })

protected def tileToLayout(tileLayerMetadata: String, resampleMethod: ResampleMethod): TiledRasterLayer[_]
def tileToLayout(layoutType: LayoutType, resampleMethod: ResampleMethod): TiledRasterLayer[_]
protected def tileToLayout(layoutDefinition: LayoutDefinition, resampleMethod: ResampleMethod): TiledRasterLayer[_]
def tileToLayout(
tileLayerMetadata: String,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[_]

def tileToLayout(
layoutType: LayoutType,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[_]

def tileToLayout(
layoutDefinition: LayoutDefinition,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[_]

def reproject(targetCRS: String, resampleMethod: ResampleMethod): RasterLayer[K]

def reproject(
targetCRS: String,
layoutType: LayoutType,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[_]

def reproject(
targetCRS: String,
layoutDefinition: LayoutDefinition,
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[_]

protected def reproject(targetCRS: String, resampleMethod: ResampleMethod): RasterLayer[K]
protected def reproject(targetCRS: String, layoutType: LayoutType, resampleMethod: ResampleMethod): TiledRasterLayer[_]
protected def reproject(targetCRS: String, layoutDefinition: LayoutDefinition, resampleMethod: ResampleMethod): TiledRasterLayer[_]
protected def withRDD(result: RDD[(K, MultibandTile)]): RasterLayer[K]

def merge(numPartitions: Integer, partitioner: String): RasterLayer[K] =
numPartitions match {
case i: Integer => withRDD(rdd.merge(Some(TileLayer.getPartitioner(i, partitioner))))
def merge(partitionStrategy: PartitionStrategy): RasterLayer[K] =
partitionStrategy match {
case ps: PartitionStrategy => withRDD(rdd.merge(ps.producePartitioner(rdd.getNumPartitions)))
case null => withRDD(rdd.merge())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.reflect._
class SpatialPartitioner[K: SpatialComponent](partitions: Int, bits: Int) extends Partitioner {
def numPartitions: Int = partitions

def getBits: Int = bits

def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
val SpatialKey(col, row) = k.getComponent[SpatialKey]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class SpatialTiledRasterLayer(
def tileToLayout(
layoutDefinition: LayoutDefinition,
zoom: Option[Int],
resampleMethod: ResampleMethod
resampleMethod: ResampleMethod,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val baseTransform = rdd.metadata.layout.mapTransform
val targetTransform = layoutDefinition.mapTransform
Expand All @@ -115,15 +116,22 @@ class SpatialTiledRasterLayer(
bounds = KeyBounds(targetTransform(rdd.metadata.extent))
)

val options = getTilerOptions(resampleMethod, partitionStrategy)
val tileLayer =
MultibandTileLayerRDD(projectedRDD.tileToLayout(retiledLayerMetadata, resampleMethod), retiledLayerMetadata)
MultibandTileLayerRDD(projectedRDD.tileToLayout(retiledLayerMetadata, options), retiledLayerMetadata)

SpatialTiledRasterLayer(zoom, tileLayer)
}

def pyramid(resampleMethod: ResampleMethod, partitioner: String): Array[TiledRasterLayer[SpatialKey]] = {
def pyramid(resampleMethod: ResampleMethod, partitionStrategy: PartitionStrategy): Array[TiledRasterLayer[SpatialKey]] = {
require(! rdd.metadata.bounds.isEmpty, "Can not pyramid an empty RDD")
val part = TileLayer.getPartitioner(rdd.partitions.length, partitioner)

val partitioner =
partitionStrategy match {
case ps: PartitionStrategy => ps.producePartitioner(rdd.getNumPartitions)
case null => None
}

val (baseZoom, scheme) =
zoomLevel match {
case Some(zoom) =>
Expand All @@ -136,7 +144,7 @@ class SpatialTiledRasterLayer(

Pyramid.levelStream(
rdd, scheme, baseZoom, 0,
Pyramid.Options(resampleMethod=resampleMethod, partitioner=part)
Pyramid.Options(resampleMethod=resampleMethod, partitioner=partitioner)
).map{ x =>
SpatialTiledRasterLayer(Some(x._1), x._2)
}.toArray
Expand All @@ -147,7 +155,8 @@ class SpatialTiledRasterLayer(
neighborhood: String,
param1: Double,
param2: Double,
param3: Double
param3: Double,
partitionStrategy: PartitionStrategy
): TiledRasterLayer[SpatialKey] = {
val singleTileLayerRDD: TileLayerRDD[SpatialKey] = TileLayerRDD(
rdd.mapValues({ v => v.band(0) }),
Expand All @@ -163,7 +172,13 @@ class SpatialTiledRasterLayer(
val cellSize = rdd.metadata.layout.cellSize
val op: ((Tile, Option[GridBounds]) => Tile) = getOperation(operation, _neighborhood, cellSize, param1)

val result: TileLayerRDD[SpatialKey] = FocalOperation(singleTileLayerRDD, _neighborhood)(op)
val result: TileLayerRDD[SpatialKey] =
partitionStrategy match {
case ps: PartitionStrategy =>
FocalOperation(singleTileLayerRDD, _neighborhood, ps.producePartitioner(rdd.getNumPartitions))(op)
case null =>
FocalOperation(singleTileLayerRDD, _neighborhood, None)(op)
}

val multibandRDD: MultibandTileLayerRDD[SpatialKey] =
MultibandTileLayerRDD(result.mapValues{ x => MultibandTile(x) }, result.metadata)
Expand Down Expand Up @@ -458,8 +473,7 @@ object SpatialTiledRasterLayer {
fillValue: Double,
cellType: String,
options: Rasterizer.Options,
numPartitions: Integer,
layerPartitioner: String
partitionStrategy: PartitionStrategy
): SpatialTiledRasterLayer = {
val geomRDD = geomWKB.map { WKB.read }
val fullEnvelope = geomRDD.map(_.envelope).reduce(_ combine _)
Expand All @@ -472,8 +486,7 @@ object SpatialTiledRasterLayer {
cellType,
fullEnvelope,
options,
numPartitions,
layerPartitioner)
partitionStrategy)
}

def rasterizeGeometry(
Expand All @@ -484,8 +497,7 @@ object SpatialTiledRasterLayer {
fillValue: Double,
cellType: String,
options: Rasterizer.Options,
numPartitions: Integer,
layerPartitioner: String
partitionStrategy: PartitionStrategy
): SpatialTiledRasterLayer = {
val geoms = geomWKB.asScala.map(WKB.read)
val fullEnvelope = geoms.map(_.envelope).reduce(_ combine _)
Expand All @@ -499,8 +511,7 @@ object SpatialTiledRasterLayer {
cellType,
fullEnvelope,
options,
numPartitions,
layerPartitioner)
partitionStrategy)
}

def rasterizeGeometry(
Expand All @@ -511,8 +522,7 @@ object SpatialTiledRasterLayer {
requestedCellType: String,
extent: Extent,
options: Rasterizer.Options,
numPartitions: Integer,
layerPartitioner: String
partitionStrategy: PartitionStrategy
): SpatialTiledRasterLayer = {
import geotrellis.raster.rasterize.Rasterizer.Options

Expand All @@ -521,8 +531,12 @@ object SpatialTiledRasterLayer {
val LayoutLevel(z, ld) = ZoomedLayoutScheme(srcCRS).levelForZoom(requestedZoom)
val maptrans = ld.mapTransform
val gb @ GridBounds(cmin, rmin, cmax, rmax) = maptrans(extent)
val partitions = Option(numPartitions).map(_.toInt).getOrElse(math.max(gb.size / 512, 1))
val partitioner = TileLayer.getPartitioner(partitions, layerPartitioner)

val partitioner =
partitionStrategy match {
case ps: PartitionStrategy => ps.producePartitioner(math.max(gb.size / 512, 1))
case null => None
}

val tiles = RasterizeRDD.fromGeometry(
geoms = geomRDD,
Expand All @@ -531,7 +545,9 @@ object SpatialTiledRasterLayer {
value = fillValue,
options = Option(options).getOrElse(Options.DEFAULT),
partitioner = partitioner)

val metadata = TileLayerMetadata(cellType, ld, maptrans(gb), srcCRS, KeyBounds(gb))

SpatialTiledRasterLayer(Some(requestedZoom),
MultibandTileLayerRDD(tiles.mapValues(MultibandTile(_)), metadata))
}
Expand All @@ -542,9 +558,8 @@ object SpatialTiledRasterLayer {
requestedZoom: Int,
requestedCellType: String,
options: Rasterizer.Options,
numPartitions: Integer,
zIndexCellType: String,
layerPartitioner: String
partitionStrategy: PartitionStrategy
): SpatialTiledRasterLayer = {
import geotrellis.raster.rasterize.Rasterizer.Options

Expand All @@ -560,8 +575,12 @@ object SpatialTiledRasterLayer {
val LayoutLevel(z, ld) = ZoomedLayoutScheme(srcCRS).levelForZoom(requestedZoom)
val maptrans = ld.mapTransform
val gb @ GridBounds(cmin, rmin, cmax, rmax) = maptrans(fullEnvelope)
val partitions = Option(numPartitions).map(_.toInt).getOrElse(math.max(gb.size / 512, 1))
val partitioner = TileLayer.getPartitioner(partitions, layerPartitioner)

val partitioner =
partitionStrategy match {
case ps: PartitionStrategy => ps.producePartitioner(math.max(gb.size / 512, 1))
case null => None
}

val tiles =
RasterizeRDD.fromFeatureWithZIndex(
Expand Down

0 comments on commit 7091b88

Please sign in to comment.