[Data] Add TPCH queries 7,8,9 for benchmarking#60662
[Data] Add TPCH queries 7,8,9 for benchmarking#60662bveeramani merged 30 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request adds TPC-H queries 5, 7, 8, and 9 for benchmarking purposes. The overall structure of the new query files is consistent. However, I've found several correctness issues where the implementations deviate significantly from the TPC-H specifications for queries 7, 8, and 9. These need to be addressed to ensure the benchmarks are valid. Additionally, there are opportunities to improve performance in queries 5 and 9 by optimizing the join logic. The configuration changes in the YAML file are appropriate.
Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: ZTE Ray <dai.ping88@zte.com.cn>
Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
… for improved clarity and consistency. Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
|
@owenowenisme Please review the code. Looking forward to any suggestions. |
|
Hi @daiping8, can you help me understand why are you adding these benchmarks? |
Hi. This is a task assigned by the Ray Data Team. https://docs.google.com/document/d/1OFFp2jMMnrCPiE0Gxdi0ronXGVqtDYDbUoS3fsNc54Q/edit?pli=1&tab=t.0 |
owenowenisme
left a comment
There was a problem hiding this comment.
I think you're missing some tables in common.py ? How about let's open up a pr first to add the name mapping?
FYI
=== region ===
Column names: ['column0', 'column1', 'column2', 'column3']
column0: int64
column1: string
column2: string
column3: string
=== supplier ===
Column names: ['column0', 'column1', 'column2', 'column3', 'column4', 'column5', 'column6', 'column7']
column0: int64
column1: string
column2: string
column3: int64
column4: string
column5: double
column6: string
column7: string
… nation, supplier, customer, orders, part, and partsupp Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
owenowenisme
left a comment
There was a problem hiding this comment.
@daiping8 I ran the release tests and there are some tailed tests, could you take a look?
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
|
Release test passed: |
| nation_region_pd = nation_region.to_pandas()[["n_nationkey", "n_name"]].copy() | ||
|
|
||
| def _join_supplier(batch: pd.DataFrame) -> pd.DataFrame: | ||
| out = batch.merge( | ||
| nation_region_pd, | ||
| left_on="s_nationkey", | ||
| right_on="n_nationkey", | ||
| how="inner", | ||
| ) | ||
| return out.rename( | ||
| columns={ | ||
| "n_nationkey": "n_nationkey_supp", | ||
| "n_name": "n_name_supp", | ||
| } | ||
| ) | ||
|
|
||
| supplier_nation = supplier.map_batches( | ||
| _join_supplier, | ||
| batch_format="pandas", | ||
| ) | ||
|
|
||
| def _join_customer(batch: pd.DataFrame) -> pd.DataFrame: |
There was a problem hiding this comment.
Why are we doing this?
There was a problem hiding this comment.
Through broadcast join, after filtering nation_region to ASIA, there are only about 5 rows, which is very small in size. Converting it to pandas and merging it in each batch along with map_batches can avoid shuffling the large-scale tables supplier and customer, thereby reducing the costs of shuffling and network transmission.
After previous tests, using ray data join will result in an OOM error. I have added comments here and a todo for future improvements.
There was a problem hiding this comment.
Totally understand where you're coming from @daiping8, but the purpose of tests is to actually track the way people are expecting to be able to be use Ray Data -- meaning that we'd use join as an operation for joining and then continuously improving it to make it perform better over time.
Does that make sense?
There was a problem hiding this comment.
So what i'm gonna ask you to do is following
- Please impl it using
join(tuning it as necessary to make it work out of the box first) - Then, if you see opportunities to improve joins themselves, please go ahead and impl them too
…CH dataset loading. Update tpch_q5.py with comments on future optimizations for column selection and join operations. Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
owenowenisme
left a comment
There was a problem hiding this comment.
I see there's a lot of branching in the pipeline, I think the logic will be much more readable to linearize the join chain. By moving away from a bushy DAG and into a single-stream pipeline, we allow Ray to better orchestrate task execution and data flow. This can ensures we don't hold multiple large intermediate datasets in memory simultaneously, which reduces the risk of object store spilling.
| supplier_nation = supplier.join( | ||
| asia_for_supplier, | ||
| num_partitions=16, | ||
| join_type="inner", | ||
| on=("s_nationkey",), | ||
| right_on=("n_nationkey",), | ||
| ) | ||
| supplier_nation = ( | ||
| supplier_nation.rename_columns({"n_name": "n_name_supp"}) | ||
| .select_columns(["s_suppkey", "s_nationkey", "n_name_supp"]) | ||
| ) | ||
|
|
||
| # customer ⋈ asia_for_customer (Ray join), get customer nations | ||
| customer_nation = customer.join( | ||
| asia_for_customer, | ||
| num_partitions=16, | ||
| join_type="inner", | ||
| on=("c_nationkey",), | ||
| right_on=("n_nationkey",), | ||
| ) | ||
| customer_nation = ( | ||
| customer_nation.rename_columns({"n_name": "n_name_cust"}) | ||
| .select_columns(["c_custkey", "c_nationkey", "n_name_cust"]) | ||
| ) |
There was a problem hiding this comment.
I think we can simplify this. For a 6-table join, we only need 5 join operations. By joining the tables in a linear chain (Region → Nation → Customer → Orders → Lineitem → Supplier), we eliminate the redundant branch where nation is joined twice. This reduces the number of global shuffles, keeps the intermediate dataset smaller by filtering for the target region early, and allows us to implement the c_nationkey = s_nationkey constraint as a local filter rather than an expensive distributed join.
And also the logic is hard to follow here.
|
Also it would be easier to understand if we comment the sql query in the file like #61305, would you mind adding it? |
…, and Q9 Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
| "region": { | ||
| "column0": "r_regionkey", | ||
| "column1": "r_name", | ||
| "column2": "r_comment", |
There was a problem hiding this comment.
I think there are 4 columns in region?
There was a problem hiding this comment.
In fact, all the data in the fourth column of the parquet file is None. The data in the first three columns have actual meanings and match the definitions.
schema: column0: int64
column1: string
column2: string
column3: string
column0 column1 column2 column3
0 AFRICA lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to None
1 AMERICA hs use ironic, even requests. s None
2 ASIA ges. thinly even pinto beans ca None
3 EUROPE ly final courts cajole furiously final excuse None
4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl NoneThere was a problem hiding this comment.
Got it thanks for clarifying
…he purpose of each join. Signed-off-by: daiping8 <dai.ping88@zte.com.cn>
owenowenisme
left a comment
There was a problem hiding this comment.
Release tests passed for Q7,8,9
## Description Adding Query Q7, Q8, Q9 for TPCH tests There are some issues with TPCH Q5. For details, see #61354. --------- Signed-off-by: daiping8 <dai.ping88@zte.com.cn> Signed-off-by: ZTE Ray <dai.ping88@zte.com.cn> Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: Kamil Kaczmarek <kamil@anyscale.com>
## Description Adding Query Q7, Q8, Q9 for TPCH tests There are some issues with TPCH Q5. For details, see ray-project#61354. --------- Signed-off-by: daiping8 <dai.ping88@zte.com.cn> Signed-off-by: ZTE Ray <dai.ping88@zte.com.cn> Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
## Description Adding Query Q7, Q8, Q9 for TPCH tests There are some issues with TPCH Q5. For details, see ray-project#61354. --------- Signed-off-by: daiping8 <dai.ping88@zte.com.cn> Signed-off-by: ZTE Ray <dai.ping88@zte.com.cn> Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: You-Cheng Lin <mses010108@gmail.com>

Description
Adding Query Q7, Q8, Q9 for TPCH tests
There are some issues with TPCH Q5. For details, see #61354.