Skip to content

Commit

Permalink
Added createRowstoreTable option
Browse files Browse the repository at this point in the history
Summary:
Before 7.5 default table type in S2 was rowstore.
We tried to create a columnstore table and only if user specified some additional keys - we used the default table type (rowstore).
Starting from 7.5 the default table type is columnstore. As a result, a user was not able to create a rowstore table by adding keys.
Introduced a new option that will allow it
**Design doc/spec**:
**Docs impact**: none

Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A145

Reviewers: carl, pmishchenko-ua, posadcha-ua

Reviewed By: pmishchenko-ua

Subscribers: amishra, engineering-list

JIRA Issues: PLAT-5782

Differential Revision: https://grizzly.internal.memcompute.com/D54799
  • Loading branch information
AdalbertMemSQL committed Feb 11, 2022
1 parent c8f2b1b commit 4d365a2
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 19 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ global options have the prefix `spark.datasource.singlestore.`.
| `onDuplicateKeySQL` | If this option is specified, and a row is to be inserted that would result in a duplicate value in a PRIMARY KEY or UNIQUE index, SingleStore will instead perform an UPDATE of the old row. See examples below
| `insertBatchSize` | Size of the batch for row insertion (default: `10000`)
| `maxErrors` | The maximum number of errors in a single `LOAD DATA` request. When this limit is reached, the load fails. If this property equals to `0`, no error limit exists (Default: `0`)
| `createRowstoreTable` | If enabled, the connector creates a rowstore table (default: `false`)

## Examples

Expand Down Expand Up @@ -129,7 +130,7 @@ with `tableKey`. These options must be formatted in a specific way in order to
correctly specify the keys.

> :warning: The default table type is SingleStore Columnstore. If you want a RowStore table,
> you will need to specify a Primary Key using the tableKey option.
> you will need to enable the `createRowstoreTable` option.
To explain we will refer to the following example:

Expand Down
30 changes: 23 additions & 7 deletions src/main/scala/com/singlestore/spark/JdbcHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.sql.{Connection, PreparedStatement, SQLException, Statement}
import java.util.UUID.randomUUID

import com.singlestore.spark.SinglestoreOptions.{TableKey, TableKeyType}
import com.singlestore.spark.SQLGen.{StringVar, VariableList}
import com.singlestore.spark.SQLGen.{SinglestoreVersion, StringVar, VariableList}
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
Expand Down Expand Up @@ -262,7 +262,9 @@ object JdbcHelpers extends LazyLogging {
}
}

