Skip to content

Commit

Permalink
Test connector and document with on dupe key update for columnstore
Browse files Browse the repository at this point in the history
Summary:
Added 2 tests that cover different workflows that occur with columnstore and on duplicate key update.
Tested that this workflows don't work in 7.1

Updated README with "columnstore" on duplecae key update

**Design doc/spec**:
**Docs impact**: none
**Preliminary Reviewer(s)**:
**Final Reviewer**:

Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A137

Reviewers: amakarovych-ua

Reviewed By: amakarovych-ua

Subscribers: engineering-list

JIRA Issues: PLAT-5137

Differential Revision: https://grizzly.internal.memcompute.com/D54541
  • Loading branch information
PolliO committed Feb 4, 2022
1 parent 65c8fd2 commit c8f2b1b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ df.write

## Inserting rows into the table with ON DUPLICATE KEY UPDATE

When updating a rowstore table it is possible to insert rows with `ON DUPLICATE KEY UPDATE` option.
When updating a table it is possible to insert rows with `ON DUPLICATE KEY UPDATE` option.
See [sql reference](https://docs.singlestore.com/db/latest/en/reference/sql-reference/data-manipulation-language-dml/insert.html) for more details.

> :warning: This feature doesn't work for columnstore tables with SingleStore 7.1.
> :warning: Do not allow to update a table on unique key.
```scala
df.write
.option("onDuplicateKeySQL", "age = age + 1")
Expand Down
106 changes: 106 additions & 0 deletions src/test/scala/com/singlestore/spark/SQLOverwriteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.singlestore.spark.SinglestoreOptions.{OVERWRITE_BEHAVIOR, TRUNCATE}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StringType}

import scala.List
import scala.util.Try

class SQLOverwriteTest extends IntegrationSuiteBase {
Expand Down Expand Up @@ -282,4 +283,109 @@ class SQLOverwriteTest extends IntegrationSuiteBase {
}
}

describe("on duplicate key update") {

def createTable(isColumnstore: Boolean): Unit = {
val createTableQuery = if (isColumnstore) {
s"CREATE TABLE $dbName.$tableName( srt int, suk int, nuk int, sort key(srt), shard key(suk), key(nuk) using hash, unique key(suk) using hash);"
} else {
val rowstore = if (version.atLeast("7.5.0") && isColumnstore) {
"ROWSTORE"
} else {
""
}
s"CREATE $rowstore TABLE $dbName.$tableName( srt int, suk int, nuk int, sort key(srt), shard key(suk), key(nuk) using hash, key(suk) using hash);"

}
spark.executeSinglestoreQuery(query = createTableQuery)
spark.executeSinglestoreQuery(query = s"INSERT INTO $dbName.$tableName VALUES (1, 1, 1)")
}

def assertOnDuplicateKeyUpdateResult[U, T <: Product](rowData: List[U],
fields: List[T]): Unit = {
assertSmallDataFrameEquality(spark.read.format("singlestore").load(s"$dbName.$tableName"),
spark.createDF(rowData, fields))
}

def onDuplicateKeyUpdate(isColumnstore: Boolean): Unit = {
val fields =
List(("srt", IntegerType, true), ("suk", IntegerType, true), ("nuk", IntegerType, true))
val df = spark.read.format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT).load(s"$tableName")
df.write
.format("singlestore")
.option("onDuplicateKeySQL", "srt = 2")
.option("insertBatchSize", 300)
.mode(SaveMode.Append)
.save(s"$dbName.$tableName")

if (isColumnstore) {
assertOnDuplicateKeyUpdateResult(
List(
(2, 1, 1),
),
fields
)
} else {
assertOnDuplicateKeyUpdateResult(
List(
(1, 1, 1),
(1, 1, 1),
),
fields
)
}

df.write
.format("singlestore")
.option("onDuplicateKeySQL", "nuk = 2")
.option("insertBatchSize", 300)
.mode(SaveMode.Append)
.save(s"$dbName.$tableName")

if (isColumnstore) {
assertOnDuplicateKeyUpdateResult(
List(
(2, 1, 2),
),
fields
)
} else {
assertOnDuplicateKeyUpdateResult(
List(
(1, 1, 1),
(1, 1, 1),
(1, 1, 1),
(1, 1, 1),
),
fields
)
}

if (isColumnstore) {
val result = Try {
df.write
.format("singlestore")
.option("onDuplicateKeySQL", "suk = 2")
.option("insertBatchSize", 300)
.mode(SaveMode.Append)
.save(s"$dbName.$tableName")
}
assert(result.isFailure)
}
}

it("success update with unique key") {
if (version.atLeast("7.3.0")) {
createTable(true)
onDuplicateKeyUpdate(true)
}
}

it("success update without unique key") {
if (version.atLeast("7.3.0")) {
createTable(false)
onDuplicateKeyUpdate(false)
}
}
}
}

0 comments on commit c8f2b1b

Please sign in to comment.