New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETL Refactor #1553

Merged
merged 22 commits into from Aug 19, 2016

Conversation

Projects
None yet
4 participants
@pomadchin
Member

pomadchin commented Jun 17, 2016

Move ETL to use json configuration

  • classes to read to / from json
  • refactor etl input / output plugins to work with new inputs
  • update documentation
  • deprecate scallop
  • test on a real ingest (chatta-demo ingest / emr)
  • refactor landsat demo to work with this pr
  • refactor chatta demo to work with this pr
  • output into a separate file
  • improve json validation (our exceptions improvements and schema validation(?))
  • @echeipesh comments

pomadchin added some commits Jun 17, 2016

@pomadchin

This comment has been minimized.

Member

pomadchin commented Jun 17, 2016

//complete json example//

datasets.json:

[  
   {  
      "name":"test",
      "ingestType":{  
         "format":"geotiff",
         "inputCredentials":"inputCredentials name",
         "output":"hadoop",
         "outputCredentials":"outputCredentials name",
         "input":"hadoop"
      },
      "path":{  
         "input":"input",
         "output":"output"
      },
      "cache":"NONE",
      "ingestOptions":{  
         "breaks":"0:ffffe5ff;0.1:f7fcb9ff;0.2:d9f0a3ff;0.3:addd8eff;0.4:78c679ff;0.5:41ab5dff;0.6:238443ff;0.7:006837ff;1:004529ff",
         "reprojectMethod":"buffered",
         "cellSize":{  
            "width":256.0,
            "height":256.0
         },
         "encoding":"geotiff",
         "tileSize":256,
         "layoutExtent":{  
            "xmin":1.0,
            "ymin":2.0,
            "xmax":3.0,
            "ymax":4.0
         },
         "resolutionThreshold":0.1,
         "pyramid":true,
         "resampleMethod":"nearest-neighbor",
         "keyIndexMethod":{  
            "type":"zorder"
         },
         "layoutScheme":"tms",
         "cellType":"int8",
         "crs":"+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +wktext +no_defs "
      }
   }
]

credentials.json:

{
  "accumulo": [{
    "name": "name",
    "zookeepers": "zookeepers",
    "instance": "instance",
    "user": "user",
    "password": "password"
  }],
  "cassandra": [{
    "name": "name",
    "allowRemoteDCsForLocalConsistencyLevel": false,
    "localDc": "datacenter1",
    "usedHostsPerRemoteDc": 0,
    "hosts": "hosts",
    "replicationStrategy": "SimpleStrategy",
    "replicationFactor": 1,
    "user": "user",
    "password": "password"
  }],
  "s3": [],
  "hadoop": []
}
@lossyrob

This comment has been minimized.

Member

lossyrob commented Jun 17, 2016

Does this allow for additional backends to be supplied via code in client projects?

pomadchin added some commits Jun 20, 2016

@pomadchin pomadchin changed the title from [WIP] ETL Refactor to ETL Refactor Jun 20, 2016

@pomadchin

This comment has been minimized.

Member

pomadchin commented Jun 20, 2016

should include #1494

@pomadchin pomadchin changed the title from ETL Refactor to [WIP] ETL Refactor Jun 22, 2016

@RickMohr

This comment has been minimized.

RickMohr commented Jun 22, 2016

Will there be validation for the input JSON? If I mis-spell resampleMetod by mistake it would be fantastic to get an "Unknown property - resampleMetod" error rather than a silent baffling result.

