Skip to content

Commit

Permalink
Merge pull request #499 from jbouffard/feature/aggregate-by-cell
Browse files Browse the repository at this point in the history
Aggregate_by_cell Method
  • Loading branch information
Jacob Bouffard committed Sep 19, 2017
2 parents a0043fa + d972ae3 commit 22976e5
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object Constants {

final val MEAN = "Mean"
final val SUM = "Sum"
final val VARIANCE = "Variance"
final val STANDARDDEVIATION = "StandardDeviation"
final val SLOPE = "Slope"
final val ASPECT = "Aspect"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import geotrellis.raster._
import geotrellis.raster.distance._
import geotrellis.raster.io.geotiff._
import geotrellis.raster.io.geotiff.compression._
import geotrellis.raster.mapalgebra.local._
import geotrellis.raster.rasterize._
import geotrellis.raster.render._
import geotrellis.raster.resample.ResampleMethod
Expand Down Expand Up @@ -44,6 +45,9 @@ import org.apache.spark.SparkContext._
import java.util.ArrayList
import scala.reflect._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import spire.syntax.cfor._


abstract class TiledRasterLayer[K: SpatialComponent: JsonFormat: ClassTag] extends TileLayer[K] {
Expand Down Expand Up @@ -286,6 +290,39 @@ abstract class TiledRasterLayer[K: SpatialComponent: JsonFormat: ClassTag] exten
case multi: MultiPolygon => singleTileLayerRDD.polygonalSumDouble(multi)
}

def aggregateByCell(operation: String): TiledRasterLayer[K] = {
val bands: RDD[(K, Array[ArrayBuffer[Tile]])] =
rdd.combineByKey(
(multi: MultibandTile) => {
val arr = Array.ofDim[ArrayBuffer[Tile]](multi.bandCount)

cfor(0)(_ < arr.size, _ + 1) { x =>
arr(x) = ArrayBuffer(multi.band(x))
}

arr
},
(acc: Array[ArrayBuffer[Tile]], multi: MultibandTile) =>
acc.zip(multi.bands.toBuffer).map {
case (arr: ArrayBuffer[Tile], tile: Tile) => tile +=: arr
},
(acc1: Array[ArrayBuffer[Tile]], acc2: Array[ArrayBuffer[Tile]]) =>
acc1.zip(acc2) map { case (x, y) => x ++=: y }
)

val result: RDD[(K, Array[Tile])] =
operation match {
case SUM => bands.mapValues { x => x.map { tiles => tiles.reduce( _ localAdd _ ) } }
case MIN => bands.mapValues { x => x.map(Min(_)) }
case MAX => bands.mapValues { x => x.map(Max(_)) }
case MEAN => bands.mapValues { x => x.map(Mean(_)) }
case VARIANCE => bands.mapValues { x => x.map(Variance(_)) }
case STANDARDDEVIATION => bands.mapValues { x => x.map { tiles => Sqrt(Variance(tiles)) } }
}

withRDD(result.mapValues { tiles => MultibandTile(tiles) } )
}

def isFloatingPointLayer(): Boolean = rdd.metadata.cellType.isFloatingPoint

protected def withRDD(result: RDD[(K, MultibandTile)]): TiledRasterLayer[K]
Expand Down
1 change: 1 addition & 0 deletions geopyspark/geotrellis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class Operation(Enum):
MIN = 'Min'
ASPECT = 'Aspect'
SLOPE = 'Slope'
VARIANCE = 'Variance'
STANDARD_DEVIATION = 'StandardDeviation'


Expand Down
46 changes: 46 additions & 0 deletions geopyspark/geotrellis/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,52 @@ def tile_func(cells, cell_type, no_data_value):
self.layer_metadata,
self.zoom_level)

