Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support sst ingest for geo #22

Merged
merged 4 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ target/
.idea/
.eclipse/
*.iml
.project
.bloop
.metals
.settings
.vscode
.classpath
.factorypath

spark-importer.ipr
spark-importer.iws
Expand Down
5 changes: 5 additions & 0 deletions nebula-exchange/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.5</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.16.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@

/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value}
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
/**
* processor is a converter.
* It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge,
Expand Down Expand Up @@ -155,7 +157,9 @@ trait Processor extends Serializable {
row.get(index).toString.toLong
}
case PropertyType.GEOGRAPHY => {
throw new IllegalArgumentException("sst import does not support GEOGRAPHY property yet.")
val wkt = row.get(index).toString
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
}
}
Expand All @@ -172,4 +176,44 @@ trait Processor extends Serializable {
case StringType => row.getString(index).toLong
}
}

def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = {
jtsGeom.getGeometryType match {
case "Point" => {
val jtsPoint = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Point]
val jtsCoord = jtsPoint.getCoordinate
Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y)))
}
case "LineString" => {
val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString]
val jtsCoordList = jtsLineString.getCoordinates
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsCoordList) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Geography.lsVal(new LineString(coordList.asJava))
}
case "Polygon" => {
val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon]
val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]()
val jtsShell = jtsPolygon.getExteriorRing
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsShell.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
coordListList.add(coordList.asJava)

val jtsHolesNum = jtsPolygon.getNumInteriorRing
for (i <- 0 until jtsHolesNum) {
val coordList = new ListBuffer[Coordinate]()
val jtsHole = jtsPolygon.getInteriorRingN(i)
for (jtsCoord <- jtsHole.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
coordListList.add(coordList.asJava)
}
Geography.pgVal(new Polygon(coordListList))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package scala.com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.exchange.processor.Processor
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value}
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{
Expand Down Expand Up @@ -139,6 +139,28 @@ class ProcessorSuite extends Processor {
val nullValue = new Value()
nullValue.setNVal(NullType.__NULL__)
assert(extraValueForSST(row, "col14", map).equals(nullValue))

// POINT(3 8)
val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8)))
val geogPointExpect = extraValueForSST(row, "col15", map)

assert(geogPointExpect.equals(geogPoint))
// LINESTRING(3 8, 4.7 73.23)
val line = new java.util.ArrayList[Coordinate]()
line.add(new Coordinate(3, 8))
line.add(new Coordinate(4.7, 73.23))
val geogLineString = Geography.lsVal(new LineString(line))
assert(extraValueForSST(row, "col16", map).equals(geogLineString))
// POLYGON((0 1, 1 2, 2 3, 0 1))
val shell: java.util.List[Coordinate] = new java.util.ArrayList[Coordinate]()
shell.add(new Coordinate(0, 1))
shell.add(new Coordinate(1, 2))
shell.add(new Coordinate(2, 3))
shell.add(new Coordinate(0, 1))
val rings = new java.util.ArrayList[java.util.List[Coordinate]]()
rings.add(shell)
val geogPolygon = Geography.pgVal(new Polygon(rings))
assert(extraValueForSST(row, "col17", map).equals(geogPolygon))
}

/**
Expand Down