(There are JSON schema validators for most languages, listed at http://json-schema.org/implementations.html. I don't see one for Scala, but for Java https://github.com/everit-org/json-schema looks active.)

@pomadchin

This comment has been minimized.

Member

pomadchin commented Jun 22, 2016

@RickMohr a good point, sure, more over our json validation should be improved too. thx for that notice

@pomadchin pomadchin referenced this pull request Jun 28, 2016

Merged

ETL Refactor #5

7 of 7 tasks complete
--layer nlcd-tms --crs EPSG:3857 --pyramid --layoutScheme {tms | floating}
### Ingest tiles from local fs or hdfs into s3 storage command
`datasets.json`:

This comment has been minimized.

@echeipesh

echeipesh Jun 29, 2016

Contributor

Would it make sense to have input dataset and output config to be in separate files? This way you can have multiple sources and only one destination like --output azavea-datahub.json

This comment has been minimized.

@pomadchin

pomadchin Jun 29, 2016

Member

@echeipesh a very good point

@pomadchin

This comment has been minimized.

Member

pomadchin commented Jul 8, 2016

@RickMohr have you got any comments? (i decided to use https://github.com/daveclayton/json-schema-validator, looks like it has larger community and supported right now)

@pomadchin pomadchin changed the title from [WIP] ETL Refactor to ETL Refactor Jul 8, 2016

import spray.json.DefaultJsonProtocol._
import spray.json._
class EtlConf(val credentials: Credentials, val datasets: List[Input], val output: Output) {

This comment has been minimized.

@echeipesh

echeipesh Jul 8, 2016

Contributor

I think its a cool idea to be able to have multiple inputs into an ETL job but EtlJob only takes one input, so it doesn't seem like having a list of them here is used?

|
|Usage: geotrellis-etl [options]
|
| --datasets <value>

This comment has been minimized.

@echeipesh

echeipesh Jul 8, 2016

Contributor

should be input just to be consistent with what was and output

@@ -0,0 +1,24 @@
package geotrellis.spark.etl.config
case class Credentials(accumulo: List[Accumulo], cassandra: List[Cassandra], s3: List[S3], hadoop: List[Hadoop]) {

This comment has been minimized.

@echeipesh

echeipesh Jul 8, 2016

Contributor

The job of this class seems to be to parse out the backend specific parameters, this is something that should be handled by constructor or companion object of each particular back-end.

@echeipesh

This comment has been minimized.

Contributor

echeipesh commented Jul 8, 2016

Ok, my biggest comment is on handling of credentials.json. In reality it serves two related purposes, to provide credentials and to provide some back-end specific configuration, that's great. We just need to make it a lookup table that we can call out to from the input.json Currently it calls out the type of the back-end as part of the JSON structure, which is kind of strange. Ideally this is a name -> config table like;

{
  "backend-profiles": [
    {
      "name": "accumulo-gis",
      "type": "Accumulo",
      "zookeepers": "zookeepers",
      "instance": "instance",
      "user": "user",
      "password": "password"
    },
    {
      "name": "cassandra-gis",
      "type": "Cassandra"
      "allowRemoteDCsForLocalConsistencyLevel": false,
      "localDc": "datacenter1",
      "usedHostsPerRemoteDc": 0,
      "hosts": "hosts",
      "replicationStrategy": "SimpleStrategy",
      "replicationFactor": 1,
      "user": "user",
      "password": "password"
    }
  ]
}

Now related problem is that an input needs to be able to refine those configurations by adding some kind of path/table/schema selection and configure an InputPlugin that will actually fetch me some bytes. It can look like:
input.json:

   {
      "name":"nlcd-tms",
      "format":"{geotiff | temporal-geotiff}",
      "backend":{
        "type":"file"
        "profile": "datahub-local",
        "path": "file:///Data/nlcd/tiles",
      },
      "NoData": 0,
      "cache":"NONE",
   }

Here we're going to ask for everything that affects reading the tiles: what format they're in, if we should cache them, if we should force a NoData while while reading them (because nobody cares to set it correctly when writing the files) and most importantly the backend to use. The backend js object will select the credentials/config profile and add some information that identifies a dataset in there.

I think you could get a little fancy by basically merging the profile with the backend block, which would allow you to shift the configuration from the profile into the input section, but lets say that's not justified here.

Then final comment is that the output is actually what determines the index and layout used, so those sections need to be moved there:

{
  "backend": {
      "type": "S3",
      "profile": "datahub-s3"
      "prefix": "/catalog"
  },
  "reprojectMethod":"buffered",
  "pyramid":true,
  "resampleMethod":"nearest-neighbor",
  "keyIndexMethod":{
    "type":"zorder"
  },
  "layoutScheme": { // just map the fields directly to zoomed and floating layout scheme objects
     "type": "zoomed",
     "tileSize": 256,
     "crs":"EPSG:3857"
  },
}

Hopefully by dealing with backend profiles this way you can even generate a case class that you can pass to the input module like AccumuloBackend that will have all the fields set. You could parse them by partially reading the JSON object to read the "type" and then using the correct format to read it fully and shove them into a Map[String, Backend] or something like that.

I like the idea of the CustomBackend you have and that fits pretty nicely into this flow as well.

"instance",
"user",
"password"
]

This comment has been minimized.

@RickMohr

RickMohr Jul 8, 2016

Consider adding

"additionalProperties": false

(Explained e.g. at https://spacetelescope.github.io/understanding-json-schema/reference/object.html)

This comment has been minimized.

@pomadchin

pomadchin Jul 8, 2016

Member

@RickMohr probably description field would be good to add as well, thanks!

@RickMohr

This comment has been minimized.

RickMohr commented Jul 8, 2016

Thanks for adding validation!

@lossyrob

This comment has been minimized.

Member

lossyrob commented Jul 29, 2016

@pomadchin can you update? I think we're ready for this to be merged, yeah?

@pomadchin

This comment has been minimized.

Member

pomadchin commented Jul 30, 2016

@lossyrob yes, sure

@pomadchin

This comment has been minimized.

Member

pomadchin commented Jul 30, 2016

merged master, though need to tests on any demo, how / if hbase modules work

sealed trait BackendProfile {
val name: String
def `type`: String

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

This can actually be BackendType instead of a string

case class UserDefinedBackendType(name: String) extends BackendType
case class UserDefinedBackendInputType(name: String) extends BackendInputType

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

this is unused and seems like a an artifact of earlier layout

@@ -0,0 +1,6 @@
package geotrellis.spark.etl.config
case class IngestOutputType(

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

unused class

This comment has been minimized.

@pomadchin

pomadchin Aug 4, 2016

Member

@echeipesh good catch, it's from previous revisions; thanks!

@@ -0,0 +1,3 @@
package geotrellis.spark.etl.config
case class Backend(`type`: BackendType, path: String, profile: Option[String] = None)

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

This really begs for profile to be an Option[BackendProfile], see comment in the parser for details.

val outputParsed = output.parseJson.convertTo[Output]
inputsParsed.map { inputParsed =>

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

This is basically trying to do a lookup from profile name to profile instance. At this point you have parsed the profiles already. It would be just as easy to give that as a constructor to JsonFormat[Inout] instance defined here, which would be used for parsing. This would also allow you to upgrade Backend to have the actual profile.

case _ => throw new Exception("unsupported keyIndexMethod definition")
}
def getKeyIndexMethod[K] = _getKeyIndexMethod.asInstanceOf[KeyIndexMethod[K]]

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

Code style, no need for private def here you should be able to:

def getKeyIndexMethod[K] = {
(keyIndexMethod.`type`, keyIndexMethod.temporalResolution) match {
     case ("rowmajor", None)    => RowMajorKeyIndexMethod
     case ("hilbert", None)     => HilbertKeyIndexMethod
     case ("hilbert", Some(tr)) => HilbertKeyIndexMethod(tr.toInt)
     case ("zorder", None)      => ZCurveKeyIndexMethod
     case ("zorder", Some(tr))  => ZCurveKeyIndexMethod.byMilliseconds(tr)
     case _                     => throw new Exception("unsupported keyIndexMethod definition")
}.asInstanceof[KeyIndexMethod[K]]

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

Actually I would really expect scala type inference to figure it out without asInstanceof, could be wrong there.

cache: Option[StorageLevel] = None,
noData: Option[Double] = None
) {
def params = getParams(backend.`type`, backend.path)

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

This branches on backend type. Should Backend have a type hierarchy? In that case this could be more clearly explained as overloading an abstract method.

@@ -0,0 +1,7 @@
package geotrellis.spark.etl.config
case class IngestType(

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

Unused

def getInstance = HBaseInstance(zookeepers.split(","), master)
}
case class BackendProfiles(backendProfiles: BackendProfile*)

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

I'm unclear if this is useful, only seems to be used in tests?

Map("keyspace" -> keyspace, "table" -> table)
}
case HadoopType | FileType => Map("path" -> p)
case UserDefinedBackendType(s) => Map("input" -> s)

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

UserDefinedBackendType doesn't get to use the path ?

This comment has been minimized.

@pomadchin

pomadchin Aug 11, 2016

Member

i wanted to set there more semantics; like in case of hadoop and file we have "path", but in case of some custom backend (see landsat emr demo pr) it can be just any kind of input (as mentioned: params to load landsat tiles)

@echeipesh

This comment has been minimized.

Contributor

echeipesh commented Aug 4, 2016

Most of the comments in round two are around making a Backend type hierarchy to clean up all those match statements and upgrading Strings to instances of objects they name.

A really cool thing this allows is getting rid of the params all-together. Plugins could get just instances of Backend and BackendProfile and cast them to the type they must be. This would get rid of all the string parsing. The whole params: Maps[String, String] is an artifact parsing command line arguments that don't really have a deep structure that JSON does. This is a lot of code changes so probably it can be done as round 2 PR into ETL.

import geotrellis.spark.etl.config._
case class EtlJob(conf: EtlConf) extends Serializable {

This comment has been minimized.

@echeipesh

echeipesh Aug 4, 2016

Contributor

Once backends generate their own params, what is the purpose of this class? We could just pass EtlConf at that point, but does that lose meaning?

pomadchin added some commits Aug 11, 2016

@pomadchin

This comment has been minimized.

Member

pomadchin commented Aug 11, 2016

includes rather important hbase deps fix ("org.codehaus.jackson" % "jackson-core-asl" % "1.8.3" intransitive())

@pomadchin

This comment has been minimized.

Member

pomadchin commented Aug 12, 2016

without last dirty fix:

Exception in thread "main" java.lang.ClassCastException: geotrellis.spark.io.index.ZCurveKeyIndexMethod$ cannot be cast to geotrellis.spark.io.index.KeyIndexMethod
    at geotrellis.spark.etl.config.Output.getKeyIndexMethod(Output.scala:45)
    at geotrellis.spark.etl.accumulo.SpatialAccumuloOutput.writer(SpatialAccumuloOutput.scala:13)
    at geotrellis.spark.etl.OutputPlugin$class.apply(OutputPlugin.scala:19)
    at geotrellis.spark.etl.accumulo.SpatialAccumuloOutput.apply(SpatialAccumuloOutput.scala:11)
    at geotrellis.spark.etl.Etl.savePyramid$1(Etl.scala:178)
    at geotrellis.spark.etl.Etl.save(Etl.scala:192)
    at geotrellis.spark.etl.Etl$$anonfun$ingest$1.apply(Etl.scala:47)
    at geotrellis.spark.etl.Etl$$anonfun$ingest$1.apply(Etl.scala:39)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at geotrellis.spark.etl.Etl$.ingest(Etl.scala:39)
    at geotrellis.chatta.ChattaIngest$delayedInit$body.apply(ChattaIngest.scala:15)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at geotrellis.chatta.ChattaIngest$.main(ChattaIngest.scala:12)
    at geotrellis.chatta.ChattaIngest.main(ChattaIngest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

pomadchin added some commits Aug 12, 2016

@echeipesh

This comment has been minimized.

Contributor

echeipesh commented Aug 19, 2016

💯 Thank you for pushing this for so long, looks really good.

@echeipesh echeipesh merged commit 2077839 into locationtech:master Aug 19, 2016

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@lossyrob lossyrob added this to the 1.0 milestone Oct 18, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment