Skip to content

Commit

Permalink
Added connection pooling
Browse files Browse the repository at this point in the history
Summary:
Added DBCP lib and used it to implement connection pooling.
Design doc: https://docs.google.com/document/d/1WzbbJgJHK49zAFLpoCBmkkuWBEq1p8q8QaV4DPp40Jo/edit?usp=sharing
**Design doc/spec**:
**Docs impact**: none

Test Plan: https://webapp.io/memsql/commits?query=repo%3Asinglestore-spark-connector+id%3A128

Reviewers: carl, pmishchenko-ua, hniemchenko-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-4435, PLAT-4861

Differential Revision: https://grizzly.internal.memcompute.com/D54433
  • Loading branch information
AdalbertMemSQL committed Mar 10, 2022
1 parent 82cdd8d commit fd98ff4
Show file tree
Hide file tree
Showing 17 changed files with 450 additions and 122 deletions.
77 changes: 52 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,58 @@ The `singlestore-spark-connector` is configurable globally via Spark options and
locally when constructing a DataFrame. The options are named the same, however
global options have the prefix `spark.datasource.singlestore.`.

| Option | Description
| - | -
| `ddlEndpoint` (required) | Hostname or IP address of the SingleStore Master Aggregator in the format `host[:port]` (port is optional). Ex. `master-agg.foo.internal:3308` or `master-agg.foo.internal`
| `dmlEndpoints` | Hostname or IP address of SingleStore Aggregator nodes to run queries against in the format `host[:port],host[:port],...` (port is optional, multiple hosts separated by comma). Ex. `child-agg:3308,child-agg2` (default: `ddlEndpoint`)
| `user` | SingleStore username (default: `root`)
| `password` | SingleStore password (default: no password)
| `query` | The query to run (mutually exclusive with dbtable)
| `dbtable` | The table to query (mutually exclusive with query)
| `database` | If set, all connections will default to using this database (default: empty)
| `disablePushdown` | Disable SQL Pushdown when running queries (default: false)
| `enableParallelRead` | Enable reading data in parallel for some query shapes; one of (`disabled`, `automaticLite`, `automatic`, `forced`) (default: `automaticLite`)
| `parallelRead.Features` | Specify comma separated list of parallel read features that will be tried. The order in which features are listed determines their priority. Supported features: `ReadFromLeaves`, `ReadFromAggregators`, `ReadFromAggregatorsMaterialized`. Ex. `ReadFromLeaves,ReadFromAggregators` (default: `ReadFromAggregators`).
| `parallelRead.tableCreationTimeoutMS` | Number of milliseconds reader will wait for the result table creation when the `ReadFromAggregators` feature is used; 0 means no timeout (default: `0`)
| `parallelRead.tableCreationTimeoutMaterializedMS` | Number of milliseconds reader will wait for the result table creation when the `ReadFromAggregatorsMaterialized` feature is used; 0 means no timeout (default: `0`)
| `parallelRead.repartition` | Repartition data before reading it (default: `false`)
| `parallelRead.repartition.columns` | Comma separated list of column names that are used for repartitioning, if `parallelRead.repartition` is enabled. By default, repartitioning is done using an additional column with `RAND()` value.
| `overwriteBehavior` | Specify the behavior during Overwrite; one of `dropAndCreate`, `truncate`, `merge` (default: `dropAndCreate`)
| `truncate` | :warning: **Deprecated option, please use `overwriteBehavior` instead** Truncate instead of drop an existing table during Overwrite (default: false)
| `loadDataCompression` | Compress data on load; one of (`GZip`, `LZ4`, `Skip`) (default: GZip)
| `loadDataFormat` | Serialize data on load; one of (`Avro`, `CSV`) (default: CSV)
| `tableKey` | Specify additional keys to add to tables created by the connector (See below for more details)
| `onDuplicateKeySQL` | If this option is specified, and a row is to be inserted that would result in a duplicate value in a PRIMARY KEY or UNIQUE index, SingleStore will instead perform an UPDATE of the old row. See examples below
| `insertBatchSize` | Size of the batch for row insertion (default: `10000`)
| `maxErrors` | The maximum number of errors in a single `LOAD DATA` request. When this limit is reached, the load fails. If this property equals to `0`, no error limit exists (Default: `0`)
| `createRowstoreTable` | If enabled, the connector creates a rowstore table (default: `false`).
#### Basic options
| Option | Description
| - | -
| `ddlEndpoint` (required) | Hostname or IP address of the SingleStore Master Aggregator in the format `host[:port]` (port is optional). Ex. `master-agg.foo.internal:3308` or `master-agg.foo.internal`
| `dmlEndpoints` | Hostname or IP address of SingleStore Aggregator nodes to run queries against in the format `host[:port],host[:port],...` (port is optional, multiple hosts separated by comma). Ex. `child-agg:3308,child-agg2` (default: `ddlEndpoint`)
| `user` | SingleStore username (default: `root`)
| `password` | SingleStore password (default: no password)
| `query` | The query to run (mutually exclusive with dbtable)
| `dbtable` | The table to query (mutually exclusive with query)
| `database` | If set, all connections will default to using this database (default: empty)

