Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Pushdown dereference expression to Parquet reader #187

Closed
wants to merge 1 commit into from

Conversation

qqibrow
Copy link
Contributor

@qqibrow qqibrow commented Feb 7, 2019

Following design:

  1. PushdownDereferenceExpression
    Dereference expressions will be pushed down to the projection right above tableScan, which saves CPU/Memory/Network cost.

for query:

with t1 as ( select * from (values ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE)))) 
as t (msg) ) 
select b.msg.x from t1 a, t1 b where a.msg.y = b.msg.y

current plan:

Output[x] => [expr_16:bigint]
        Cost: {rows: 1 (8B), cpu: 297.00, memory: 69.00, network: 0.00}
        x := expr_16
    - Project[] => [expr_16:bigint]
            Cost: {rows: 1 (8B), cpu: 297.00, memory: 69.00, network: 0.00}
            expr_16 := "field_7".x
        - InnerJoin[("expr_20" = "expr_21")][$hashvalue, $hashvalue_22] => [field_7:row(x bigint, y double)]
                Cost: {rows: 1 (45B), cpu: 288.90, memory: 69.00, network: 0.00}
            - Project[] => [expr_20:double, $hashvalue:bigint]
                    Cost: {rows: 1 (18B), cpu: 27.00, memory: 0.00, network: 0.00}
                    $hashvalue := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_20"), 0))
                - Project[] => [expr_20:double]
                        Cost: {rows: 1 (9B), cpu: 9.00, memory: 0.00, network: 0.00}
                        expr_20 := "field".y
                    - Values => [field:row(x bigint, y double)]                       
            - Project[] => [field_7:row(x bigint, y double), expr_21:double, $hashvalue_22:bigint]
                    Cost: {rows: 1 (69B), cpu: 129.00, memory: 0.00, network: 0.00}
                    $hashvalue_22 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_21"), 0))
                - Project[] => [field_7:row(x bigint, y double), expr_21:double]
                        Cost: {rows: 1 (60B), cpu: 60.00, memory: 0.00, network: 0.00}
                        expr_21 := "field_7".y
                    - Values => [field_7:row(x bigint, y double)]

enable dereference pushdown:

 Output[x] => [expr_22:bigint]
        Cost: {rows: 1 (8B), cpu: 125.10, memory: 27.00, network: 0.00}
        x := expr_22
    - InnerJoin[("expr_23" = "expr_24")][$hashvalue, $hashvalue_25] => [expr_22:bigint]
            Cost: {rows: 1 (8B), cpu: 125.10, memory: 27.00, network: 0.00}
        - Project[] => [expr_23:double, $hashvalue:bigint]
                Cost: {rows: 1 (18B), cpu: 27.00, memory: 0.00, network: 0.00}
                $hashvalue := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_23"), 0))
            - Project[] => [expr_23:double]
                    Cost: {rows: 1 (9B), cpu: 9.00, memory: 0.00, network: 0.00}
                    expr_23 := "field".y
                - Values => [field:row(x bigint, y double)]
                      
        - Project[] => [expr_22:bigint, expr_24:double, $hashvalue_25:bigint]
                Cost: {rows: 1 (27B), cpu: 45.00, memory: 0.00, network: 0.00}
                $hashvalue_25 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("expr_24"), 0))
            - Project[] => [expr_22:bigint, expr_24:double]
                    Cost: {rows: 1 (18B), cpu: 18.00, memory: 0.00, network: 0.00}
                    expr_22 := "field_7".x
                    expr_24 := "field_7".y
                - Values => [field_7:row(x bigint, y double)]
  1. MergeNestedColumns
    MergeNestedColumns will detect "project above tableScan" pattern and try to push nestedColumn metadata into TableScan. An new Metadata API getNestedColumnHandles is created to support custom nested column pushdown logic.

  2. getNestedColumnHandles in Hive
    getNestedColumnHandles in Hive return every dereference as independent HiveColumnHandle. Added Optional<NestedColumn> in HiveColumnHandle.

After all query plan will looks like:

explain select msg.workflow.uuid, msg.action.uuid from foo.bar where msg.workflow.uuid = 'abc' and msg.action.name = 'send_sms' limit 10;

before:

- Output[uuid, uuid] => [expr_7:varchar, expr_8:varchar]
        uuid := expr_7
        uuid := expr_8
    - Limit[10] => [expr_7:varchar, expr_8:varchar]
        - LocalExchange[SINGLE] () => expr_7:varchar, expr_8:varchar
            - RemoteExchange[GATHER] => expr_7:varchar, expr_8:varchar
                - LimitPartial[10] => [expr_7:varchar, expr_8:varchar]
                    - ScanFilterProject[table = hive:foo.bar, filterPredicate = ((""msg"".workflow.uuid = CAST('abc' AS varchar)) AND (""msg"".action.name = CAST('send_sms' AS varchar)))] => [expr_7:varchar, expr_8:varchar]
                            expr_7 := ""msg"".workflow.uuid
                            expr_8 := ""msg"".action.uuid
                            LAYOUT: foo.bar
                            msg := msg:struct<......>:12:REGULAR
                            datestr:string:-1:PARTITION_KEY
                                :: [[2000-05-23, 2050-10-26]]

after:

