Skip to content

Commit

Permalink
Revert "Merge pyramid tiles during combine step"
Browse files Browse the repository at this point in the history
This reverts commit 3d4be0b.
  • Loading branch information
echeipesh committed Aug 7, 2017
1 parent 4a83f22 commit a53edca
Showing 1 changed file with 23 additions and 31 deletions.
54 changes: 23 additions & 31 deletions spark/src/main/scala/geotrellis/spark/pyramid/Pyramid.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package geotrellis.spark.pyramid

import geotrellis.spark._
import geotrellis.spark.tiling._
import geotrellis.vector.Extent
import geotrellis.raster._
import geotrellis.raster.merge._
import geotrellis.raster.resample._
Expand Down Expand Up @@ -88,38 +87,31 @@ object Pyramid extends LazyLogging {
.setComponent(nextLayout)
.setComponent(nextKeyBounds)

val targetTileCols = nextLayout.tileLayout.tileCols
val targetTileRows = nextLayout.tileLayout.tileRows

val createTile: ((Extent, Extent, V)) => V = { case (targetExtent, baseExtent, baseTile) =>
val targetTile = baseTile.prototype(targetTileCols, targetTileRows)
targetTile.merge(targetExtent, baseExtent, baseTile, resampleMethod)
}

def includeTile: (V, (Extent, Extent, V)) => V = { case (targetTile, (targetExtent, baseExtent, baseTile)) =>
targetTile.merge(targetExtent, baseExtent, baseTile, resampleMethod)
}

def combineTile: (V, V) => V = { case (left: V, right: V) =>
left.merge(right)
}
// Functions for combine step
def createTiles(tile: (K, V)): Seq[(K, V)] = Seq(tile)
def mergeTiles1(tiles: Seq[(K, V)], tile: (K, V)): Seq[(K, V)] = tiles :+ tile
def mergeTiles2(tiles1: Seq[(K, V)], tiles2: Seq[(K, V)]): Seq[(K, V)] = tiles1 ++ tiles2

val nextRdd = {
val transformedRdd = rdd.mapPartitions( _.map { case (baseKey, tile) =>
val baseExtent = sourceLayout.mapTransform(baseKey)
val newSpatialComponent = nextLayout.mapTransform(baseExtent.center)
val targetKey = baseKey.setComponent(newSpatialComponent)
val targetExtent = nextLayout.mapTransform(targetKey)


(targetKey, (targetExtent, baseExtent, tile))
}, preservesPartitioning = true)

partitioner.fold(
transformedRdd.combineByKey(createTile, includeTile, combineTile)
)(
transformedRdd.combineByKey(createTile, includeTile, combineTile, _)
)
val transformedRdd = rdd
.map { case (key, tile) =>
val extent = sourceLayout.mapTransform(key)
val newSpatialKey = nextLayout.mapTransform(extent.center)
(key.setComponent(newSpatialKey), (key, tile))
}

partitioner
.fold(transformedRdd.combineByKey(createTiles, mergeTiles1, mergeTiles2))(transformedRdd.combineByKey(createTiles _, mergeTiles1 _, mergeTiles2 _, _))
.mapPartitions ( partition => partition.map { case (newKey: K, seq: Seq[(K, V)]) =>
val newExtent = nextLayout.mapTransform(newKey)
val newTile = seq.head._2.prototype(nextLayout.tileLayout.tileCols, nextLayout.tileLayout.tileRows)

for ((oldKey, tile) <- seq) {
val oldExtent = sourceLayout.mapTransform(oldKey)
newTile.merge(newExtent, oldExtent, tile, resampleMethod)
}
(newKey, newTile: V)
}, preservesPartitioning = true)
}

nextZoom -> new ContextRDD(nextRdd, nextMetadata)
Expand Down

0 comments on commit a53edca

Please sign in to comment.