From f25d2687778c0f06f39e1bd9e22728656f4600d0 Mon Sep 17 00:00:00 2001 From: jievince <38901892+jievince@users.noreply.github.com> Date: Wed, 10 Nov 2021 10:44:08 +0800 Subject: [PATCH 1/4] support sst ingest for geo --- .gitignore | 7 +++ nebula-exchange/pom.xml | 5 ++ .../nebula/exchange/processor/Processor.scala | 48 ++++++++++++++++++- 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 3877c83a..1af985e2 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,13 @@ target/ .idea/ .eclipse/ *.iml +.project +.bloop +.metals +.settings +.vscode +.classpath +.factorypath spark-importer.ipr spark-importer.iws diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml index 57251449..8f1b6cb7 100644 --- a/nebula-exchange/pom.xml +++ b/nebula-exchange/pom.xml @@ -736,6 +736,11 @@ clickhouse-jdbc 0.2.5 + + org.locationtech.jts + jts-core + 1.16.1 + diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index 0cf028f3..cf15cbb2 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -1,3 +1,4 @@ + /* Copyright (c) 2020 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. @@ -5,13 +6,15 @@ package com.vesoft.nebula.exchange.processor -import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value} +import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon} import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, LongType, StringType} +import org.locationtech.jts.io.WKBWriter; + /** * processor is a converter. * It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge, @@ -155,7 +158,11 @@ trait Processor extends Serializable { row.get(index).toString.toLong } case PropertyType.GEOGRAPHY => { - throw new IllegalArgumentException("sst import does not support GEOGRAPHY property yet.") + val wkt = row.get(index).toString + org.locationtech.jts.geom.Geometry jtsGeom = new org.locationtech.jts.io + .WKTReader(2, ByteOrderValues.LITTLE_ENDIAN) + .read(wkt); + convertJTSGeometryToGeography(jtsGeom) } } } @@ -172,4 +179,41 @@ trait Processor extends Serializable { case StringType => row.getString(index).toLong } } + + def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = { + jtsGeom.getGeometryType() match { + case Geometry.TYPENAME_POINT => { + val jtsCoord = jtsGeom.getCoordinate() + Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y))) + } + case Geometry.TYPENAME_LINESTRING => { + val jtsCoordList = jtsGeom.getCoordinates().asScala + val coordList = scala.collection.mutable.ListBuffer.empty[Coordinate] + for (var jtsCoord <- jtsCoordList) { + coordList += jtsCoord + } + Geography.lsVal(new LineString(coordList.asJava)) + } + case Geometry.TYPENAME_POLYGON => { + val coordListList = ListBuffer[ListBuffer[Coordinate]] + val jtsShell = jtsGeom.getExteriorRing() + val coordList = ListBuffer[Coordinate] + for (var jtsCoord <- jtsShell.getCoordinates()) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) + } + coordListList += coordList + + val jtsHolesNum = jtsGeom.getNumInteriorRing() + for (var i <- 0 to jtsHolesNum) { + val coordList = ListBuffer[Coordinate] + val jtsHole = jtsGeom.getInteriorRingN(i) + for (var jtsCoord <- jtsHole.geteCoordinates()) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) + } + coordListList += coordList + } + Geography.pgVal(new Polygon(coordListList.asJava)) + } + } + } } From 48a861013290685ecd88450d98c361f66672eea6 Mon Sep 17 00:00:00 2001 From: jievince <38901892+jievince@users.noreply.github.com> Date: Wed, 10 Nov 2021 10:56:44 +0800 Subject: [PATCH 2/4] fix --- .../nebula/exchange/processor/Processor.scala | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index cf15cbb2..058ea0cd 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -12,9 +12,8 @@ import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, LongType, StringType} - -import org.locationtech.jts.io.WKBWriter; - +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer /** * processor is a converter. * It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge, @@ -159,9 +158,7 @@ trait Processor extends Serializable { } case PropertyType.GEOGRAPHY => { val wkt = row.get(index).toString - org.locationtech.jts.geom.Geometry jtsGeom = new org.locationtech.jts.io - .WKTReader(2, ByteOrderValues.LITTLE_ENDIAN) - .read(wkt); + val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt); convertJTSGeometryToGeography(jtsGeom) } } @@ -182,37 +179,40 @@ trait Processor extends Serializable { def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = { jtsGeom.getGeometryType() match { - case Geometry.TYPENAME_POINT => { - val jtsCoord = jtsGeom.getCoordinate() + case "Point" => { + val jtsPoint = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Point] + val jtsCoord = jtsPoint.getCoordinate() Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y))) } - case Geometry.TYPENAME_LINESTRING => { - val jtsCoordList = jtsGeom.getCoordinates().asScala - val coordList = scala.collection.mutable.ListBuffer.empty[Coordinate] - for (var jtsCoord <- jtsCoordList) { - coordList += jtsCoord + case "LineString" => { + val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString] + val jtsCoordList = jtsLineString.getCoordinates() + var coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsCoordList) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } Geography.lsVal(new LineString(coordList.asJava)) } - case Geometry.TYPENAME_POLYGON => { - val coordListList = ListBuffer[ListBuffer[Coordinate]] - val jtsShell = jtsGeom.getExteriorRing() - val coordList = ListBuffer[Coordinate] - for (var jtsCoord <- jtsShell.getCoordinates()) { + case "Polygon" => { + val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon] + var coordListList = new java.util.ArrayList[java.util.List[Coordinate]]() + val jtsShell = jtsPolygon.getExteriorRing() + var coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsShell.getCoordinates()) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } - coordListList += coordList + coordListList.add(coordList.asJava) - val jtsHolesNum = jtsGeom.getNumInteriorRing() - for (var i <- 0 to jtsHolesNum) { - val coordList = ListBuffer[Coordinate] - val jtsHole = jtsGeom.getInteriorRingN(i) - for (var jtsCoord <- jtsHole.geteCoordinates()) { + val jtsHolesNum = jtsPolygon.getNumInteriorRing() + for (i <- 0 until jtsHolesNum) { + var coordList = new ListBuffer[Coordinate]() + val jtsHole = jtsPolygon.getInteriorRingN(i) + for (jtsCoord <- jtsHole.getCoordinates()) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } - coordListList += coordList + coordListList.add(coordList.asJava) } - Geography.pgVal(new Polygon(coordListList.asJava)) + Geography.pgVal(new Polygon(coordListList)) } } } From ef914f8f579d812e3e832f3ccd917ab348672c72 Mon Sep 17 00:00:00 2001 From: jievince <38901892+jievince@users.noreply.github.com> Date: Wed, 10 Nov 2021 18:31:21 +0800 Subject: [PATCH 3/4] add sst test for geo --- .../exchange/processor/ProcessorSuite.scala | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala index 6c1d7526..1e139c60 100644 --- a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala +++ b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala @@ -6,7 +6,7 @@ package scala.com.vesoft.nebula.exchange.processor import com.vesoft.nebula.exchange.processor.Processor -import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value} +import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{ @@ -139,6 +139,26 @@ class ProcessorSuite extends Processor { val nullValue = new Value() nullValue.setNVal(NullType.__NULL__) assert(extraValueForSST(row, "col14", map).equals(nullValue)) + + // POINT(3 8) + val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8))) + assert(extraValueForClient(row, "col15", map).equals(geogPoint)) + // LINESTRING(3 8, 4.7 73.23) + var line = new java.util.ArrayList[Coordinate]() + line.add(new Coordinate(3, 8)) + line.add(new Coordinate(4.7, 73.23)) + val geogLineString = Geography.lsVal(new LineString(line)) + assert(extraValueForClient(row, "col16", map).equals(geogLineString)) + // POLYGON((0 1, 1 2, 2 3, 0 1)) + var shell: java.util.List[Coordinate] = new java.util.ArrayList[Coordinate]() + shell.add(new Coordinate(0, 1)) + shell.add(new Coordinate(1, 2)) + shell.add(new Coordinate(2, 3)) + shell.add(new Coordinate(0, 1)) + var rings = new java.util.ArrayList[java.util.List[Coordinate]]() + rings.add(shell) + val geogPolygon = Geography.pgVal(new Polygon(rings)) + assert(extraValueForClient(row, "col17", map).equals(geogPolygon)) } /** From 6f76388aed4db0bf2b729ed32ff1f594b3f5ccab Mon Sep 17 00:00:00 2001 From: jievince <38901892+jievince@users.noreply.github.com> Date: Wed, 10 Nov 2021 20:35:45 +0800 Subject: [PATCH 4/4] debug --- .../nebula/exchange/processor/Processor.scala | 24 +++++++++---------- .../exchange/processor/ProcessorSuite.scala | 14 ++++++----- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index 058ea0cd..f33faf31 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -158,7 +158,7 @@ trait Processor extends Serializable { } case PropertyType.GEOGRAPHY => { val wkt = row.get(index).toString - val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt); + val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt) convertJTSGeometryToGeography(jtsGeom) } } @@ -178,16 +178,16 @@ trait Processor extends Serializable { } def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = { - jtsGeom.getGeometryType() match { + jtsGeom.getGeometryType match { case "Point" => { val jtsPoint = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Point] - val jtsCoord = jtsPoint.getCoordinate() + val jtsCoord = jtsPoint.getCoordinate Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y))) } case "LineString" => { val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString] - val jtsCoordList = jtsLineString.getCoordinates() - var coordList = new ListBuffer[Coordinate]() + val jtsCoordList = jtsLineString.getCoordinates + val coordList = new ListBuffer[Coordinate]() for (jtsCoord <- jtsCoordList) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } @@ -195,19 +195,19 @@ trait Processor extends Serializable { } case "Polygon" => { val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon] - var coordListList = new java.util.ArrayList[java.util.List[Coordinate]]() - val jtsShell = jtsPolygon.getExteriorRing() - var coordList = new ListBuffer[Coordinate]() - for (jtsCoord <- jtsShell.getCoordinates()) { + val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]() + val jtsShell = jtsPolygon.getExteriorRing + val coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsShell.getCoordinates) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } coordListList.add(coordList.asJava) - val jtsHolesNum = jtsPolygon.getNumInteriorRing() + val jtsHolesNum = jtsPolygon.getNumInteriorRing for (i <- 0 until jtsHolesNum) { - var coordList = new ListBuffer[Coordinate]() + val coordList = new ListBuffer[Coordinate]() val jtsHole = jtsPolygon.getInteriorRingN(i) - for (jtsCoord <- jtsHole.getCoordinates()) { + for (jtsCoord <- jtsHole.getCoordinates) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } coordListList.add(coordList.asJava) diff --git a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala index 1e139c60..a4c0d539 100644 --- a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala +++ b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala @@ -142,23 +142,25 @@ class ProcessorSuite extends Processor { // POINT(3 8) val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8))) - assert(extraValueForClient(row, "col15", map).equals(geogPoint)) + val geogPointExpect = extraValueForSST(row, "col15", map) + + assert(geogPointExpect.equals(geogPoint)) // LINESTRING(3 8, 4.7 73.23) - var line = new java.util.ArrayList[Coordinate]() + val line = new java.util.ArrayList[Coordinate]() line.add(new Coordinate(3, 8)) line.add(new Coordinate(4.7, 73.23)) val geogLineString = Geography.lsVal(new LineString(line)) - assert(extraValueForClient(row, "col16", map).equals(geogLineString)) + assert(extraValueForSST(row, "col16", map).equals(geogLineString)) // POLYGON((0 1, 1 2, 2 3, 0 1)) - var shell: java.util.List[Coordinate] = new java.util.ArrayList[Coordinate]() + val shell: java.util.List[Coordinate] = new java.util.ArrayList[Coordinate]() shell.add(new Coordinate(0, 1)) shell.add(new Coordinate(1, 2)) shell.add(new Coordinate(2, 3)) shell.add(new Coordinate(0, 1)) - var rings = new java.util.ArrayList[java.util.List[Coordinate]]() + val rings = new java.util.ArrayList[java.util.List[Coordinate]]() rings.add(shell) val geogPolygon = Geography.pgVal(new Polygon(rings)) - assert(extraValueForClient(row, "col17", map).equals(geogPolygon)) + assert(extraValueForSST(row, "col17", map).equals(geogPolygon)) } /**