From 0641da84f55d19780d171f466f342578d0fd6859 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 8 Sep 2022 22:55:22 +0800 Subject: [PATCH] support general jdbc datasource --- .../exchange/common/config/Configs.scala | 41 +++++++++++++++++ .../common/config/SourceConfigs.scala | 32 +++++++++++++ .../com/vesoft/nebula/exchange/Exchange.scala | 13 ++++-- .../exchange/reader/ServerBaseReader.scala | 45 +++++++++++++++++++ .../com/vesoft/nebula/exchange/Exchange.scala | 13 ++++-- .../exchange/reader/ServerBaseReader.scala | 45 +++++++++++++++++++ .../com/vesoft/nebula/exchange/Exchange.scala | 10 ++++- .../exchange/reader/ServerBaseReader.scala | 45 +++++++++++++++++++ 8 files changed, 237 insertions(+), 7 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index e04cfa97..7ebfff54 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -573,6 +573,7 @@ object Configs { case "CLICKHOUSE" => SourceCategory.CLICKHOUSE case "POSTGRESQL" => SourceCategory.POSTGRESQL case "ORACLE" => SourceCategory.ORACLE + case "JDBC" => SourceCategory.JDBC case _ => throw new IllegalArgumentException(s"${category} not support") } } @@ -682,6 +683,46 @@ object Configs { config.getString("table"), getOrElse(config, "sentence", null) ) + case SourceCategory.JDBC => + val partitionColumn = + if (config.hasPath("partitionColumn")) + Some(config.getString("partitionColumn")) + else None + + val lowerBound = + if (config.hasPath("lowerBound")) + Some(config.getLong("lowerBound")) + else None + + val upperBound = + if (config.hasPath("upperBound")) + Some(config.getLong("upperBound")) + else None + + val numPartitions = + if (config.hasPath("numPartitions")) + Some(config.getLong("numPartitions")) + else None + + val fetchSize = + if (config.hasPath("fetchSize")) + Some(config.getLong("fetchSize")) + else None + + JdbcConfigEntry( + SourceCategory.JDBC, + config.getString("url"), + config.getString("driver"), + config.getString("user"), + config.getString("password"), + config.getString("table"), + partitionColumn, + lowerBound, + upperBound, + numPartitions, + fetchSize, + getOrElse(config, "sentence", null) + ) case SourceCategory.KAFKA => val intervalSeconds = if (config.hasPath("interval.seconds")) config.getInt("interval.seconds") diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala index 32fa1e32..e541341d 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SourceConfigs.scala @@ -28,6 +28,7 @@ object SourceCategory extends Enumeration { val CLICKHOUSE = Value("CLICKHOUSE") val POSTGRESQL = Value("POSTGRESQL") val ORACLE = Value("ORACLE") + val JDBC = Value("JDBC") val SOCKET = Value("SOCKET") val KAFKA = Value("KAFKA") @@ -312,3 +313,34 @@ case class OracleConfigEntry(override val category: SourceCategory.Value, s"Oracle source {url:$url, driver:$driver, user:$user, passwd:$passwd, table:$table, sentence:$sentence}" } } + +/** + * JdbcConfigEntry + * + * @param url JDBC database url of the form `jdbc:subprotocol:subname`. + * @param table Name of the table in the external database. + * @param partitionColumn the name of a column of integral type that will be used for partitioning. + * @param lowerBound the minimum value of `columnName` used to decide partition stride. + * @param upperBound the maximum value of `columnName` used to decide partition stride. + * @param numPartitions the number of partitions. This, along with `lowerBound` (inclusive), + * `upperBound` (exclusive), form partition strides for generated WHERE + * clause expressions used to split the column `columnName` evenly. When + * the input is less than 1, the number is set to 1. + */ +case class JdbcConfigEntry(override val category: SourceCategory.Value, + url: String, + driver: String, + user: String, + passwd: String, + table: String, + partitionColumn: Option[String] = None, + lowerBound: Option[Long] = None, + upperBound: Option[Long] = None, + numPartitions: Option[Long] = None, + fetchSize: Option[Long] = None, + override val sentence: String) + extends ServerDataSourceConfigEntry { + override def toString: String = { + s"Jdbc source {url:$url, driver:$driver, user:$user, passwd:$passwd, table:$table, sentence:$sentence}" + } +} diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 2ea1af0d..5eb3bb53 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -6,8 +6,8 @@ package com.vesoft.nebula.exchange import org.apache.spark.sql.{DataFrame, SparkSession} - import java.io.File + import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} import com.vesoft.exchange.common.config.{ @@ -18,6 +18,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, + JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, @@ -35,14 +36,15 @@ import com.vesoft.nebula.exchange.reader.{ HiveReader, JSONReader, JanusGraphReader, + JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, - OracleReader, ORCReader, - PostgreSQLReader, + OracleReader, ParquetReader, + PostgreSQLReader, PulsarReader } import com.vesoft.exchange.common.processor.ReloadProcessor @@ -321,6 +323,11 @@ object Exchange { val reader = new OracleReader(session, oracleConfig) Some(reader.read()) } + case SourceCategory.JDBC => { + val jdbcConfig = config.asInstanceOf[JdbcConfigEntry] + val reader = new JdbcReader(session, jdbcConfig) + Some(reader.read()) + } case _ => { LOG.error(s"Data source ${config.category} not supported") None diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index ae59b8f9..eaba95c0 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -11,6 +11,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, + JdbcConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, @@ -317,3 +318,47 @@ class OracleReader(override val session: SparkSession, oracleConfig: OracleConfi df } } + +/** + * Jdbc reader + */ +class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry) + extends ServerBaseReader(session, jdbcConfig.sentence) { + Class.forName(jdbcConfig.driver) + override def read(): DataFrame = { + var dfReader = session.read + .format("jdbc") + .option("url", jdbcConfig.url) + .option("dbtable", jdbcConfig.table) + .option("user", jdbcConfig.user) + .option("password", jdbcConfig.passwd) + .option("driver", jdbcConfig.driver) + + if (jdbcConfig.partitionColumn.isDefined) { + dfReader.option("partitionColumn", jdbcConfig.partitionColumn.get) + } + if (jdbcConfig.numPartitions.isDefined) { + dfReader.option("numPartitions", jdbcConfig.numPartitions.get) + } + if (jdbcConfig.lowerBound.isDefined) { + dfReader.option("lowerBound", jdbcConfig.lowerBound.get) + } + if (jdbcConfig.upperBound.isDefined) { + dfReader.option("upperBound", jdbcConfig.upperBound.get) + } + if (jdbcConfig.fetchSize.isDefined) { + dfReader.option("fetchsize", jdbcConfig.fetchSize.get) + } + + var df = dfReader.load() + + if (jdbcConfig.sentence != null) { + val tableName = if (jdbcConfig.table.contains(".")) { + jdbcConfig.table.split("\\.")(1) + } else jdbcConfig.table + df.createOrReplaceTempView(tableName) + df = session.sql(sentence) + } + df + } +} diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 2ea1af0d..5eb3bb53 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -6,8 +6,8 @@ package com.vesoft.nebula.exchange import org.apache.spark.sql.{DataFrame, SparkSession} - import java.io.File + import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} import com.vesoft.exchange.common.config.{ @@ -18,6 +18,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, + JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, @@ -35,14 +36,15 @@ import com.vesoft.nebula.exchange.reader.{ HiveReader, JSONReader, JanusGraphReader, + JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, - OracleReader, ORCReader, - PostgreSQLReader, + OracleReader, ParquetReader, + PostgreSQLReader, PulsarReader } import com.vesoft.exchange.common.processor.ReloadProcessor @@ -321,6 +323,11 @@ object Exchange { val reader = new OracleReader(session, oracleConfig) Some(reader.read()) } + case SourceCategory.JDBC => { + val jdbcConfig = config.asInstanceOf[JdbcConfigEntry] + val reader = new JdbcReader(session, jdbcConfig) + Some(reader.read()) + } case _ => { LOG.error(s"Data source ${config.category} not supported") None diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index 1ddc9eb9..d26ddfe8 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -11,6 +11,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, + JdbcConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, @@ -375,3 +376,47 @@ class OracleReader(override val session: SparkSession, oracleConfig: OracleConfi df } } + +/** + * Jdbc reader + */ +class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry) + extends ServerBaseReader(session, jdbcConfig.sentence) { + Class.forName(jdbcConfig.driver) + override def read(): DataFrame = { + var dfReader = session.read + .format("jdbc") + .option("url", jdbcConfig.url) + .option("dbtable", jdbcConfig.table) + .option("user", jdbcConfig.user) + .option("password", jdbcConfig.passwd) + .option("driver", jdbcConfig.driver) + + if (jdbcConfig.partitionColumn.isDefined) { + dfReader.option("partitionColumn", jdbcConfig.partitionColumn.get) + } + if (jdbcConfig.numPartitions.isDefined) { + dfReader.option("numPartitions", jdbcConfig.numPartitions.get) + } + if (jdbcConfig.lowerBound.isDefined) { + dfReader.option("lowerBound", jdbcConfig.lowerBound.get) + } + if (jdbcConfig.upperBound.isDefined) { + dfReader.option("upperBound", jdbcConfig.upperBound.get) + } + if (jdbcConfig.fetchSize.isDefined) { + dfReader.option("fetchsize", jdbcConfig.fetchSize.get) + } + + var df = dfReader.load() + + if (jdbcConfig.sentence != null) { + val tableName = if (jdbcConfig.table.contains(".")) { + jdbcConfig.table.split("\\.")(1) + } else jdbcConfig.table + df.createOrReplaceTempView(tableName) + df = session.sql(sentence) + } + df + } +} diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index f09ab0d2..0efd4cab 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -7,6 +7,7 @@ package com.vesoft.nebula.exchange import org.apache.spark.sql.{DataFrame, SparkSession} import java.io.File + import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} import com.vesoft.exchange.common.config.{ @@ -17,6 +18,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, + JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, @@ -34,12 +36,13 @@ import com.vesoft.nebula.exchange.reader.{ HiveReader, JSONReader, JanusGraphReader, + JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, - OracleReader, ORCReader, + OracleReader, ParquetReader, PostgreSQLReader, PulsarReader @@ -320,6 +323,11 @@ object Exchange { val reader = new OracleReader(session, oracleConfig) Some(reader.read()) } + case SourceCategory.JDBC => { + val jdbcConfig = config.asInstanceOf[JdbcConfigEntry] + val reader = new JdbcReader(session, jdbcConfig) + Some(reader.read()) + } case _ => { LOG.error(s"Data source ${config.category} not supported") None diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index 7f5519c0..8799b276 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -10,6 +10,7 @@ import com.vesoft.exchange.common.config.{ HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, + JdbcConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, @@ -264,3 +265,47 @@ class OracleReader(override val session: SparkSession, oracleConfig: OracleConfi df } } + +/** + * Jdbc reader + */ +class JdbcReader(override val session: SparkSession, jdbcConfig: JdbcConfigEntry) + extends ServerBaseReader(session, jdbcConfig.sentence) { + Class.forName(jdbcConfig.driver) + override def read(): DataFrame = { + var dfReader = session.read + .format("jdbc") + .option("url", jdbcConfig.url) + .option("dbtable", jdbcConfig.table) + .option("user", jdbcConfig.user) + .option("password", jdbcConfig.passwd) + .option("driver", jdbcConfig.driver) + + if (jdbcConfig.partitionColumn.isDefined) { + dfReader.option("partitionColumn", jdbcConfig.partitionColumn.get) + } + if (jdbcConfig.numPartitions.isDefined) { + dfReader.option("numPartitions", jdbcConfig.numPartitions.get) + } + if (jdbcConfig.lowerBound.isDefined) { + dfReader.option("lowerBound", jdbcConfig.lowerBound.get) + } + if (jdbcConfig.upperBound.isDefined) { + dfReader.option("upperBound", jdbcConfig.upperBound.get) + } + if (jdbcConfig.fetchSize.isDefined) { + dfReader.option("fetchsize", jdbcConfig.fetchSize.get) + } + + var df = dfReader.load() + + if (jdbcConfig.sentence != null) { + val tableName = if (jdbcConfig.table.contains(".")) { + jdbcConfig.table.split("\\.")(1) + } else jdbcConfig.table + df.createOrReplaceTempView(tableName) + df = session.sql(sentence) + } + df + } +}