- Output[uuid, uuid] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
        uuid := msg.workflow.uuid
        uuid := msg.action.uuid
    - Limit[10] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
        - LocalExchange[SINGLE] () => msg.workflow.uuid:varchar, msg.action.uuid:varchar              
            - RemoteExchange[GATHER] => msg.workflow.uuid:varchar, msg.action.uuid:varchar                
                - LimitPartial[10] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]                       
                    - ScanFilterProject[table = hive:foo.bar, filterPredicate = ((""msg.action.name"" = CAST('send_sms' AS varchar)) AND (""msg.workflow.uuid"" = CAST('abc' AS varchar)))] => [msg.workflow.uuid:varchar, msg.action.uuid:varchar]
                            LAYOUT: foo.bar
                            msg.action.uuid := msg.action.uuid:string:12:REGULAR
                            msg.workflow.uuid := msg.workflow.uuid:string:12:REGULAR
                            msg.action.name := msg.action.name:string:12:REGULAR
                            datestr:string:-1:PARTITION_KEY
                                :: [[2000-05-23, 2050-10-26]]
  1. Code change in ParquetReader that only read selected columns defined in HiveColumnHandle (..., Optional)

Add PushDownDereferenceExpression to pushdown dereferences right above TableScan
Add MergeNestedColumn to convert valid dereference into ColumnHandle in
TableScan
Add NestedColumn into HiveColumnHandle
Change in ParquetReader to use NestedColumn in file reading
@martint
Copy link
Member

martint commented Feb 8, 2019

Thanks for submitting this @qqibrow! I've started looking at the PR a and have a couple of initial high-level comments:

  • Can you split the change into a 1) commit that introduces support for pushing down these expressions to the query engine and 2) a commit that extends the Hive connector to leverage the new functionality? That will make it easier to review and discuss the query engine abstractions.
  • Please make sure to implement all plans transformations using Rules, not the visitor-based PlanOptimizer. The latter is legacy and we're trying to move away from it.

I'll post later with some other comments about the abstraction and some thoughts I have in relation to #18

@qqibrow
Copy link
Contributor Author

qqibrow commented Feb 8, 2019

@martint thanks for replying!

  • Please make sure to implement all plans transformations using Rules, not the visitor-based PlanOptimizer. The latter is legacy and we're trying to move away from it.

Current code rely on the recursion of PlanOptimizer. https://github.com/prestosql/presto/pull/187/files#diff-bceb73c4a557ca9212fb58de061b4076R155
For example, some dereferences are pushed down to tablescan using recursion, visitTablescan check whether those dereferences are valid. If valid, all plannode above tablescan wil change. Any suggestion how to implement similar thing using Rule?

@martint
Copy link
Member

martint commented Feb 9, 2019

With Rules, you don't have to worry about recursion. That's taken care of by the IterativeOptimizer and how it matches and applies rules. All you have to do is match a Project on top of a TableScan and go from there.

BTW, I started capturing more thoughts about how this and other forms of pushdown could work in the short term: https://github.com/prestosql/presto/wiki/Pushdown-of-complex-operations

@Yaliang
Copy link
Member

Yaliang commented Feb 12, 2019

@qqibrow Thanks a lot of reposting this PR! In case you need some examples of how to use Rules to enforce dereference expression. This was my approach: prestodb/presto@3773a21#diff-f17c5e8c28d5cd670607a75e09f8cbbb

In the ProjectOffPushDownFieldRule, you could define the pattern for your rules as:
project -> target.
Here the project contains the dereference expression and you want to pushdown those expression into target.
Then you can define how each of the target use those dereference expression to prune the columns.

The IterativeOptimizer will iteratively spread the pruned columns until none of the rule produce new result. You can image IterativeOptimizer is using BFS to update all the nodes vs PlanOptimizer is using DFS one to do the same.

Side note: You may find out there are some project node with extractly same input and same output after the optimization. You can use RemoveRedundantIdentityProjections to prune those nodes.

@qqibrow
Copy link
Contributor Author

qqibrow commented Feb 14, 2019

@Yaliang thanks for the help! I will take a look. will ping you in slack if more questions :)

@martint martint changed the title Pushdown dereference expression to paruqet reader [WIP] Pushdown dereference expression to paruqet reader Feb 21, 2019
@phd3
Copy link
Member

phd3 commented Apr 17, 2019

@qqibrow Are you still working on this?

@findepi findepi changed the title [WIP] Pushdown dereference expression to paruqet reader [WIP] Pushdown dereference expression to Parquet reader Apr 17, 2019
@qqibrow
Copy link
Contributor Author

qqibrow commented Apr 19, 2019

@phd1994 Yes. This is a working version, but needs refactoring. As discussed, we will break this into 3 parts:

  1. Push dereferences down above table scan in query plan. zhenxiao is primarily working on refactoring from PlanOptimizer to Rules.
  2. Push dereferences down to connector. That depends on the new projection pushdown API.
  3. changes in parquet reader. depends on 2.

@kokosing kokosing added WIP and removed WIP labels May 8, 2019
@JamesRTaylor JamesRTaylor mentioned this pull request May 23, 2019
@JamesRTaylor
Copy link

@qqibrow - wanted to see if I could help in any way with this effort. I noticed zhenxiao had a PR over in prestodb here that looks to be (1) above. Not sure if it helps, but I ported that over to prestosql here. Looks like your patch has the necessary changes to the Hive connector.

Is the remaining work this item from the plan?
Add pushXXXIntoConnector + Rule pairs
Projection

Just trying to gauge how close we are and what I can do to help.

@tooptoop4
Copy link
Contributor

@qqibrow Are you still working on this?

@martint
Copy link
Member

martint commented Aug 4, 2020

This has already been merged as part of another PR

@martint martint closed this Aug 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

7 participants