Skip to content

Commit

Permalink
Updated external host feature
Browse files Browse the repository at this point in the history
  • Loading branch information
blinov-ivan committed May 13, 2021
1 parent d38aa23 commit e071428
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 42 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ global options have the prefix `spark.datasource.singlestore.`.
| `database` | If set, all connections will default to using this database (default: empty)
| `disablePushdown` | Disable SQL Pushdown when running queries (default: false)
| `enableParallelRead` | Enable reading data in parallel for some query shapes (default: false)
| `useExternalHost` | Enable using external host and port from `information_schema.mv_nodes` table for direct connection to leaves (default: false)
| `overwriteBehavior` | Specify the behavior during Overwrite; one of `dropAndCreate`, `truncate`, `merge` (default: `dropAndCreate`)
| `truncate` | :warning: **Deprecated option, please use `overwriteBehavior` instead** Truncate instead of drop an existing table during Overwrite (default: false)
| `loadDataCompression` | Compress data on load; one of (`GZip`, `LZ4`, `Skip`) (default: GZip)
Expand Down Expand Up @@ -338,10 +337,9 @@ In order to use parallel reads, the username and password provided to the
In addition, the hostnames and ports listed by `SHOW LEAVES` must be directly
connectible from Spark.

`useExternalHost` option allows you to use external hosts and ports from `information_schema.mv_nodes` table
for direct connection to leaves.

**:warning: `useExternalHost` feature works only with a SingleStore version `7.1.0` or above
**:warning: When parallel read enabled, connector will try to use external hosts and ports from `information_schema.mv_nodes` table
for direct connection to leaves, if they don't exist, connector will use internal ones.
This feature works only with a SingleStore version `7.1.0` or above.

