Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve scaling speed and some cleanup #18005

Merged
merged 10 commits into from Jul 7, 2023

Conversation

gaurav8297
Copy link
Member

@gaurav8297 gaurav8297 commented Jun 22, 2023

Description

Benchmarks

For unpartitioned case:

Before:

trino:insert_demo> insert into lineitem_test_unpart select * from lineitem_shipdate;
INSERT: 6733485888 rows

Query 20230620_050123_00012_e44xz, FINISHED, 7 nodes
Splits: 7,486 total, 7,486 done (100.00%)
7:24 [6.73B rows, 182GB] [15.2M rows/s, 421MB/s]

After:

trino:insert_demo> insert into lineitem_test_unpart select * from lineitem_shipdate;
INSERT: 6733485888 rows

Query 20230620_050758_00006_8c6sa, FINISHED, 7 nodes
Splits: 7,656 total, 7,656 done (100.00%)
4:31 [6.73B rows, 182GB] [24.8M rows/s, 689MB/s]

For partitioned case:

Before:

trino:insert_demo> insert into lineitem_part_target_shipmode select * from lineitem_part_3_skewed_partitions;
INSERT: 3085776220 rows

Query 20230620_052319_00017_e44xz, FINISHED, 7 nodes
Splits: 2,243 total, 2,243 done (100.00%)
3:43 [3.09B rows, 83.3GB] [13.8M rows/s, 382MB/s]

After:

trino:insert_demo> insert into lineitem_part_target_shipmode select * from lineitem_part_3_skewed_partitions;
INSERT: 3085776220 rows

Query 20230624_232648_00006_z65ur, FINISHED, 7 nodes
Splits: 2,242 total, 2,242 done (100.00%)
2:16 [3.09B rows, 83.3GB] [22.8M rows/s, 629MB/s]

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Jun 22, 2023
@gaurav8297 gaurav8297 force-pushed the improve_scaling_speed_new branch 4 times, most recently from b6afb98 to c4544f3 Compare June 23, 2023 18:05
@gaurav8297 gaurav8297 marked this pull request as ready for review June 23, 2023 18:11
@gaurav8297 gaurav8297 requested a review from sopel39 June 23, 2023 19:22
@gaurav8297 gaurav8297 force-pushed the improve_scaling_speed_new branch 2 times, most recently from 07dfa35 to ca576ed Compare June 30, 2023 03:25
@github-actions github-actions bot added tests:hive hive Hive connector labels Jun 30, 2023
@gaurav8297 gaurav8297 force-pushed the improve_scaling_speed_new branch 3 times, most recently from b8efb88 to d21b698 Compare July 3, 2023 01:10
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

core/trino-main/src/main/java/io/trino/FeaturesConfig.java Outdated Show resolved Hide resolved
core/trino-main/src/main/java/io/trino/FeaturesConfig.java Outdated Show resolved Hide resolved
core/trino-main/src/main/java/io/trino/FeaturesConfig.java Outdated Show resolved Hide resolved
@@ -246,6 +247,11 @@ public void setFinishedFuture(ListenableFuture<Void> finishedFuture)
checkState(this.finishedFuture.getAndSet(requireNonNull(finishedFuture, "finishedFuture is null")) == null, "finishedFuture already set");
}

public void recordWriterInputDataSize(long sizeInBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need extra methods for this? I think this value is just same as inputDataSize for TableWriter

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this. However, then we have to sum inputDataSize for only TableWriter operator type in DriverContext to get writerInputDataSize.

@gaurav8297 gaurav8297 force-pushed the improve_scaling_speed_new branch 2 times, most recently from 767f85e to 2a910c0 Compare July 4, 2023 06:11
@github-actions github-actions bot added the docs label Jul 4, 2023
@gaurav8297 gaurav8297 requested a review from sopel39 July 4, 2023 06:22
@@ -1471,11 +1471,13 @@ default Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
return Optional.empty();
}

@Deprecated
Copy link
Member

@sopel39 sopel39 Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment that it's now used only in test code and is not needed for any engine logic anymore cc @findepi

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused methods should be removed from here.

For test-only functionality, this should be handled purely on the test side

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to give it 1 iteration of release IMO. cc @kokosing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see a reason to keep unused method in ConnectorMetadata.
if someone wants to support two versions at a time, they can still compile their ConnectorMetadata impl (just without @Override annotation)

anyway, don't feel strongly about one version of not, just please do follow up. you should be able to create commit/PR without waiting for anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi I think we have one-version-backward compatibility policy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav8297 can you put up a PR that will remove these deprecated methods? to be merged after 422 goes out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is part of #18561

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created another pr #18617

@gaurav8297 gaurav8297 force-pushed the improve_scaling_speed_new branch 3 times, most recently from 12224f8 to 4bd4609 Compare July 6, 2023 00:16
Since writers are both CPU and IO bound, we
should increase them to the same value as the
number of worker threads.
Benchmarks:

Before:
Input Size: 6B rows
Time: 7:24 mins

After:
Input Size: 6B rows
Time: 4:31 mins
This is a preparatory commit to replace the
dependency of physicalWrittenBytes from writer
scaling. Instead, we will use the uncompress
data processed by the writer.
This is a hidden property which is required to test
end-to-end scaling partitioned writers in different
scenarios.
Since writers are both CPU and IO bound, we
should increase them to the same value as the
number of worker threads.
Benchmarks:
Before:
Input Size: 3B rows with 3 skewed partitions
Time: 3:43 mins