def aggregate_by_cell(self, operation):
"""Computes an aggregate summary for each cell of all of the values for each key.
The ``operation`` given is a local map algebra function that will be applied
to all values that share the same key. If there are multiple copies of the same
key in the layer, then this method will reduce all instances of the ``(K, Tile)``
pairs into a single element. This resulting ``(K, Tile)``\'s ``Tile`` will contain the
aggregate summaries of each cell of the reduced ``Tile``\s that had the same ``K``.
Note:
Not all :class:`~geoypspark.geotrellis.constants.Operation`\s are supported.
Only ``SUM``, ``MIN``, ``MAX``, ``MEAN``, ``VARIANCE``, AND ``STANDARD_DEVIATION``
can be used.
Note:
If calculating ``VARIANCE`` or ``STANDARD_DEVIATION``, then any ``K``
that is a single copy will have a resulting ``Tile`` that is filled with
``NoData`` values. This is because the variance of a single element is
undefined.
Args:
operation (str or :class:`~geopyspark.geotrellis.constants.Operation`): The aggregate
operation to be performed.
Returns:
:class:`~geopyspark.geotrellis.layer.TiledRasterLayer`
"""

aggregate_operations = [
Operation.SUM,
Operation.MEAN,
Operation.MIN,
Operation.MAX,
Operation.VARIANCE,
Operation.STANDARD_DEVIATION
]

operation = Operation(operation)

if operation not in aggregate_operations:
raise ValueError("Cannot perform aggregation with this operation", operation)

result = self.srdd.aggregateByCell(operation.value)

return TiledRasterLayer(self.layer_type, result)

