Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize queries with similar subplans (CTE) #19744

Open
jaystarshot opened this issue May 26, 2023 · 35 comments
Open

Optimize queries with similar subplans (CTE) #19744

jaystarshot opened this issue May 26, 2023 · 35 comments
Assignees

Comments

@jaystarshot
Copy link
Member

Ref - trinodb/trino#5878

@jaystarshot
Copy link
Member Author

Will start looking

@ClarenceThreepwood
Copy link
Contributor

ClarenceThreepwood commented May 26, 2023

This is also something we have been exploring. In order to do this right, we need a mechanism to materialize intermediate results. But short of that, this paper describes a holistic framework for identifying and reusing CTE's with streaming/staged in-memory execution
https://www.amazon.science/publications/computation-reuse-via-fusion-in-amazon-athena
@jaystarshot - If you folks are interested, we can collaborate on this
cc: @bmckennaah, @aaneja

@jaystarshot
Copy link
Member Author

For part 1, yes I think we need to store the results in a temp table and create a new exchange which will stream it from the temp table to the repeated parts of the plan. Thanks for the reference, will go through.
Yes we are very interested and would love to collaborate !

@kaikalur
Copy link
Contributor

CC: @feilong-liu

@jaystarshot
Copy link
Member Author

jaystarshot commented Jun 6, 2023

image
This is the architecture that I was thinking about. Basically create a table definition in planning and read and write to that table while managing state in execution.

Original Plan
image

In planning, there will be a new CTE optimizer after exchange optimizer which will detect and choose CTEs by hashing all plans beginning from exchange and maybe using CBO. this optimizer will transform the plan to new Bridge writers and reader operators which will be based off TableWriter and TableScanOperator respectively.

New Plan
image

Table Deletion will need to be done from co-ordinator after the query lifecyle.

Parts which are challenging

  • Since Insertion will be handled in execution and we are only creating the table metadata during optimization, the part where we assign which drivers to read respective split (maybe in scheduler) is unclear to me.
    ... more to be updated

@bmckennaah
Copy link

bmckennaah commented Jun 6, 2023 via email

@jaystarshot
Copy link
Member Author

@bmckennaah the physical plan hashing will be tailored to canonicalize the subtrees to detect CTE's accurately hence dt3 and dt4 would be detected. The decision has to be cost based ofcourse. Initially however, I am focusing on a complete end to end flow and deciding using non cbo.

@bmckennaah
Copy link

bmckennaah commented Jun 6, 2023 via email

@jaystarshot
Copy link
Member Author

So we would need to store b1, c1, and d1 as the common temporary table. I think maybe the hashing can be tailored later to process column-wise and then a project on the common CTE while reading. i.e. if we focus on the same subtree first it can be extended for this use case.

@rschlussel
Copy link
Contributor

Thanks for looking at this. This is a feature I've wanted for a while but haven't had a chance to prioritize.
http://www.vldb.org/pvldb/vol8/p1704-elhelw.pdf is another related paper that @mlyublena shared with me. Also, for the temporary table, you can reuse the materialized exchange concept.

@bmckennaah
Copy link

bmckennaah commented Jun 6, 2023 via email

@kaikalur
Copy link
Contributor

kaikalur commented Jun 7, 2023

Look at: ExchangeMaterializationStrategy when it is set to ALL, I think we already materialize exchanges to hive. Maybe you can piggyback on that framework for this work.

@jaystarshot
Copy link
Member Author

Wow thanks! I will take a look

@jaystarshot
Copy link
Member Author

jaystarshot commented Jun 10, 2023

Thanks everyone for all the pointers! I reused the materialized exchange framework and could create a prototype for detection and replacement for a simple Union query.
SELECT column FROM table UNION SELECT column FROM table for a hive table.
Without CTE
image
With CTE
image

I used naiive detection logic based on hash of a plan (did not use CBO) in a new optimizer, basically added a new exchange type and used the basePlanFragmenter to create and store Temporary tables. Similar to here.

This basically uses the metadata.createTemporaryTable which creates session lived temporary tables. But this functionality is experimental currently so that would be our dependency to make this production ready.

There are a lot of sanity tests and a lot of work needed to make this production ready but the prototype shows that this can be done

@kaikalur
Copy link
Contributor

kaikalur commented Jun 10, 2023

This is cool! Once this framework is setup, it could be interesting to make CTE melding more first-class like merging filters/projections, based on the filter cardinalities and matching output partitioning etc.

@bmckennaah
Copy link

bmckennaah commented Jun 10, 2023 via email

@jaystarshot
Copy link
Member Author

jaystarshot commented Jun 12, 2023

Thank you! I will begin work on breaking down components, creating PR's for them and making this production ready, along with a more detailed analysis of performance of temporary tables, any risks etc

@jaystarshot
Copy link
Member Author

And a design doc to start now

@bmckennaah
Copy link

bmckennaah commented Jun 12, 2023 via email

@kaikalur
Copy link
Contributor

