diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/ErrorHandler.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/ErrorHandler.scala index d27994aa..360409c5 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/ErrorHandler.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/ErrorHandler.scala @@ -46,8 +46,9 @@ object ErrorHandler { */ def save(buffer: ArrayBuffer[String], path: String): Unit = { LOG.info(s"create reload path $path") - val fileSystem = FileSystem.get(new Configuration()) + val targetPath = new Path(path) + val fileSystem = targetPath.getFileSystem(new Configuration()) val errors = if (fileSystem.exists(targetPath)) { // For kafka, the error ngql need to append to a same file instead of overwrite fileSystem.append(targetPath) @@ -72,7 +73,8 @@ object ErrorHandler { *@return true if path exists */ def existError(path: String): Boolean = { - val fileSystem = FileSystem.get(new Configuration()) + val errorPath = new Path(path) + val fileSystem = errorPath.getFileSystem(new Configuration()) fileSystem.exists(new Path(path)) } } diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 1dc77d37..4f7e481c 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -238,7 +238,7 @@ object Configs { private[this] val DEFAULT_EXECUTION_RETRY = 3 private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE private[this] val DEFAULT_EXECUTION_INTERVAL = 3000 - private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/" + private[this] val DEFAULT_ERROR_OUTPUT_PATH = "file:///tmp/nebula.writer.errors/" private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue private[this] val DEFAULT_RATE_LIMIT = 1024 private[this] val DEFAULT_RATE_TIMEOUT = 100 @@ -312,8 +312,14 @@ object Configs { val executionEntry = ExecutionConfigEntry(executionTimeout, executionRetry, executionInterval) LOG.info(s"Execution Config ${executionEntry}") - val errorConfig = getConfigOrNone(nebulaConfig, "error") - val errorPath = getOrElse(errorConfig, "output", DEFAULT_ERROR_OUTPUT_PATH) + val errorConfig = getConfigOrNone(nebulaConfig, "error") + var errorPath = getOrElse(errorConfig, "output", DEFAULT_ERROR_OUTPUT_PATH) + if (!errorPath.startsWith("hdfs://")) { + if (!errorPath.startsWith("file://")) { + errorPath = s"file://${errorPath}" + } + } + val errorMaxSize = getOrElse(errorConfig, "max", DEFAULT_ERROR_MAX_BATCH_SIZE) val errorEntry = ErrorConfigEntry(errorPath, errorMaxSize) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala index b954453c..0b40d40d 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala @@ -10,7 +10,7 @@ import com.vesoft.exchange.common.GraphProvider import com.vesoft.exchange.common.config.Configs import com.vesoft.exchange.common.writer.NebulaGraphClientWriter import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.LongAccumulator @@ -55,8 +55,10 @@ class ReloadProcessor(data: DataFrame, } } if (errorBuffer.nonEmpty) { - ErrorHandler.save(errorBuffer, - s"${config.errorConfig.errorPath}/reload.${TaskContext.getPartitionId()}") + ErrorHandler.save( + errorBuffer, + s"${config.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}/reload.${TaskContext + .getPartitionId()}") errorBuffer.clear() } LOG.info(s"data reload in partition ${TaskContext diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala index 5278496d..90fcf70d 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala @@ -73,7 +73,7 @@ class ConfigsSuite { assert(executionConfig.timeout == Integer.MAX_VALUE) assert(errorConfig.errorMaxSize == 32) - assert(errorConfig.errorPath.equals("/tmp/errors")) + assert(errorConfig.errorPath.equals("file:///tmp/errors")) assert(rateConfig.limit == 1024) assert(rateConfig.timeout == 1000) @@ -258,11 +258,14 @@ class ConfigsSuite { } } } - - @Test def configsWithVariableSuite(): Unit = { - val args = List("-c", "src/test/resources/application.conf", "-v", "-p", "path0=/app/test1.parquet,path1=/app/test2.csv,path2=/app/test2.json,path3=/app/test3.json") + val args = List( + "-c", + "src/test/resources/application.conf", + "-v", + "-p", + "path0=/app/test1.parquet,path1=/app/test2.csv,path2=/app/test2.json,path3=/app/test3.json") val options = Configs.parser(args.toArray, "test") val c: Argument = options match { case Some(config) => config @@ -272,9 +275,9 @@ class ConfigsSuite { } assert(c.variable) - val configs = Configs.parse(c.config, c.variable, c.param) - val tagsConfig = configs.tagsConfig - val edgesConfig = configs.edgesConfig + val configs = Configs.parse(c.config, c.variable, c.param) + val tagsConfig = configs.tagsConfig + val edgesConfig = configs.edgesConfig for (tagConfig <- tagsConfig) { val source = tagConfig.dataSourceConfigEntry @@ -319,6 +322,7 @@ class ConfigsSuite { } } + /** * correct com.vesoft.exchange.common.config */ diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index e3299d40..44bcf0b3 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -51,7 +51,7 @@ import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} import org.apache.log4j.Logger -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} final case class TooManyErrorsException(private val message: String) extends Exception(message) @@ -230,12 +230,13 @@ object Exchange { } // reimport for failed tags and edges - if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) { - spark.sparkContext.setJobGroup("Reload", s"Reload: ${configs.errorConfig.errorPath}") + val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" + if (failures > 0 && ErrorHandler.existError(errorPath)) { + spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") - val data = spark.read.text(configs.errorConfig.errorPath) + val data = spark.read.text(errorPath) val startTime = System.currentTimeMillis() val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 78d880f2..ce57508b 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -26,7 +26,7 @@ import com.vesoft.nebula.exchange.TooManyErrorsException import com.vesoft.nebula.meta.EdgeItem import org.apache.commons.codec.digest.MurmurHash2 import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession} import org.apache.spark.util.LongAccumulator @@ -81,9 +81,10 @@ class EdgeProcessor(spark: SparkSession, } } if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( errorBuffer, - s"${config.errorConfig.errorPath}/${edgeConfig.name}.${TaskContext.getPartitionId}") + s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index a67eb904..7af987fc 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -25,7 +25,7 @@ import com.vesoft.nebula.exchange.TooManyErrorsException import com.vesoft.nebula.meta.TagItem import org.apache.commons.codec.digest.MurmurHash2 import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession} import org.apache.spark.util.LongAccumulator @@ -88,9 +88,10 @@ class VerticesProcessor(spark: SparkSession, } } if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( errorBuffer, - s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}") + s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 60eedd46..073f8045 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -51,7 +51,7 @@ import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} import org.apache.log4j.Logger -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} final case class TooManyErrorsException(private val message: String) extends Exception(message) @@ -229,12 +229,13 @@ object Exchange { } // reimport for failed tags and edges - if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) { - spark.sparkContext.setJobGroup("Reload", s"Reload: ${configs.errorConfig.errorPath}") + val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" + if (failures > 0 && ErrorHandler.existError(errorPath)) { + spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") - val data = spark.read.text(configs.errorConfig.errorPath) + val data = spark.read.text(errorPath) val startTime = System.currentTimeMillis() val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 95c1a4e8..54372836 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -27,7 +27,7 @@ import com.vesoft.nebula.exchange.TooManyErrorsException import com.vesoft.nebula.meta.EdgeItem import org.apache.commons.codec.digest.MurmurHash2 import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession} import org.apache.spark.util.LongAccumulator @@ -82,9 +82,10 @@ class EdgeProcessor(spark: SparkSession, } } if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( errorBuffer, - s"${config.errorConfig.errorPath}/${edgeConfig.name}.${TaskContext.getPartitionId}") + s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index af489367..f537aba4 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -26,7 +26,7 @@ import com.vesoft.nebula.exchange.TooManyErrorsException import com.vesoft.nebula.meta.TagItem import org.apache.commons.codec.digest.MurmurHash2 import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Encoders, Row, SparkSession} import org.apache.spark.util.LongAccumulator @@ -90,9 +90,10 @@ class VerticesProcessor(spark: SparkSession, } } if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( errorBuffer, - s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}") + s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 185d3c4b..58143415 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -51,7 +51,7 @@ import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} import org.apache.log4j.Logger -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} final case class TooManyErrorsException(private val message: String) extends Exception(message) @@ -229,12 +229,13 @@ object Exchange { } // reimport for failed tags and edges - if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) { - spark.sparkContext.setJobGroup("Reload", s"Reload: ${configs.errorConfig.errorPath}") + val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" + if (failures > 0 && ErrorHandler.existError(errorPath)) { + spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") - val data = spark.read.text(configs.errorConfig.errorPath) + val data = spark.read.text(errorPath) val startTime = System.currentTimeMillis() val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index ada618f3..be0e3a2d 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -27,7 +27,7 @@ import com.vesoft.nebula.exchange.TooManyErrorsException import com.vesoft.nebula.meta.EdgeItem import org.apache.commons.codec.digest.MurmurHash2 import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession} import org.apache.spark.util.LongAccumulator @@ -82,9 +82,10 @@ class EdgeProcessor(spark: SparkSession, } } if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( errorBuffer, - s"${config.errorConfig.errorPath}/${edgeConfig.name}.${TaskContext.getPartitionId}") + s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index c2c3b8bc..35fa7d2c 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -26,7 +26,7 @@ import com.vesoft.nebula.exchange.TooManyErrorsException import com.vesoft.nebula.meta.TagItem import org.apache.commons.codec.digest.MurmurHash2 import org.apache.log4j.Logger -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession} import org.apache.spark.util.LongAccumulator @@ -90,9 +90,10 @@ class VerticesProcessor(spark: SparkSession, } } if (errorBuffer.nonEmpty) { + val appId = SparkEnv.get.blockManager.conf.getAppId ErrorHandler.save( errorBuffer, - s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}") + s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext