Skip to content

Commit

Permalink
support to exclude specific tags or edges when migrate space (#93)
Browse files Browse the repository at this point in the history
* support to exclude specific tags or edges when migrate space

* update variable name
  • Loading branch information
Nicole00 committed Apr 18, 2023
1 parent ffc5d4c commit 1e01411
Showing 1 changed file with 65 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import scala.collection.mutable.ListBuffer
* --conf spark.executor.extraClassPath=./ \
* --jars commons-cli-1.4.jar \
* --class com.vesoft.nebula.examples.connector.Nebula2Nebula example-3.0-SNAPSHOT-jar-with-dependencies.jar \
* -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2
* -sourceMeta "192.168.8.171:9559" -sourceSpace "source" -limit 2 -targetMeta "192.168.8.171:9559" -targetGraph "192.168.8.171:9669" -targetSpace "target" -batch 2 -timeout 50000 -u root -passwd nebula
*
*/
object Nebula2Nebula {
Expand All @@ -55,18 +55,38 @@ object Nebula2Nebula {

val sourceMetaOption =
new Option("sourceMeta", "sourceMetaAddress", true, "source nebulagraph metad address")
sourceMetaOption.setRequired(true)
val sourceSpaceOption =
new Option("sourceSpace", "sourceSpace", true, "source nebulagraph space name")
sourceSpaceOption.setRequired(true)
val limitOption =
new Option("limit", "limit", true, "records for one reading request for reading")
limitOption.setRequired(true)
val targetMetaOption =
new Option("targetMeta", "targetMetaAddress", true, "target nebulagraph metad address")
targetMetaOption.setRequired(true)
val targetGraphOption =
new Option("targetGraph", "targetGraphAddress", true, "target nebulagraph graphd address")
targetGraphOption.setRequired(true)
val targetSpaceOption =
new Option("targetSpace", "targetSpace", true, "target nebulagraph space name")
val batchOption = new Option("batch", "batch", true, "batch size for one insert request")
targetSpaceOption.setRequired(true)
val batchOption = new Option("batch", "batch", true, "batch size for one insert request")
batchOption.setRequired(true)
val writeParallelOption = new Option("p", "parallel", true, "parallel for writing data")
writeParallelOption.setRequired(true)
val timeoutOption = new Option("timeout", "timeout", true, "timeout for java client");
timeoutOption.setRequired(true)
val userOption = new Option("u", "user", true, "user")
userOption.setRequired(true)
val passwdOption = new Option("passwd", "password", true, "password")
passwdOption.setRequired(true)

// filter out some tags /edges
val excludeTagsOption =
new Option("excludeTags", "excludeTags", true, "filter out these tags, separate with `,`")
val excludeEdgesOption =
new Option("excludeEdges", "excludeEdges", true, "filter out these edges, separate with `,`")

val options = new Options
options.addOption(sourceMetaOption)
Expand All @@ -77,13 +97,19 @@ object Nebula2Nebula {
options.addOption(targetSpaceOption)
options.addOption(batchOption)
options.addOption(writeParallelOption)
options.addOption(timeoutOption)
options.addOption(userOption)
options.addOption(passwdOption)
options.addOption(excludeTagsOption)
options.addOption(excludeEdgesOption)

var cli: CommandLine = null
val cliParser: CommandLineParser = new DefaultParser
val helpFormatter = new HelpFormatter

try cli = cliParser.parse(options, args)
catch {
try {
cli = cliParser.parse(options, args)
} catch {
case e: ParseException =>
helpFormatter.printHelp(">>>> options", options)
e.printStackTrace()
Expand All @@ -98,27 +124,45 @@ object Nebula2Nebula {
val targetSpace: String = cli.getOptionValue("targetSpace")
val batch: Int = cli.getOptionValue("batch").toInt
val parallel: Int = cli.getOptionValue("p").toInt
val timeout: Int = cli.getOptionValue("timeout").toInt
val user: String = cli.getOptionValue("u")
val passed: String = cli.getOptionValue("passwd")
val excludeTags: List[String] =
if (cli.hasOption("excludeTags")) cli.getOptionValue("excludeTags").split(",").toList
else List()
val excludeEdges: List[String] =
if (cli.hasOption("excludeEdges")) cli.getOptionValue("excludeEdges").split(",").toList
else List()

// common config
val sourceConnectConfig =
NebulaConnectionConfig
.builder()
.withMetaAddress(sourceMetaAddr)
.withConenctionRetry(2)
.withTimeout(timeout)
.build()

val targetConnectConfig =
NebulaConnectionConfig
.builder()
.withMetaAddress(targetMetaAddr)
.withGraphAddress(targetGraphAddr)
.withTimeout(timeout)
.withConenctionRetry(2)
.build()

val metaHostAndPort = sourceMetaAddr.split(":")
val (tags, edges, partitions) =
var (tags, edges, partitions) =
getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace)

if (excludeTags.nonEmpty) {
tags = tags.dropWhile(ele => excludeTags.contains(ele))
}
if (excludeEdges.nonEmpty) {
edges = edges.dropWhile(ele => excludeEdges.contains(ele))
}

tags.foreach(tag => {
syncTag(spark,
sourceConnectConfig,
Expand All @@ -129,7 +173,9 @@ object Nebula2Nebula {
targetSpace,
batch,
tag,
parallel)
parallel,
user,
passed)
})

edges.foreach(edge => {
Expand All @@ -142,9 +188,10 @@ object Nebula2Nebula {
targetSpace,
batch,
edge,
parallel)
parallel,
user,
passed)
})

}

def getTagsAndEdges(metaHost: String,
Expand Down Expand Up @@ -176,7 +223,9 @@ object Nebula2Nebula {
targetSpace: String,
batch: Int,
tag: String,
writeParallel: Int): Unit = {
writeParallel: Int,
user: String,
passwd: String): Unit = {
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(sourceSpace)
Expand All @@ -194,6 +243,8 @@ object Nebula2Nebula {
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace(targetSpace)
.withUser(user)
.withPasswd(passwd)
.withTag(tag)
.withVidField("_vertexId")
.withBatch(batch)
Expand All @@ -211,7 +262,9 @@ object Nebula2Nebula {
targetSpace: String,
batch: Int,
edge: String,
writeParallel: Int): Unit = {
writeParallel: Int,
user: String,
passwd: String): Unit = {
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(sourceSpace)
Expand All @@ -229,6 +282,8 @@ object Nebula2Nebula {
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace(targetSpace)
.withUser(user)
.withPasswd(passwd)
.withEdge(edge)
.withSrcIdField("_srcId")
.withDstIdField("_dstId")
Expand Down

0 comments on commit 1e01411

Please sign in to comment.