Skip to content

Commit

Permalink
Merge pull request #518 from jbouffard/feature/filter-by-times
Browse files Browse the repository at this point in the history
Filter By Times
  • Loading branch information
Jacob Bouffard committed Nov 15, 2017
2 parents 72e490b + 969e206 commit 151b6c3
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.{Either, Left, Right}
import scala.collection.JavaConverters._

import java.util.ArrayList
import java.time.{ZonedDateTime, ZoneId}

import spray.json._

Expand Down Expand Up @@ -153,6 +154,31 @@ class TemporalRasterLayer(val rdd: RDD[(TemporalProjectedExtent, MultibandTile)]

def collectKeys(): java.util.ArrayList[Array[Byte]] =
PythonTranslator.toPython[TemporalProjectedExtent, ProtoTemporalProjectedExtent](rdd.keys.collect)

def filterByTimes(
times: java.util.ArrayList[String]
): TemporalRasterLayer = {
val timeBoundaries: Array[(Long, Long)] =
times
.asScala
.grouped(2)
.map { list =>
list match {
case scala.collection.mutable.Buffer(a, b) =>
(ZonedDateTime.parse(a).toInstant.toEpochMilli, ZonedDateTime.parse(b).toInstant.toEpochMilli)
case scala.collection.mutable.Buffer(a) =>
(ZonedDateTime.parse(a).toInstant.toEpochMilli, ZonedDateTime.parse(a).toInstant.toEpochMilli)
}
}.toArray

val inRange = (tpe: TemporalProjectedExtent, range: (Long, Long)) =>
range._1 <= tpe.instant && tpe.instant <= range._2

val filteredRDD =
rdd.filter { case (key, _) => timeBoundaries.filter(inRange(key, _)).size != 0 }

TemporalRasterLayer(filteredRDD)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import geotrellis.raster.render._
import geotrellis.raster.resample.{ResampleMethod, PointResampleMethod, Resample}
import geotrellis.spark._
import geotrellis.spark.costdistance.IterativeCostDistance
import geotrellis.spark.filter._
import geotrellis.spark.io._
import geotrellis.spark.io.json._
import geotrellis.spark.mapalgebra.local._
Expand Down Expand Up @@ -472,6 +473,36 @@ class TemporalTiledRasterLayer(
case None => Seq()
}
}.collect().toMap

def filterByTimes(
times: java.util.ArrayList[String]
): TemporalTiledRasterLayer = {
val bounds: KeyBounds[SpatialKey] = KeyBounds(rdd.metadata.gridBounds)
val minKey = bounds.minKey
val maxKey = bounds.maxKey
val timeBoundaries: Array[KeyBounds[SpaceTimeKey]] =
times
.asScala
.grouped(2)
.map { list =>
list match {
case scala.collection.mutable.Buffer(a, b) =>
KeyBounds(
SpaceTimeKey(minKey.col, minKey.row, ZonedDateTime.parse(a)),
SpaceTimeKey(maxKey.col, maxKey.row, ZonedDateTime.parse(b))
)
case scala.collection.mutable.Buffer(a) =>
KeyBounds(
SpaceTimeKey(minKey.col, minKey.row, ZonedDateTime.parse(a)),
SpaceTimeKey(maxKey.col, maxKey.row, ZonedDateTime.parse(a))
)
}
}.toArray

val filteredRDD = rdd.filterByKeyBounds(timeBoundaries)

TemporalTiledRasterLayer(zoomLevel, filteredRDD)
}
}


