Skip to content

Commit

Permalink
Merge pull request #2216 from pomadchin/fix/upsampling
Browse files Browse the repository at this point in the history
Fix ETL upsampling bug
  • Loading branch information
lossyrob committed Jun 5, 2017
2 parents 936460f + 5f0e329 commit 86d7d9c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 25 deletions.
29 changes: 19 additions & 10 deletions spark-etl/src/main/scala/geotrellis/spark/etl/Etl.scala
Expand Up @@ -16,12 +16,13 @@

package geotrellis.spark.etl

import geotrellis.raster.CellGrid
import geotrellis.raster.{CellGrid, CellSize}
import geotrellis.raster.crop.CropMethods
import geotrellis.raster.merge.TileMergeMethods
import geotrellis.raster.prototype.TilePrototypeMethods
import geotrellis.raster.reproject._
import geotrellis.raster.resample.{NearestNeighbor, ResampleMethod}
import geotrellis.raster.reproject.Reproject.{Options => RasterReprojectOptions}
import geotrellis.raster.resample.ResampleMethod
import geotrellis.raster.stitch.Stitcher
import geotrellis.spark._
import geotrellis.spark.io._
Expand Down Expand Up @@ -170,23 +171,31 @@ case class Etl(conf: EtlConf, @transient modules: Seq[TypedModule] = Etl.default

output.reprojectMethod match {
case PerTileReproject =>
val reprojected = rdd.reproject(destCrs)
val floatMD = { // collecting floating metadata allows detecting upsampling
val (_, md) = reprojected.collectMetadata(FloatingLayoutScheme(output.tileSize))
md.copy(cellType = targetCellType.getOrElse(md.cellType))
def reprojected(targetCellSize: Option[CellSize] = None) = {
val reprojectedRdd = rdd.reproject(destCrs, RasterReprojectOptions(method = method, targetCellSize = targetCellSize))

val floatMD = { // collecting floating metadata allows detecting upsampling
val (_, md) = reprojectedRdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
md.copy(cellType = targetCellType.getOrElse(md.cellType))
}

reprojectedRdd -> floatMD
}

scheme match {
case Left(scheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
val LayoutLevel(zoom, layoutDefinition) = scheme.levelForZoom(output.maxZoom.get)
zoom -> resizingTileRDD(reprojected, floatMD, layoutDefinition)
val (reprojectedRdd, floatMD) = reprojected(Some(layoutDefinition.cellSize))
zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)

case Left(scheme) => // True for both FloatinglayoutScheme and ZoomedlayoutScheme
val (reprojectedRdd, floatMD) = reprojected()
val LayoutLevel(zoom, layoutDefinition) = scheme.levelFor(floatMD.extent, floatMD.cellSize)
zoom -> resizingTileRDD(reprojected, floatMD, layoutDefinition)
zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)

case Right(layoutDefinition) =>
0 -> resizingTileRDD(reprojected, floatMD, layoutDefinition)
val (reprojectedRdd, floatMD) = reprojected()
0 -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
}

case BufferedReproject =>
Expand All @@ -200,7 +209,7 @@ case class Etl(conf: EtlConf, @transient modules: Seq[TypedModule] = Etl.default
scheme match {
case Left(layoutScheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(output.maxZoom.get)
zoom -> tiled.reproject(destCrs, layoutDefinition, method)._2
zoom -> tiled.reproject(destCrs, layoutDefinition, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2

case Left(layoutScheme) =>
tiled.reproject(destCrs, layoutScheme, method)
Expand Down
29 changes: 20 additions & 9 deletions spark/src/main/scala/geotrellis/spark/ingest/Ingest.scala
Expand Up @@ -19,6 +19,7 @@ package geotrellis.spark.ingest
import geotrellis.proj4._
import geotrellis.raster._
import geotrellis.raster.resample.{ResampleMethod, NearestNeighbor}
import geotrellis.raster.reproject.Reproject.{Options => RasterReprojectOptions}
import geotrellis.spark._
import geotrellis.spark.pyramid.Pyramid
import geotrellis.spark.reproject._
Expand Down Expand Up @@ -68,20 +69,30 @@ object Ingest {
partitioner: Option[Partitioner] = None,
bufferSize: Option[Int] = None,
maxZoom: Option[Int] = None,
tileSize: Option[Int] = Some(256))
tileSize: Option[Int] = None)
(sink: (TileLayerRDD[K], Int) => Unit): Unit = {
val (_, tileLayerMetadata) = (maxZoom, tileSize) match {
case (Some(zoom), Some(tileSize)) => sourceTiles.collectMetadata(destCRS, tileSize, zoom)
case _ => sourceTiles.collectMetadata(FloatingLayoutScheme(512))

val (_, tileLayerMetadata) = tileSize match {
case Some(ts) => sourceTiles.collectMetadata(FloatingLayoutScheme(ts))
case _ => sourceTiles.collectMetadata(FloatingLayoutScheme(256))
}
val tiledRdd = sourceTiles.tileToLayout(tileLayerMetadata, resampleMethod).cache()

val tiledRdd = sourceTiles.tileToLayout(tileLayerMetadata, resampleMethod).cache()
val contextRdd = new ContextRDD(tiledRdd, tileLayerMetadata)
val (zoom, tileLayerRdd) =
bufferSize match {
case Some(bs) => contextRdd.reproject(destCRS, layoutScheme, bs)
case None => contextRdd.reproject(destCRS, layoutScheme)

val (zoom, tileLayerRdd) = (layoutScheme, maxZoom) match {
case (layoutScheme: ZoomedLayoutScheme, Some(mz)) =>
val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(mz)
(zoom, bufferSize match {
case Some(bs) => contextRdd.reproject(destCRS, layoutDefinition, bs, options = RasterReprojectOptions(method = resampleMethod, targetCellSize = Some(layoutDefinition.cellSize)))._2
case _ => contextRdd.reproject(destCRS, layoutDefinition, options = RasterReprojectOptions(method = resampleMethod, targetCellSize = Some(layoutDefinition.cellSize)))._2
})

case _ => bufferSize match {
case Some(bs) => contextRdd.reproject(destCRS, layoutScheme, bs, resampleMethod)
case _ => contextRdd.reproject(destCRS, layoutScheme, resampleMethod)
}
}

tileLayerRdd.persist(cacheLevel)

Expand Down
27 changes: 22 additions & 5 deletions spark/src/main/scala/geotrellis/spark/ingest/MultibandIngest.scala
Expand Up @@ -19,6 +19,7 @@ package geotrellis.spark.ingest
import geotrellis.proj4._
import geotrellis.raster._
import geotrellis.raster.resample.{NearestNeighbor, ResampleMethod}
import geotrellis.raster.reproject.Reproject.{Options => RasterReprojectOptions}
import geotrellis.spark._
import geotrellis.spark.pyramid._
import geotrellis.spark.reproject._
Expand All @@ -42,15 +43,31 @@ object MultibandIngest {
partitioner: Option[Partitioner] = None,
bufferSize: Option[Int] = None,
maxZoom: Option[Int] = None,
tileSize: Option[Int] = Some(256))
tileSize: Option[Int] = None)
(sink: (MultibandTileLayerRDD[K], Int) => Unit): Unit = {
val (_, tileLayerMetadata) = (maxZoom, tileSize) match {
case (Some(zoom), Some(tileSize)) => sourceTiles.collectMetadata(destCRS, tileSize, zoom)
case _ => sourceTiles.collectMetadata(FloatingLayoutScheme(512))

val (_, tileLayerMetadata) = tileSize match {
case Some(ts) => sourceTiles.collectMetadata(FloatingLayoutScheme(ts))
case _ => sourceTiles.collectMetadata(FloatingLayoutScheme(256))
}

val tiledRdd = sourceTiles.tileToLayout(tileLayerMetadata, resampleMethod).cache()
val contextRdd = new ContextRDD(tiledRdd, tileLayerMetadata)
val (zoom, tileLayerRdd) = bufferSize.fold(contextRdd.reproject(destCRS, layoutScheme))(contextRdd.reproject(destCRS, layoutScheme, _))

val (zoom, tileLayerRdd) = (layoutScheme, maxZoom) match {
case (layoutScheme: ZoomedLayoutScheme, Some(mz)) =>
val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(mz)
(zoom, bufferSize match {
case Some(bs) => contextRdd.reproject(destCRS, layoutDefinition, bs, options = RasterReprojectOptions(method = resampleMethod, targetCellSize = Some(layoutDefinition.cellSize)))._2
case _ => contextRdd.reproject(destCRS, layoutDefinition, options = RasterReprojectOptions(method = resampleMethod, targetCellSize = Some(layoutDefinition.cellSize)))._2
})

case _ => bufferSize match {
case Some(bs) => contextRdd.reproject(destCRS, layoutScheme, bs, resampleMethod)
case _ => contextRdd.reproject(destCRS, layoutScheme, resampleMethod)
}
}

tileLayerRdd.persist(cacheLevel)

def buildPyramid(zoom: Int, rdd: MultibandTileLayerRDD[K]): List[(Int, MultibandTileLayerRDD[K])] = {
Expand Down
Expand Up @@ -63,6 +63,12 @@ class TileRDDReprojectMethods[
def reproject(destCrs: CRS, layoutScheme: LayoutScheme, bufferSize: Int): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) =
reproject(destCrs, layoutScheme, bufferSize, Options.DEFAULT)

def reproject(destCrs: CRS, layoutDefinition: LayoutDefinition, bufferSize: Int, options: Options): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) =
TileRDDReproject(self, destCrs, Right(layoutDefinition), bufferSize, options)

def reproject(destCrs: CRS, layoutDefinition: LayoutDefinition, bufferSize: Int): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) =
TileRDDReproject(self, destCrs, Right(layoutDefinition), bufferSize, Options.DEFAULT)

def reproject(destCrs: CRS, layoutDefinition: LayoutDefinition, options: Options): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) =
TileRDDReproject(self, destCrs, Right(layoutDefinition), options)

Expand Down
Expand Up @@ -33,7 +33,7 @@ class IngestSpec extends FunSpec
describe("Ingest") {
it("should read GeoTiff with overrided input CRS") {
val source = HadoopGeoTiffRDD.spatial(new Path(inputHome, "all-ones.tif"), HadoopGeoTiffRDD.Options(crs = Some(CRS.fromEpsgCode(3857))))
// val source = sc.hadoopGeoTiffRDD(new Path(inputHome, "all-ones.tif"), sc.defaultTiffExtensions, crs = "EPSG:3857")
// val source = sc.hadoopGeoTiffRDD(new Path(inputHome, "all-ones.tif"), sc.defaultTiffExtensions, crs = "EPSG:3857")
source.take(1).toList.map { case (k, _) => k.crs.proj4jCrs.getName }.head shouldEqual "EPSG:3857"
}

Expand Down

0 comments on commit 86d7d9c

Please sign in to comment.