Skip to content

Commit

Permalink
Remove Full Outer Join From S3
Browse files Browse the repository at this point in the history
  • Loading branch information
James McClain authored and echeipesh committed Oct 4, 2017
1 parent 473946d commit 20e6219
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 17 deletions.
16 changes: 1 addition & 15 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3LayerWriter.scala
Expand Up @@ -134,27 +134,13 @@ class S3LayerWriter(
val updatedMetadata: M =
metadata.merge(rdd.metadata)

val updatedRdd: RDD[(K, V)] =
mergeFunc match {
case Some(mergeFunc) =>
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
case None => rdd
}

val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema

// Write updated metadata, and the possibly updated schema
// Only really need to write the metadata and schema
attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema)
rddWriter.write(updatedRdd, bucket, keyPath)
rddWriter.update(rdd, bucket, keyPath, Some(writerSchema), mergeFunc)
}

// Layer Writing
Expand Down
4 changes: 4 additions & 0 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala
Expand Up @@ -26,15 +26,19 @@ import geotrellis.spark.util.KryoWrapper
import scalaz.concurrent.{Strategy, Task}
import scalaz.std.vector._
import scalaz.stream.{Process, nondeterminism}

import com.amazonaws.services.s3.model.AmazonS3Exception

import org.apache.avro.Schema
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import com.typesafe.config.ConfigFactory

import java.util.concurrent.Executors


trait S3RDDReader {

def getS3Client: () => S3Client
Expand Down
50 changes: 48 additions & 2 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala
Expand Up @@ -19,10 +19,16 @@ package geotrellis.spark.io.s3
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.util.KryoWrapper

import com.amazonaws.services.s3.model.{AmazonS3Exception, ObjectMetadata, PutObjectRequest, PutObjectResult}

import org.apache.avro.Schema
import org.apache.commons.io.IOUtils
import org.apache.spark.rdd.RDD

import com.typesafe.config.ConfigFactory

import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.{Process, nondeterminism}

Expand All @@ -31,6 +37,7 @@ import java.util.concurrent.Executors

import scala.reflect._


trait S3RDDWriter {

def getS3Client: () => S3Client
Expand All @@ -41,6 +48,18 @@ trait S3RDDWriter {
keyPath: K => String,
putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p },
threads: Int = ConfigFactory.load().getThreads("geotrellis.s3.threads.rdd.write")
): Unit = {
update(rdd, bucket, keyPath, None, None, putObjectModifier, threads)
}

private[s3] def update[K: AvroRecordCodec: ClassTag, V: AvroRecordCodec: ClassTag](
rdd: RDD[(K, V)],
bucket: String,
keyPath: K => String,
writerSchema: Option[Schema],
mergeFunc: Option[(V, V) => V],
putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p },
threads: Int = ConfigFactory.load().getThreads("geotrellis.s3.threads.rdd.write")
): Unit = {
val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema
Expand All @@ -56,19 +75,46 @@ trait S3RDDWriter {
// on a key type that may no longer by valid for the key type of the resulting RDD.
rdd.groupBy({ row => keyPath(row._1) }, numPartitions = rdd.partitions.length)

val _recordCodec = KeyValueRecordCodec[K, V]
val kwWriterSchema = KryoWrapper(writerSchema)

pathsToTiles.foreachPartition { partition =>
if(partition.nonEmpty) {
import geotrellis.spark.util.TaskUtils._
val getS3Client = _getS3Client
val s3client: S3Client = getS3Client()
val schema = kwWriterSchema.value.getOrElse(_recordCodec.schema)

val requests: Process[Task, PutObjectRequest] =
Process.unfold(partition) { iter =>
if (iter.hasNext) {
val recs = iter.next()
val key = recs._1
val pairs = recs._2.toVector
val bytes = AvroEncoder.toBinary(pairs)(_codec)
val rows1: Vector[(K,V)] = recs._2.toVector
val rows2: Vector[(K,V)] =
if (mergeFunc != None) {
try {
val bytes = IOUtils.toByteArray(s3client.getObject(bucket, key).getObjectContent)
AvroEncoder.fromBinary(schema, bytes)(_recordCodec)
} catch {
case e: AmazonS3Exception if e.getStatusCode == 404 => Vector.empty
}
}
else Vector.empty
val outRows: Vector[(K, V)] =
mergeFunc match {
case Some(fn) =>
(rows2 ++ rows1)
.groupBy({ case (k,v) => k })
.map({ case (k, kvs) =>
val vs = kvs.map({ case (k,v) => v }).toSeq
val v: V = vs.tail.foldLeft(vs.head)(fn)
(k, v) })
.toVector
case None => rows1
}

val bytes = AvroEncoder.toBinary(outRows)(_codec)
val metadata = new ObjectMetadata()
metadata.setContentLength(bytes.length)
val is = new ByteArrayInputStream(bytes)
Expand Down

0 comments on commit 20e6219

Please sign in to comment.