Skip to content

Commit

Permalink
Non-Merging Updater For Hbase
Browse files Browse the repository at this point in the history
  • Loading branch information
James McClain authored and echeipesh committed Oct 4, 2017
1 parent 6a2243c commit fcd5da4
Showing 1 changed file with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,41 @@ class HBaseLayerUpdater(
layerReader: HBaseLayerReader
) extends LayerUpdater[LayerId] with LazyLogging {

protected def _overwrite[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K]
): Unit = {
_update(id, rdd, keyBounds, None)
}

protected def _update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M], keyBounds: KeyBounds[K], mergeFunc: (V, V) => V) = {
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: (V, V) => V
): Unit = {
_update(id, rdd, keyBounds, Some(mergeFunc))
}

def _update[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
V: AvroRecordCodec: ClassTag,
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[M],
keyBounds: KeyBounds[K],
mergeFunc: Option[(V, V) => V]
) = {
if (!attributeStore.layerExists(id)) throw new LayerNotFoundError(id)

val LayerAttributes(header, metadata, keyIndex, writerSchema) = try {
Expand Down Expand Up @@ -74,14 +104,18 @@ class HBaseLayerUpdater(
metadata.merge(rdd.metadata)

val updatedRdd: RDD[(K, V)] =
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
mergeFunc match {
case Some(mergeFunc) =>
existingTiles
.fullOuterJoin(rdd)
.flatMapValues {
case (Some(layerTile), Some(updateTile)) => Some(mergeFunc(layerTile, updateTile))
case (Some(layerTile), _) => Some(layerTile)
case (_, Some(updateTile)) => Some(updateTile)
case _ => None
}
case None => rdd
}

val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema
Expand Down

0 comments on commit fcd5da4

Please sign in to comment.