diff --git a/README.md b/README.md index f9fc9166..01f79bb7 100644 --- a/README.md +++ b/README.md @@ -165,9 +165,10 @@ df.write ## Inserting rows into the table with ON DUPLICATE KEY UPDATE -When updating a rowstore table it is possible to insert rows with `ON DUPLICATE KEY UPDATE` option. +When updating a table it is possible to insert rows with `ON DUPLICATE KEY UPDATE` option. See [sql reference](https://docs.singlestore.com/db/latest/en/reference/sql-reference/data-manipulation-language-dml/insert.html) for more details. - +> :warning: This feature doesn't work for columnstore tables with SingleStore 7.1. +> :warning: Do not allow to update a table on unique key. ```scala df.write .option("onDuplicateKeySQL", "age = age + 1") diff --git a/src/test/scala/com/singlestore/spark/SQLOverwriteTest.scala b/src/test/scala/com/singlestore/spark/SQLOverwriteTest.scala index ecbaaa80..23213065 100644 --- a/src/test/scala/com/singlestore/spark/SQLOverwriteTest.scala +++ b/src/test/scala/com/singlestore/spark/SQLOverwriteTest.scala @@ -6,6 +6,7 @@ import com.singlestore.spark.SinglestoreOptions.{OVERWRITE_BEHAVIOR, TRUNCATE} import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.types.{IntegerType, StringType} +import scala.List import scala.util.Try class SQLOverwriteTest extends IntegrationSuiteBase { @@ -282,4 +283,109 @@ class SQLOverwriteTest extends IntegrationSuiteBase { } } + describe("on duplicate key update") { + + def createTable(isColumnstore: Boolean): Unit = { + val createTableQuery = if (isColumnstore) { + s"CREATE TABLE $dbName.$tableName( srt int, suk int, nuk int, sort key(srt), shard key(suk), key(nuk) using hash, unique key(suk) using hash);" + } else { + val rowstore = if (version.atLeast("7.5.0") && isColumnstore) { + "ROWSTORE" + } else { + "" + } + s"CREATE $rowstore TABLE $dbName.$tableName( srt int, suk int, nuk int, sort key(srt), shard key(suk), key(nuk) using hash, key(suk) using hash);" + + } + spark.executeSinglestoreQuery(query = createTableQuery) + spark.executeSinglestoreQuery(query = s"INSERT INTO $dbName.$tableName VALUES (1, 1, 1)") + } + + def assertOnDuplicateKeyUpdateResult[U, T <: Product](rowData: List[U], + fields: List[T]): Unit = { + assertSmallDataFrameEquality(spark.read.format("singlestore").load(s"$dbName.$tableName"), + spark.createDF(rowData, fields)) + } + + def onDuplicateKeyUpdate(isColumnstore: Boolean): Unit = { + val fields = + List(("srt", IntegerType, true), ("suk", IntegerType, true), ("nuk", IntegerType, true)) + val df = spark.read.format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT).load(s"$tableName") + df.write + .format("singlestore") + .option("onDuplicateKeySQL", "srt = 2") + .option("insertBatchSize", 300) + .mode(SaveMode.Append) + .save(s"$dbName.$tableName") + + if (isColumnstore) { + assertOnDuplicateKeyUpdateResult( + List( + (2, 1, 1), + ), + fields + ) + } else { + assertOnDuplicateKeyUpdateResult( + List( + (1, 1, 1), + (1, 1, 1), + ), + fields + ) + } + + df.write + .format("singlestore") + .option("onDuplicateKeySQL", "nuk = 2") + .option("insertBatchSize", 300) + .mode(SaveMode.Append) + .save(s"$dbName.$tableName") + + if (isColumnstore) { + assertOnDuplicateKeyUpdateResult( + List( + (2, 1, 2), + ), + fields + ) + } else { + assertOnDuplicateKeyUpdateResult( + List( + (1, 1, 1), + (1, 1, 1), + (1, 1, 1), + (1, 1, 1), + ), + fields + ) + } + + if (isColumnstore) { + val result = Try { + df.write + .format("singlestore") + .option("onDuplicateKeySQL", "suk = 2") + .option("insertBatchSize", 300) + .mode(SaveMode.Append) + .save(s"$dbName.$tableName") + } + assert(result.isFailure) + } + } + + it("success update with unique key") { + if (version.atLeast("7.3.0")) { + createTable(true) + onDuplicateKeyUpdate(true) + } + } + + it("success update without unique key") { + if (version.atLeast("7.3.0")) { + createTable(false) + onDuplicateKeyUpdate(false) + } + } + } }