Skip to content

Commit

Permalink
Merge pull request #628 from jbouffard/feature/geotools/shapefiles
Browse files Browse the repository at this point in the history
Shapefile Reading API
  • Loading branch information
Jacob Bouffard committed Feb 6, 2018
2 parents 2f945e2 + 2f8766e commit 542e037
Show file tree
Hide file tree
Showing 23 changed files with 649 additions and 3 deletions.
3 changes: 2 additions & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ set -x

cd geopyspark-backend \
&& ./sbt -Dbintray.user=$BINTRAY_USER -Dbintray.pass=$BINTRAY_PASS "project geotrellis-backend" publish \
&& ./sbt -Dbintray.user=$BINTRAY_USER -Dbintray.pass=$BINTRAY_PASS "project vectorpipe" publish
&& ./sbt -Dbintray.user=$BINTRAY_USER -Dbintray.pass=$BINTRAY_PASS "project vectorpipe" publish \
&& ./sbt -Dbintray.user=$BINTRAY_USER -Dbintray.pass=$BINTRAY_PASS "project geotools" publish
7 changes: 6 additions & 1 deletion geopyspark-backend/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ lazy val vectorPipeProject = Project("vectorpipe", file("vectorpipe"))
.settings(publishSettings: _*)
.dependsOn(utilProject)

lazy val geotoolsProject = Project("geotools", file("geotools"))
.settings(commonSettings: _*)
.settings(publishSettings: _*)
.dependsOn(utilProject)

lazy val geotrellisProject = Project("geotrellis-backend", file("geotrellis"))
.settings(commonSettings: _*)
.settings(publishSettings: _*)
.dependsOn(utilProject, vectorPipeProject)
.dependsOn(utilProject, vectorPipeProject, geotoolsProject)
36 changes: 36 additions & 0 deletions geopyspark-backend/geotools/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name := "geotools-backend"

resolvers ++= Seq(
"osgeo" at "http://download.osgeo.org/webdav/geotools",
"geosolutions" at "http://maven.geo-solutions.it/",
"Geotoolkit Repo" at "http://maven.geotoolkit.org",
"Location Tech GeoTrellis Snapshots" at "https://repo.locationtech.org/content/repositories/geotrellis-snapshots",
"Location Tech GeoTrellis resleases" at "https://repo.locationtech.org/content/groups/releases",
Resolver.mavenLocal
)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
"org.locationtech.geotrellis" %% "geotrellis-s3" % Version.geotrellis,
"org.locationtech.geotrellis" %% "geotrellis-s3-testkit" % Version.geotrellis,
"org.locationtech.geotrellis" %% "geotrellis-spark" % Version.geotrellis,
"org.locationtech.geotrellis" %% "geotrellis-geotools" % Version.geotrellis,
"org.locationtech.geotrellis" %% "geotrellis-shapefile" % Version.geotrellis,
"de.javakaffee" % "kryo-serializers" % "0.38" exclude("com.esotericsoftware", "kryo"),
"javax.media" % "jai_core" % "1.1.3" % Test from "http://download.osgeo.org/webdav/geotools/javax/media/jai_core/1.1.3/jai_core-1.1.3.jar"
)

assemblyMergeStrategy in assembly := {
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case "META-INF/MANIFEST.MF" => MergeStrategy.discard
case "META-INF\\MANIFEST.MF" => MergeStrategy.discard
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.discard
case "META-INF/ECLIPSEF.SF" => MergeStrategy.discard
case x if x.startsWith("META-INF/services") => MergeStrategy.concat
case _ => MergeStrategy.first
}

PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package protos;

