Skip to content

Commit

Permalink
Move Functionality in File and Hadoop
Browse files Browse the repository at this point in the history
  • Loading branch information
James McClain authored and echeipesh committed Oct 4, 2017
1 parent 37f6ed4 commit e506d88
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FileLayerReader(
)(implicit sc: SparkContext) extends FilteringLayerReader[LayerId] with LazyLogging {

val defaultNumPartitions = sc.defaultParallelism
def sparkContext: SparkContext = sc

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,97 +39,30 @@ class FileLayerUpdater(
layerReader: FileLayerReader
) extends LayerUpdater[LayerId] with LazyLogging {

protected def _overwrite[
val layerWriter = new FileLayerWriter(attributeStore, catalogPath)
implicit val sc: SparkContext = layerReader.sparkContext

def update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K]
): Unit = {
_update(id, rdd, keyBounds, None)
}
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M], mergeFunc: (V, V) => V): Unit =
layerWriter.update(id, rdd, mergeFunc)

protected def _update[
def update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: (V, V) => V
): Unit = {
_update(id, rdd, keyBounds, Some(mergeFunc))
}
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M]): Unit =
layerWriter.update(id, rdd)

def _update[
def overwrite[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: Option[(V, V) => V]
): Unit = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)
val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[FileLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerUpdateError(id).initCause(e)
}

val path = header.path

if (!(keyIndex.keyBounds contains keyBounds))
throw new LayerOutOfKeyBoundsError(id, keyIndex.keyBounds)

val maxWidth = Index.digits(keyIndex.toIndex(keyIndex.keyBounds.maxKey))
val keyPath = KeyPathGenerator(catalogPath, path, keyIndex, maxWidth)
val layerPath = new File(catalogPath, path).getAbsolutePath

logger.info(s"Saving updated RDD for layer ${id} to $path")
val existingTiles =
if(schemaHasChanged[K, V](writerSchema)) {
logger.warn(s"RDD schema has changed, this requires rewriting the entire layer.")
layerReader
.read[K, V, M](id)

} else {
val query =
new LayerQuery[K, M]
.where(Intersects(rdd.metadata.getComponent[Bounds[K]].get))

layerReader.read[K, V, M](id, query, layerReader.defaultNumPartitions, filterIndexOnly = true)
}

val updatedMetadata: M =
metadata.merge(rdd.metadata)

val updatedRdd: RDD[(K, V)] =
mergeFunc match {
case Some(mergeFunc) =>
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
case None => rdd
}

val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M]): Unit =
layerWriter.overwrite(id, rdd)

// Write updated metadata, and the possibly updated schema
// Only really need to write the metadata and schema
attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema)
FileRDDWriter.write[K, V](updatedRdd, layerPath, keyPath)
}
}

object FileLayerUpdater {
Expand Down
103 changes: 102 additions & 1 deletion spark/src/main/scala/geotrellis/spark/io/file/FileLayerWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package geotrellis.spark.io.file

import geotrellis.raster._
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.io.index._
import geotrellis.raster._
import geotrellis.spark.merge._
import geotrellis.util._

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import spray.json._

import scala.reflect._
Expand All @@ -48,6 +51,104 @@ class FileLayerWriter(
catalogPath: String
) extends LayerWriter[LayerId] with LazyLogging {

// Layer Updating
protected def _overwrite[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
sc: SparkContext,
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K]
): Unit = {
_update(sc, id, rdd, keyBounds, None)
}

protected def _update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
sc: SparkContext,
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: (V, V) => V
): Unit = {
_update(sc, id, rdd, keyBounds, Some(mergeFunc))
}

def _update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
sc: SparkContext,
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: Option[(V, V) => V]
): Unit = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)
val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[FileLayerHeader, M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerUpdateError(id).initCause(e)
}

val path = header.path

if (!(keyIndex.keyBounds contains keyBounds))
throw new LayerOutOfKeyBoundsError(id, keyIndex.keyBounds)

