Skip to content

Commit

Permalink
Added connection attributes
Browse files Browse the repository at this point in the history
Summary: Added connector name, connector version, and spark version at the moment of the connection creation.

Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A414

Reviewers: pmishchenko-ua, vtkachuk-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6645

Differential Revision: https://grizzly.internal.memcompute.com/D63626
  • Loading branch information
AdalbertMemSQL committed Jul 10, 2023
1 parent 6c65288 commit bf45ce9
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 12 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Expand Up @@ -11,6 +11,7 @@ val scalaVersionPrefix = scalaVersionStr.substring(0, 4)
lazy val root = project
.withId("singlestore-spark-connector")
.in(file("."))
.enablePlugins(BuildInfoPlugin)
.settings(
name := "singlestore-spark-connector",
organization := "com.singlestore",
Expand Down Expand Up @@ -47,7 +48,9 @@ lazy val root = project
"com.github.mrpowers" %% "spark-daria" % "0.38.2" % Test
),
Test / testOptions += Tests.Argument("-oF"),
Test / fork := true
Test / fork := true,
buildInfoKeys := Seq[BuildInfoKey](version),
buildInfoPackage := "com.singlestore.spark"
)

credentials += Credentials(
Expand Down
5 changes: 3 additions & 2 deletions project/plugins.sbt
@@ -1,2 +1,3 @@
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.6")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.6")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
4 changes: 2 additions & 2 deletions src/main/scala/com/singlestore/spark/DefaultSource.scala
Expand Up @@ -44,7 +44,7 @@ class DefaultSource
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val params = CaseInsensitiveMap(includeGlobalParams(sqlContext, parameters))
val options = SinglestoreOptions(params)
val options = SinglestoreOptions(params, sqlContext.sparkSession.sparkContext)
if (options.disablePushdown) {
SQLPushdownRule.ensureRemoved(sqlContext.sparkSession)
SinglestoreReaderNoPushdown(SinglestoreOptions.getQuery(params), options, sqlContext)
Expand All @@ -63,7 +63,7 @@ class DefaultSource
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val opts = CaseInsensitiveMap(includeGlobalParams(sqlContext, parameters))
val conf = SinglestoreOptions(opts)
val conf = SinglestoreOptions(opts, sqlContext.sparkSession.sparkContext)

val table = SinglestoreOptions
.getTable(opts)
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/singlestore/spark/JdbcHelpers.scala
Expand Up @@ -19,6 +19,7 @@ import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types.{StringType, StructType}

import scala.util.{Failure, Success, Try}
import org.apache.spark.SparkContext

case class SinglestorePartitionInfo(ordinal: Int, name: String, hostport: String)

Expand Down Expand Up @@ -60,6 +61,7 @@ object JdbcHelpers extends LazyLogging {
properties.setProperty("driverClassName", "com.singlestore.jdbc.Driver")
properties.setProperty("username", conf.user)
properties.setProperty("password", conf.password)
properties.setProperty("connectionAttributes", s"_connector_name:SingleStore Spark Connector,_connector_version:${BuildInfo.version},_product_version:${conf.sparkVersion}")
properties.setProperty(
"connectionProperties",
(Map(
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/singlestore/spark/SQLHelper.scala
Expand Up @@ -27,7 +27,7 @@ object SQLHelper extends LazyLogging {
}
}

val conf = SinglestoreOptions(CaseInsensitiveMap(opts))
val conf = SinglestoreOptions(CaseInsensitiveMap(opts), spark.sparkContext)
val conn =
SinglestoreConnectionPool.getConnection(getDDLConnProperties(conf, isOnExecutor = false))
try {
Expand Down
Expand Up @@ -27,6 +27,8 @@ object SinglestoreConnectionPool {
properties, {
deleteEmptyDataSources()
val newDataSource = BasicDataSourceFactory.createDataSource(properties)
newDataSource.addConnectionProperty("connectionAttributes",
properties.getProperty("connectionAttributes"))
dataSources += (properties -> newDataSource)
newDataSource
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/com/singlestore/spark/SinglestoreOptions.scala
Expand Up @@ -4,6 +4,7 @@ import com.singlestore.spark.SinglestoreOptions.TableKey
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.SparkContext

case class SinglestoreOptions(
ddlEndpoint: String,
Expand Down Expand Up @@ -32,7 +33,8 @@ case class SinglestoreOptions(
insertBatchSize: Int,
createRowstoreTable: Boolean,
driverConnectionPoolOptions: SinglestoreConnectionPoolOptions,
executorConnectionPoolOptions: SinglestoreConnectionPoolOptions
executorConnectionPoolOptions: SinglestoreConnectionPoolOptions,
sparkVersion: String
) extends LazyLogging {

def assert(condition: Boolean, message: String) = {
Expand Down Expand Up @@ -231,7 +233,7 @@ object SinglestoreOptions extends LazyLogging {
}
}

def apply(options: CaseInsensitiveMap[String]): SinglestoreOptions = {
def apply(options: CaseInsensitiveMap[String], sc: SparkContext): SinglestoreOptions = {
val table = getTable(options)

require(
Expand Down Expand Up @@ -372,6 +374,7 @@ object SinglestoreOptions extends LazyLogging {
options.getOrElse(DRIVER_CONNECTION_POOL_MAX_WAIT_MS, "-1").toLong,
options.getOrElse(DRIVER_CONNECTION_POOL_MAX_CONN_LIFETIME_MS, "-1").toLong,
),
sparkVersion = sc.version
)
}
}
3 changes: 2 additions & 1 deletion src/test/scala/com/singlestore/spark/ExternalHostTest.scala
Expand Up @@ -181,7 +181,8 @@ class ExternalHostTest
10,
false,
SinglestoreConnectionPoolOptions(enabled = true, -1, 8, 30000, 1000, -1, -1),
SinglestoreConnectionPoolOptions(enabled = true, -1, 8, 2000, 1000, -1, -1)
SinglestoreConnectionPoolOptions(enabled = true, -1, 8, 2000, 1000, -1, -1),
"3.4.0"
)

val conn =
Expand Down
64 changes: 64 additions & 0 deletions src/test/scala/com/singlestore/spark/SanityTest.scala
Expand Up @@ -307,4 +307,68 @@ class SanityTest extends IntegrationSuiteBase with BeforeAndAfterEach {
.load()
assertSmallDataFrameEquality(jwtDF, df, orderedComparison = false)
}

it("connection attributes") {
assume(version.atLeast("8.1.0"))

val conn = SinglestoreConnectionPool.getConnection(
JdbcHelpers.getDDLConnProperties(
new SinglestoreOptions(
s"$masterHost:$masterPort",
List.empty[String],
"root",
masterPassword,
None,
Map.empty[String, String],
false,
false,
Automatic,
List(ReadFromLeaves),
0,
0,
0,
true,
Set.empty,
Truncate,
SinglestoreOptions.CompressionType.GZip,
SinglestoreOptions.LoadDataFormat.CSV,
List.empty[SinglestoreOptions.TableKey],
None,
10,
10,
false,
SinglestoreConnectionPoolOptions(enabled = true, -1, 8, 30000, 1000, -1, -1),
SinglestoreConnectionPoolOptions(enabled = true, -1, 8, 2000, 1000, -1, -1),
"3.4.0"
),
false)
)

val expectedAttributes = Map[String, String](
"_connector_name" -> "SingleStore Spark Connector",
"_connector_version" -> BuildInfo.version,
"_product_version" -> spark.sparkContext.version
)

var actualAttributes = Map[String, String]()
try {
val stmt = conn.createStatement()
try {
val rs = stmt.executeQuery("select * from information_schema.mv_connection_attributes")
try {
while (rs.next()) {
actualAttributes = actualAttributes + (rs.getString(3) -> rs.getString(4))
}
} finally {
rs.close()
}
} finally {
stmt.close()
}

assert(expectedAttributes.toSet.subsetOf(actualAttributes.toSet))
} finally {
conn.close()
}
}
}
Expand Up @@ -15,7 +15,8 @@ class SinglestoreConnectionPoolTest extends IntegrationSuiteBase {
properties = JdbcHelpers.getConnProperties(
SinglestoreOptions(
CaseInsensitiveMap(
Map("ddlEndpoint" -> s"$masterHost:$masterPort", "password" -> masterPassword))),
Map("ddlEndpoint" -> s"$masterHost:$masterPort", "password" -> masterPassword)),
spark.sparkContext),
isOnExecutor = false,
s"$masterHost:$masterPort"
)
Expand Down
Expand Up @@ -10,10 +10,12 @@ class SinglestoreOptionsTest extends IntegrationSuiteBase {
assert(
SinglestoreOptions(
CaseInsensitiveMap(
requiredOptions ++ Map("dmlEndpoints" -> "host1:3302,host2:3302,host1:3342"))) ==
requiredOptions ++ Map("dmlEndpoints" -> "host1:3302,host2:3302,host1:3342")),
spark.sparkContext) ==
SinglestoreOptions(
CaseInsensitiveMap(
requiredOptions ++ Map("dmlEndpoints" -> "host2:3302,host1:3302,host1:3342"))),
requiredOptions ++ Map("dmlEndpoints" -> "host2:3302,host1:3302,host1:3342")),
spark.sparkContext),
"Should sort dmlEndpoints"
)
}
Expand Down

0 comments on commit bf45ce9

Please sign in to comment.