Skip to content

Commit

Permalink
more snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin authored and echeipesh committed Jan 15, 2018
1 parent 7f27522 commit f8e9982
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 113 deletions.
@@ -0,0 +1,7 @@
package geotrellis.spark.pipeline.ast

trait Exp[T] {
def lit(i: Int): T
def neg(t: T): T
def add(l: T, r: T): T
}
@@ -0,0 +1,16 @@
package geotrellis.spark.pipeline.ast

import geotrellis.spark.pipeline._
import geotrellis.spark.pipeline.json.Transform
import org.apache.spark.rdd.RDD

case class Group[T](
tags: List[String],
tag: String,
`type`: String = "transform.group"
) extends Transform {
def eval(labeledRDDs: List[(String, RDD[T])]): List[(String, RDD[T])] = {
val (grdds, rdds) = labeledRDDs.partition { case (l, _) => tags.contains(l) }
grdds.map { case (_, rdd) => tag -> rdd } ::: rdds
}
}
@@ -0,0 +1,14 @@
package geotrellis.spark.pipeline.ast.multiband.spatial

import geotrellis.spark.pipeline

import geotrellis.raster.Tile
import geotrellis.vector.ProjectedExtent

class Group {
def apply(tags: List[String],
tag: String,
`type`: String = "transform.group"): Group = {

}
}
@@ -0,0 +1,81 @@
package geotrellis.spark.pipeline.interpreter

import geotrellis.spark.pipeline.json._
import org.apache.spark.rdd.RDD

object TypedObject {
def read(clazz: String, `type`: String) = {
val isSpatial = `type`.indexOfSlice("spatial") > 0
val isSingleband = `type`.indexOfSlice("singleband") > 0
}

}

object Interpreter {
sealed trait Tree
final case class Leaf(clazz: String) extends Tree
final case class Node(clazz: String, ts: List[Tree]) extends Tree

def interpretUntyped = {
// everything should keep ordering
val reads: List[Read] = List()
val transformations: List[Transform] = List()
val writes: List[Write] = List()

val inputs: List[(String, RDD[Any])] = reads.map { r =>
// make instance of a class and typed; after that it's possible to erase types again.
// Class.forName(r.`type`).newInstance
// read =>
null: (String, RDD[Any])
}

// along with common transform operations there can be arguable aggregate functions,
// to rename multiple inputs
// or to merge them into multiband input

val reorogonizedInputs = transformations.flatMap {
case t: TransformGroup =>
// make instance of a class and typed; after that it's possible to erase types again.
// Class.forName(r.`type`).newInstance
// List[(String, RDD[Any])] => List[(String, RDD[Any])] function applied
// casting of RDD can be incapsulated into this functions
null: List[(String, RDD[Any])]

case t: TransformMerge =>
// make instance of a class and typed; after that it's possible to erase types again.
// Class.forName(r.`type`).newInstance
// List[(String, RDD[Any])] => List[(String, RDD[Any])] function applied
// casting of RDD can be incapsulated into this functions
null: List[(String, RDD[Any])]

// no transofmration steps applied
case _ => null: List[(String, RDD[Any])]
}

val generalTransformations: List[(String, RDD[Any])] = reorogonizedInputs.map {
case p @ (tag, rdd) =>
transformations.foldLeft(p) { case (acc, tr: Transform) =>
// make instance of a class and typed; after that it's possible to erase types again.
// Class.forName(r.`type`).newInstance
// List[(String, RDD[Any])] => List[(String, RDD[Any])] functions applied
// casting of RDD can be incapsulated into this functions
// String as a first tuple argument can be used to be sure that transformation can be applied
// runtime exceptions can happen: class not found, or type can't be casted
// shapeless.cast function can be used(?)

// tr.instance.apply(acc)

null: (String, RDD[Any])
}
}

writes.collect { case w: Write =>
// make instance of a class and typed; after that it's possible to erase types again.
// Class.forName(r.`type`).newInstance
// List[(String, RDD[Any])] => Boolean // Unit

()
}

}
}
@@ -1,4 +1,6 @@
package geotrellis.spark.pipeline
package geotrellis.spark.pipeline.json

import geotrellis.spark.pipeline.PipelineConstructor

trait PipelineExpr {
def ~(other: PipelineExpr): PipelineConstructor = this :: other :: Nil
Expand Down
@@ -1,4 +1,4 @@
package geotrellis.spark.pipeline
package geotrellis.spark.pipeline.json

case class PipelineKeyIndexMethod(
`type`: String,
Expand Down
@@ -1,17 +1,9 @@
package geotrellis.spark.pipeline

import java.net.URI
package geotrellis.spark.pipeline.json

import geotrellis.proj4.CRS
import geotrellis.raster.CellGrid
import geotrellis.spark.Metadata
import geotrellis.util.Component
import geotrellis.vector.ProjectedExtent
import org.apache.spark.rdd.RDD

import scala.reflect._
import scala.reflect.runtime.universe._
import scala.util.Try
import java.net.URI

trait Read extends PipelineExpr {
val profile: String
Expand All @@ -25,51 +17,37 @@ trait Read extends PipelineExpr {
def getURI = new URI(uri)
def getCRS = crs.map(c => Try(CRS.fromName(c)) getOrElse CRS.fromString(c))
def getTag = tag.getOrElse("default")

def eval[I, V]: RDD[(I, V)]
}

case class ReadHadoop(
`type`: String,
profile: String,
uri: String,
crs: Option[String] = None,
tag: Option[String] = None,
maxTileSize: Option[Int] = None,
partitions: Option[Int] = None,
clip: Boolean = false,
`type`: String = "read.hadoop"
) extends Read {
def eval[I, V]: RDD[(I, V)] = {
null
}
}
clip: Boolean = false
) extends Read

case class ReadS3(
`type`: String,
profile: String,
uri: String,
crs: Option[String] = None,
tag: Option[String] = None,
maxTileSize: Option[Int] = None,
partitions: Option[Int] = None,
clip: Boolean = false,
`type`: String = "read.s3"
) extends Read {
def eval[I, V]: RDD[(I, V)] = {
null
}
}
clip: Boolean = false
) extends Read

case class ReadFile(
`type`: String,
profile: String,
uri: String,
crs: Option[String] = None,
tag: Option[String] = None,
maxTileSize: Option[Int] = None,
partitions: Option[Int] = None,
clip: Boolean = false,
`type`: String = "read.file"
) extends Read {
def eval[I, V]: RDD[(I, V)] = {
null
}
}
clip: Boolean = false
) extends Read
@@ -1,4 +1,4 @@
package geotrellis.spark.pipeline
package geotrellis.spark.pipeline.json