## Running SQL queries
The methods `executeSinglestoreQuery(query: String, variables: Any*)` and `executeSinglestoreQueryDB(db: String, query: String, variables: Any*)`
Expand Down
3 changes: 0 additions & 3 deletions src/main/scala/com/singlestore/spark/SinglestoreOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ case class SinglestoreOptions(
enableAsserts: Boolean,
disablePushdown: Boolean,
enableParallelRead: Boolean,
useExternalHost: Boolean,
// write options
overwriteBehavior: OverwriteBehavior,
loadDataCompression: SinglestoreOptions.CompressionType.Value,
Expand Down Expand Up @@ -101,7 +100,6 @@ object SinglestoreOptions extends LazyLogging {
final val ENABLE_ASSERTS = newOption("enableAsserts")
final val DISABLE_PUSHDOWN = newOption("disablePushdown")
final val ENABLE_PARALLEL_READ = newOption("enableParallelRead")
final val USE_EXTERNAL_HOST = newOption("useExternalHost")

def getTable(options: CaseInsensitiveMap[String]): Option[TableIdentifier] =
options
Expand Down Expand Up @@ -182,7 +180,6 @@ object SinglestoreOptions extends LazyLogging {
enableAsserts = options.get(ENABLE_ASSERTS).getOrElse("false").toBoolean,
disablePushdown = options.get(DISABLE_PUSHDOWN).getOrElse("false").toBoolean,
enableParallelRead = options.get(ENABLE_PARALLEL_READ).getOrElse("false").toBoolean,
useExternalHost = options.get(USE_EXTERNAL_HOST).getOrElse("false").toBoolean,
overwriteBehavior = {
val truncateOption = options.get(TRUNCATE)
val overwriteBehaviorOption = options.get(OVERWRITE_BEHAVIOR)
Expand Down
19 changes: 10 additions & 9 deletions src/main/scala/com/singlestore/spark/SinglestoreQueryHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,27 @@ object SinglestoreQueryHelpers extends LazyLogging {
val minimalExternalHostVersion = "7.1.0"
val explainJSON = JdbcHelpers.explainJSONQuery(options, query, variables).parseJson
val partitions = JdbcHelpers.partitionHostPorts(options, options.database.head)
val partitionHostPorts = if (options.useExternalHost) {
val partitionHostPorts = {
val singlestoreVersion = SinglestoreVersion(JdbcHelpers.getSinglestoreVersion(options))
if (singlestoreVersion.atLeast(minimalExternalHostVersion)) {
val externalHostMap = JdbcHelpers.externalHostPorts(options)
partitions.map(p => {
var isValid = true
val externalPartitions = partitions.flatMap(p => {
val externalHost = externalHostMap.get(p.hostport)
if (externalHost.isDefined) {
SinglestorePartitionInfo(p.ordinal, p.name, externalHost.get)
Some(SinglestorePartitionInfo(p.ordinal, p.name, externalHost.get))
} else {
throw new IllegalArgumentException(
s"No external host/port provided for the host ${p.hostport}")
isValid = false
None
// throw new IllegalArgumentException(
// s"No external host/port provided for the host ${p.hostport}")
}
})
if (isValid) externalPartitions
else partitions
} else {
log.warn(
s"To use `External host` feature, your SingleStore version should be $minimalExternalHostVersion or above, your current version is $singlestoreVersion")
partitions
}
} else {
partitions
}
try {
partitionsFromExplainJSON(options, options.database.head, partitionHostPorts, explainJSON)
Expand Down
67 changes: 42 additions & 25 deletions src/test/scala/com/singlestore/spark/ExternalHostTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,48 @@ class ExternalHostTest
}
}

it("empty external host map") {

withObjectMocked[JdbcHelpers.type] {

setupMockJdbcHelper()
when(JdbcHelpers.getSinglestoreVersion(any[SinglestoreOptions])).thenReturn("7.1.0")
when(JdbcHelpers.externalHostPorts(any[SinglestoreOptions]))
.thenReturn(Map.empty[String, String])

val actualDf = spark.read
.format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT)
.option("useExternalHost", "true")
.load(s"$testDb.$testCollection")

assertSmallDataFrameEquality(df, actualDf)
}
}

it("wrong external host map") {

withObjectMocked[JdbcHelpers.type] {

setupMockJdbcHelper()
when(JdbcHelpers.getSinglestoreVersion(any[SinglestoreOptions])).thenReturn("7.1.0")

val externalHostMap = Map(
"172.17.0.3:3307" -> "172.17.0.100:3307",
"172.17.0.4:3307" -> "172.17.0.200:3307"
)

when(JdbcHelpers.externalHostPorts(any[SinglestoreOptions]))
.thenReturn(externalHostMap)

val actualDf = spark.read
.format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT)
.option("useExternalHost", "true")
.load(s"$testDb.$testCollection")

assertSmallDataFrameEquality(df, actualDf)
}
}

it("valid external host function") {

val mvNodesDf = spark.createDF(
Expand All @@ -119,7 +161,6 @@ class ExternalHostTest
false,
false,
true,
true,
Truncate,
SinglestoreOptions.CompressionType.GZip,
SinglestoreOptions.LoadDataFormat.CSV,
Expand Down Expand Up @@ -161,30 +202,6 @@ class ExternalHostTest
}

describe("failed tests") {
it("empty external host map") {

withObjectMocked[JdbcHelpers.type] {

setupMockJdbcHelper()
when(JdbcHelpers.getSinglestoreVersion(any[SinglestoreOptions])).thenReturn("7.1.0")
when(JdbcHelpers.externalHostPorts(any[SinglestoreOptions]))
.thenReturn(Map.empty[String, String])

try {
spark.read
.format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT)
.option("useExternalHost", "true")
.load(s"$testDb.$testCollection")
.collect()
fail("Exception expected")
} catch {
case ex: IllegalArgumentException =>
assert(
ex.getMessage equals "No external host/port provided for the host 172.17.0.2:3307")
case _ => fail("IllegalArgumentException expected")
}
}
}

it("wrong external host") {

Expand Down

0 comments on commit e071428

Please sign in to comment.