message ProtoSimpleFeature {
bytes geom = 1;
map<string, string> metadata = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package geopyspark.geotools

import geotrellis.spark.io.hadoop._

import org.apache.spark._
import org.apache.hadoop.fs.Path

import java.net.URI


object HadoopUtils {
def listFiles(sc: SparkContext, uris: Array[URI], extensions: Seq[String]): Array[String] =
uris.flatMap { uri => listFiles(sc, uri, extensions) }

def listFiles(sc: SparkContext, uri: URI, extensions: Seq[String]): Array[String] = {
val path: Path = new Path(uri)
val conf = sc.hadoopConfiguration.withInputDirectory(path, extensions)

HdfsUtils
.listFiles(path, conf)
.map { _.toString }
.filter { path => extensions.exists { e => path.endsWith(e) } }
.toArray
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package geopyspark.geotools

import geotrellis.spark.io.s3._

import org.apache.spark._

import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing}

import java.net.URI

import scala.collection.JavaConverters._
import scala.collection.mutable


object S3Utils {
def listKeys(uris: Array[URI], extensions: Seq[String], s3Client: S3Client): Array[String] =
uris.flatMap { uri => listKeys(uri, extensions, s3Client) }

def listKeys(uri: URI, extensions: Seq[String], s3Client: S3Client): Array[String] =
listKeys(uri.getHost, uri.getPath.tail, extensions, s3Client)

def listKeys(
s3bucket: String,
s3prefix: String,
extensions: Seq[String],
s3Client: S3Client
): Array[String] = {
val objectRequest = (new ListObjectsRequest)
.withBucketName(s3bucket)
.withPrefix(s3prefix)

listKeys(objectRequest, s3Client)
.filter { path => extensions.exists { e => path.endsWith(e) } }
.collect { case key =>
s"https://s3.amazonaws.com/${s3bucket}/${key}"
}.toArray
}

// Copied from GeoTrellis codebase
def listKeys(listObjectsRequest: ListObjectsRequest, s3Client: S3Client): Array[String] = {
var listing: ObjectListing = null
val result = mutable.ListBuffer[String]()
do {
listing = s3Client.listObjects(listObjectsRequest)
// avoid including "directories" in the input split, can cause 403 errors on GET
result ++= listing.getObjectSummaries.asScala.map(_.getKey).filterNot(_ endsWith "/")
listObjectsRequest.setMarker(listing.getNextMarker)
} while (listing.isTruncated)

result.toArray
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package geopyspark.geotools.kryo

import geotrellis.spark.io.kryo._

import com.esotericsoftware.kryo.Kryo
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.feature.simple.SimpleFeatureImpl

import de.javakaffee.kryoserializers._


class ExpandedKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
UnmodifiableCollectionsSerializer.registerSerializers(kryo)
kryo.register(classOf[SimpleFeature])
kryo.register(classOf[SimpleFeatureImpl])
kryo.register(classOf[SimpleFeatureType])
super.registerClasses(kryo)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package geopyspark


package object geotools extends protobufs.Implicits
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package geopyspark.geotools.protobufs


object Implicits extends Implicits

trait Implicits extends SimpleFeatureProtoBuf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package geopyspark.geotools.protobufs

import geopyspark.util._
import geotrellis.vector._
import geotrellis.vector.io.wkb.WKB

import protos.simpleFeatureMessages._

import scala.collection.JavaConverters._

import com.google.protobuf.ByteString


trait SimpleFeatureProtoBuf {
implicit def featureProtoBufCodec = new ProtoBufCodec[Feature[Geometry, Map[String, AnyRef]], ProtoSimpleFeature] {
def encode(feature: Feature[Geometry, Map[String, AnyRef]]): ProtoSimpleFeature = {
val geom = feature.geom
val data = feature.data

val geomBytes: ByteString = ByteString.copyFrom(WKB.write(geom))
val dataMap: Map[String, String] = data.mapValues { case v => if (v == null) "" else v.toString }

ProtoSimpleFeature(geom = geomBytes, metadata = dataMap)
}

// TODO: Implement a decoding method
def decode(message: ProtoSimpleFeature): Feature[Geometry, Map[String, String]] =
???
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package geopyspark.geotools.shapefile

import geopyspark.geotools._
import geotrellis.spark.io.hadoop._

import org.geotools.data.simple
import org.geotools.data.shapefile._
import org.opengis.feature.simple._

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.hadoop.fs.Path

import java.io.File
import java.net.{URI, URL}

import scala.collection.mutable


object HadoopShapefileRDD {
def createSimpleFeaturesRDD(
sc: SparkContext,
uris: Array[URI],
extensions: Seq[String],
numPartitions: Int
): RDD[SimpleFeature] =
createSimpleFeaturesRDD(sc, HadoopUtils.listFiles(sc, uris, extensions), numPartitions)

def createSimpleFeaturesRDD(
sc: SparkContext,
paths: Array[String],
numPartitions: Int
): RDD[SimpleFeature] = {
val urls = sc.parallelize(paths, numPartitions).map { new URL(_) }

urls.flatMap { url =>
val ds = new ShapefileDataStore(url)
val ftItr = ds.getFeatureSource.getFeatures.features

try {
val simpleFeatures = mutable.ListBuffer[SimpleFeature]()
while(ftItr.hasNext) simpleFeatures += ftItr.next()
simpleFeatures.toList
} finally {
ftItr.close
ds.dispose
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package geopyspark.geotools.shapefile

import geopyspark.geotools._
import geotrellis.spark.io.s3._

import org.apache.spark._
import org.apache.spark.rdd._

import org.geotools.data.DataStoreFinder
import org.geotools.data.simple.SimpleFeatureStore
import org.geotools.feature._
import org.geotools.feature.simple._

import org.opengis.feature.simple._
import org.opengis.feature.`type`.Name

import com.amazonaws.auth._
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing}

import java.net.URI

import scala.collection.JavaConverters._
import scala.collection.mutable


class IteratorWrapper[I, T](iter: I)(hasNext: I => Boolean, next: I => T, close: I => Unit) extends Iterator[T] {
def hasNext = {
val has = hasNext(iter)
if (! has) close(iter)
has
}
def next = next(iter)
}


object S3ShapefileRDD {
def createSimpleFeaturesRDD(
sc: SparkContext,
uris: Array[URI],
extensions: Seq[String],
s3Client: S3Client,
numPartitions: Int
): RDD[SimpleFeature] =
createSimpleFeaturesRDD(sc, S3Utils.listKeys(uris, extensions, s3Client), numPartitions)

def createSimpleFeaturesRDD(
sc: SparkContext,
urlArray: Array[String],
numPartitions: Int
): RDD[SimpleFeature] = {
val urlRdd: RDD[String] = sc.parallelize(urlArray, numPartitions)
urlRdd.mapPartitions { urls =>
urls.flatMap { url =>
val datastoreParams = Map("url" -> url).asJava
val shpDS = DataStoreFinder.getDataStore(datastoreParams)
require(shpDS != null, "Could not build ShapefileDataStore")

shpDS.getNames.asScala.flatMap { name: Name =>
val features =
shpDS
.getFeatureSource(name)
.getFeatures
.features
new IteratorWrapper(features)(_.hasNext, _.next, _.close)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package geopyspark.geotools.shapefile

import geopyspark.geotools._
import geopyspark.util._

import protos.simpleFeatureMessages._

import geotrellis.vector._
import geotrellis.spark.io.s3._
import geotrellis.spark.io.s3.testkit._
import geotrellis.geotools._

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.api.java.JavaRDD

import org.opengis.feature.simple._

import java.net.URI

import scala.collection.JavaConverters._


object ShapefileRDD {
def get(
sc: SparkContext,
paths: java.util.ArrayList[String],
extensions: java.util.ArrayList[String],
numPartitions: Int,
s3Client: String
): JavaRDD[Array[Byte]] = {
val uris: Array[URI] = paths.asScala.map { path => new URI(path) }.toArray

val client =
s3Client match {
case null => S3Client.DEFAULT
case s: String =>
s match {
case "default" => S3Client.DEFAULT
case "mock" => new MockS3Client()
case _ => throw new Exception(s"Unkown S3Client specified, ${s}")
}
}

val simpleFeaturesRDD: RDD[SimpleFeature] =
uris.head.getScheme match {
case "s3" =>
S3ShapefileRDD.createSimpleFeaturesRDD(sc, uris, extensions.asScala, client, numPartitions)
case _ =>
HadoopShapefileRDD.createSimpleFeaturesRDD(sc, uris, extensions.asScala, numPartitions)
}

val featuresRDD: RDD[Feature[Geometry, Map[String, AnyRef]]] =
simpleFeaturesRDD.map { SimpleFeatureToFeature(_) }

PythonTranslator.toPython[Feature[Geometry, Map[String, AnyRef]], ProtoSimpleFeature](featuresRDD)
}
}

0 comments on commit 542e037

Please sign in to comment.