Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Dereference pushdown for the Delta Lake connector #17085

Merged
merged 4 commits into from May 8, 2023

Conversation

krvikash
Copy link
Contributor

@krvikash krvikash commented Apr 18, 2023

Description

This PR implements dereference pushdown for Delta Lake connector(similar to #8129).

This adds significant performance improvements for queries accessing nested fields inside struct/row columns. They have been optimized through the pushdown of dereference expressions. With this feature, the query execution prunes structural data eagerly, extracting the necessary fields.

For Example:

I have a table having a nested field col. When perform selecting col.a, we can see the difference in Input and Physical Input values in the query plan when running with and without dereference pushdown.

Table Schema as below:

trino:dereference_pushdown_test> SHOW COLUMNS FROM ntest;
  Column  |          Type           | Extra | Comment
----------+-------------------------+-------+---------
 orderkey | bigint                  |       |
 col      | row(a bigint, b bigint) |       |
(2 rows)

Query Plan without Dereference pushdown:

trino:dereference_pushdown_test> SET SESSION delta_lake.projection_pushdown_enabled=false;
SET SESSION
trino:dereference_pushdown_test> EXPLAIN ANALYZE SELECT col.a FROM ntest;
                                                                                           Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Trino version: testversion
 Queued: 453.79us, Analysis: 306.94ms, Planning: 91.70ms, Execution: 490.10ms
 Fragment 1 [SOURCE]
     CPU: 60.38ms, Scheduled: 216.02ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1500000 rows (32.90MB); per task: avg.: 1500000.00 std.dev.: 0.00, Output: 1500000 rows (12.87MB)
     Output layout: [expr]
     Output partitioning: SINGLE []
     ScanProject[table = delta_lake:dereference_pushdown_test.ntest]
         Layout: [expr:bigint]
         Estimates: {rows: 1500000 (12.87MB), cpu: 78.68M, memory: 0B, network: 0B}/{rows: 1500000 (12.87MB), cpu: 12.87M, memory: 0B, network: 0B}
         CPU: 60.00ms (100.00%), Scheduled: 216.00ms (100.00%), Blocked: 0.00ns (?%), Output: 1500000 rows (12.87MB)
         Input avg.: 1500000.00 rows, Input std.dev.: 0.00%
         expr := "col"[1]
         col := col:row(a bigint, b bigint):REGULAR
         Input: 1500000 rows (32.90MB), Filtered: 0.00%, Physical input: 425.08kB, Physical input time: 167700000.00ns

Query Plan with Dereference pushdown:

trino:dereference_pushdown_test> SET SESSION delta_lake.projection_pushdown_enabled=true;
SET SESSION
trino:dereference_pushdown_test> EXPLAIN ANALYZE SELECT col.a FROM ntest;
                                                                                           Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Trino version: testversion
 Queued: 431.92us, Analysis: 365.17ms, Planning: 105.14ms, Execution: 501.89ms
 Fragment 1 [SOURCE]
     CPU: 47.47ms, Scheduled: 208.86ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1500000 rows (12.87MB); per task: avg.: 1500000.00 std.dev.: 0.00, Output: 1500000 rows (12.87MB)
     Output layout: [col.a]
     Output partitioning: SINGLE []
     TableScan[table = delta_lake:dereference_pushdown_test.ntest]
         Layout: [col.a:bigint]
         Estimates: {rows: 1500000 (12.87MB), cpu: 12.87M, memory: 0B, network: 0B}
         CPU: 48.00ms (100.00%), Scheduled: 208.00ms (100.00%), Blocked: 0.00ns (?%), Output: 1500000 rows (12.87MB)
         Input avg.: 1500000.00 rows, Input std.dev.: 0.00%
         col.a := col.a:bigint:REGULAR
         Input: 1500000 rows (12.87MB), Physical input: 236.54kB, Physical input time: 175040000.00ns

More Details about dereference pushdown: https://trino.io/blog/2020/08/14/dereference-pushdown.html

Additional context and related issues

The feature is enabled by default.

The feature can be disabled by setting delta.projection-pushdown-enabled configuration property or delta.projection_pushdown_enabled session property to false.

Release notes

(X) Release notes are required, with the following suggested text:

# DeltaLake
* Add Dereference pushdown for the Delta Lake connector. ({issue}`17085`)

@cla-bot cla-bot bot added the cla-signed label Apr 18, 2023
@krvikash krvikash self-assigned this Apr 18, 2023
@github-actions github-actions bot added delta-lake Delta Lake connector hive Hive connector tests:hive labels Apr 18, 2023
@krvikash krvikash force-pushed the delta-dereference-pushdown branch 2 times, most recently from ef81ebc to b0b04f0 Compare April 18, 2023 18:32
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;

public class TestDeltaLakeProjectionPushdownPlans
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added the iceberg Iceberg connector label Apr 18, 2023
@findinpath
Copy link
Contributor

Please add a product test reading from Delta Lake tables (created either by Databricks/Delta OSS) with column mapping mode name & id to make sure that the dereferencing actually works for such tables as well.

@krvikash
Copy link
Contributor Author

Added some more tests.

@krvikash krvikash force-pushed the delta-dereference-pushdown branch 4 times, most recently from b1c173b to 3e3167b Compare April 19, 2023 15:35
@github-actions github-actions bot added the docs label Apr 19, 2023
@krvikash krvikash removed the iceberg Iceberg connector label Apr 19, 2023
@Test
public void testHighlyNestedData()
{
// TODO consider moving this in BaseConnectorTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could put these tests in a base class in io.trino.testing and then run it for the relevant connectors and file formats by adding derived classes (e.g. BaseOrcWithBloomFiltersTest)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering moving tests in follow-up PR.

#17085 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great finding Raunaq.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krvikash krvikash force-pushed the delta-dereference-pushdown branch 2 times, most recently from 41659e2 to 7e7acaa Compare April 19, 2023 21:18
@krvikash
Copy link
Contributor Author

krvikash commented May 5, 2023

Addressed most of the comments. Some of the comments are yet to be addressed.

@krvikash krvikash force-pushed the delta-dereference-pushdown branch from 4eee057 to dd60e5c Compare May 5, 2023 09:57
@krvikash
Copy link
Contributor Author

krvikash commented May 5, 2023

rebased with master and resolved conflicts

@krvikash krvikash force-pushed the delta-dereference-pushdown branch from dd60e5c to b6be925 Compare May 5, 2023 12:28
@krvikash
Copy link
Contributor Author

krvikash commented May 5, 2023

Addressed rest of the comments:

will raise follow-up PR for below comments.
#17085 (comment)
#17085 (comment)

@@ -102,7 +103,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}

Set<String> predicatedColumnNames = tableHandle.getNonPartitionConstraint().getDomains().orElseThrow().keySet().stream()
.map(DeltaLakeColumnHandle::getName)
.map(DeltaLakeColumnHandle::getBaseColumnName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use projected column name here if it dereference field?

I see this as part of #17164

return value.flatMap(o -> deserializeStatisticsValue(columnHandle, String.valueOf(o)));
}

private Optional<Object> deserializeStatisticsValue(DeltaLakeColumnHandle columnHandle, String statValue)
{
if (!columnHandle.isBaseColumn()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: FYI This seems to be done as well in deserializeColumnValue one line below.
No change needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deserializeColumnValue throw a verification exception to verify the column is base column.

verify(column.isBaseColumn(), "Unexpected dereference: %s", column);

@@ -55,20 +55,20 @@ public void testColumnMappingModeNone()

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (a_number INT)" +
" (a_number INT, nested STRUCT<field1: STRING, field2: STRING>)" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have tests for tables that underwent insert/update/delete/merge with onDelta + dereference pushdown (for all column mapping modes)

@findepi what is the rationale for this request in the context of this read oriented feature ? Is this more about being future-proof?

@findinpath
Copy link
Contributor

@findepi can you pls run this PR with secrets?

@findepi
Copy link
Member

findepi commented May 8, 2023

/test-with-secrets sha=b6be925a0b09044923a17b37133fd58809e00540

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please squash (after the test with secrets run)

return getColumns(table.getMetadataEntry()).stream()
return table.getProjectedColumns()
.map(projectedColumns -> (List<DeltaLakeColumnHandle>) projectedColumns.stream()
.map(DeltaLakeColumnHandle.class::cast) // TODO DeltaLakeTableHandle.projectedColumns should be a collection of DeltaLakeColumnHandle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to remember to solve this in #17365

@krvikash
Copy link
Contributor Author

krvikash commented May 8, 2023

test-with-secrets run is 🟢 https://github.com/trinodb/trino/actions/runs/4912972173

@krvikash krvikash force-pushed the delta-dereference-pushdown branch from b6be925 to 1b9b988 Compare May 8, 2023 12:11
@krvikash
Copy link
Contributor Author

krvikash commented May 8, 2023

Addressed comments

@findepi findepi merged commit 43e90f4 into trinodb:master May 8, 2023
7 of 14 checks passed
@findepi
Copy link
Member

findepi commented May 8, 2023

@krvikash thank you so much!

@findepi findepi mentioned this pull request May 8, 2023
@krvikash
Copy link
Contributor Author

krvikash commented May 8, 2023

Thank you so much, @findepi | @findinpath | @raunaqmorarka | @alexjo2144 for the reviews and constant support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector docs hive Hive connector
Development

Successfully merging this pull request may close these issues.

None yet

7 participants