Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel read in jdbc-based connectors #389

Open
dain opened this Issue Mar 5, 2019 · 14 comments

Comments

6 participants
@dain
Copy link
Member

dain commented Mar 5, 2019

Currently read jdbc-based tables are using single connection which could be slow. However other data engines are able to do a parallel table read. See the below.

In Sqoop - https://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html

7.2.4. Controlling Parallelism
When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id. Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

In Spark - http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.

batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Related: - https://www.percona.com/blog/2016/08/17/apache-spark-makes-slow-mysql-queries-10x-faster/

The idea is simple: Spark can read MySQL data via JDBC and can also execute SQL queries, so we can connect it directly to MySQL and run the queries. Why is this faster? For long running (i.e., reporting or BI) queries, it can be much faster as Spark is a massively parallel system. MySQL can only use one CPU core per query, whereas Spark can use all cores on all cluster nodes. In my examples below, MySQL queries are executed inside Spark and run 5-10 times faster (on top of the same MySQL data).

In addition, Spark can add “cluster” level parallelism. In the case of MySQL replication or Percona XtraDB Cluster, Spark can split the query into a set of smaller queries (in the case of a partitioned table it will run one query per each partition for example) and run those in parallel across multiple slave servers of multiple Percona XtraDB Cluster nodes. Finally, it will use map/reduce the type of processing to aggregate the results.

@kokosing:

I like the way that Spark is using. Following this approach we could have configuration (eg. mysql.properties) that would tell what column should be used for partitioning, number of partitions, low and max column partitioning column values. Then read from that table could be easily parallelized.

@electrum:

The Spark way looks good. You can see the code here it uses to build the queries: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Notes:

  1. No lower/upper bound for the first/last query (the bounds don't filter the table, as the docs say)
  2. One of the queries needs to include the null values

Ref: prestodb/presto#10832

@davcamer

This comment has been minimized.

Copy link

davcamer commented Mar 12, 2019

Following this approach we could have configuration (eg. mysql.properties) that would tell what column should be used for partitioning, number of partitions, low and max column partitioning column values.

Would this require a configuration for every table in the connected MySQL database?

@zeeshanabid94

This comment has been minimized.

Copy link

zeeshanabid94 commented Mar 12, 2019

Any ideas on how to proceed with this? We are working with clickhouse that has a JDBC driver. It is pretty slow for huge queries. It would be great if there was a design on how to move ahead with this feature.

@findepi

This comment has been minimized.

Copy link
Member

findepi commented Mar 12, 2019

@zeeshanabid94 does Clickhouse has concept of data partitioning?

@zeeshanabid94

This comment has been minimized.

Copy link

zeeshanabid94 commented Mar 12, 2019

@findepi Clickhouse doesn't have explicit concept of data partitioning. However, there is a concept of table engines that defines indices, partitions and storage of the underlying data.
https://clickhouse.yandex/docs/en/operations/table_engines/

But to answer your question, yes it does support data partitioning.

@zeeshanabid94

This comment has been minimized.

Copy link

zeeshanabid94 commented Mar 12, 2019

I was thinking of some way to provide a partition_column, min_value, max_value for each query and then fetch data in partitions based on number of partitions created. Each partition would then have the size (max_value - min_value) / number of partitions. I just don't understand how we can infer which column is the partition column for each table.

@findepi

This comment has been minimized.

Copy link
Member

findepi commented Mar 12, 2019

@zeeshanabid94 sorry, i asked too fast. I think it's better to delay this discussion until you implement non-parallel version of the connector.
This has two benefits:

  • your PRs will be easier to review -- a connector is a lot of code, so the simpler first version the better; adding parallel reads in JDBC-based connector shouldn't require any major redesign
  • once non-parallel version is landed, it will be easier for others to participate and have suggestions
@zeeshanabid94

This comment has been minimized.

Copy link

zeeshanabid94 commented Mar 12, 2019

@findepi I have already done that. Clickhouse is SQL compliant. The SELECT clause contains all the statements of Presto ANSI SQL. So I do have the plugin working and it is similar to mySQL and PostgreSQL, just the two basic Client and Plugin classes with a third ClientModule class injecting the relevant client. I had to replace the original QueryBuilder in base JDBC with a custom one. The custom one just does not add Schema to table names in queries. This works great with clickhouse and all select queries work pretty well.
But we just performed a query where presto had to fetch 559 M row from our database. The query took around 17 mins to fetch the data and under 10 secs to process all of it.

I have the plugin already ready.
https://github.com/zeeshanabid94/presto/tree/presto-clickhouse/presto-clickhouse

@electrum

This comment has been minimized.

Copy link
Member

electrum commented Mar 13, 2019

The main challenge of parallel JDBC is determining the split ranges and counts, as explained in the issue description. This is difficult for traditional databases like MySQL or PostgreSQL, since they don't (to my knowledge) have a generic way to return this information for table (e.g., the row count and distribution of the primary key).

Some ideas on how to do this:

  • static configuration for the connector
  • special table in the target database that holds the configuration
  • heuristic such as running a min/max/count and assuming an even distribution

We could also treat this similar to traditional statistics in databases, where Presto runs one or more queries in the target database to compute these statistics and records them in a special table. This computation could be triggered manually with a procedure call, or automatically based on some freshness policy. When executing a split, we could record a comparison of the expected vs actual row count, and use that to trigger a refresh.

@electrum

This comment has been minimized.

Copy link
Member

electrum commented Mar 13, 2019

At a JDBC connector infrastructure level, I think we have everything needed for parallelism. A connector should only need to override the JdbcClient.getSplits() method and create each JdbcSplit with a non-overlapping additionalPredicate.

@zeeshanabid94 If you can submit a PR for the connector, I'm happy to work with you on getting it merged and making it parallel. I haven't had a chance to try out ClickHouse, so this will be a fun project.

@electrum electrum added the roadmap label Mar 13, 2019

@zeeshanabid94

This comment has been minimized.

Copy link

zeeshanabid94 commented Mar 13, 2019

@electrum There is already a PR for the clickhouse connector. I am working with spaghettifunk to get it merged. The PR is at this link:
prestodb/presto#12456 (comment)

As far as parallelizing all JDBC connector goes, it doesn't seem to be very straightforward. All the approaches you've mentioned require implementation of separate classes and maintenance of internal data. It is not as straightforward as implementing the connector was. I will think more about this. Meanwhile, if anyone has any ideas about this feature, it would be nice to get to know those ideas and flesh out the design even more.
I do not have much experience with JDBC, but this does seem like a challenging fun project.

@electrum

This comment has been minimized.

Copy link
Member

electrum commented Mar 14, 2019

@zeeshanabid94 Can you submit the PR to this repository? You’ll need to make a few minor changes to the POM and change the Java package names.

Implementing parallel should only require overriding that method in your ClickhouseClient class. The hard part is figuring out the partition columns and table stats, which will require investigating what metadata is available in ClickHouse and how to access it from JDBC. I can help with that.

@electrum

This comment has been minimized.

Copy link
Member

electrum commented Mar 14, 2019

Sorry, I just realized you are not the author of the other PR. @spaghettifunk can you submit the ClickHouse connector PR to this repository? I’m happy to review it and work with you on getting it merged.

@spaghettifunk

This comment has been minimized.

Copy link

spaghettifunk commented Mar 14, 2019

hi @electrum what do you mean with

submit the ClickHouse connector PR to this repository

I already have a PR open (prestodb/presto#12456) about the JDBC connector of ClickHouse. Unfortunately I am on a work-trip and I can't work on this until the weekend. Besides that, I'd like to have some feedback on the PR I have opened already. In particular, I'd like to have some directions in testing the connector and what are the things that are missing. It's my first time implementing a JDBC connector and I used PostgreSQL as inspiration.

@findepi

This comment has been minimized.

Copy link
Member

findepi commented Mar 14, 2019

@zeeshanabid94 @zeeshanabid94 @electrum Let's move this conversation somewhere else, as it's really no longer a general "Parallel read in jdbc-based connectors" discussion now.
I suggest that we all connect on Slack (look for david and findepi there). I did a lot around JDBC connectors and will try to be helpful as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.