Skip to content

Commit

Permalink
Merge pull request #663 from locationtech-labs/feature/update-vectorpipe
Browse files Browse the repository at this point in the history
 VectorPipe Improvements
  • Loading branch information
echeipesh committed Jun 12, 2018
2 parents 84aec51 + fc26417 commit 1e7b11a
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 135 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Vim
*.swp

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ install:
./sbt "project geotrellis-backend" assembly &&
cp geotrellis/target/scala-2.11/geotrellis-backend-assembly-*.jar ../geopyspark/jars &&
popd
- if [ ! -f archives/spark-2.1.1-bin-hadoop2.7.tgz ]; then
- if [ ! -f archives/spark-2.3.1-bin-hadoop2.7.tgz ]; then
pushd archives;
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz;
wget http://apache.cs.utah.edu/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz;
popd;
fi
- tar -xvf archives/spark-2.1.1-bin-hadoop2.7.tgz
- tar -xvf archives/spark-2.3.1-bin-hadoop2.7.tgz
- pip3 install -r requirements.txt
- pip3 install pyproj
- pip3 install pylint
Expand All @@ -58,7 +58,7 @@ cache:
- $HOME/.cache/pip

script:
- export SPARK_HOME=./spark-2.1.1-bin-hadoop2.7/
- export SPARK_HOME=./spark-2.3.1-bin-hadoop2.7/
- export JAVA_HOME=/usr/lib/jvm/java-8-oracle
- ./test_script.sh
- pylint geopyspark
Expand Down
2 changes: 1 addition & 1 deletion geopyspark-backend/vectorpipe/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ libraryDependencies ++= Seq(
"org.locationtech.geotrellis" %% "geotrellis-s3-testkit" % Version.geotrellis,
"org.locationtech.geotrellis" %% "geotrellis-spark" % Version.geotrellis,
"org.typelevel" %% "cats" % "0.9.0",
"com.azavea" %% "vectorpipe" % "0.2.2"
"com.azavea" %% "vectorpipe" % "0.3.0"
)

assemblyMergeStrategy in assembly := {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,45 @@ import DefaultJsonProtocol._


class FeaturesCollection(
val points: RDD[Feature[Point, ElementMeta]],
val lines: RDD[Feature[Line, ElementMeta]],
val polygons: RDD[Feature[Polygon, ElementMeta]],
val multiPolygons: RDD[Feature[MultiPolygon, ElementMeta]]
val nodes: RDD[Feature[Geometry, ElementMeta]],
val ways: RDD[Feature[Geometry, ElementMeta]],
val relations: RDD[Feature[Geometry, ElementMeta]]
) {

def toProtoPoints: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](points.asInstanceOf[RDD[Feature[Geometry, ElementMeta]]])
def toProtoNodes: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](nodes)

def toProtoLines: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](lines.asInstanceOf[RDD[Feature[Geometry, ElementMeta]]])
def toProtoWays: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](ways)

def toProtoPolygons: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](polygons.asInstanceOf[RDD[Feature[Geometry, ElementMeta]]])
def toProtoRelations: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](relations)

def toProtoMultiPolygons: JavaRDD[Array[Byte]] =
PythonTranslator.toPython[Feature[Geometry, ElementMeta], ProtoFeature](multiPolygons.asInstanceOf[RDD[Feature[Geometry, ElementMeta]]])

def getPointTags: String =
if (points.isEmpty)
def getNodeTags: String =
if (nodes.isEmpty)
Map[String, String]().toJson.compactPrint
else
points
nodes
.map { _.data.tags }
.reduce { _ ++: _ }
.toJson
.compactPrint

def getLineTags: String =
if (lines.isEmpty)
def getWayTags: String =
if (ways.isEmpty)
Map[String, String]().toJson.compactPrint
else
lines
ways
.map { _.data.tags }
.reduce { _ ++: _ }
.toJson
.compactPrint

def getPolygonTags: String =
if (polygons.isEmpty)
def getRelationTags: String =
if (relations.isEmpty)
Map[String, String]().toJson.compactPrint
else
polygons
.map { _.data.tags }
.reduce { _ ++: _ }
.toJson
.compactPrint

