Skip to content

Commit

Permalink
support reading config file from hdfs (#106)
Browse files Browse the repository at this point in the history
* support reading config file from hdfs

* fix test
  • Loading branch information
Nicole00 committed Nov 9, 2022
1 parent b86efc5 commit 5879b25
Show file tree
Hide file tree
Showing 11 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package com.vesoft.exchange.common.config

import java.io.File
import java.io.{File, InputStreamReader}
import java.nio.file.Files

import com.google.common.net.HostAndPort
import com.typesafe.config.{Config, ConfigFactory}
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.KeyPolicy
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.log4j.Logger

import scala.collection.mutable
Expand Down Expand Up @@ -255,12 +257,21 @@ object Configs {
* @param configPath
* @return
*/
def parse(configPath: File): Configs = {
if (!Files.exists(configPath.toPath)) {
throw new IllegalArgumentException(s"${configPath} not exist")
def parse(configPath: String): Configs = {
var config: Config = null
if (configPath.startsWith("hdfs://")) {
val hadoopConfig: Configuration = new Configuration()
val fs: FileSystem = org.apache.hadoop.fs.FileSystem.get(hadoopConfig)
val file: FSDataInputStream = fs.open(new Path(configPath))
val reader = new InputStreamReader(file)
config = ConfigFactory.parseReader(reader)
} else {
if (!Files.exists(new File(configPath).toPath)) {
throw new IllegalArgumentException(s"${configPath} not exist")
}
config = ConfigFactory.parseFile(new File(configPath))
}

val config = ConfigFactory.parseFile(configPath)
val nebulaConfig = config.getConfig("nebula")
val addresses = nebulaConfig.getStringList("address.graph").asScala.toList
val metaAddresses = nebulaConfig.getStringList("address.meta").asScala.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ConfigsSuite {
assert(c.hive)
assert(c.directly)

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
val dataBaseConfigEntry = configs.databaseConfig
val userConfig = configs.userConfig
val connectionConfig = configs.connectionConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._

class EdgeProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var edgeConfig: EdgeConfigEntry = config.edgesConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class VerticesProcessorSuite {
private[this] lazy val LOG = Logger.getLogger(this.getClass)

val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var tagConfig: TagConfigEntry = config.tagsConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._

class EdgeProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var edgeConfig: EdgeConfigEntry = config.edgesConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._

class VerticesProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var tagConfig: TagConfigEntry = config.tagsConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._

class EdgeProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var edgeConfig: EdgeConfigEntry = config.edgesConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._

class VerticesProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var tagConfig: TagConfigEntry = config.tagsConfig.head
Expand Down

0 comments on commit 5879b25

Please sign in to comment.