def schemaToString(schema: StructType, tableKeys: List[TableKey]): String = {
def schemaToString(schema: StructType,
tableKeys: List[TableKey],
createRowstoreTable: Boolean): String = {
// spark should never call any of our code if the schema is empty
assert(schema.length > 0)

Expand All @@ -285,7 +287,7 @@ object JdbcHelpers extends LazyLogging {
// specify a sort key so we just pick the first column arbitrarily for now
var finalTableKeys = tableKeys
// if all the keys are shard keys it means there are no other keys so we can default
if (tableKeys.forall(_.keyType == TableKeyType.Shard)) {
if (!createRowstoreTable && tableKeys.forall(_.keyType == TableKeyType.Shard)) {
finalTableKeys = TableKey(TableKeyType.Columnstore, columns = schema.head.name) :: tableKeys
}

Expand Down Expand Up @@ -340,8 +342,11 @@ object JdbcHelpers extends LazyLogging {
def createTable(conn: Connection,
table: TableIdentifier,
schema: StructType,
tableKeys: List[TableKey]): Unit = {
val sql = s"CREATE TABLE ${table.quotedString} ${schemaToString(schema, tableKeys)}"
tableKeys: List[TableKey],
createRowstoreTable: Boolean,
version: SinglestoreVersion): Unit = {
val sql =
s"CREATE ${if (createRowstoreTable && version.atLeast("7.3.0")) "ROWSTORE" else ""} TABLE ${table.quotedString} ${schemaToString(schema, tableKeys, createRowstoreTable)}"
log.trace(s"Executing SQL:\n$sql")
conn.withStatement(stmt => stmt.executeUpdate(sql))
}
Expand Down Expand Up @@ -480,6 +485,7 @@ object JdbcHelpers extends LazyLogging {
table: TableIdentifier,
mode: SaveMode,
schema: StructType): Unit = {
val version = SinglestoreVersion(JdbcHelpers.getSinglestoreVersion(conf))
val jdbcOpts = JdbcHelpers.getDDLJDBCOptions(conf)
val conn = JdbcUtils.createConnectionFactory(jdbcOpts)()
try {
Expand All @@ -491,7 +497,12 @@ object JdbcHelpers extends LazyLogging {
JdbcHelpers.truncateTable(conn, table)
case DropAndCreate =>
JdbcHelpers.dropTable(conn, table)
JdbcHelpers.createTable(conn, table, schema, conf.tableKeys)
JdbcHelpers.createTable(conn,
table,
schema,
conf.tableKeys,
conf.createRowstoreTable,
version)
case Merge =>
// nothing to do
}
Expand All @@ -504,7 +515,12 @@ object JdbcHelpers extends LazyLogging {
case SaveMode.Append => // continue
}
} else {
JdbcHelpers.createTable(conn, table, schema, conf.tableKeys)
JdbcHelpers.createTable(conn,
table,
schema,
conf.tableKeys,
conf.createRowstoreTable,
version)
}
} finally {
conn.close()
Expand Down
16 changes: 10 additions & 6 deletions src/main/scala/com/singlestore/spark/SinglestoreOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ case class SinglestoreOptions(
database: Option[String],
jdbcExtraOptions: Map[String, String],
enableAsserts: Boolean,
// read options
disablePushdown: Boolean,
enableParallelRead: ParallelReadEnablement,
parallelReadFeatures: List[ParallelReadType],
parallelReadTableCreationTimeoutMS: Long,
parallelReadMaterializedTableCreationTimeoutMS: Long,
parallelReadRepartition: Boolean,
parallelReadRepartitionColumns: Set[String],
// write options
overwriteBehavior: OverwriteBehavior,
loadDataCompression: SinglestoreOptions.CompressionType.Value,
Expand All @@ -23,11 +29,7 @@ case class SinglestoreOptions(
onDuplicateKeySQL: Option[String],
maxErrors: Int,
insertBatchSize: Int,
parallelReadFeatures: List[ParallelReadType],
parallelReadTableCreationTimeoutMS: Long,
parallelReadMaterializedTableCreationTimeoutMS: Long,
parallelReadRepartition: Boolean,
parallelReadRepartitionColumns: Set[String]
createRowstoreTable: Boolean
) extends LazyLogging {

def assert(condition: Boolean, message: String) = {
Expand Down Expand Up @@ -109,6 +111,7 @@ object SinglestoreOptions extends LazyLogging {
final val ON_DUPLICATE_KEY_SQL = newOption("onDuplicateKeySQL")
final val INSERT_BATCH_SIZE = newOption("insertBatchSize")
final val MAX_ERRORS = newOption("maxErrors")
final val CREATE_ROWSTORE_TABLE = newOption("createRowstoreTable")

final val ENABLE_ASSERTS = newOption("enableAsserts")
final val DISABLE_PUSHDOWN = newOption("disablePushdown")
Expand Down Expand Up @@ -307,7 +310,8 @@ object SinglestoreOptions extends LazyLogging {
parallelReadRepartitionColumns =
splitEscapedColumns(options.get(PARALLEL_READ_REPARTITION_COLUMNS).getOrElse(""))
.map(column => trimAndUnescapeColumn(column))
.toSet
.toSet,
createRowstoreTable = options.getOrElse(CREATE_ROWSTORE_TABLE, "false").toBoolean
)
}
}
11 changes: 6 additions & 5 deletions src/test/scala/com/singlestore/spark/ExternalHostTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,19 @@ class ExternalHostTest
false,
false,
Automatic,
List(ReadFromLeaves),
0,
0,
true,
Set.empty,
Truncate,
SinglestoreOptions.CompressionType.GZip,
SinglestoreOptions.LoadDataFormat.CSV,
List.empty[SinglestoreOptions.TableKey],
None,
10,
10,
List(ReadFromLeaves),
0,
0,
true,
Set.empty
false
)

val conn = JdbcUtils.createConnectionFactory(getDDLJDBCOptions(conf))()
Expand Down
15 changes: 15 additions & 0 deletions src/test/scala/com/singlestore/spark/SanityTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.BeforeAndAfterEach
import com.singlestore.spark.SQLHelper._

class SanityTest extends IntegrationSuiteBase with BeforeAndAfterEach {
var df: DataFrame = _
Expand Down Expand Up @@ -207,4 +208,18 @@ class SanityTest extends IntegrationSuiteBase with BeforeAndAfterEach {
repartitionColumnsTest()
spark.sqlContext.setConf("spark.datasource.singlestore.disablePushdown", "false")
}

it("creates rowstore table") {
df.write
.format(DefaultSource.SINGLESTORE_SOURCE_NAME_SHORT)
.option("createRowstoreTable", "true")
.save("testdb.rowstore")

val rows = spark.executeSinglestoreQueryDB(
"testdb",
"select storage_type from information_schema.tables where table_schema='testdb' and table_name='rowstore';")
assert(rows.size == 1, "Only one row should be selected")
rows.foreach(row =>
assert(row.getString(0).equals("INMEMORY_ROWSTORE"), "Should create rowstore table"))
}
}

0 comments on commit 4d365a2

Please sign in to comment.