def getMultiPolygonTags: String =
if (multiPolygons.isEmpty)
Map[String, String]().toJson.compactPrint
else
multiPolygons
relations
.map { _.data.tags }
.reduce { _ ++: _ }
.toJson
Expand All @@ -78,14 +64,10 @@ class FeaturesCollection(


object FeaturesCollection {
def apply(features: Features): FeaturesCollection =
new FeaturesCollection(features.points, features.lines, features.polygons, features.multiPolys)

def apply(
points: RDD[Feature[Point, ElementMeta]],
lines: RDD[Feature[Line, ElementMeta]],
polygons: RDD[Feature[Polygon, ElementMeta]],
multiPolygons: RDD[Feature[MultiPolygon, ElementMeta]]
nodes: RDD[Feature[Geometry, ElementMeta]],
ways: RDD[Feature[Geometry, ElementMeta]],
relations: RDD[Feature[Geometry, ElementMeta]]
): FeaturesCollection =
new FeaturesCollection(points, lines, polygons, multiPolygons)
new FeaturesCollection(nodes, ways, relations)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,64 @@ package geopyspark.vectorpipe.io
import geopyspark.util._
import geopyspark.vectorpipe._

import geotrellis.vector.Extent

import vectorpipe._
import vectorpipe.osm._
import vectorpipe.osm.{OSMReader => OSM}

import org.apache.spark._
import org.apache.spark.sql._

import scala.util.{Failure, Success => S}


object OSMReader {
def fromORC(
ss: SparkSession,
source: String
sourcePath: String,
targetExtent: java.util.Map[String, Double]
): FeaturesCollection = {
val (ns, ws, rs) = osm.fromORC(source)(ss).get
FeaturesCollection(osm.features(ns, ws, rs))
val reader =
targetExtent match {
case null => OSM(sourcePath)(ss)
case _ =>
OSM(
sourcePath,
Extent(targetExtent.get("xmin"), targetExtent.get("ymin"), targetExtent.get("xmax"), targetExtent.get("ymax"))
)(ss)
}

FeaturesCollection(
reader.nodeFeaturesRDD,
reader.wayFeaturesRDD,
reader.relationFeaturesRDD
)
}

def fromDataFrame(
elements: Dataset[Row]
elements: Dataset[Row],
cachePath: String,
targetExtent: java.util.Map[String, Double]
): FeaturesCollection = {
val (ns, ws, rs) = osm.fromDataFrame(elements)
FeaturesCollection(osm.features(ns, ws, rs))
val reader =
targetExtent match {
case null => new OSM(elements, None)(elements.sparkSession)
case _ =>
new OSM(
elements,
Some(
Extent(
targetExtent.get("xmin"),
targetExtent.get("ymin"),
targetExtent.get("xmax"),
targetExtent.get("ymax")
)
)
)(elements.sparkSession)
}

FeaturesCollection(
reader.nodeFeaturesRDD,
reader.wayFeaturesRDD,
reader.relationFeaturesRDD
)
}
}
24 changes: 8 additions & 16 deletions geopyspark/tests/vector_pipe/rasterizer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ def tearDown(self):
def test_rasterization(self):
features = osm_reader.from_orc(file_path("zerns.orc"))

lines = features.get_line_features_rdd()
polys = features.get_polygon_features_rdd()

mapped_lines = lines.map(lambda feature: Feature(feature.geometry, CellValue(1, 1)))
lines = features.get_way_features_rdd()

def assign_cellvalues(feature):
tags = feature.properties.tags.values()
Expand All @@ -32,23 +29,19 @@ def assign_cellvalues(feature):
elif "en:Zern's Farmer's Market" in tags:
return Feature(feature.geometry, CellValue(3, 3))
else:
return Feature(feature.geometry, CellValue(2, 2))
return Feature(feature.geometry, CellValue(1, 1))

mapped_polys = polys.map(lambda feature: assign_cellvalues(feature))
mapped_lines = lines.map(lambda feature: assign_cellvalues(feature))

unioned = BaseTestClass.pysc.union((mapped_lines, mapped_polys))
result = rasterize_features(unioned, 4326, 12, cell_type=CellType.INT8)
result = rasterize_features(mapped_lines, 4326, 12, cell_type=CellType.INT8)

self.assertEqual(result.get_min_max(), (1, 4))
self.assertEqual(result.count(), 1)

def test_rasterization_with_partitioner(self):
features = osm_reader.from_orc(file_path("zerns.orc"))

lines = features.get_line_features_rdd()
polys = features.get_polygon_features_rdd()

mapped_lines = lines.map(lambda feature: Feature(feature.geometry, CellValue(1, 1)))
lines = features.get_way_features_rdd()

def assign_cellvalues(feature):
tags = feature.properties.tags.values()
Expand All @@ -58,12 +51,11 @@ def assign_cellvalues(feature):
elif "en:Zern's Farmer's Market" in tags:
return Feature(feature.geometry, CellValue(3, 3))
else:
return Feature(feature.geometry, CellValue(2, 2))
return Feature(feature.geometry, CellValue(1, 1))

mapped_polys = polys.map(lambda feature: assign_cellvalues(feature))
mapped_lines = lines.map(lambda feature: assign_cellvalues(feature))

unioned = BaseTestClass.pysc.union((mapped_lines, mapped_polys))
result = rasterize_features(unioned,
result = rasterize_features(mapped_lines,
4326,
12,
cell_type=CellType.INT8,
Expand Down
21 changes: 8 additions & 13 deletions geopyspark/tests/vector_pipe/reading_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,27 @@ def tearDown(self):

def test_reading_rdds_from_orc(self):

self.assertTrue(self.features.get_point_features_rdd().isEmpty())
self.assertTrue(self.features.get_multipolygon_features_rdd().isEmpty())

self.assertEqual(self.features.get_line_features_rdd().count(), 5)
self.assertEqual(self.features.get_polygon_features_rdd().count(), 5)
self.assertTrue(self.features.get_node_features_rdd().count(), 1)
self.assertEqual(self.features.get_way_features_rdd().count(), 13)
self.assertEqual(self.features.get_relation_features_rdd().count(), 0)

def test_reading_tags_from_orc(self):
ex_point_tags = ['traffic_signals', 'backward']
ex_line_tags = ['Big Rd', 'PA 73', 'Layfield Rd', 'Jackson', '19525']
ex_polygon_tags = ["en:Zern's Farmer's Market",
'E Philadelphia Avenue',
'LakePond',
'Douglass Park',
'Gilbertsville']

self.assertEqual(self.features.get_point_tags(), {})
self.assertEqual(self.features.get_multipolygon_tags(), {})
point_tags = self.features.get_node_tags().values()
line_tags = self.features.get_way_tags().values()

line_tags = self.features.get_line_tags().values()
polygon_tags = self.features.get_polygon_tags().values()
for tag in ex_point_tags:
self.assertTrue(tag in point_tags)

for tag in ex_line_tags:
self.assertTrue(tag in line_tags)

for tag in ex_polygon_tags:
self.assertTrue(tag in polygon_tags)


if __name__ == "__main__":
unittest.main()

0 comments on commit 1e7b11a

Please sign in to comment.