Skip to content

Commit

Permalink
Merge pull request #568 from jbouffard/feature/partitioners
Browse files Browse the repository at this point in the history
Expose Partitioners in the API
  • Loading branch information
Jacob Bouffard committed Jan 8, 2018
2 parents ce5e03f + 650ff98 commit 410f60c
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ object Constants {

final val METERSATEQUATOR = 11320
final val FEETATEQUATOR = 365217.6

final val HASH = "HashPartitioner"
final val SPATIAL = "SpatialPartitioner"
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ abstract class RasterLayer[K](implicit ev0: ClassTag[K], ev1: Component[K, Proje
protected def reproject(targetCRS: String, layoutDefinition: LayoutDefinition, resampleMethod: ResampleMethod): TiledRasterLayer[_]
protected def withRDD(result: RDD[(K, MultibandTile)]): RasterLayer[K]

def merge(numPartitions: Integer): RasterLayer[K] =
def merge(numPartitions: Integer, partitioner: String): RasterLayer[K] =
numPartitions match {
case i: Integer => withRDD(rdd.merge(Some(new HashPartitioner(i))))
case i: Integer => withRDD(rdd.merge(Some(TileLayer.getPartitioner(i, partitioner))))
case null => withRDD(rdd.merge())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package geopyspark.geotrellis

import geotrellis.spark._
import geotrellis.spark.io.index._
import geotrellis.spark.io.index.zcurve._
import geotrellis.util._

import org.apache.spark._

import scala.reflect._


class SpatialPartitioner[K: SpatialComponent](partitions: Int, bits: Int) extends Partitioner {
def numPartitions: Int = partitions

def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
val SpatialKey(col, row) = k.getComponent[SpatialKey]
((Z2(col, row).z >> bits) % partitions).toInt
}
}

object SpatialPartitioner {
def apply[K: SpatialComponent](partitions: Int, bits: Int): SpatialPartitioner[K] =
new SpatialPartitioner[K](partitions, bits)

def apply[K: SpatialComponent](partitions: Int): SpatialPartitioner[K] =
new SpatialPartitioner[K](partitions, 8)
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ class SpatialTiledRasterLayer(
SpatialTiledRasterLayer(zoom, tileLayer)
}

def pyramid(resampleMethod: ResampleMethod): Array[TiledRasterLayer[SpatialKey]] = {
def pyramid(resampleMethod: ResampleMethod, partitioner: String): Array[TiledRasterLayer[SpatialKey]] = {
require(! rdd.metadata.bounds.isEmpty, "Can not pyramid an empty RDD")
val part = rdd.partitioner.getOrElse(new HashPartitioner(rdd.partitions.length))
val part = TileLayer.getPartitioner(rdd.partitions.length, partitioner)
val (baseZoom, scheme) =
zoomLevel match {
case Some(zoom) =>
Expand Down Expand Up @@ -458,7 +458,8 @@ object SpatialTiledRasterLayer {
fillValue: Double,
cellType: String,
options: Rasterizer.Options,
numPartitions: Integer
numPartitions: Integer,
layerPartitioner: String
): SpatialTiledRasterLayer = {
val geomRDD = geomWKB.map { WKB.read }
val fullEnvelope = geomRDD.map(_.envelope).reduce(_ combine _)
Expand All @@ -471,7 +472,8 @@ object SpatialTiledRasterLayer {
cellType,
fullEnvelope,
options,
numPartitions)
numPartitions,
layerPartitioner)
}

def rasterizeGeometry(
Expand All @@ -482,7 +484,8 @@ object SpatialTiledRasterLayer {
fillValue: Double,
cellType: String,
options: Rasterizer.Options,
numPartitions: Integer
numPartitions: Integer,
layerPartitioner: String
): SpatialTiledRasterLayer = {
val geoms = geomWKB.asScala.map(WKB.read)
val fullEnvelope = geoms.map(_.envelope).reduce(_ combine _)
Expand All @@ -496,7 +499,8 @@ object SpatialTiledRasterLayer {
cellType,
fullEnvelope,
options,
numPartitions)
numPartitions,
layerPartitioner)
}

def rasterizeGeometry(
Expand All @@ -507,7 +511,8 @@ object SpatialTiledRasterLayer {
requestedCellType: String,
extent: Extent,
options: Rasterizer.Options,
numPartitions: Integer
numPartitions: Integer,
layerPartitioner: String
): SpatialTiledRasterLayer = {
import geotrellis.raster.rasterize.Rasterizer.Options

Expand All @@ -516,7 +521,8 @@ object SpatialTiledRasterLayer {
val LayoutLevel(z, ld) = ZoomedLayoutScheme(srcCRS).levelForZoom(requestedZoom)
val maptrans = ld.mapTransform
val gb @ GridBounds(cmin, rmin, cmax, rmax) = maptrans(extent)
val partitioner = new HashPartitioner(Option(numPartitions).map(_.toInt).getOrElse(math.max(gb.size / 512, 1)))
val partitions = Option(numPartitions).map(_.toInt).getOrElse(math.max(gb.size / 512, 1))
val partitioner = TileLayer.getPartitioner(partitions, layerPartitioner)

val tiles = RasterizeRDD.fromGeometry(
geoms = geomRDD,
Expand All @@ -537,7 +543,8 @@ object SpatialTiledRasterLayer {
requestedCellType: String,
options: Rasterizer.Options,
numPartitions: Integer,
zIndexCellType: String
zIndexCellType: String,
layerPartitioner: String
): SpatialTiledRasterLayer = {
import geotrellis.raster.rasterize.Rasterizer.Options

Expand All @@ -553,7 +560,8 @@ object SpatialTiledRasterLayer {
val LayoutLevel(z, ld) = ZoomedLayoutScheme(srcCRS).levelForZoom(requestedZoom)
val maptrans = ld.mapTransform
val gb @ GridBounds(cmin, rmin, cmax, rmax) = maptrans(fullEnvelope)
val partitioner = new HashPartitioner(Option(numPartitions).map(_.toInt).getOrElse(math.max(gb.size / 512, 1)))
val partitions = Option(numPartitions).map(_.toInt).getOrElse(math.max(gb.size / 512, 1))
val partitioner = TileLayer.getPartitioner(partitions, layerPartitioner)

val tiles =
RasterizeRDD.fromFeatureWithZIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ class TemporalTiledRasterLayer(
TemporalTiledRasterLayer(zoom, tileLayer)
}

def pyramid(resampleMethod: ResampleMethod): Array[TiledRasterLayer[SpaceTimeKey]] = {
def pyramid(resampleMethod: ResampleMethod, partitioner: String): Array[TiledRasterLayer[SpaceTimeKey]] = {
require(! rdd.metadata.bounds.isEmpty, "Can not pyramid an empty RDD")
val part = rdd.partitioner.getOrElse(new HashPartitioner(rdd.partitions.length))
val part = TileLayer.getPartitioner(rdd.partitions.length, partitioner)
val (baseZoom, scheme) =
zoomLevel match {
case Some(zoom) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ object TileLayer {
case NODATACELLS => TargetCell.NoData
}

def getPartitioner(partitions: Int, partitioner: String): Partitioner =
partitioner match {
case HASH => new HashPartitioner(partitions)
case SPATIAL => SpatialPartitioner(partitions)
}

def combineBands[K: ClassTag, L <: TileLayer[K]: ClassTag](
sc: SparkContext,
layers: ArrayList[L]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ abstract class TiledRasterLayer[K: SpatialComponent: JsonFormat: ClassTag: Bound
def rdd: RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]
def zoomLevel: Option[Int]

def repartition(numPartitions: Int): TiledRasterLayer[K] =
withRDD(rdd.partitionBy(new HashPartitioner(numPartitions)))
def repartition(numPartitions: Int, partitioner: String): TiledRasterLayer[K] =
withRDD(rdd.partitionBy(TileLayer.getPartitioner(numPartitions, partitioner)))

def bands(band: Int): TiledRasterLayer[K] =
withRDD(rdd.mapValues { multibandTile => multibandTile.subsetBands(band) })
Expand Down Expand Up @@ -119,7 +119,7 @@ abstract class TiledRasterLayer[K: SpatialComponent: JsonFormat: ClassTag: Bound
resampleMethod: ResampleMethod
): TiledRasterLayer[K]

def pyramid(resampleMethod: ResampleMethod): Array[_] // Array[TiledRasterLayer[K]]
def pyramid(resampleMethod: ResampleMethod, partitioner: String): Array[_] // Array[TiledRasterLayer[K]]

def focal(
operation: String,
Expand Down Expand Up @@ -378,13 +378,13 @@ abstract class TiledRasterLayer[K: SpatialComponent: JsonFormat: ClassTag: Bound
withRDD(result.mapValues { tiles => MultibandTile(tiles) } )
}

def merge(numPartitions: Integer): TiledRasterLayer[K] =
def merge(numPartitions: Integer, partitioner: String): TiledRasterLayer[K] =
numPartitions match {
case i: Integer => withRDD(
ContextRDD(
rdd
.asInstanceOf[RDD[(K, MultibandTile)]]
.merge(Some(new HashPartitioner(i))),
.merge(Some(TileLayer.getPartitioner(i, partitioner))),
rdd.metadata
)
)
Expand Down
15 changes: 14 additions & 1 deletion geopyspark/geotrellis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
'Operation', 'Neighborhood', 'ClassificationStrategy', 'CellType', 'ColorRamp',
'DEFAULT_MAX_TILE_SIZE', 'DEFAULT_PARTITION_BYTES', 'DEFAULT_CHUNK_SIZE',
'DEFAULT_GEOTIFF_TIME_TAG', 'DEFAULT_GEOTIFF_TIME_FORMAT', 'DEFAULT_S3_CLIENT',
'StorageMethod', 'ColorSpace', 'Compression', 'Unit']
'StorageMethod', 'ColorSpace', 'Compression', 'Unit', 'Partitioner']


"""The NoData value for ints in GeoTrellis."""
Expand Down Expand Up @@ -285,8 +285,21 @@ class Compression(Enum):
NO_COMPRESSION = "NoCompression"
DEFLATE_COMPRESSION = "DeflateCompression"


class Unit(Enum):
"""Represents the units of elevation."""

METERS = "Meters"
FEET = "Feet"


class Partitioner(Enum):
"""Partitioners to reparttion a layer.
There are currently two supported Partitioners:
- ``HASH_PARTITIONER`` Spark's HashPartitioner
- ``SPATIAL_PARTITIONER`` partitions data based on they key of each element.
"""

HASH_PARTITIONER = "HashPartitioner"
SPATIAL_PARTITIONER = "SpatialPartitioner"
43 changes: 31 additions & 12 deletions geopyspark/geotrellis/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
StorageMethod,
ColorSpace,
Compression,
NO_DATA_INT
NO_DATA_INT,
Partitioner
)
from geopyspark.geotrellis.neighborhood import Neighborhood

Expand Down Expand Up @@ -587,7 +588,7 @@ def collect_keys(self):
else:
return [temporal_projected_extent_decoder(key) for key in self.srdd.collectKeys()]

def merge(self, num_partitions=None):
def merge(self, num_partitions=None, partitioner=Partitioner.HASH_PARTITIONER):
"""Merges the ``Tile`` of each ``K`` together to produce a single ``Tile``.
This method will reduce each value by its key within the layer to produce a single
Expand All @@ -604,12 +605,16 @@ def merge(self, num_partitions=None):
num_partitions (int, optional): The number of partitions that the resulting
layer should be partitioned with. If ``None``, then the ``num_partitions``
will the number of partitions the layer curretly has.
partitioner (str or :class:`~geopyspark.geotrellis.constants.Partitioner`, optional):
The partitioner that should be used to repartition the data. The default is
``Partitioner.HASH_PARTITIONER``.
Returns:
:class:`~geopyspark.geotrellis.layer.RasterLayer`
"""

result = self.srdd.merge(num_partitions)
partitioner = Partitioner(partitioner).value
result = self.srdd.merge(num_partitions, partitioner)

return RasterLayer(self.layer_type, result)

Expand Down Expand Up @@ -932,7 +937,7 @@ def collect_keys(self):
else:
return [space_time_key_decoder(key) for key in self.srdd.collectKeys()]

def merge(self, num_partitions=None):
def merge(self, num_partitions=None, partitioner=Partitioner.HASH_PARTITIONER):
"""Merges the ``Tile`` of each ``K`` together to produce a single ``Tile``.
This method will reduce each value by its key within the layer to produce a single
Expand All @@ -949,12 +954,16 @@ def merge(self, num_partitions=None):
num_partitions (int, optional): The number of partitions that the resulting
layer should be partitioned with. If ``None``, then the ``num_partitions``
will the number of partitions the layer curretly has.
partitioner (str or :class:`~geopyspark.geotrellis.constants.Partitioner`, optional):
The partitioner that should be used to repartition the data. The default is
``Partitioner.HASH_PARTITIONER``.
Returns:
:class:`~geopyspark.geotrellis.layer.TiledRasterLayer`
"""

result = self.srdd.merge(num_partitions)
partitioner = Partitioner(partitioner).value
result = self.srdd.merge(num_partitions, partitioner)

return TiledRasterLayer(self.layer_type, result)

Expand Down Expand Up @@ -1190,18 +1199,24 @@ def reproject(self, target_crs, resample_method=ResampleMethod.NEAREST_NEIGHBOR)

return TiledRasterLayer(self.layer_type, srdd)

def repartition(self, num_partitions=None):
"""Repartition underlying RDD using HashPartitioner.
If ``num_partitions`` is None, existing number of partitions will be used.
def repartition(self, num_partitions=None, partitioner=Partitioner.HASH_PARTITIONER):
"""Repartition underlying RDD using the given partitioner.
Args:
num_partitions(int, optional): Desired number of partitions
num_partitions(int, optional): Desired number of partitions. If ``None``,
then the exisiting number of partitions will be used.
partitioner (str or :class:`~geopyspark.geotrellis.constants.Partitioner`, optional):
The partitioner that should be used to repartition the data. The default is
``Partitioner.HASH_PARTITIONER``.
Returns:
:class:`~geopyspark.geotrellis.rdd.TiledRasterLayer`
"""

num_partitions = num_partitions or self.getNumPartitions()
return TiledRasterLayer(self.layer_type, self.srdd.repartition(num_partitions))
partitioner = Partitioner(partitioner).value

return TiledRasterLayer(self.layer_type, self.srdd.repartition(num_partitions, partitioner))

def lookup(self, col, row):
"""Return the value(s) in the image of a particular ``SpatialKey`` (given by col and row).
Expand Down Expand Up @@ -1286,13 +1301,16 @@ def tile_to_layout(self, layout, target_crs=None, resample_method=ResampleMethod

return TiledRasterLayer(self.layer_type, srdd)

def pyramid(self, resample_method=ResampleMethod.NEAREST_NEIGHBOR):
def pyramid(self, resample_method=ResampleMethod.NEAREST_NEIGHBOR, partitioner=Partitioner.HASH_PARTITIONER):
"""Creates a layer ``Pyramid`` where the resolution is halved per level.
Args:
resample_method (str or :class:`~geopyspark.geotrellis.constants.ResampleMethod`, optional):
The resample method to use when building the pyramid.
Default is ``ResampleMethods.NEAREST_NEIGHBOR``.
partitioner (str or :class:`~geopyspark.geotrellis.constants.Partitioner`, optional):
The partitioner that should be used to repartition the data. The default is
``Partitioner.HASH_PARTITIONER``.
Returns:
:class:`~geopyspark.geotrellis.layer.Pyramid`.
Expand All @@ -1302,7 +1320,8 @@ def pyramid(self, resample_method=ResampleMethod.NEAREST_NEIGHBOR):
"""

resample_method = ResampleMethod(resample_method)
result = self.srdd.pyramid(resample_method)
partitioner = Partitioner(partitioner).value
result = self.srdd.pyramid(resample_method, partitioner)
return Pyramid([TiledRasterLayer(self.layer_type, srdd) for srdd in result])

def focal(self, operation, neighborhood=None, param_1=None, param_2=None, param_3=None):
Expand Down

0 comments on commit 410f60c

Please sign in to comment.