def to_geotiff_rdd(self,
storage_method=StorageMethod.STRIPED,
rows_per_strip=None,
Expand Down
156 changes: 156 additions & 0 deletions geopyspark/tests/tiled_layer_tests/aggregate_cells_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import numpy as np
import os
import pytest
import unittest

from geopyspark.geotrellis import SpatialKey, Extent, Tile
from geopyspark.geotrellis.layer import TiledRasterLayer
from geopyspark.tests.base_test_class import BaseTestClass
from geopyspark.geotrellis.constants import LayerType, Operation


class AggregateCellsTest(BaseTestClass):
first = np.array([[
[1.0, 2.0, 3.0, 4.0, 5.0],
[1.0, 2.0, 3.0, 4.0, 5.0],
[1.0, 2.0, 3.0, 4.0, 5.0],
[1.0, 2.0, 3.0, 4.0, 5.0],
[1.0, 2.0, 3.0, 4.0, 5.0]]])

second = np.array([[
[1.0, 1.0, 1.0, 1.0, 1.0],
[2.0, 2.0, 2.0, 2.0, 2.0],
[3.0, 3.0, 3.0, 3.0, 3.0],
[4.0, 4.0, 4.0, 4.0, 4.0],
[5.0, 5.0, 5.0, 5.0, 5.0]]])

cells_1 = np.array([first, second])
cells_2 = np.array([second, first])

tile_1 = Tile.from_numpy_array(cells_1, -1.0)
tile_2 = Tile.from_numpy_array(cells_2, -1.0)

layer = [(SpatialKey(0, 0), tile_1),
(SpatialKey(1, 0), tile_1),
(SpatialKey(1, 0), tile_2),
(SpatialKey(0, 1), tile_1),
(SpatialKey(1, 1), tile_1)]
rdd = BaseTestClass.pysc.parallelize(layer)

extent = {'xmin': 0.0, 'ymin': 0.0, 'xmax': 33.0, 'ymax': 33.0}
layout = {'layoutCols': 2, 'layoutRows': 2, 'tileCols': 5, 'tileRows': 5}
metadata = {'cellType': 'float32ud-1.0',
'extent': extent,
'crs': '+proj=longlat +datum=WGS84 +no_defs ',
'bounds': {
'minKey': {'col': 0, 'row': 0},
'maxKey': {'col': 1, 'row': 1}},
'layoutDefinition': {
'extent': extent,
'tileLayout': {'tileCols': 5, 'tileRows': 5, 'layoutCols': 2, 'layoutRows': 2}}}

raster_rdd = TiledRasterLayer.from_numpy_rdd(LayerType.SPATIAL, rdd, metadata)

@pytest.fixture(autouse=True)
def tearDown(self):
yield
BaseTestClass.pysc._gateway.close()

def test_aggregate_sum(self):
result = self.raster_rdd.aggregate_by_cell(operation=Operation.SUM)
expected = np.array([self.first + self.second, self.first + self.second])

self.assertTrue((result.lookup(1, 0)[0].cells == expected).all())
self.assertTrue((result.lookup(0, 0)[0].cells[0] == self.first).all())
self.assertTrue((result.lookup(0, 0)[0].cells[1] == self.second).all())

def test_aggregate_min(self):
result = self.raster_rdd.aggregate_by_cell(operation=Operation.MIN)

band = np.array([[
[1, 1, 1, 1, 1],
[1, 2, 2, 2, 2],
[1, 2, 3, 3, 3],
[1, 2, 3, 4, 4],
[1, 2, 3, 4, 5]]])

expected = np.array([band, band])

self.assertTrue((result.lookup(1, 0)[0].cells == expected).all())
self.assertTrue((result.lookup(0, 0)[0].cells[0] == self.first).all())
self.assertTrue((result.lookup(0, 0)[0].cells[1] == self.second).all())

def test_aggregate_max(self):
result = self.raster_rdd.aggregate_by_cell(operation=Operation.MAX)

band = np.array([[
[1, 2, 3, 4, 5],
[2, 2, 3, 4, 5],
[3, 3, 3, 4, 5],
[4, 4, 4, 4, 5],
[5, 5, 5, 5, 5]]])

expected = np.array([band, band])

self.assertTrue((result.lookup(1, 0)[0].cells == expected).all())
self.assertTrue((result.lookup(0, 0)[0].cells[0] == self.first).all())
self.assertTrue((result.lookup(0, 0)[0].cells[1] == self.second).all())

def test_aggregate_mean(self):
result = self.raster_rdd.aggregate_by_cell(Operation.MEAN)

band = np.array([[
[1, 1.5, 2, 2.5, 3],
[1.5, 2, 2.5, 3, 3.5],
[2, 2.5, 3, 3.5, 4],
[2.5, 3, 3.5, 4, 4.5],
[3, 3.5, 4, 4.5, 5]]])

expected = np.array([band, band])

self.assertTrue((result.lookup(1, 0)[0].cells == expected).all())
self.assertTrue((result.lookup(0, 0)[0].cells[0] == self.first).all())
self.assertTrue((result.lookup(0, 0)[0].cells[1] == self.second).all())

def test_aggregate_variance(self):
result = self.raster_rdd.aggregate_by_cell(Operation.VARIANCE)

band = np.array([[
[1, 1.5, 2, 2.5, 3],
[1.5, 2, 2.5, 3, 3.5],
[2, 2.5, 3, 3.5, 4],
[2.5, 3, 3.5, 4, 4.5],
[3, 3.5, 4, 4.5, 5]]])

expected = np.array([
((self.first - band) ** 2) + ((self.second - band) ** 2),
((self.first - band) ** 2) + ((self.second - band) ** 2)
])
expected_2 = np.full((5, 5), -1.0)

self.assertTrue((result.lookup(1, 0)[0].cells == expected).all())
self.assertTrue((result.lookup(0, 0)[0].cells == expected_2).all())

def test_aggregate_std(self):
result = self.raster_rdd.aggregate_by_cell(Operation.STANDARD_DEVIATION)

band = np.array([[
[1, 1.5, 2, 2.5, 3],
[1.5, 2, 2.5, 3, 3.5],
[2, 2.5, 3, 3.5, 4],
[2.5, 3, 3.5, 4, 4.5],
[3, 3.5, 4, 4.5, 5]]])

expected = np.array([
(((self.first - band) ** 2) + ((self.second - band) ** 2)) ** (1/2),
(((self.first - band) ** 2) + ((self.second - band) ** 2)) ** (1/2)
])
expected_2 = np.full((5, 5), -1.0)

self.assertTrue((result.lookup(1, 0)[0].cells == expected).all())
self.assertTrue((result.lookup(0, 0)[0].cells == expected_2).all())


if __name__ == "__main__":
unittest.main()
BaseTestClass.pysc.stop()

0 comments on commit 22976e5

Please sign in to comment.