#### Read options
| Option | Description
| - | -
| `disablePushdown` | Disable SQL Pushdown when running queries (default: false)
| `enableParallelRead` | Enable reading data in parallel for some query shapes; one of (`disabled`, `automaticLite`, `automatic`, `forced`) (default: `automaticLite`)
| `parallelRead.Features` | Specify comma separated list of parallel read features that will be tried. The order in which features are listed determines their priority. Supported features: `ReadFromLeaves`, `ReadFromAggregators`, `ReadFromAggregatorsMaterialized`. Ex. `ReadFromLeaves,ReadFromAggregators` (default: `ReadFromAggregators`).
| `parallelRead.tableCreationTimeoutMS` | Number of milliseconds reader will wait for the result table creation when the `ReadFromAggregators` feature is used; 0 means no timeout (default: `0`)
| `parallelRead.tableCreationTimeoutMaterializedMS` | Number of milliseconds reader will wait for the result table creation when the `ReadFromAggregatorsMaterialized` feature is used; 0 means no timeout (default: `0`)
| `parallelRead.repartition` | Repartition data before reading it (default: `false`)
| `parallelRead.repartition.columns` | Comma separated list of column names that are used for repartitioning, if `parallelRead.repartition` is enabled. By default, repartitioning is done using an additional column with `RAND()` value.

#### Write options
| Option | Description
| - | -
| `overwriteBehavior` | Specify the behavior during Overwrite; one of `dropAndCreate`, `truncate`, `merge` (default: `dropAndCreate`)
| `truncate` | :warning: **Deprecated option, please use `overwriteBehavior` instead** Truncate instead of drop an existing table during Overwrite (default: false)
| `loadDataCompression` | Compress data on load; one of (`GZip`, `LZ4`, `Skip`) (default: GZip)
| `loadDataFormat` | Serialize data on load; one of (`Avro`, `CSV`) (default: CSV)
| `tableKey` | Specify additional keys to add to tables created by the connector (See below for more details)
| `onDuplicateKeySQL` | If this option is specified, and a row is to be inserted that would result in a duplicate value in a PRIMARY KEY or UNIQUE index, SingleStore will instead perform an UPDATE of the old row. See examples below
| `insertBatchSize` | Size of the batch for row insertion (default: `10000`)
| `maxErrors` | The maximum number of errors in a single `LOAD DATA` request. When this limit is reached, the load fails. If this property equals to `0`, no error limit exists (default: `0`)
| `createRowstoreTable` | If enabled, the connector creates a rowstore table (default: `false`).

