Skip to content

Commit

Permalink
Created the LayerType type and added the optional layerType parameter…
Browse files Browse the repository at this point in the history
… to the LayerHeaders

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Added the layerType parameter to the rest of the LayerHeaders

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Fixed bug in HBaseLayerHeader

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Created the LayerType Json format

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Improved the toString method for LayerType objects

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Fixed bug where the old LayerHeaders could not be read

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Cleaned up the code

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>

Cleaned up the code some more

Signed-off-by: Jacob Bouffard <jbouffard@azavea.com>
  • Loading branch information
Jacob Bouffard committed May 1, 2018
1 parent 71bfb89 commit 29b3903
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 38 deletions.
Expand Up @@ -16,14 +16,15 @@

package geotrellis.spark.io.accumulo

import geotrellis.spark.io.LayerHeader
import geotrellis.spark.io.{LayerHeader, LayerType, AvroLayerType}

import spray.json._

case class AccumuloLayerHeader(
keyClass: String,
valueClass: String,
tileTable: String
tileTable: String,
layerType: LayerType = AvroLayerType
) extends LayerHeader {
def format = "accumulo"
}
Expand All @@ -35,16 +36,26 @@ object AccumuloLayerHeader {
"format" -> JsString(md.format),
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass),
"tileTable" -> JsString(md.tileTable)
"tileTable" -> JsString(md.tileTable),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): AccumuloLayerHeader =
value.asJsObject.getFields("keyClass", "valueClass", "tileTable") match {
value.asJsObject.getFields("keyClass", "valueClass", "tileTable", "layerType") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(tileTable), layerType) =>
AccumuloLayerHeader(
keyClass,
valueClass,
tileTable,
layerType.convertTo[LayerType]
)
case Seq(JsString(keyClass), JsString(valueClass), JsString(tileTable)) =>
AccumuloLayerHeader(
keyClass,
valueClass,
tileTable)
tileTable,
AvroLayerType
)
case _ =>
throw new DeserializationException(s"AccumuloLayerHeader expected, got: $value")
}
Expand Down
Expand Up @@ -16,15 +16,16 @@

package geotrellis.spark.io.cassandra

import geotrellis.spark.io.LayerHeader
import geotrellis.spark.io.{LayerHeader, LayerType, AvroLayerType}

import spray.json._

case class CassandraLayerHeader(
keyClass: String,
valueClass: String,
keyspace: String,
tileTable: String
tileTable: String,
layerType: LayerType = AvroLayerType
) extends LayerHeader {
def format = "cassandra"
}
Expand All @@ -37,17 +38,28 @@ object CassandraLayerHeader {
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass),
"keyspace" -> JsString(md.keyspace),
"tileTable" -> JsString(md.tileTable)
"tileTable" -> JsString(md.tileTable),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): CassandraLayerHeader =
value.asJsObject.getFields("keyClass", "valueClass", "keyspace", "tileTable") match {
value.asJsObject.getFields("keyClass", "valueClass", "keyspace", "tileTable", "layerType") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(keyspace), JsString(tileTable), layerType) =>
CassandraLayerHeader(
keyClass,
valueClass,
keyspace,
tileTable,
layerType.convertTo[LayerType]
)
case Seq(JsString(keyClass), JsString(valueClass), JsString(keyspace), JsString(tileTable)) =>
CassandraLayerHeader(
keyClass,
valueClass,
keyspace,
tileTable)
tileTable,
AvroLayerType
)
case _ =>
throw new DeserializationException(s"CassandraLayerHeader expected, got: $value")
}
Expand Down
Expand Up @@ -40,7 +40,7 @@ class CassandraLayerUpdater(
V: AvroRecordCodec: ClassTag,
M: JsonFormat: Component[?, Bounds[K]]: Mergable
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M], keyBounds: KeyBounds[K], mergeFunc: (V, V) => V): Unit = {
val CassandraLayerHeader(_, _, keyspace, table) = attributeStore.readHeader[CassandraLayerHeader](id)
val CassandraLayerHeader(_, _, keyspace, table, _) = attributeStore.readHeader[CassandraLayerHeader](id)
val layerWriter = new CassandraLayerWriter(attributeStore, instance, keyspace, table)
layerWriter.update(id, rdd, mergeFunc)
}
Expand All @@ -50,7 +50,7 @@ class CassandraLayerUpdater(
V: AvroRecordCodec: ClassTag,
M: JsonFormat: Component[?, Bounds[K]]: Mergable
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M]): Unit = {
val CassandraLayerHeader(_, _, keyspace, table) = attributeStore.readHeader[CassandraLayerHeader](id)
val CassandraLayerHeader(_, _, keyspace, table, _) = attributeStore.readHeader[CassandraLayerHeader](id)
val layerWriter = new CassandraLayerWriter(attributeStore, instance, keyspace, table)
layerWriter.overwrite(id, rdd)
}
Expand Down
Expand Up @@ -16,14 +16,15 @@

