Skip to content

Commit

Permalink
Merge pull request #560 from jbouffard/fix/geotiff-api
Browse files Browse the repository at this point in the history
geotiff.get API, Logic, And DocString Changes
  • Loading branch information
Jacob Bouffard committed Nov 29, 2017
2 parents 54d8716 + 2ddb58b commit fe5588d
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ object Constants {

final val STRIPED = "Striped"
final val TILED = "Tiled"

final val INTKEYS = Array("max_tile_size", "num_partitions", "chunk_size")
final val STRINGKEYS = Array("crs", "time_tag", "time_format", "delimiter", "s3_client")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.apache.spark.rdd._

import scala.collection.JavaConverters._
import collection.JavaConversions._
import java.util.Map


object GeoTrellisUtils {
Expand All @@ -25,18 +24,19 @@ object GeoTrellisUtils {
def seqToIterable[T](seq: Seq[T]): java.util.Iterator[T] = seq.toIterator.asJava

def convertToScalaMap(
javaMap: java.util.Map[String, Any],
stringValues: Array[String]
): (scala.collection.Map[String, String], scala.collection.Map[String, Int]) = {
javaMap: java.util.Map[String, Any]
): (Map[String, String], Map[String, Int]) = {
val scalaMap = javaMap.asScala

val intMap =
scalaMap.filterKeys(x => !(stringValues.contains(x)))
scalaMap.filterKeys(x => INTKEYS.contains(x))
.mapValues(x => x.asInstanceOf[Int])
.toMap

val stringMap =
scalaMap.filterKeys(x => stringValues.contains(x))
scalaMap.filterKeys(x => STRINGKEYS.contains(x))
.mapValues(x => x.asInstanceOf[String])
.toMap

(stringMap, intMap)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,29 @@ import geotrellis.spark.io.hadoop._
import geotrellis.spark.io.s3._
import geotrellis.spark.io.s3.testkit._

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._

import java.net.URI
import java.util.Map
import scala.reflect._

import org.apache.spark._
import org.apache.hadoop.fs.Path


object GeoTiffRDD {
import Constants._

object HadoopGeoTiffRDDOptions {
def default = HadoopGeoTiffRDD.Options.DEFAULT

def setValues(javaMap: java.util.Map[String, Any]): HadoopGeoTiffRDD.Options = {
val stringValues = Array("time_tag", "time_format", "crs")

val (stringMap, intMap) = GeoTrellisUtils.convertToScalaMap(javaMap, stringValues)

def setValues(
intMap: Map[String, Int],
stringMap: Map[String, String],
partitionBytes: Option[Long]
): HadoopGeoTiffRDD.Options = {
val crs: Option[CRS] =
if (stringMap.contains("crs"))
Some(CRS.fromName(stringMap("crs")))
TileLayer.getCRS(stringMap("crs"))
else
None

Expand All @@ -41,28 +40,23 @@ object GeoTiffRDD {
timeFormat = stringMap.getOrElse("time_format", default.timeFormat),
maxTileSize = intMap.get("max_tile_size"),
numPartitions = intMap.get("num_partitions"),
chunkSize = intMap.get("chunk_size"))
partitionBytes = partitionBytes,
chunkSize = intMap.get("chunk_size")
)
}
}

object S3GeoTiffRDDOptions {
def default = S3GeoTiffRDD.Options.DEFAULT

def setValues(javaMap: java.util.Map[String, Any]): S3GeoTiffRDD.Options = {
val stringValues = Array("time_tag", "time_format", "s3_client", "crs")
val (stringMap, intMap) = GeoTrellisUtils.convertToScalaMap(javaMap, stringValues)

def setValues(
intMap: Map[String, Int],
stringMap: Map[String, String],
partitionBytes: Option[Long]
): S3GeoTiffRDD.Options = {
val crs: Option[CRS] =
if (stringMap.contains("crs"))
Some(CRS.fromName(stringMap("crs")))
else
None

val partitionBytes =
if (javaMap.contains("partition_bytes") && !intMap.contains("max_tile_size"))
Some(javaMap.get("partition_bytes").asInstanceOf[Long])
else if (!intMap.contains("max_tile_size"))
default.partitionBytes
TileLayer.getCRS(stringMap("crs"))
else
None

Expand All @@ -86,31 +80,40 @@ object GeoTiffRDD {
numPartitions = intMap.get("num_partitions"),
partitionBytes = partitionBytes,
chunkSize = intMap.get("chunk_size"),
getS3Client = getS3Client)
delimiter = stringMap.get("delimiter"),
getS3Client = getS3Client
)
}
}

def get(
sc: SparkContext,
keyType: String,
paths: java.util.List[String],
options: java.util.Map[String, Any]
options: java.util.Map[String, Any],
partitionBytes: String
): RasterLayer[_] = {
val uris = paths.map{ path => new URI(path) }
val (stringMap, intMap) = GeoTrellisUtils.convertToScalaMap(options)
val bytes = Some(partitionBytes.toLong)

uris
.map { uri =>
uri.getScheme match {
case S3 =>
if (options isEmpty)
getS3GeoTiffRDD(sc, keyType, uri, S3GeoTiffRDDOptions.default)
else
getS3GeoTiffRDD(sc, keyType, uri, S3GeoTiffRDDOptions.setValues(options))
getS3GeoTiffRDD(
sc,
keyType,
uri,
S3GeoTiffRDDOptions.setValues(intMap, stringMap, bytes)
)
case _ =>
if (options isEmpty)
getHadoopGeoTiffRDD(sc, keyType, new Path(uri), HadoopGeoTiffRDDOptions.default)
else
getHadoopGeoTiffRDD(sc, keyType, new Path(uri), HadoopGeoTiffRDDOptions.setValues(options))
getHadoopGeoTiffRDD(
sc,
keyType,
new Path(uri),
HadoopGeoTiffRDDOptions.setValues(intMap, stringMap, bytes)
)
}
}
.reduce{ (r1, r2) =>
Expand Down
28 changes: 27 additions & 1 deletion geopyspark/geotrellis/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,39 @@


__all__ = ['NO_DATA_INT', 'LayerType', 'IndexingMethod', 'ResampleMethod', 'TimeUnit',
'Operation', 'Neighborhood', 'ClassificationStrategy', 'CellType', 'ColorRamp']
'Operation', 'Neighborhood', 'ClassificationStrategy', 'CellType', 'ColorRamp',
'DEFAULT_MAX_TILE_SIZE', 'DEFAULT_PARTITION_BYTES', 'DEFAULT_CHUNK_SIZE',
'DEFAULT_GEOTIFF_TIME_TAG', 'DEFAULT_GEOTIFF_TIME_FORMAT', 'DEFAULT_S3_CLIENT']


"""The NoData value for ints in GeoTrellis."""
NO_DATA_INT = -2147483648


"""The default size of each tile in the resulting layer."""
DEFAULT_MAX_TILE_SIZE = 256


"""The default byte size of each partition."""
DEFAULT_PARTITION_BYTES = 1281 * 1024 * 1024


"""The default number of bytes that should be read in at a time."""
DEFAULT_CHUNK_SIZE = 65536


"""The default name of the GeoTiff tag that contains the timestamp for the tile."""
DEFAULT_GEOTIFF_TIME_TAG = "TIFFTAG_DATETIME"


"""The default pattern that will be parsed from the timeTag."""
DEFAULT_GEOTIFF_TIME_FORMAT = "yyyy:MM:dd HH:mm:ss"


"""The default S3 Client to use when reading layers in."""
DEFAULT_S3_CLIENT = "default"


class LayerType(Enum):
"""The type of the key within the tuple of the wrapped RDD."""

Expand Down
82 changes: 46 additions & 36 deletions geopyspark/geotrellis/geotiff.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""This module contains functions that create ``RasterLayer`` from files."""

from functools import reduce
from geopyspark import get_spark_context
from geopyspark.geotrellis.constants import LayerType
from geopyspark.geotrellis.constants import (LayerType,
DEFAULT_MAX_TILE_SIZE,
DEFAULT_PARTITION_BYTES,
DEFAULT_CHUNK_SIZE,
DEFAULT_GEOTIFF_TIME_TAG,
DEFAULT_GEOTIFF_TIME_FORMAT,
DEFAULT_S3_CLIENT)
from geopyspark.geotrellis.layer import RasterLayer


Expand All @@ -12,12 +17,14 @@
def get(layer_type,
uri,
crs=None,
max_tile_size=None,
max_tile_size=DEFAULT_MAX_TILE_SIZE,
num_partitions=None,
chunk_size=None,
time_tag=None,
time_format=None,
s3_client=None):
chunk_size=DEFAULT_CHUNK_SIZE,
partition_bytes=DEFAULT_PARTITION_BYTES,
time_tag=DEFAULT_GEOTIFF_TIME_TAG,
time_format=DEFAULT_GEOTIFF_TIME_FORMAT,
delimiter=None,
s3_client=DEFAULT_S3_CLIENT):
"""Creates a ``RasterLayer`` from GeoTiffs that are located on the local file system, ``HDFS``,
or ``S3``.
Expand All @@ -29,41 +36,41 @@ def get(layer_type,
Note:
All of the GeoTiffs must have the same saptial type.
uri (str): The path to a given file/directory.
crs (str, optional): The CRS that the output tiles should be
in. The CRS must be in the well-known name format. If ``None``,
then the CRS that the tiles were originally in will be used.
max_tile_size (int, optional): The max size of each tile in the
resulting Layer. If the size is smaller than a read in tile,
then that tile will be broken into tiles of the specified
size. If ``None``, then the whole tile will be read in.
num_partitions (int, optional): The number of repartitions Spark
uri (str or [str]): The path or list of paths to the desired tile(s)/directory(ies).
crs (str or int, optional): The CRS that the output tiles should be
in. If ``None``, then the CRS that the tiles were originally in
will be used.
max_tile_size (int or None, optional): The max size of each tile in the
resulting Layer. If the size is smaller than the read in tile,
then that tile will be broken into smaller sections of the given
size. Defaults to :const:`~geopyspark.geotrellis.constants.DEFAULT_MAX_TILE_SIZE`.
If ``None``, then the whole tile will be read in.
num_partitions (int, optional): The number of partitions Spark
will make when the data is repartitioned. If ``None``, then the
data will not be repartitioned.
partition_bytes (int, optional): The desired number of bytes per partition.
This is will ensure that at least one item is assigned for each partition.
If ``None`` and ``max_tile_size`` is not set, then the default size per
partition is 128 Mb.
Note:
This option is only available when reading from S3.
If ``max_tile_size`` is also specified then this parameter
will be ignored.
Note:
This option is incompatible with the ``max_tile_size`` option.
If both are set, then ``max_tile_size`` will be used instead of
``partition_bytes``.
partition_bytes (int, optional): The desired number of bytes per
partition. This is will ensure that at least one item is assigned for
each partition. Defaults to :const:`~geopyspark.geotrellis.constants.DEFAULT_PARTITION_BYTES`.
chunk_size (int, optional): How many bytes of the file should be
read in at a time. If ``None``, then files will be read in 65536
byte chunks.
read in at a time. Defaults to :const:`~geopyspark.geotrellis.constants.DEFAULT_CHUNK_SIZE`.
time_tag (str, optional): The name of the tiff tag that contains
the time stamp for the tile. If ``None``, then the default value
is: ``TIFFTAG_DATETIME``.
time_format (str, optional): The pattern of the time stamp for
java.time.format.DateTimeFormatter to parse. If ``None``,
then the default value is: ``yyyy:MM:dd HH:mm:ss``.
the time stamp for the tile.
Defaults to :const:`~geopyspark.geotrellis.constants.DEFAULT_GEOTIFF_TIME_TAG`.
time_format (str, optional): The pattern of the time stamp to be parsed.
Defaults to :const:`~geopyspark.geotrellis.constants.DEFAULT_GEOTIFF_TIME_FORMAT`.
delimiter (str, optional): The delimiter to use for S3 object listings.
Note:
This parameter will only be used when reading from S3.
s3_client (str, optional): Which ``S3Cleint`` to use when reading
GeoTiffs from S3. There are currently two options: ``default`` and
``mock``. If ``None``, ``defualt`` is used.
``mock``. Defaults to :const:`~geopyspark.geotrellis.constants.DEFAULT_S3_CLIENT`.
Note:
``mock`` should only be used in unit tests and debugging.
Expand All @@ -73,21 +80,24 @@ def get(layer_type,
"""

inputs = {k:v for k, v in locals().items() if v is not None}
pysc = get_spark_context()

pysc = get_spark_context()
geotiff_rdd = pysc._gateway.jvm.geopyspark.geotrellis.io.geotiff.GeoTiffRDD

key = LayerType(inputs.pop('layer_type'))._key_name(False)
partition_bytes = str(inputs.pop('partition_bytes'))

if isinstance(uri, list):
srdd = geotiff_rdd.get(pysc._jsc.sc(),
key,
inputs.pop('uri'),
inputs)
inputs,
partition_bytes)
else:
srdd = geotiff_rdd.get(pysc._jsc.sc(),
key,
[inputs.pop('uri')],
inputs)
inputs,
partition_bytes)

return RasterLayer(layer_type, srdd)

0 comments on commit fe5588d

Please sign in to comment.