Skip to content

Commit

Permalink
Merge pull request #2288 from jamesmcclain/feature/timeseries
Browse files Browse the repository at this point in the history
Time Series
  • Loading branch information
echeipesh committed Jul 27, 2017
2 parents 1f937c7 + 3025283 commit 8220302
Show file tree
Hide file tree
Showing 8 changed files with 568 additions and 10 deletions.
1 change: 1 addition & 0 deletions spark/src/main/scala/geotrellis/spark/package.scala
Expand Up @@ -61,6 +61,7 @@ package object spark
with summary.Implicits
with summary.polygonal.Implicits
with tiling.Implicits
with timeseries.Implicits
with viewshed.Implicits {
type TileLayerRDD[K] = RDD[(K, Tile)] with Metadata[TileLayerMetadata[K]]
object TileLayerRDD {
Expand Down
13 changes: 13 additions & 0 deletions spark/src/main/scala/geotrellis/spark/timeseries/Implicits.scala
@@ -0,0 +1,13 @@
package geotrellis.spark.timeseries

import geotrellis.spark._

import org.apache.spark.rdd.RDD


object Implicits extends Implicits

trait Implicits {
implicit class withRDDTimeSeriesMethods(val self: TileLayerRDD[SpaceTimeKey])
extends RDDTimeSeriesMethods
}
@@ -0,0 +1,279 @@
package geotrellis.spark.timeseries

import geotrellis.raster._
import geotrellis.raster.histogram._
import geotrellis.raster.summary.polygonal._
import geotrellis.spark._
import geotrellis.spark.mask.Mask.Options
import geotrellis.util.annotations.experimental
import geotrellis.util.MethodExtensions
import geotrellis.vector._

import java.time.ZonedDateTime


/**
* @define experimental <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>@experimental
*/
@experimental object RDDTimeSeriesFunctions {

/**
* $experimental
*/
@experimental def histogramProjection(tile: Tile): StreamingHistogram =
StreamingHistogram.fromTile(tile)

/**
* $experimental
*/
@experimental def histogramReduction(left: StreamingHistogram, right: StreamingHistogram): StreamingHistogram =
left + right

/**
* $experimental
*/
@experimental def meanReduction(left: MeanResult, right: MeanResult): MeanResult =
left + right

/**
* $experimental
*/
@experimental def maxReduction(left: Double, right: Double): Double =
scala.math.max(left, right)

/**
* $experimental
*/
@experimental def minReduction(left: Double, right: Double): Double =
scala.math.min(left, right)

/**
* $experimental
*/
@experimental def sumReduction(left: Double, right: Double): Double =
left + right
}

/**
* @define experimental <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>@experimental
*/
@experimental abstract class RDDTimeSeriesMethods
extends MethodExtensions[TileLayerRDD[SpaceTimeKey]] {

/**
* $experimental
*/
@experimental def sumSeries(
polygon: MultiPolygon,
options: Options
): Map[ZonedDateTime, Double] =
sumSeries(List(polygon), options)

/**
* $experimental
*/
@experimental def sumSeries(
polygon: MultiPolygon
): Map[ZonedDateTime, Double] =
sumSeries(List(polygon), Options.DEFAULT)

/**
* $experimental
*/
@experimental def sumSeries(
polygons: Traversable[MultiPolygon]
): Map[ZonedDateTime, Double] =
sumSeries(polygons, Options.DEFAULT)

/**
* $experimental
*/
@experimental def sumSeries(
polygons: Traversable[MultiPolygon],
options: Options
): Map[ZonedDateTime, Double] = {
TimeSeries(
self,
SumDoubleSummary.handleFullTile,
RDDTimeSeriesFunctions.sumReduction,
polygons,
options
)
.collect()
.toMap
}

/**
* $experimental
*/
@experimental def minSeries(
polygon: MultiPolygon,
options: Options
): Map[ZonedDateTime, Double] =
minSeries(List(polygon), options)

/**
* $experimental
*/
@experimental def minSeries(
polygon: MultiPolygon
): Map[ZonedDateTime, Double] =
minSeries(List(polygon), Options.DEFAULT)

/**
* $experimental
*/
@experimental def minSeries(
polygons: Traversable[MultiPolygon]
): Map[ZonedDateTime, Double] =
minSeries(polygons, Options.DEFAULT)

/**
* $experimental
*/
@experimental def minSeries(
polygons: Traversable[MultiPolygon],
options: Options
): Map[ZonedDateTime, Double] = {
TimeSeries(
self,
MinDoubleSummary.handleFullTile,
RDDTimeSeriesFunctions.minReduction,
polygons,
options
)
.collect()
.toMap
}

/**
* $experimental
*/
@experimental def maxSeries(
polygon: MultiPolygon,
options: Options
): Map[ZonedDateTime, Double] =
maxSeries(List(polygon), options)

/**
* $experimental
*/
@experimental def maxSeries(
polygon: MultiPolygon
): Map[ZonedDateTime, Double] =
maxSeries(List(polygon), Options.DEFAULT)

/**
* $experimental
*/
@experimental def maxSeries(
polygons: Traversable[MultiPolygon]
): Map[ZonedDateTime, Double] =
maxSeries(polygons, Options.DEFAULT)

/**
* $experimental
*/
@experimental def maxSeries(
polygons: Traversable[MultiPolygon],
options: Options
): Map[ZonedDateTime, Double] = {
TimeSeries(
self,
MaxDoubleSummary.handleFullTile,
RDDTimeSeriesFunctions.maxReduction,
polygons,
options
)
.collect()
.toMap
}

/**
* $experimental
*/
@experimental def meanSeries(
polygon: MultiPolygon,
options: Options = Options.DEFAULT
): Map[ZonedDateTime, Double] =
meanSeries(List(polygon), options)

/**
* $experimental
*/
@experimental def meanSeries(
polygon: MultiPolygon
): Map[ZonedDateTime, Double] =
meanSeries(polygon, Options.DEFAULT)

/**
* $experimental
*/
@experimental def meanSeries(
polygons: Traversable[MultiPolygon]
): Map[ZonedDateTime, Double] =
meanSeries(polygons, Options.DEFAULT)

/**
* $experimental
*/
@experimental def meanSeries(
polygons: Traversable[MultiPolygon],
options: Options
): Map[ZonedDateTime, Double] = {
TimeSeries(
self,
MeanResult.fromFullTileDouble,
RDDTimeSeriesFunctions.meanReduction,
polygons,
options
)
.mapValues({ mr => mr.mean })
.collect()
.toMap
}

/**
* $experimental
*/
@experimental def histogramSeries(
polygon: MultiPolygon,
options: Options = Options.DEFAULT
): Map[ZonedDateTime, Histogram[Double]] =
histogramSeries(List(polygon), options)

/**
* $experimental
*/
@experimental def histogramSeries(
polygon: MultiPolygon
): Map[ZonedDateTime, Histogram[Double]] =
histogramSeries(polygon, Options.DEFAULT)

/**
* $experimental
*/
@experimental def histogramSeries(
polygons: Traversable[MultiPolygon]
): Map[ZonedDateTime, Histogram[Double]] =
histogramSeries(polygons, Options.DEFAULT)

/**
* $experimental
*/
@experimental def histogramSeries(
polygons: Traversable[MultiPolygon],
options: Options
): Map[ZonedDateTime, Histogram[Double]] = {
TimeSeries(
self,
RDDTimeSeriesFunctions.histogramProjection,
RDDTimeSeriesFunctions.histogramReduction,
polygons,
options
)
.collect()
.toMap
}

}
40 changes: 40 additions & 0 deletions spark/src/main/scala/geotrellis/spark/timeseries/TimeSeries.scala
@@ -0,0 +1,40 @@
package geotrellis.spark.timeseries

import geotrellis.raster._
import geotrellis.spark._
import geotrellis.spark.mask._
import geotrellis.util.annotations.experimental
import geotrellis.vector._

import org.apache.log4j.Logger
import org.apache.spark.rdd._

import scala.reflect.ClassTag

import java.time.ZonedDateTime


/**
* Given a TileLayerRDD[SpaceTimeKey], some masking geometry, and a
* reduction operator, produce a time series.
*
* @author James McClain
* @define experimental <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>@experimental
*/
@experimental object TimeSeries {

@experimental def apply[R: ClassTag](
layer: TileLayerRDD[SpaceTimeKey],
projection: Tile => R,
reduction: (R, R) => R,
geoms: Traversable[MultiPolygon],
options: Mask.Options = Mask.Options.DEFAULT
): RDD[(ZonedDateTime, R)] = {

layer
.mask(geoms, options)
.map({ case (key: SpaceTimeKey, tile: Tile) => (key.time, projection(tile)) })
.reduceByKey(reduction)
}

}
Expand Up @@ -45,19 +45,23 @@ class RDDCostDistanceMethodsSpec extends FunSpec

val points = List(Point(2.5+5.0, 2.5))

it("The costdistance Method Should Work (1/2)") {
val expected = IterativeCostDistance(rdd, points).collect.toList
val actual = rdd.costdistance(points).collect.toList
describe("Cost-Distance Extension Methods") {

actual should be (expected)
}
it("The costdistance Method Should Work (1/2)") {
val expected = IterativeCostDistance(rdd, points).collect.toList
val actual = rdd.costdistance(points).collect.toList

actual should be (expected)
}

it("The costdistance Method Should Work (2/2)") {
val resolution = IterativeCostDistance.computeResolution(rdd)
val expected = IterativeCostDistance(rdd, points, resolution).collect.toList
val actual = rdd.costdistance(points, resolution).collect.toList

it("The costdistance Method Should Work (2/2)") {
val resolution = IterativeCostDistance.computeResolution(rdd)
val expected = IterativeCostDistance(rdd, points, resolution).collect.toList
val actual = rdd.costdistance(points, resolution).collect.toList
actual should be (expected)
}

actual should be (expected)
}

}

0 comments on commit 8220302

Please sign in to comment.