package geotrellis.spark.io.hbase

import geotrellis.spark.io.LayerHeader
import geotrellis.spark.io.{LayerHeader, LayerType, AvroLayerType}

import spray.json._

case class HBaseLayerHeader(
keyClass: String,
valueClass: String,
tileTable: String
tileTable: String,
layerType: LayerType = AvroLayerType
) extends LayerHeader {
def format = "hbase"
}
Expand All @@ -35,16 +36,26 @@ object HBaseLayerHeader {
"format" -> JsString(md.format),
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass),
"tileTable" -> JsString(md.tileTable)
"tileTable" -> JsString(md.tileTable),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): HBaseLayerHeader =
value.asJsObject.getFields("keyClass", "valueClass", "tileTable") match {
value.asJsObject.getFields("keyClass", "valueClass", "tileTable", "layerType") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(tileTable), layerType) =>
HBaseLayerHeader(
keyClass,
valueClass,
tileTable,
layerType.convertTo[LayerType]
)
case Seq(JsString(keyClass), JsString(valueClass), JsString(tileTable)) =>
HBaseLayerHeader(
keyClass,
valueClass,
tileTable)
tileTable,
AvroLayerType
)
case _ =>
throw new DeserializationException(s"HBaseLayerHeader expected, got: $value")
}
Expand Down
24 changes: 16 additions & 8 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3LayerHeader.scala
Expand Up @@ -17,15 +17,16 @@
package geotrellis.spark.io.s3

import geotrellis.raster.Tile
import geotrellis.spark.io.LayerHeader
import geotrellis.spark.io.{LayerHeader, LayerType, AvroLayerType}

import spray.json._

case class S3LayerHeader(
keyClass: String,
valueClass: String,
bucket: String,
key: String
key: String,
layerType: LayerType = AvroLayerType
) extends LayerHeader {
def format = "s3"
}
Expand All @@ -38,21 +39,28 @@ object S3LayerHeader {
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass),
"bucket" -> JsString(md.bucket.toString),
"key" -> JsString(md.key.toString)
"key" -> JsString(md.key.toString),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): S3LayerHeader =
value.asJsObject.getFields("keyClass", "valueClass", "bucket", "key") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(bucket), JsString(key)) =>
value.asJsObject.getFields("keyClass", "valueClass", "bucket", "key", "layerType") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(bucket), JsString(key), layerType) =>
S3LayerHeader(
keyClass,
valueClass,
bucket, key)
case Seq(JsString(keyClass), JsString(bucket), JsString(key)) =>
bucket,
key,
layerType.convertTo[LayerType]
)
case Seq(JsString(keyClass), JsString(bucket), JsString(key), JsString(layerType)) =>
S3LayerHeader(
keyClass,
classOf[Tile].getCanonicalName,
bucket, key)
bucket,
key,
AvroLayerType
)

case other =>
throw new DeserializationException(s"S3LayerHeader expected, got: $other")
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/scala/geotrellis/spark/io/LayerHeader.scala
Expand Up @@ -24,6 +24,7 @@ trait LayerHeader {
def format: String
def keyClass: String
def valueClass: String
def layerType: LayerType
}

