Skip to content

Commit

Permalink
Merge pull request #2102 from pomadchin/feature/generic-raster-writers
Browse files Browse the repository at this point in the history
Rasters write support to HDFS / S3
  • Loading branch information
lossyrob committed Apr 3, 2017
2 parents d9ca7d3 + cf14df3 commit 490c6c2
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 1 deletion.
@@ -0,0 +1,48 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff.reader._
import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.vector.Extent

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object HadoopGeoTiffReader {
def readSingleband(path: Path)(implicit sc: SparkContext): SinglebandGeoTiff = readSingleband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readSingleband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): SinglebandGeoTiff =
HdfsUtils.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readSingleband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}

def readMultiband(path: Path)(implicit sc: SparkContext): MultibandGeoTiff = readMultiband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readMultiband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): MultibandGeoTiff =
HdfsUtils.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readMultiband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}
}
@@ -0,0 +1,29 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.render.Jpg

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object HadoopJpgReader {
def read(path: Path)(implicit sc: SparkContext): Jpg = read(path, sc.hadoopConfiguration)
def read(path: Path, conf: Configuration): Jpg = HdfsUtils.read(path, conf) { is => Jpg(IOUtils.toByteArray(is)) }
}
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.render.Png
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object HadoopPngReader {
def read(path: Path)(implicit sc: SparkContext): Png = read(path, sc.hadoopConfiguration)
def read(path: Path, conf: Configuration): Png = HdfsUtils.read(path, conf) { is => Png(IOUtils.toByteArray(is)) }
}
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.util.MethodExtensions

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

trait HadoopRasterMethods[T] extends MethodExtensions[T] {
def write(path: Path)(implicit sc: SparkContext): Unit = write(path, sc.hadoopConfiguration)
def write(path: Path, conf: Configuration): Unit
}
53 changes: 53 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/hadoop/HdfsUtils.scala
Expand Up @@ -18,6 +18,7 @@ package geotrellis.spark.io.hadoop

import geotrellis.util.LazyLogging

import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -230,4 +231,56 @@ object HdfsUtils extends LazyLogging {
}
}
}

def write(path: Path, conf: Configuration)(dosWrite: DataOutputStream => Unit): Unit = {
val fs = path.getFileSystem(conf)

val os = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, writing without compression.")
fs.create(path)
} else {
codec.createOutputStream(fs.create(path))
}
}
try {
val dos = new DataOutputStream(os)
try {
dosWrite(dos)
} finally {
dos.close
}
} finally {
os.close
}
}

def read[T](path: Path, conf: Configuration)(disRead: DataInputStream => T): T = {
val fs = path.getFileSystem(conf)

val is = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, reading without compression.")
fs.open(path)
} else {
codec.createInputStream(fs.open(path))
}
}
try {
val dis = new DataInputStream(is)
try {
disRead(dis)
} finally {
dis.close
}
} finally {
is.close
}
}
}
22 changes: 22 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/hadoop/Implicits.scala
Expand Up @@ -16,6 +16,13 @@

package geotrellis.spark.io.hadoop

import geotrellis.raster.CellGrid
import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.render.{Jpg, Png}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd._

Expand All @@ -25,4 +32,19 @@ trait Implicits {
implicit class HadoopSparkContextMethodsWrapper(val sc: SparkContext) extends HadoopSparkContextMethods
implicit class withSaveBytesToHadoopMethods[K](rdd: RDD[(K, Array[Byte])]) extends SaveBytesToHadoopMethods[K](rdd)
implicit class withSaveToHadoopMethods[K,V](rdd: RDD[(K,V)]) extends SaveToHadoopMethods[K, V](rdd)

implicit class withJpgHadoopWriteMethods(val self: Jpg) extends HadoopRasterMethods[Jpg] {
def write(path: Path, conf: Configuration): Unit =
HdfsUtils.write(path, conf) { _.write(self.bytes) }
}

implicit class withPngHadoopWriteMethods(val self: Png) extends HadoopRasterMethods[Png] {
def write(path: Path, conf: Configuration): Unit =
HdfsUtils.write(path, conf) { _.write(self.bytes) }
}

implicit class withGeoTiffHadoopWriteMethods[T <: CellGrid](val self: GeoTiff[T]) extends HadoopRasterMethods[GeoTiff[T]] {
def write(path: Path, conf: Configuration): Unit =
HdfsUtils.write(path, conf) { new GeoTiffWriter(self, _).write() }
}
}
1 change: 0 additions & 1 deletion spark/src/main/scala/geotrellis/spark/package.scala
Expand Up @@ -24,7 +24,6 @@ import geotrellis.spark.tiling._
import geotrellis.spark.ingest._
import geotrellis.spark.crop._
import geotrellis.spark.filter._

