Skip to content

Commit

Permalink
add vertex without tag for sst
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Jan 25, 2022
1 parent 1fc042b commit e4f5570
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -119,14 +119,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -222,7 +226,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -256,14 +260,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -121,14 +121,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange.processor

import java.nio.ByteOrder
import java.nio.{ByteBuffer, ByteOrder}

import com.vesoft.exchange.common.{ErrorHandler, GraphProvider, MetaProvider, VidType}
import com.vesoft.exchange.common.{KeyPolicy, Vertex, Vertices}
Expand Down Expand Up @@ -121,14 +121,18 @@ class VerticesProcessor(spark: SparkSession,

val spaceVidLen = metaProvider.getSpaceVidLen(space)
val tagItem = metaProvider.getTagItem(space, tagName)
val emptyValue = ByteBuffer.allocate(0).array()

var sstKeyValueData = data
.dropDuplicates(tagConfig.vertexField)
.mapPartitions { iter =>
iter.map { row =>
encodeVertex(row, partitionNum, vidType, spaceVidLen, tagItem, fieldTypeMap)
}
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))
}(Encoders.tuple(Encoders.BINARY, Encoders.BINARY, Encoders.BINARY))
.flatMap(line => {
List((line._1, emptyValue), (line._2, line._3))
})(Encoders.tuple(Encoders.BINARY, Encoders.BINARY))

// repartition dataframe according to nebula part, to make sure sst files for one part has no overlap
if (tagConfig.repartitionWithNebula) {
Expand Down Expand Up @@ -241,7 +245,7 @@ class VerticesProcessor(spark: SparkSession,
vidType: VidType.Value,
spaceVidLen: Int,
tagItem: TagItem,
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte]) = {
fieldTypeMap: Map[String, Int]): (Array[Byte], Array[Byte], Array[Byte]) = {
// check if vertex id is valid, if not, throw AssertException
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

Expand Down Expand Up @@ -275,14 +279,15 @@ class VerticesProcessor(spark: SparkSession,
} else {
vertexId.getBytes()
}
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val codec = new NebulaCodecImpl()
val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id)
val orphanVertexKey = codec.orphanVertexKey(spaceVidLen, partitionId, vidBytes)
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield
extraValueForSST(row, property, fieldTypeMap)
.asInstanceOf[AnyRef]
val vertexValue = codec.encodeTag(tagItem, nebulaKeys.asJava, values.asJava)
(vertexKey, vertexValue)
(orphanVertexKey, vertexKey, vertexValue)
}
}

0 comments on commit e4f5570

Please sign in to comment.