Maybe I should open a separate issue but a Tee operator could be quite useful in Presto in general (also in this specific case). That could help quite a bit in things like window functions as well as this CTE/subplan reuse case.

@jaystarshot
Copy link
Member Author

I've put together a proposal that's been inspired and based on the paper http://www.vldb.org/pvldb/vol8/p1704-elhelw.pdf, which was recommended by @rschlussel. I'm hoping we could discuss it and improve the idea further.

https://docs.google.com/document/d/10J9_j08imqe6xEH9iepteG0DA5B-IpmYv4lSRvCgdJA/edit.

@jaystarshot
Copy link
Member Author

jaystarshot commented Jun 20, 2023

@kaikalur @rschlussel @ClarenceThreepwood @bmckennaah what do you think about this approach? - link

@rschlussel
Copy link
Contributor

@jaystarshot wanted to check if there was any update here

@jaystarshot
Copy link
Member Author

@rschlussel Unfortunately nope, kind of got sidetracked by other tasks due to the estimated work and ROI for this one but we plan to dedicate some resources this half. Interested in reprioritizing if we can dedicate more resources using the community.

@jaystarshot
Copy link
Member Author

We are starting POC development based on the design and will have something this quarter

@jaystarshot
Copy link
Member Author

jaystarshot commented Oct 10, 2023

've prepared the POC - accessible here: link.
Currently only 1 persistent cte is supported, i am working on supporting multiple + dependent.

Initially, we executed tpcds query 4 and observed a 30% CPU reduction on sf100. On sf1000, Q4, which previously failed, successfully finished within 5 minutes on our test cluster using the PAGEFILE storage format for the temporary table.

Additionally, we mimicked production traffic by arbitrarily converting one Common Table Expression (CTE) to be persistent. Unfortunately, 20-40% of the queries encountered an identical internal error, stating 'Malformed PageFile format, footer length is missing. '

However, these errors were not reproducible upon subsequent runs. The remaining 60-80% of the queries executed successfully."

@jaystarshot
Copy link
Member Author

jaystarshot commented Oct 10, 2023

We don't see these errors with parquet format for temp tables. The speed is around 5x slower though. 10% queries failed with hive bad data. (
..__presto_temporary_table_PARQUET_20231010_035817_00498_9ghcq_e13e9b33_cdbd_49f5_a6ff_fcf613f2f4b6/000000_0_20231010_035817_00498_9ghcq is not a valid Parquet File",
"cause": {
"type": "com.facebook.presto.parquet.ParquetCorruptionException",

)

@viczhang861
Copy link
Contributor

'Malformed PageFile format, footer length is missing. '

This error message suggested it is not a valid PAGEFILE format, is it possible that this file is empty, not exit, or it is not a PAGEFILE format (less likely, if so , error will throw later)?

@jaystarshot
Copy link
Member Author

jaystarshot commented Oct 15, 2023

I debugged with deletion off, The pagefile file was non empty. It happens with parquet too.
..__presto_temporary_table_PARQUET_20231010_035817_00498_9ghcq_e13e9b33_cdbd_49f5_a6ff_fcf613f2f4b6/000000_0_20231010_035817_00498_9ghcq is not a valid Parquet File",
"cause": {
"type": "com.facebook.presto.parquet.ParquetCorruptionException",
)

I inspected the parquet file on HDFS and it was a valid parquet file! Moreover the read was just happening 200ms after the write

@jaystarshot
Copy link
Member Author

jaystarshot commented Oct 15, 2023

So my suspicion was now on 2 things.

  1. HDFS inconsistency
  2. Some grouped execution changes not being picked up in my patch
    I introduced a 1-second pause after the child sections finish and before the parent section begins, and voilà! The corruption errors disappeared!
    Wondering where and what the proper fix would be

@jaystarshot
Copy link
Member Author

Finally found the issue, we had a legacy configuration where read from observer namenode was enabled. I think OBNN needs some time to sync with active NN, Turning that off works.
Proceeding to fix some other issues and productionize the PR

@sutodi
Copy link

sutodi commented Dec 27, 2023

Where are we on this. Even we are waiting for this PR to get merged.

@jaystarshot
Copy link
Member Author

jaystarshot commented Jan 3, 2024 via email

@avorozhtsov
Copy link

Thank you @jaystarshot !
BTW, there is open source YQL query, and this feature is implemented there. It is different from Presto and sitting on top of YDB, but anyway maybe it is possible to get some insights from their repository.
https://ydb.tech/docs/en/yql/reference/
https://github.com/ydb-platform/ydb/tree/main/ydb/library/yql

@jaystarshot
Copy link
Member Author

Did you mean to refere something? Because these links just point to the general page

facebook-github-bot pushed a commit to facebookincubator/velox that referenced this issue May 26, 2024
Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: #9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this issue Jun 7, 2024
…ncubator#9740)

Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: facebookincubator#9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this issue Jun 7, 2024
…ncubator#9740)

Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: facebookincubator#9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 🏗 In progress
Development

No branches or pull requests

9 participants