val maxWidth = Index.digits(keyIndex.toIndex(keyIndex.keyBounds.maxKey))
val keyPath = KeyPathGenerator(catalogPath, path, keyIndex, maxWidth)
val layerPath = new File(catalogPath, path).getAbsolutePath
val layerReader = FileLayerReader(attributeStore, catalogPath)(sc)

logger.info(s"Saving updated RDD for layer ${id} to $path")
val existingTiles =
if(schemaHasChanged[K, V](writerSchema)) {
logger.warn(s"RDD schema has changed, this requires rewriting the entire layer.")
layerReader
.read[K, V, M](id)

} else {
val query =
new LayerQuery[K, M]
.where(Intersects(rdd.metadata.getComponent[Bounds[K]].get))

layerReader.read[K, V, M](id, query, layerReader.defaultNumPartitions, filterIndexOnly = true)
}

val updatedMetadata: M =
metadata.merge(rdd.metadata)

val updatedRdd: RDD[(K, V)] =
mergeFunc match {
case Some(mergeFunc) =>
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
case None => rdd
}

val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema

// Write updated metadata, and the possibly updated schema
// Only really need to write the metadata and schema
attributeStore.writeLayerAttributes(id, header, updatedMetadata, keyIndex, schema)
FileRDDWriter.write[K, V](updatedRdd, layerPath, keyPath)
}

// Layer Writing
protected def _write[
K: AvroRecordCodec: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class HadoopLayerReader(
extends FilteringLayerReader[LayerId] with LazyLogging {

val defaultNumPartitions = sc.defaultParallelism
def sparkContext: SparkContext = sc

def read[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,85 +39,28 @@ class HadoopLayerUpdater(
layerCopier: HadoopLayerCopier
) extends LayerUpdater[LayerId] with LazyLogging {

protected def _overwrite[
implicit val sc = layerReader.sparkContext

def update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K]
): Unit = {
_update(id, rdd, keyBounds, None)
}
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M], mergeFunc: (V, V) => V): Unit =
layerWriter.update(id, rdd, mergeFunc)

protected def _update[
def update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: (V, V) => V
): Unit = {
_update(id, rdd, keyBounds, Some(mergeFunc))
}
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M]): Unit =
layerWriter.update(id, rdd)

def _update[
def overwrite[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: Option[(V, V) => V]
): Unit = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)
val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
attributeStore.readLayerAttributes[HadoopLayerHeader,M, K](id)
} catch {
case e: AttributeNotFoundError => throw new LayerUpdateError(id).initCause(e)
}

if (!(keyIndex.keyBounds contains keyBounds))
throw new LayerOutOfKeyBoundsError(id, keyIndex.keyBounds)

logger.warn(s"MapFiles cannot be updated, so this requires rewriting the entire layer.")

val entireLayer = layerReader.read[K, V, M](id)

val updatedMetadata: M =
metadata.merge(rdd.metadata)

val fn = mergeFunc match {
case Some(fn) => fn
case None => { (v1: V, v2: V) => v2 }
}

val updatedRdd: RDD[(K, V)] =
entireLayer
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(fn(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}

val updated = ContextRDD(updatedRdd, updatedMetadata)

val tmpId = id.createTemporaryId
logger.info(s"Saving updated RDD to temporary id $tmpId")
layerWriter.write(tmpId, updated, keyIndex)
logger.info(s"Deleting layer $id")
layerDeleter.delete(id)
logger.info(s"Copying in $tmpId to $id")
layerCopier.copy[K, V, M](tmpId, id)
logger.info(s"Deleting temporary layer at $tmpId")
layerDeleter.delete(tmpId)
}
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M]): Unit =
layerWriter.overwrite(id, rdd)
}

object HadoopLayerUpdater {
Expand Down
Loading

0 comments on commit e506d88

Please sign in to comment.