Expand Down
68 changes: 67 additions & 1 deletion geopyspark/geotrellis/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import datetime
import shapely.wkb
import pytz
from shapely.geometry import Polygon, MultiPolygon, Point
from geopyspark.geotrellis.protobufcodecs import (multibandtile_decoder,
projected_extent_decoder,
Expand Down Expand Up @@ -724,6 +725,39 @@ def __str__(self):
def __repr__(self):
return "RasterLayer(layer_type={})".format(self.layer_type)

def filter_by_times(self, time_intervals):
"""Filters a ``SPACETIME`` layer by keeping only the values whose keys fall within
a the given time interval(s).
Args:
time_intervals (``[datetime.datetime]``): A list of the time intervals to query.
This list can have one or multiple elements. If just a single element, then only
exact matches with that given time will be kept. If there are multiple times given,
then they are each paired together so that they form ranges of time. In the case
where there are an odd number of elements, then the remaining time will be treated
as a single query and not a range.
Note:
If nothing intersects the given ``time_intervals``, then the returned ``RasterLayer``
will be empty.
Returns:
:class:`~geopyspark.geotrellis.layer.RasterLayer`
"""

if self.layer_type == LayerType.SPATIAL:
raise TypeError("Only layers of type SPACETIME can be filtered by time")

for x, time in enumerate(time_intervals):
if time.tzinfo:
time_intervals[x] = time.astimezone(pytz.utc).isoformat()
else:
time_intervals[x] = time.replace(tzinfo=pytz.utc).isoformat()

result = self.srdd.filterByTimes(time_intervals)

return RasterLayer(self.layer_type, result)


class TiledRasterLayer(CachableLayer, TileLayer):
"""Wraps a RDD of tiled, GeoTrellis rasters.
Expand Down Expand Up @@ -1480,6 +1514,39 @@ def normalize(self, new_min, new_max, old_min=None, old_max=None):

return TiledRasterLayer(self.layer_type, srdd)

def filter_by_times(self, time_intervals):
"""Filters a ``SPACETIME`` layer by keeping only the values whose keys fall within
a the given time interval(s).
Args:
time_intervals (``[datetime.datetime]``): A list of the time intervals to query.
This list can have one or multiple elements. If just a single element, then only
exact matches with that given time will be kept. If there are multiple times given,
then they are each paired together so that they form ranges of time. In the case
where there are an odd number of elements, then the remaining time will be treated
as a single query and not a range.
Note:
If nothing intersects the given ``time_intervals``, then the returned ``TiledRasterLayer``
will be empty.
Returns:
:class:`~geopyspark.geotrellis.layer.TiledRasterLayer`
"""

if self.layer_type == LayerType.SPATIAL:
raise TypeError("Only layers of type SPACETIME can be filtered by time")

for x, time in enumerate(time_intervals):
if time.tzinfo:
time_intervals[x] = time.astimezone(pytz.utc).isoformat()
else:
time_intervals[x] = time.replace(tzinfo=pytz.utc).isoformat()

result = self.srdd.filterByTimes(time_intervals)

return TiledRasterLayer(self.layer_type, result)

def get_point_values(self, points, resample_method=None):
"""Returns the values of the layer at given points.
Expand Down Expand Up @@ -1613,7 +1680,6 @@ def to_datetime(instant):
else:
raise TypeError("Expected a list or dict. Instead got", type(points))


@staticmethod
def _process_polygonal_summary(geometry, operation):
if isinstance(geometry, (Polygon, MultiPolygon)):
Expand Down
125 changes: 125 additions & 0 deletions geopyspark/tests/filter_by_times_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
import datetime
import unittest
import numpy as np

import pytest

from geopyspark.geotrellis import SpatialKey, Tile, _convert_to_unix_time
from geopyspark.tests.base_test_class import BaseTestClass
from geopyspark.geotrellis import Extent, ProjectedExtent, SpaceTimeKey, SpatialKey, TemporalProjectedExtent
from geopyspark.geotrellis.layer import RasterLayer, TiledRasterLayer
from geopyspark.geotrellis.constants import LayerType


class FilterByTimesTest(BaseTestClass):
band = np.array([
[1.0, 1.0, 1.0, 1.0, 1.0],
[1.0, 1.0, 1.0, 1.0, 1.0],
[1.0, 1.0, 1.0, 1.0, 1.0],
[1.0, 1.0, 1.0, 1.0, 1.0],
[1.0, 1.0, 1.0, 1.0, 1.0]])

tile = Tile.from_numpy_array(band)
time_1 = datetime.datetime.strptime("2016-08-24T09:00:00Z", '%Y-%m-%dT%H:%M:%SZ')
time_2 = datetime.datetime.strptime("2017-08-24T09:00:00Z", '%Y-%m-%dT%H:%M:%SZ')
time_3 = datetime.datetime.strptime("2017-10-17T09:00:00Z", '%Y-%m-%dT%H:%M:%SZ')

layer = [(SpaceTimeKey(0, 0, time_1), tile),
(SpaceTimeKey(1, 0, time_1), tile),
(SpaceTimeKey(0, 1, time_1), tile),
(SpaceTimeKey(1, 1, time_1), tile),
(SpaceTimeKey(0, 0, time_2), tile),
(SpaceTimeKey(1, 0, time_2), tile),
(SpaceTimeKey(0, 1, time_2), tile),
(SpaceTimeKey(1, 1, time_2), tile),
(SpaceTimeKey(0, 0, time_3), tile),
(SpaceTimeKey(1, 0, time_3), tile),
(SpaceTimeKey(0, 1, time_3), tile),
(SpaceTimeKey(1, 1, time_3), tile)
]

rdd = BaseTestClass.pysc.parallelize(layer)

extent = {'xmin': 0.0, 'ymin': 0.0, 'xmax': 33.0, 'ymax': 33.0}
layout = {'layoutCols': 2, 'layoutRows': 2, 'tileCols': 5, 'tileRows': 5}
metadata = {'cellType': 'float32ud-1.0',
'extent': extent,
'crs': '+proj=longlat +datum=WGS84 +no_defs ',
'bounds': {
'minKey': {'col': 0, 'row': 0, 'instant': _convert_to_unix_time(time_1)},
'maxKey': {'col': 1, 'row': 1, 'instant': _convert_to_unix_time(time_3)}},
'layoutDefinition': {
'extent': extent,
'tileLayout': {'tileCols': 5, 'tileRows': 5, 'layoutCols': 2, 'layoutRows': 2}}}

tiled_raster_rdd = TiledRasterLayer.from_numpy_rdd(LayerType.SPACETIME, rdd, metadata)

layer2 = [(TemporalProjectedExtent(Extent(0, 0, 1, 1), epsg=3857, instant=time_1), tile),
(TemporalProjectedExtent(Extent(1, 0, 2, 1), epsg=3857, instant=time_1), tile),
(TemporalProjectedExtent(Extent(0, 1, 1, 2), epsg=3857, instant=time_1), tile),
(TemporalProjectedExtent(Extent(1, 1, 2, 2), epsg=3857, instant=time_1), tile),
(TemporalProjectedExtent(Extent(1, 0, 2, 1), epsg=3857, instant=time_2), tile),
(TemporalProjectedExtent(Extent(1, 0, 2, 1), epsg=3857, instant=time_2), tile),
(TemporalProjectedExtent(Extent(0, 1, 1, 2), epsg=3857, instant=time_2), tile),
(TemporalProjectedExtent(Extent(1, 1, 2, 2), epsg=3857, instant=time_2), tile),
(TemporalProjectedExtent(Extent(1, 0, 2, 1), epsg=3857, instant=time_3), tile),
(TemporalProjectedExtent(Extent(1, 0, 2, 1), epsg=3857, instant=time_3), tile),
(TemporalProjectedExtent(Extent(0, 1, 1, 2), epsg=3857, instant=time_3), tile),
(TemporalProjectedExtent(Extent(1, 1, 2, 2), epsg=3857, instant=time_3), tile)]

rdd2 = BaseTestClass.pysc.parallelize(layer2)
raster_rdd = RasterLayer.from_numpy_rdd(LayerType.SPACETIME, rdd2)

@pytest.fixture(autouse=True)
def tearDown(self):
yield
BaseTestClass.pysc._gateway.close()

def test_filter_temporal_projected_extent_single_time(self):
result = self.raster_rdd.filter_by_times([self.time_1])
expected = self.layer2[:4]
actual = result.to_numpy_rdd().collect()

self.assertEqual(len(expected), len(actual))

for x, y in zip(expected, actual):
self.assertEqual(x[0], y[0])
self.assertTrue((x[1].cells == y[1].cells).all())

def test_filter_temporal_projected_extent_multi_intervals(self):
result = self.raster_rdd.filter_by_times([self.time_2, self.time_3])
expected = self.layer2[4:]
actual = result.to_numpy_rdd().collect()

self.assertEqual(len(expected), len(actual))

for x, y in zip(expected, actual):
self.assertEqual(x[0], y[0])
self.assertTrue((x[1].cells == y[1].cells).all())

def test_filter_spacetime_key_single_time(self):
result = self.tiled_raster_rdd.filter_by_times([self.time_3])
expected = self.layer[8:]
actual = result.to_numpy_rdd().collect()

self.assertEqual(len(expected), len(actual))

for x, y in zip(expected, actual):
self.assertEqual(x[0], y[0])
self.assertTrue((x[1].cells == y[1].cells).all())

def test_filter_spacetime_key_multi_intervals(self):
result = self.tiled_raster_rdd.filter_by_times([self.time_1, self.time_2])
expected = self.layer[:8]
actual = result.to_numpy_rdd().collect()

self.assertEqual(len(expected), len(actual))

for x, y in zip(expected, actual):
self.assertEqual(x[0], y[0])
self.assertTrue((x[1].cells == y[1].cells).all())


if __name__ == "__main__":
unittest.main()

0 comments on commit 151b6c3

Please sign in to comment.