import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd._
import spire.syntax.cfor._
Expand Down
@@ -0,0 +1,132 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff._
import geotrellis.raster.testkit._
import geotrellis.raster.{IntCellType, MultibandTile}
import geotrellis.spark.io.hadoop
import geotrellis.spark.testkit.TestEnvironment

import org.apache.hadoop.fs.Path
import org.scalatest._

import java.io._

class HadoopRasterMethodsSpec extends FunSpec
with Matchers
with BeforeAndAfterAll
with RasterMatchers
with TileBuilders
with TestEnvironment {

describe ("writing Rasters without errors and with correct tiles, crs and extent using Hadoop FSData{Input|Output} stream") {
def expandGeoTiff(geoTiff: MultibandGeoTiff) =
MultibandGeoTiff(
MultibandTile(
geoTiff.tile.bands ++
geoTiff.tile.bands ++
geoTiff.tile.bands
),
geoTiff.extent,
geoTiff.crs
)

val (tempTiff, tempPng, tempJpg) = (
File.createTempFile("geotiff-writer", ".tif"),
File.createTempFile("geotiff-writer", ".png"),
File.createTempFile("geotiff-writer", ".jpg")
)

val (pathTiff, pathPng, pathJpg) = (tempTiff.getPath, tempPng.getPath, tempJpg.getPath)
val (pathTiffGz, pathPngGz, pathJpgGz) = (s"${tempTiff.getPath}.gz", s"${tempPng.getPath}.gz", s"${tempJpg.getPath}.gz")
val existencePath = "raster-test/data/aspect.tif"

it("should write GeoTiff with tags") {
val geoTiff = MultibandGeoTiff(existencePath)

val expected = geoTiff.tile
val expectedTags = geoTiff.tags

geoTiff.write(new Path(pathTiff))

val actualTiff = hadoop.HadoopGeoTiffReader.readMultiband(new Path(pathTiff))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

actual should be (expected)
actualTags should be (expectedTags)
}

it("should write GeoTiff with tags with gzip") {
val geoTiff = MultibandGeoTiff(existencePath)

val expected = geoTiff.tile
val expectedTags = geoTiff.tags

geoTiff.write(new Path(pathTiffGz))

val actualTiff = hadoop.HadoopGeoTiffReader.readMultiband(new Path(pathTiffGz))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

actual should be (expected)
actualTags should be (expectedTags)
}

it("should write Png") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))

val expected = geoTiff.tile.convert(IntCellType).renderPng()
expected.write(new Path(pathPng))

val actual = hadoop.HadoopPngReader.read(new Path(pathPng))

actual.bytes should be (expected.bytes)
}

it("should write Png with gzip") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderPng()
expected.write(new Path(pathPngGz))

val actual = hadoop.HadoopPngReader.read(new Path(pathPngGz))

actual.bytes should be (expected.bytes)
}

it("should write Jpg") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderJpg()
expected.write(new Path(pathJpg))

val actual = hadoop.HadoopPngReader.read(new Path(pathJpg))

actual.bytes should be (expected.bytes)
}

it("should write Jpg with gzip") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderJpg()
expected.write(new Path(pathJpgGz))

val actual = hadoop.HadoopPngReader.read(new Path(pathJpgGz))

actual.bytes should be (expected.bytes)
}
}
}

0 comments on commit 490c6c2

Please sign in to comment.