trait Reindex extends PipelineExpr {
val name: String
Expand All @@ -8,41 +8,41 @@ trait Reindex extends PipelineExpr {
}

case class ReindexHadoop(
`type`: String,
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.hadoop"
keyIndexMethod: PipelineKeyIndexMethod
) extends Reindex

case class ReindexS3(
`type`: String,
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.s3"
keyIndexMethod: PipelineKeyIndexMethod
) extends Reindex

case class ReindexAccumulo(
`type`: String,
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.accumulo"
keyIndexMethod: PipelineKeyIndexMethod
) extends Reindex

case class ReindexCassandra(
`type`: String,
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.cassandra"
keyIndexMethod: PipelineKeyIndexMethod
) extends Reindex

case class ReindexHBase(
`type`: String,
name: String,
profile: String,
uri: String,
keyIndexMethod: PipelineKeyIndexMethod,
`type`: String = "reindex.hbase"
keyIndexMethod: PipelineKeyIndexMethod
) extends Reindex
@@ -1,16 +1,15 @@
package geotrellis.spark.pipeline
package geotrellis.spark.pipeline.json

import geotrellis.proj4.CRS
import geotrellis.raster.{CellGrid, CellSize, CellType}
import geotrellis.raster.crop.CropMethods
import geotrellis.raster.merge.TileMergeMethods
import geotrellis.raster.prototype.TilePrototypeMethods
import geotrellis.raster.reproject.TileReprojectMethods
import geotrellis.raster.resample.{NearestNeighbor, PointResampleMethod, ResampleMethod}
import geotrellis.raster.resample.{NearestNeighbor, PointResampleMethod}
import geotrellis.raster.stitch.Stitcher
import geotrellis.raster.{CellGrid, CellSize, CellType}
import geotrellis.spark.tiling.{FloatingLayoutScheme, LayoutDefinition, LayoutLevel, LayoutScheme, TilerKeyMethods, ZoomedLayoutScheme}
import geotrellis.spark._
import geotrellis.spark.{Boundable, Metadata, SpatialComponent}
import geotrellis.spark.{Boundable, Metadata, SpatialComponent, _}
import geotrellis.util.Component
import geotrellis.vector.ProjectedExtent
import org.apache.spark.rdd.RDD
Expand All @@ -20,59 +19,38 @@ import scala.util.Try

trait Transform extends PipelineExpr

trait TransformHomo {

}

/** Rename Inputs into groups */
case class TransformGroup(
`type`: String,
tags: List[String],
tag: String,
`type`: String = "transform.group"
) extends Transform {
def eval[I, V](labeledRDDs: List[(String, RDD[(I, V)])]): List[(String, RDD[(I, V)])] = {
val (grdds, rdds) = labeledRDDs.partition { case (l, _) => tags.contains(l) }
grdds.map { case (_, rdd) => tag -> rdd } ::: rdds
}
}
tag: String
) extends Transform

/** Merge inputs into a single Multiband RDD */
/*case class TransformMerge(
case class TransformMerge(
`type`: String,
tags: List[String],
tag: String,
`type`: String = "transform.merge"
) extends Transform {
def eval[I, V, V2](rdds: List[RDD[(I, V)]]): List[RDD[(I, V2)]] = null
}*/
tag: String
) extends Transform

case class TransformMap(
`type`: String,
func: String, // function name
tag: Option[String] = None,
`type`: String = "transform.map"
) extends Transform {
def eval[I, V](rdd: RDD[(I, V)]): RDD[(I, V)] = {
//Class.forName(func).newInstance().asInstanceOf[PipelineFunction]
null
}
}
tag: Option[String] = None
) extends Transform

case class TransformPerTileReproject(
crs: String,
`type`: String = "transform.reproject.per-tile"
`type`: String,
crs: String
) extends Transform {
def getCRS = Try(CRS.fromName(crs)) getOrElse CRS.fromString(crs)

def eval[
I: Component[?, ProjectedExtent],
V <: CellGrid: (? => TileReprojectMethods[V])
](rdd: RDD[(I, V)]): RDD[(I, V)] = rdd.reproject(getCRS)
}

case class TransformBufferedReproject(
`type`: String,
crs: String,
resampleMethod: PointResampleMethod = NearestNeighbor,
maxZoom: Option[Int] = None,
`type`: String = "transform.reproject.buffered"
maxZoom: Option[Int] = None
) extends Transform {
def getCRS = Try(CRS.fromName(crs)) getOrElse CRS.fromString(crs)

Expand Down

0 comments on commit f8e9982

Please sign in to comment.