Skip to content
Permalink
Browse files

Use `windowsByBytes` in HDFS

  • Loading branch information
jamesmcclain authored and echeipesh committed Oct 3, 2017
1 parent 4c24f24 commit 306f891f3f5df58fe5ac28158146ef9158c302bf
@@ -116,7 +116,7 @@ private [geotrellis] trait GeoTiffInfoReader extends LazyLogging {
* Function calculates a split of segments, to minimize segments reads.
*
* Returns RDD of pairs: (URI, Array[GridBounds])
* where GridBounds are gird bounds of a particular segment,
* where GridBounds are grid bounds of a particular segment,
* each segment can only be in a single partition.
* */
def segmentsByPartitionBytes(
@@ -0,0 +1,62 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.io.geotiff.reader.GeoTiffReader.GeoTiffInfo
import geotrellis.spark.io._
import geotrellis.spark.io.hadoop._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD


case class HadoopGeoTiffInfoReader(
path: String,
config: SerializableConfiguration,
extensions: Seq[String],
decompress: Boolean = false,
streaming: Boolean = true
) extends GeoTiffInfoReader {

lazy val geoTiffInfo: List[(String, GeoTiffInfo)] = {
HdfsUtils
.listFiles(new Path(path), config.value)
.map({ path => path.toString })
.filter({ path => extensions.exists({ e => path.endsWith(e) }) })
.map({ uri => (uri, getGeoTiffInfo(uri)) })
}

/** Returns RDD of URIs to tiffs as GeoTiffInfo is not serializable. */
def geoTiffInfoRdd(implicit sc: SparkContext): RDD[String] = {
sc.parallelize(
HdfsUtils
.listFiles(new Path(path), config.value)
.map({ path => path.toString })
.filter({ path => extensions.exists({ e => path.endsWith(e) }) })
)
}

def getGeoTiffInfo(uri: String): GeoTiffInfo = {
val path = new Path(uri)
val rr = HdfsRangeReader(path, config.value)
GeoTiffReader.readGeoTiffInfo(rr, decompress, streaming)
}
}
@@ -23,22 +23,25 @@ import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.io.geotiff.tags.TiffTags
import geotrellis.spark._
import geotrellis.spark.io.hadoop.formats._
import geotrellis.spark.io.hadoop._
import geotrellis.spark.io.RasterReader
import geotrellis.util.StreamingByteReader
import geotrellis.util.{LazyLogging, StreamingByteReader}
import geotrellis.vector.ProjectedExtent

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import java.net.URI
import java.nio.ByteBuffer


/**
* Allows for reading of whole or windowed GeoTiff as RDD[(K, V)]s through Hadoop FileSystem API.
*/
object HadoopGeoTiffRDD {
object HadoopGeoTiffRDD extends LazyLogging {
final val GEOTIFF_TIME_TAG_DEFAULT = "TIFFTAG_DATETIME"
final val GEOTIFF_TIME_FORMAT_DEFAULT = "yyyy:MM:dd HH:mm:ss"

@@ -63,6 +66,7 @@ object HadoopGeoTiffRDD {
timeFormat: String = GEOTIFF_TIME_FORMAT_DEFAULT,
maxTileSize: Option[Int] = None,
numPartitions: Option[Int] = None,
partitionBytes: Option[Long] = Some(128l * 1024 * 1024),
chunkSize: Option[Int] = None
) extends RasterReader.Options

@@ -88,22 +92,54 @@ object HadoopGeoTiffRDD {
* @param uriToKey function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def apply[I, K, V](path: Path, uriToKey: (URI, I) => K, options: Options)(implicit sc: SparkContext, rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {
val conf = configuration(path, options)
options.maxTileSize match {
case Some(tileSize) =>
def apply[I, K, V](
path: Path,
uriToKey: (URI, I) => K,
options: Options
)(implicit sc: SparkContext, rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {

val conf = new SerializableConfiguration(configuration(path, options))
val path2 = path.toString
lazy val sourceGeoTiffInfo = HadoopGeoTiffInfoReader(path2, conf, options.tiffExtensions)

(options.maxTileSize, options.partitionBytes) match {
case (_, Some(partitionBytes)) => {
val windows: RDD[(String, Array[GridBounds])] =
sourceGeoTiffInfo.windowsByBytes(partitionBytes, options.maxTileSize.getOrElse(1<<10))

windows.persist()

val windowCount = windows.count.toInt

logger.info(s"Repartition into ${windowCount} partitions.")

val repartition =
if (windowCount > windows.partitions.length) windows.repartition(windowCount)
else windows

val result = repartition.flatMap { case (path, windowBounds) =>
rr.readWindows(windowBounds, sourceGeoTiffInfo.getGeoTiffInfo(path), options).map { case (k, v) =>
uriToKey(new URI(path), k) -> v
}
}

windows.unpersist()
result
}
case (Some(_), _) =>
val pathsAndDimensions: RDD[(Path, (Int, Int))] =
sc.newAPIHadoopRDD(
conf,
conf.value,
classOf[TiffTagsInputFormat],
classOf[Path],
classOf[TiffTags]
).mapValues { tiffTags => (tiffTags.cols, tiffTags.rows) }

apply[I, K, V](pathsAndDimensions, uriToKey, options)
case None =>

case _ =>
sc.newAPIHadoopRDD(
conf,
conf.value,
classOf[BytesFileInputFormat],
classOf[Path],
classOf[Array[Byte]]
@@ -130,21 +166,21 @@ object HadoopGeoTiffRDD {
* Creates a RDD[(K, V)] whose K and V depends on the type of the GeoTiff that is going to be read in.
*
* @param pathsToDimensions RDD keyed by GeoTiff path with (cols, rows) tuple as value.
* @param uriToKey function to transform input key basing on the URI information.
* @param uriToKey A function to transform input key basing on the URI information.
* @param options An instance of [[Options]] that contains any user defined or default settings.
*/
def apply[I, K, V](pathsToDimensions: RDD[(Path, (Int, Int))], uriToKey: (URI, I) => K, options: Options)
(implicit rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {
def apply[I, K, V](
pathsToDimensions: RDD[(Path, (Int, Int))],
uriToKey: (URI, I) => K,
options: Options
)(implicit rr: RasterReader[Options, (I, V)]): RDD[(K, V)] = {

val conf = new SerializableConfiguration(pathsToDimensions.sparkContext.hadoopConfiguration)

val windows: RDD[(Path, GridBounds)] =
pathsToDimensions
.flatMap { case (objectRequest, (cols, rows)) =>
val path: Path = objectRequest
val config: Configuration = conf.value
val rangeReader = HdfsRangeReader(path, config)
val layout = GeoTiffReader.readGeoTiffInfo(rangeReader, false, true).segmentLayout.tileLayout
val info = HadoopGeoTiffInfoReader(objectRequest.toString, conf, options.tiffExtensions)
val layout = info.getGeoTiffInfo(objectRequest.toString).segmentLayout.tileLayout

RasterReader
.listWindows(cols, rows, options.maxTileSize.getOrElse(1<<10), layout.tileCols, layout.tileRows)

0 comments on commit 306f891

Please sign in to comment.
You can’t perform that action at this time.