object LayerHeader {
Expand All @@ -32,16 +33,25 @@ object LayerHeader {
JsObject(
"format" -> JsString(md.format),
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass)
"valueClass" -> JsString(md.valueClass),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): LayerHeader =
value.asJsObject.getFields("format", "keyClass", "valueClass") match {
value.asJsObject.getFields("format", "keyClass", "valueClass", "layerType") match {
case Seq(JsString(_format), JsString(_keyClass), JsString(_valueClass), _layerType) =>
new LayerHeader {
val format = _format
val keyClass = _keyClass
val valueClass = _valueClass
def layerType = _layerType.convertTo[LayerType]
}
case Seq(JsString(_format), JsString(_keyClass), JsString(_valueClass)) =>
new LayerHeader {
val format = _format
val keyClass = _keyClass
val valueClass = _valueClass
def layerType = AvroLayerType
}
case _ =>
throw new DeserializationException(s"LayerHeader expected, got: $value")
Expand Down
33 changes: 33 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/LayerType.scala
@@ -0,0 +1,33 @@
package geotrellis.spark.io

import spray.json._


trait LayerType {
lazy val name = this.getClass.getName.split("\\$").last.split("\\.").last
override def toString = name
}

object LayerType {
def fromString(str: String): LayerType =
str match {
case AvroLayerType.name => AvroLayerType
case COGLayerType.name => COGLayerType
case _ => throw new Exception(s"Could not derive LayerType from given string: $str")
}

implicit object LayerTypeFormat extends RootJsonFormat[LayerType] {
def write(layerType: LayerType) = JsString(layerType.name)

def read(value: JsValue): LayerType =
value match {
case JsString(layerType) =>
LayerType.fromString(layerType)
case v =>
throw new DeserializationException(s"LayerType expected, got $v")
}
}
}

case object AvroLayerType extends LayerType
case object COGLayerType extends LayerType
Expand Up @@ -16,13 +16,14 @@

package geotrellis.spark.io.file

import geotrellis.spark.io.LayerHeader
import geotrellis.spark.io.{LayerHeader, LayerType, AvroLayerType}
import spray.json._

case class FileLayerHeader(
keyClass: String,
valueClass: String,
path: String
path: String,
layerType: LayerType = AvroLayerType
) extends LayerHeader {
def format = "file"
}
Expand All @@ -34,16 +35,26 @@ object FileLayerHeader {
"format" -> JsString(md.format),
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass),
"path" -> JsString(md.path)
"path" -> JsString(md.path),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): FileLayerHeader =
value.asJsObject.getFields("keyClass", "valueClass", "path") match {
value.asJsObject.getFields("keyClass", "valueClass", "path", "layerType") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(path), layerType) =>
FileLayerHeader(
keyClass,
valueClass,
path,
layerType.convertTo[LayerType]
)

case Seq(JsString(keyClass), JsString(valueClass), JsString(path)) =>
FileLayerHeader(
keyClass,
valueClass,
path
path,
AvroLayerType
)

case _ =>
Expand Down
Expand Up @@ -16,15 +16,16 @@

package geotrellis.spark.io.hadoop

import geotrellis.spark.io.LayerHeader
import geotrellis.spark.io.{LayerHeader, LayerType, AvroLayerType}

import java.net.URI
import spray.json._

case class HadoopLayerHeader(
keyClass: String,
valueClass: String,
path: URI
path: URI,
layerType: LayerType = AvroLayerType
) extends LayerHeader {
def format = "hdfs"
}
Expand All @@ -36,16 +37,27 @@ object HadoopLayerHeader {
"format" -> JsString(md.format),
"keyClass" -> JsString(md.keyClass),
"valueClass" -> JsString(md.valueClass),
"path" -> JsString(md.path.toString)
"path" -> JsString(md.path.toString),
"layerType" -> md.layerType.toJson
)

def read(value: JsValue): HadoopLayerHeader =
value.asJsObject.getFields("keyClass", "valueClass", "path") match {
value.asJsObject.getFields("keyClass", "valueClass", "path", "layerType") match {
case Seq(JsString(keyClass), JsString(valueClass), JsString(path), layerType) =>
HadoopLayerHeader(
keyClass,
valueClass,
new URI(path),
layerType.convertTo[LayerType]
)

case Seq(JsString(keyClass), JsString(valueClass), JsString(path)) =>
HadoopLayerHeader(
keyClass,
keyClass,
valueClass,
new URI(path))
new URI(path),
AvroLayerType
)
case _ =>
throw new DeserializationException(s"HadoopLayerMetadata expected, got: $value")
}
Expand Down

0 comments on commit 29b3903

Please sign in to comment.