diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/cog/S3COGLayerWriter.scala b/s3/src/main/scala/geotrellis/spark/io/s3/cog/S3COGLayerWriter.scala index 3774e3dc11..ca177f47f3 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/cog/S3COGLayerWriter.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/cog/S3COGLayerWriter.scala @@ -1,9 +1,6 @@ package geotrellis.spark.io.s3.cog -import com.amazonaws.services.s3.model.AmazonS3Exception import geotrellis.raster._ -import geotrellis.raster.crop._ -import geotrellis.raster.merge._ import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.io.geotiff.writer.GeoTiffWriter import geotrellis.spark._ @@ -13,13 +10,13 @@ import geotrellis.spark.io.s3.{S3AttributeStore, S3Client, S3RDDWriter, makePath import geotrellis.spark.io.cog._ import geotrellis.spark.io.cog.vrt.VRT import geotrellis.spark.io.cog.vrt.VRT.IndexedSimpleSource -import scala.util.Try import spray.json.JsonFormat import com.amazonaws.services.s3.model.{AmazonS3Exception, ObjectMetadata, PutObjectRequest} import java.io.ByteArrayInputStream +import scala.util.Try import scala.reflect.ClassTag class S3COGLayerWriter( @@ -56,7 +53,7 @@ class S3COGLayerWriter( case _ => false } - for { (zoomRange, cogs) <- cogLayer.layers.toSeq.sortBy(_._1)(Ordering[ZoomRange].reverse) } { + for((zoomRange, cogs) <- cogLayer.layers.toSeq.sortBy(_._1)(Ordering[ZoomRange].reverse)) { val vrt = VRT(cogLayer.metadata.tileLayerMetadata(zoomRange.minZoom)) // Make RDD[(String, GeoTiff[T])] @@ -77,9 +74,7 @@ class S3COGLayerWriter( (s"${keyPath(key)}.${Extension}", cog) } - .foreachPartition { partition => - asyncWriter.write(getS3Client(), partition, mergeFunc, Some(retryCheck)) - } + .foreachPartition { partition => asyncWriter.write(getS3Client(), partition, mergeFunc, Some(retryCheck)) } // Save Accumulator val bytes = diff --git a/s3/src/test/scala/geotrellis/spark/io/s3/cog/COGS3SpaceTimeSpec.scala b/s3/src/test/scala/geotrellis/spark/io/s3/cog/COGS3SpaceTimeSpec.scala index 52f14d1b20..b5af431561 100644 --- a/s3/src/test/scala/geotrellis/spark/io/s3/cog/COGS3SpaceTimeSpec.scala +++ b/s3/src/test/scala/geotrellis/spark/io/s3/cog/COGS3SpaceTimeSpec.scala @@ -35,7 +35,8 @@ class COGS3SpaceTimeSpec with COGSpaceTimeKeyIndexMethods with TestEnvironment with COGTestFiles - with COGCoordinateSpaceTimeSpec { + with COGCoordinateSpaceTimeSpec + with COGLayerUpdateSpaceTimeTileSpec { registerAfterAll { () => MockS3Client.reset() diff --git a/spark/src/main/scala/geotrellis/spark/io/AsyncWriter.scala b/spark/src/main/scala/geotrellis/spark/io/AsyncWriter.scala index a1ad373eef..f8213ddcab 100644 --- a/spark/src/main/scala/geotrellis/spark/io/AsyncWriter.scala +++ b/spark/src/main/scala/geotrellis/spark/io/AsyncWriter.scala @@ -17,7 +17,8 @@ package geotrellis.spark.io import java.util.concurrent.Executors -import scala.util.Try + +import scala.util.{Failure, Success, Try} import scalaz.concurrent.{Strategy, Task} import scalaz.stream.{Process, nondeterminism} @@ -51,11 +52,11 @@ abstract class AsyncWriter[Client, V, E](threads: Int) extends Serializable { mergeFunc match { case Some(fn) => // TODO: match on this failure to retry reads - val oldValue = readRecord(client, key).get - (key, fn(oldValue, newValue)) - - case None => - newRecord + readRecord(client, key) match { + case Success(oldValue) => (key, fn(oldValue, newValue)) + case Failure(_) => newRecord + } + case None => newRecord } }