From 4d365a24010594f83f7f0631556ab602f5ca7291 Mon Sep 17 00:00:00 2001 From: Adalbert Makarovych Date: Fri, 11 Feb 2022 12:17:10 +0200 Subject: [PATCH] Added `createRowstoreTable` option Summary: Before 7.5 default table type in S2 was rowstore. We tried to create a columnstore table and only if user specified some additional keys - we used the default table type (rowstore). Starting from 7.5 the default table type is columnstore. As a result, a user was not able to create a rowstore table by adding keys. Introduced a new option that will allow it **Design doc/spec**: **Docs impact**: none Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A145 Reviewers: carl, pmishchenko-ua, posadcha-ua Reviewed By: pmishchenko-ua Subscribers: amishra, engineering-list JIRA Issues: PLAT-5782 Differential Revision: https://grizzly.internal.memcompute.com/D54799 --- README.md | 3 +- .../com/singlestore/spark/JdbcHelpers.scala | 30 ++++++++++++++----- .../spark/SinglestoreOptions.scala | 16 ++++++---- .../singlestore/spark/ExternalHostTest.scala | 11 +++---- .../com/singlestore/spark/SanityTest.scala | 15 ++++++++++ 5 files changed, 56 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 01f79bb7..a5223fe4 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ global options have the prefix `spark.datasource.singlestore.`. | `onDuplicateKeySQL` | If this option is specified, and a row is to be inserted that would result in a duplicate value in a PRIMARY KEY or UNIQUE index, SingleStore will instead perform an UPDATE of the old row. See examples below | `insertBatchSize` | Size of the batch for row insertion (default: `10000`) | `maxErrors` | The maximum number of errors in a single `LOAD DATA` request. When this limit is reached, the load fails. If this property equals to `0`, no error limit exists (Default: `0`) +| `createRowstoreTable` | If enabled, the connector creates a rowstore table (default: `false`) ## Examples @@ -129,7 +130,7 @@ with `tableKey`. These options must be formatted in a specific way in order to correctly specify the keys. > :warning: The default table type is SingleStore Columnstore. If you want a RowStore table, -> you will need to specify a Primary Key using the tableKey option. +> you will need to enable the `createRowstoreTable` option. To explain we will refer to the following example: diff --git a/src/main/scala/com/singlestore/spark/JdbcHelpers.scala b/src/main/scala/com/singlestore/spark/JdbcHelpers.scala index 66ed0338..338feee2 100644 --- a/src/main/scala/com/singlestore/spark/JdbcHelpers.scala +++ b/src/main/scala/com/singlestore/spark/JdbcHelpers.scala @@ -4,7 +4,7 @@ import java.sql.{Connection, PreparedStatement, SQLException, Statement} import java.util.UUID.randomUUID import com.singlestore.spark.SinglestoreOptions.{TableKey, TableKeyType} -import com.singlestore.spark.SQLGen.{StringVar, VariableList} +import com.singlestore.spark.SQLGen.{SinglestoreVersion, StringVar, VariableList} import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -262,7 +262,9 @@ object JdbcHelpers extends LazyLogging { } } - def schemaToString(schema: StructType, tableKeys: List[TableKey]): String = { + def schemaToString(schema: StructType, + tableKeys: List[TableKey], + createRowstoreTable: Boolean): String = { // spark should never call any of our code if the schema is empty assert(schema.length > 0) @@ -285,7 +287,7 @@ object JdbcHelpers extends LazyLogging { // specify a sort key so we just pick the first column arbitrarily for now var finalTableKeys = tableKeys // if all the keys are shard keys it means there are no other keys so we can default - if (tableKeys.forall(_.keyType == TableKeyType.Shard)) { + if (!createRowstoreTable && tableKeys.forall(_.keyType == TableKeyType.Shard)) { finalTableKeys = TableKey(TableKeyType.Columnstore, columns = schema.head.name) :: tableKeys } @@ -340,8 +342,11 @@ object JdbcHelpers extends LazyLogging { def createTable(conn: Connection, table: TableIdentifier, schema: StructType, - tableKeys: List[TableKey]): Unit = { - val sql = s"CREATE TABLE ${table.quotedString} ${schemaToString(schema, tableKeys)}" + tableKeys: List[TableKey], + createRowstoreTable: Boolean, + version: SinglestoreVersion): Unit = { + val sql = + s"CREATE ${if (createRowstoreTable && version.atLeast("7.3.0")) "ROWSTORE" else ""} TABLE ${table.quotedString} ${schemaToString(schema, tableKeys, createRowstoreTable)}" log.trace(s"Executing SQL:\n$sql") conn.withStatement(stmt => stmt.executeUpdate(sql)) } @@ -480,6 +485,7 @@ object JdbcHelpers extends LazyLogging { table: TableIdentifier, mode: SaveMode, schema: StructType): Unit = { + val version = SinglestoreVersion(JdbcHelpers.getSinglestoreVersion(conf)) val jdbcOpts = JdbcHelpers.getDDLJDBCOptions(conf) val conn = JdbcUtils.createConnectionFactory(jdbcOpts)() try { @@ -491,7 +497,12 @@ object JdbcHelpers extends LazyLogging { JdbcHelpers.truncateTable(conn, table) case DropAndCreate => JdbcHelpers.dropTable(conn, table) - JdbcHelpers.createTable(conn, table, schema, conf.tableKeys) + JdbcHelpers.createTable(conn, + table, + schema, + conf.tableKeys, + conf.createRowstoreTable, + version) case Merge => // nothing to do } @@ -504,7 +515,12 @@ object JdbcHelpers extends LazyLogging { case SaveMode.Append => // continue } } else { - JdbcHelpers.createTable(conn, table, schema, conf.tableKeys) + JdbcHelpers.createTable(conn, + table, + schema, + conf.tableKeys, + conf.createRowstoreTable, + version) } } finally { conn.close() diff --git a/src/main/scala/com/singlestore/spark/SinglestoreOptions.scala b/src/main/scala/com/singlestore/spark/SinglestoreOptions.scala index bd39210e..fd335880 100644 --- a/src/main/scala/com/singlestore/spark/SinglestoreOptions.scala +++ b/src/main/scala/com/singlestore/spark/SinglestoreOptions.scala @@ -13,8 +13,14 @@ case class SinglestoreOptions( database: Option[String], jdbcExtraOptions: Map[String, String], enableAsserts: Boolean, + // read options disablePushdown: Boolean, enableParallelRead: ParallelReadEnablement, + parallelReadFeatures: List[ParallelReadType], + parallelReadTableCreationTimeoutMS: Long, + parallelReadMaterializedTableCreationTimeoutMS: Long, + parallelReadRepartition: Boolean, + parallelReadRepartitionColumns: Set[String], // write options overwriteBehavior: OverwriteBehavior, loadDataCompression: SinglestoreOptions.CompressionType.Value, @@ -23,11 +29,7 @@ case class SinglestoreOptions( onDuplicateKeySQL: Option[String], maxErrors: Int, insertBatchSize: Int, - parallelReadFeatures: List[ParallelReadType], - parallelReadTableCreationTimeoutMS: Long, - parallelReadMaterializedTableCreationTimeoutMS: Long, - parallelReadRepartition: Boolean, - parallelReadRepartitionColumns: Set[String] + createRowstoreTable: Boolean ) extends LazyLogging { def assert(condition: Boolean, message: String) = { @@ -109,6 +111,7 @@ object SinglestoreOptions extends LazyLogging { final val ON_DUPLICATE_KEY_SQL = newOption("onDuplicateKeySQL") final val INSERT_BATCH_SIZE = newOption("insertBatchSize") final val MAX_ERRORS = newOption("maxErrors") + final val CREATE_ROWSTORE_TABLE = newOption("createRowstoreTable") final val ENABLE_ASSERTS = newOption("enableAsserts") final val DISABLE_PUSHDOWN = newOption("disablePushdown") @@ -307,7 +310,8 @@ object SinglestoreOptions extends LazyLogging { parallelReadRepartitionColumns = splitEscapedColumns(options.get(PARALLEL_READ_REPARTITION_COLUMNS).getOrElse("")) .map(column => trimAndUnescapeColumn(column)) - .toSet + .toSet, + createRowstoreTable = options.getOrElse(CREATE_ROWSTORE_TABLE, "false").toBoolean ) } } diff --git a/src/test/scala/com/singlestore/spark/ExternalHostTest.scala b/src/test/scala/com/singlestore/spark/ExternalHostTest.scala index 3374b238..91a97cc7 100644 --- a/src/test/scala/com/singlestore/spark/ExternalHostTest.scala +++ b/src/test/scala/com/singlestore/spark/ExternalHostTest.scala @@ -162,6 +162,11 @@ class ExternalHostTest false, false, Automatic, + List(ReadFromLeaves), + 0, + 0, + true, + Set.empty, Truncate, SinglestoreOptions.CompressionType.GZip, SinglestoreOptions.LoadDataFormat.CSV, @@ -169,11 +174,7 @@ class ExternalHostTest None, 10, 10, - List(ReadFromLeaves), - 0, - 0, - true, - Set.empty + false ) val conn = JdbcUtils.createConnectionFactory(getDDLJDBCOptions(conf))() diff --git a/src/test/scala/com/singlestore/spark/SanityTest.scala b/src/test/scala/com/singlestore/spark/SanityTest.scala index 47ad3fbc..3d4fe39b 100644 --- a/src/test/scala/com/singlestore/spark/SanityTest.scala +++ b/src/test/scala/com/singlestore/spark/SanityTest.scala @@ -8,6 +8,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.{DataFrame, SaveMode} import org.scalatest.BeforeAndAfterEach +import com.singlestore.spark.SQLHelper._ class SanityTest extends IntegrationSuiteBase with BeforeAndAfterEach { var df: DataFrame = _ @@ -207,4 +208,18 @@ class SanityTest extends IntegrationSuiteBase with BeforeAndAfterEach { repartitionColumnsTest() spark.sqlContext.setConf("spark.datasource.singlestore.disablePushdown", "false") } + + it("creates rowstore table") { + df.write + .format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT) + .option("createRowstoreTable", "true") + .save("testdb.rowstore") + + val rows = spark.executeSinglestoreQueryDB( + "testdb", + "select storage_type from information_schema.tables where table_schema='testdb' and table_name='rowstore';") + assert(rows.size == 1, "Only one row should be selected") + rows.foreach(row => + assert(row.getString(0).equals("INMEMORY_ROWSTORE"), "Should create rowstore table")) + } }