From 20e62190cf49b0422ab960a41520f89d72c0b340 Mon Sep 17 00:00:00 2001 From: James McClain Date: Tue, 12 Sep 2017 16:11:21 -0400 Subject: [PATCH] Remove Full Outer Join From S3 --- .../spark/io/s3/S3LayerWriter.scala | 16 +----- .../geotrellis/spark/io/s3/S3RDDReader.scala | 4 ++ .../geotrellis/spark/io/s3/S3RDDWriter.scala | 50 ++++++++++++++++++- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerWriter.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerWriter.scala index f711426c97..e6f9e18ec5 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerWriter.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3LayerWriter.scala @@ -134,27 +134,13 @@ class S3LayerWriter( val updatedMetadata: M = metadata.merge(rdd.metadata) - val updatedRdd: RDD[(K, V)] = - mergeFunc match { - case Some(mergeFunc) => - existingTiles - .fullOuterJoin(rdd) - .flatMapValues { - case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile)) - case (Some(layerTile), _) => Some(layerTile) - case (_, Some(updateTile)) => Some(updateTile) - case _ => None - } - case None => rdd - } - val codec = KeyValueRecordCodec[K, V] val schema = codec.schema // Write updated metadata, and the possibly updated schema // Only really need to write the metadata and schema attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema) - rddWriter.write(updatedRdd, bucket, keyPath) + rddWriter.update(rdd, bucket, keyPath, Some(writerSchema), mergeFunc) } // Layer Writing diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index b5f9c04ede..dd431a667f 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -26,15 +26,19 @@ import geotrellis.spark.util.KryoWrapper import scalaz.concurrent.{Strategy, Task} import scalaz.std.vector._ import scalaz.stream.{Process, nondeterminism} + import com.amazonaws.services.s3.model.AmazonS3Exception + import org.apache.avro.Schema import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import com.typesafe.config.ConfigFactory + import java.util.concurrent.Executors + trait S3RDDReader { def getS3Client: () => S3Client diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala index 4947b1d7d4..0431d0aa73 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala @@ -19,10 +19,16 @@ package geotrellis.spark.io.s3 import geotrellis.spark.io._ import geotrellis.spark.io.avro._ import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec +import geotrellis.spark.util.KryoWrapper import com.amazonaws.services.s3.model.{AmazonS3Exception, ObjectMetadata, PutObjectRequest, PutObjectResult} + +import org.apache.avro.Schema +import org.apache.commons.io.IOUtils import org.apache.spark.rdd.RDD + import com.typesafe.config.ConfigFactory + import scalaz.concurrent.{Strategy, Task} import scalaz.stream.{Process, nondeterminism} @@ -31,6 +37,7 @@ import java.util.concurrent.Executors import scala.reflect._ + trait S3RDDWriter { def getS3Client: () => S3Client @@ -41,6 +48,18 @@ trait S3RDDWriter { keyPath: K => String, putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, threads: Int = ConfigFactory.load().getThreads("geotrellis.s3.threads.rdd.write") + ): Unit = { + update(rdd, bucket, keyPath, None, None, putObjectModifier, threads) + } + + private[s3] def update[K: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag]( + rdd: RDD[(K, V)], + bucket: String, + keyPath: K => String, + writerSchema: Option[Schema], + mergeFunc: Option[(V, V) => V], + putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, + threads: Int = ConfigFactory.load().getThreads("geotrellis.s3.threads.rdd.write") ): Unit = { val codec = KeyValueRecordCodec[K, V] val schema = codec.schema @@ -56,19 +75,46 @@ trait S3RDDWriter { // on a key type that may no longer by valid for the key type of the resulting RDD. rdd.groupBy({ row => keyPath(row._1) }, numPartitions = rdd.partitions.length) + val _recordCodec = KeyValueRecordCodec[K, V] + val kwWriterSchema = KryoWrapper(writerSchema) + pathsToTiles.foreachPartition { partition => if(partition.nonEmpty) { import geotrellis.spark.util.TaskUtils._ val getS3Client = _getS3Client val s3client: S3Client = getS3Client() + val schema = kwWriterSchema.value.getOrElse(_recordCodec.schema) val requests: Process[Task, PutObjectRequest] = Process.unfold(partition) { iter => if (iter.hasNext) { val recs = iter.next() val key = recs._1 - val pairs = recs._2.toVector - val bytes = AvroEncoder.toBinary(pairs)(_codec) + val rows1: Vector[(K,V)] = recs._2.toVector + val rows2: Vector[(K,V)] = + if (mergeFunc != None) { + try { + val bytes = IOUtils.toByteArray(s3client.getObject(bucket, key).getObjectContent) + AvroEncoder.fromBinary(schema, bytes)(_recordCodec) + } catch { + case e: AmazonS3Exception if e.getStatusCode == 404 => Vector.empty + } + } + else Vector.empty + val outRows: Vector[(K, V)] = + mergeFunc match { + case Some(fn) => + (rows2 ++ rows1) + .groupBy({ case (k,v) => k }) + .map({ case (k, kvs) => + val vs = kvs.map({ case (k,v) => v }).toSeq + val v: V = vs.tail.foldLeft(vs.head)(fn) + (k, v) }) + .toVector + case None => rows1 + } + + val bytes = AvroEncoder.toBinary(outRows)(_codec) val metadata = new ObjectMetadata() metadata.setContentLength(bytes.length) val is = new ByteArrayInputStream(bytes)