After:
Input Size: 3B rows with 3 skewed partitions
Time: 2:16 mins
Now we don't have a dependency on physicalWrittenBytes
for writer scaling in partitioned and unpartitioned case.
Thus, we don't have to depend on supportsReportingWrittenBytes
flag coming from connector. Besides, any connector can
utilize writer scaling now.
@sopel39 sopel39 merged commit 6ba5d61 into trinodb:master Jul 7, 2023
104 checks passed
@sopel39 sopel39 mentioned this pull request Jul 7, 2023
@github-actions github-actions bot added this to the 422 milestone Jul 7, 2023
martint added a commit to martint/trino that referenced this pull request Jul 10, 2023
The test attemps to ensure that queries will fail
if one of the nodes holding a partition of the data
goes offline. However, it's fundamentally broken,
as it doesn't do anything to ensure that the node
that's being shut down actually contains any data
for the table in question.

With the recent changes in trinodb#18005
that enable writer scaling for any connector, it
is very likely that the table will land in a subset
of the nodes and cause the test to fail if the "wrong"
node is shut down.
martint added a commit that referenced this pull request Jul 10, 2023
The test attemps to ensure that queries will fail
if one of the nodes holding a partition of the data
goes offline. However, it's fundamentally broken,
as it doesn't do anything to ensure that the node
that's being shut down actually contains any data
for the table in question.

With the recent changes in #18005
that enable writer scaling for any connector, it
is very likely that the table will land in a subset
of the nodes and cause the test to fail if the "wrong"
node is shut down.
@piotrolchawa-f3
Copy link

piotrolchawa-f3 commented Jul 19, 2023

Hey @gaurav8297! Nice one! I was wondering about the benchmarks that you've conducted.

  1. What is the Trino setup? What's the catalogue you use?
  2. How does the table look like?
  3. What size of servers do you use?
  4. What connector do you use?

I'm asking because these tests seem great for me even before the change. We're trying to insert with benthos, and Trino chokes on 1000 rows/sec (even with blackhole connector). Ofc our coordinator is kind of small (16vcpus, 32Gb ram), but I'd expect it to be able to ingest more.

@sopel39
Copy link
Member

sopel39 commented Jul 19, 2023

We're trying to insert with benthos, and Trino chokes on 1000 rows/sec (even with blackhole connector).

Hard to tell what is your bottleneck (might not be insert). Maybe if you shared query JSON it would be possible to tell more. What is your target connector?

@piotrolchawa-f3
Copy link

piotrolchawa-f3 commented Jul 19, 2023

Hey @sopel39 , thanks for the quick response!

Our Setup

  1. Benthos ingests events from a message queue and INSERTs them in micro batches of 1000 using Trino.
  2. Trino (419) is using blackhole connector (to eliminate storage performance issues. But the target connector is Iceberg, on which we see exactly same performance).
  3. Coodinator is running on 16vcpu, 32GB ram, there are two workers (we tried increasing number of workers, it changes nothing).
  4. The query for insertion is: INSERT INTO iceberg.test.my_table(id, date_0, tenant_id, comment_1, comment_2, is_t, code, is_s, date_1, round, date_2) VALUES (X, Y, Z....), (X1, Y1, Z1, ....) We have a single insert query with 1000 rows.
  5. Example row insertion: INSERT INTO iceberg.test.my_table (id, date_0, tenant_id, comment_1, comment_2, is_t, code, is_s, date_1, round, date_2) VALUES ('dm4X', from_iso8601_timestamp('2023-06-27'), CAST ('fa51f2df-6645-47fd-9209-198a045fbe9a' AS UUID), '11o', '15072', CAST (false AS BOOLEAN), 'EyGuVi9n', CAST (true AS BOOLEAN), from_iso8601_timestamp('2023-06-27'), TRY_CAST(NULL AS INT), from_iso8601_timestamp('2023-06-27'));
  6. Removing CASTs sped things a little, but not in any significant way.

Our Investigation

So in our investigation we've discovered that PLANNING phase takes a long time:

  "queryStats" : {
    "createTime" : "2023-07-19T09:56:34.280Z",
    "executionStartTime" : "2023-07-19T09:56:34.388Z",
    "lastHeartbeat" : "2023-07-19T09:56:38.810Z",
    "endTime" : "2023-07-19T09:56:38.809Z",
    "elapsedTime" : "4.53s",
    "queuedTime" : "183.44us",
    "resourceWaitingTime" : "107.82ms",
    "dispatchingTime" : "49.48us",
    "executionTime" : "4.42s",
    "analysisTime" : "107.77ms",
    "planningTime" : "3.77s",
    "planningCpuTime" : "3.72s",
    "finishingTime" : "356.31us",
    "totalTasks" : 4,
    "runningTasks" : 0,
    "completedTasks" : 4,
    "failedTasks" : 0,

And if we zoom in on the PLANNER:

    "optimizerRulesSummaries" : [ {
      "rule" : "io.trino.sql.planner.iterative.rule.ExpressionRewriteRuleSet.ValuesExpressionRewrite",
      "invocations" : 104,
      "applied" : 2,
      "totalTime" : 3692627565,
      "failures" : 0
    }

which is 3.69 seconds.

Sidenote

On the other hand these are the results of the query you've run for this PR:

trino:test> insert into table2 select * from table1;
INSERT: 22721776 rows

Query 20230719_093535_00048_afzcf, FINISHED, 3 nodes
Splits: 34 total, 34 done (100.00%)
18.04 [22.7M rows, 600MB] [1.26M rows/s, 33.3MB/s]

Which looks fine, so probably it's for different reasons. Do you want me to create a separate issue for this?

@sopel39
Copy link
Member

sopel39 commented Aug 9, 2023

Which looks fine, so probably it's for different reasons. Do you want me to create a separate issue for this?

Please create an issue for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

4 participants