#### Connection pool options
| Option | Description
| - | -
| `driverConnectionPool.Enabled` | Enable using of connection pool on the driver. (default: `true`)
| `driverConnectionPool.MaxOpenConns` | The maximum number of active connections with the same options that can be allocated from the driver pool at the same time, or negative for no limit. (default: `-1`)
| `driverConnectionPool.MaxIdleConns` | The maximum number of connections with the same options that can remain idle in the driver pool, without extra ones being released, or negative for no limit. (default: `8`)
| `driverConnectionPool.MinEvictableIdleTimeMs` | The minimum amount of time an object may sit idle in the driver pool before it is eligible for eviction by the idle object evictor (if any). (default: `30000` - 30 sec)
| `driverConnectionPool.TimeBetweenEvictionRunsMS` | The number of milliseconds to sleep between runs of the idle object evictor thread on the driver. When non-positive, no idle object evictor thread will be run. (default: `1000` - 1 sec)
| `driverConnectionPool.MaxWaitMS` | The maximum number of milliseconds that the driver pool will wait (when there are no available connections) for a connection to be returned before throwing an exception, or `-1` to wait indefinitely. (default: `-1`)
| `driverConnectionPool.MaxConnLifetimeMS` | The maximum lifetime in milliseconds of a connection. After this time is exceeded the connection will fail the next activation, passivation, or validation test and won’t be returned by the driver pool. A value of zero or less means the connection has an infinite lifetime. (default: `-1`)
| `executorConnectionPool.Enabled` | Enable using of connection pool on executors. (default: `true`)
| `executorConnectionPool.MaxOpenConns` | The maximum number of active connections with the same options that can be allocated from the executor pool at the same time, or negative for no limit. (default: `true`)
| `executorConnectionPool.MaxIdleConns` | The maximum number of connections with the same options that can remain idle in the executor pool, without extra ones being released, or negative for no limit. (default: `8`)
| `executorConnectionPool.MinEvictableIdleTimeMs` | The minimum amount of time an object may sit idle in the executor pool before it is eligible for eviction by the idle object evictor (if any). (default: `2000` - 2 sec)
| `executorConnectionPool.TimeBetweenEvictionRunsMS` | The number of milliseconds to sleep between runs of the idle object evictor thread on the executor. When non-positive, no idle object evictor thread will be run. (default: `1000` - 1 sec)
| `executorConnectionPool.MaxWaitMS` | The maximum number of milliseconds that the executor pool will wait (when there are no available connections) for a connection to be returned before throwing an exception, or `-1` to wait indefinitely. (default: `-1`)
| `executorConnectionPool.MaxConnLifetimeMS` | The maximum lifetime in milliseconds of a connection. After this time is exceeded the connection will fail the next activation, passivation, or validation test and won’t be returned by the executor pool. A value of zero or less means the connection has an infinite lifetime. (default: `-1`)

## Examples

Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ lazy val root = project
"com.singlestore" % "singlestore-jdbc-client" % "1.0.1",
"io.spray" %% "spray-json" % "1.3.5",
"io.netty" % "netty-buffer" % "4.1.70.Final",
"org.apache.commons" % "commons-dbcp2" % "2.9.0",
// test dependencies
"org.mariadb.jdbc" % "mariadb-java-client" % "2.+" % Test,
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.singlestore.spark

import java.sql.Connection
import java.util.Properties

import com.singlestore.spark.JdbcHelpers.getDDLConnProperties
import com.singlestore.spark.SQLGen.VariableList
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{
Expand All @@ -10,7 +12,6 @@ import org.apache.spark.scheduler.{
SparkListenerStageCompleted,
SparkListenerStageSubmitted
}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.StructType

import scala.collection.mutable
Expand All @@ -28,7 +29,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
private case class SingleStoreRDDInfo(query: String,
variables: VariableList,
schema: StructType,
connectionOptions: JDBCOptions,
connectionProperties: Properties,
materialized: Boolean,
needsRepartition: Boolean,
repartitionColumns: Seq[String])
Expand All @@ -39,7 +40,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
rdd.query,
rdd.variables,
rdd.schema,
JdbcHelpers.getDDLJDBCOptions(rdd.options),
getDDLConnProperties(rdd.options, isOnExecutor = false),
rdd.parallelReadType.contains(ReadFromAggregatorsMaterialized),
rdd.options.parallelReadRepartition,
rdd.parallelReadRepartitionColumns,
Expand Down Expand Up @@ -75,7 +76,8 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
val tableName = JdbcHelpers.getResultTableName(applicationId, stageId, rddInfo.id)

// Create connection and save it in the map
val conn = JdbcUtils.createConnectionFactory(singleStoreRDDInfo.connectionOptions)()
val conn =
SinglestoreConnectionPool.getConnection(singleStoreRDDInfo.connectionProperties)
connectionsMap.synchronized(
connectionsMap += (tableName -> conn)
)
Expand Down

0 comments on commit fd98ff4

Please sign in to comment.