From e4f55701e53a56581587465eee33c6b6a96cb8aa Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 25 Jan 2022 17:17:19 +0800 Subject: [PATCH] add vertex without tag for sst --- .../exchange/processor/VerticesProcessor.scala | 17 +++++++++++------ .../exchange/processor/VerticesProcessor.scala | 17 +++++++++++------ .../exchange/processor/VerticesProcessor.scala | 17 +++++++++++------ 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index b14fe98e..2bcd7160 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -5,7 +5,7 @@ package com.vesoft.nebula.exchange.processor -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType} import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices} @@ -119,6 +119,7 @@ class VerticesProcessor(spark: SparkSession, val spaceVidLen = metaProvider.getSpaceVidLen(space) val tagItem = metaProvider.getTagItem(space, tagName) + val emptyValue = ByteBuffer.allocate(0).array() var sstKeyValueData = data .dropDuplicates(tagConfig.vertexField) @@ -126,7 +127,10 @@ class VerticesProcessor(spark: SparkSession, iter.map { row => encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap) } - }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) + }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY)) + .flatMap(line => { + List((line._1, emptyValue), (line._2, line._3)) + })(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) // repartition dataframe according to nebula part, to make sure sst files for one part has no overlap if (tagConfig.repartitionWithNebula) { @@ -222,7 +226,7 @@ class VerticesProcessor(spark: SparkSession, vidType: VidType.Value, spaceVidLen: Int, tagItem: TagItem, - fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = { + fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = { // check if vertex id is valid, if not, throw AssertException isVertexValid(row, tagConfig, false, vidType == VidType.STRING) @@ -256,14 +260,15 @@ class VerticesProcessor(spark: SparkSession, } else { vertexId.getBytes() } - val codec = new NebulaCodecImpl() - val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val codec = new NebulaCodecImpl() + val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava) - (vertexKey, vertexValue) + (orphanVertexKey, vertexKey, vertexValue) } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 448a67fc..61a134b1 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -5,7 +5,7 @@ package com.vesoft.nebula.exchange.processor -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType} import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices} @@ -121,6 +121,7 @@ class VerticesProcessor(spark: SparkSession, val spaceVidLen = metaProvider.getSpaceVidLen(space) val tagItem = metaProvider.getTagItem(space, tagName) + val emptyValue = ByteBuffer.allocate(0).array() var sstKeyValueData = data .dropDuplicates(tagConfig.vertexField) @@ -128,7 +129,10 @@ class VerticesProcessor(spark: SparkSession, iter.map { row => encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap) } - }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) + }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY)) + .flatMap(line => { + List((line._1, emptyValue), (line._2, line._3)) + })(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) // repartition dataframe according to nebula part, to make sure sst files for one part has no overlap if (tagConfig.repartitionWithNebula) { @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession, vidType: VidType.Value, spaceVidLen: Int, tagItem: TagItem, - fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = { + fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = { // check if vertex id is valid, if not, throw AssertException isVertexValid(row, tagConfig, false, vidType == VidType.STRING) @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession, } else { vertexId.getBytes() } - val codec = new NebulaCodecImpl() - val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val codec = new NebulaCodecImpl() + val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava) - (vertexKey, vertexValue) + (orphanVertexKey, vertexKey, vertexValue) } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 8e22fcf0..2d262561 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -5,7 +5,7 @@ package com.vesoft.nebula.exchange.processor -import java.nio.ByteOrder +import java.nio.{ByteBuffer, ByteOrder} import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType} import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices} @@ -121,6 +121,7 @@ class VerticesProcessor(spark: SparkSession, val spaceVidLen = metaProvider.getSpaceVidLen(space) val tagItem = metaProvider.getTagItem(space, tagName) + val emptyValue = ByteBuffer.allocate(0).array() var sstKeyValueData = data .dropDuplicates(tagConfig.vertexField) @@ -128,7 +129,10 @@ class VerticesProcessor(spark: SparkSession, iter.map { row => encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap) } - }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) + }(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY)) + .flatMap(line => { + List((line._1, emptyValue), (line._2, line._3)) + })(Encoders.tuple(Encoders.BINARY, Encoders.BINARY)) // repartition dataframe according to nebula part, to make sure sst files for one part has no overlap if (tagConfig.repartitionWithNebula) { @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession, vidType: VidType.Value, spaceVidLen: Int, tagItem: TagItem, - fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = { + fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = { // check if vertex id is valid, if not, throw AssertException isVertexValid(row, tagConfig, false, vidType == VidType.STRING) @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession, } else { vertexId.getBytes() } - val codec = new NebulaCodecImpl() - val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val codec = new NebulaCodecImpl() + val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) + val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes) val values = for { property <- fieldKeys if property.trim.length != 0 } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava) - (vertexKey, vertexValue) + (orphanVertexKey, vertexKey, vertexValue) } }