diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index ce2ab1ae..0dc444f3 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -13,9 +13,8 @@ import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, LongType, StringType} - -import org.locationtech.jts.io.WKBWriter; - +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer /** * processor is a converter. * It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge, @@ -160,9 +159,7 @@ trait Processor extends Serializable { } case PropertyType.GEOGRAPHY => { val wkt = row.get(index).toString - org.locationtech.jts.geom.Geometry jtsGeom = new org.locationtech.jts.io - .WKTReader(2, ByteOrderValues.LITTLE_ENDIAN) - .read(wkt); + val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt); convertJTSGeometryToGeography(jtsGeom) } } @@ -183,37 +180,40 @@ trait Processor extends Serializable { def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = { jtsGeom.getGeometryType() match { - case Geometry.TYPENAME_POINT => { - val jtsCoord = jtsGeom.getCoordinate() + case "Point" => { + val jtsPoint = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Point] + val jtsCoord = jtsPoint.getCoordinate() Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y))) } - case Geometry.TYPENAME_LINESTRING => { - val jtsCoordList = jtsGeom.getCoordinates().asScala - val coordList = scala.collection.mutable.ListBuffer.empty[Coordinate] - for (var jtsCoord <- jtsCoordList) { - coordList += jtsCoord + case "LineString" => { + val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString] + val jtsCoordList = jtsLineString.getCoordinates() + var coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsCoordList) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } Geography.lsVal(new LineString(coordList.asJava)) } - case Geometry.TYPENAME_POLYGON => { - val coordListList = ListBuffer[ListBuffer[Coordinate]] - val jtsShell = jtsGeom.getExteriorRing() - val coordList = ListBuffer[Coordinate] - for (var jtsCoord <- jtsShell.getCoordinates()) { + case "Polygon" => { + val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon] + var coordListList = new java.util.ArrayList[java.util.List[Coordinate]]() + val jtsShell = jtsPolygon.getExteriorRing() + var coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsShell.getCoordinates()) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } - coordListList += coordList + coordListList.add(coordList.asJava) - val jtsHolesNum = jtsGeom.getNumInteriorRing() - for (var i <- 0 to jtsHolesNum) { - val coordList = ListBuffer[Coordinate] - val jtsHole = jtsGeom.getInteriorRingN(i) - for (var jtsCoord <- jtsHole.geteCoordinates()) { + val jtsHolesNum = jtsPolygon.getNumInteriorRing() + for (i <- 0 until jtsHolesNum) { + var coordList = new ListBuffer[Coordinate]() + val jtsHole = jtsPolygon.getInteriorRingN(i) + for (jtsCoord <- jtsHole.getCoordinates()) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } - coordListList += coordList + coordListList.add(coordList.asJava) } - Geography.pgVal(new Polygon(coordListList.asJava)) + Geography.pgVal(new Polygon(coordListList)) } } }