Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added data_pagesize_limit to write parquet pages #1303

Merged
merged 10 commits into from Nov 25, 2022

Conversation

sundy-li
Copy link
Collaborator

@sundy-li sundy-li commented Nov 23, 2022

arrow2 writes single page by default which will hurt the read performance a lot (up to 4x slower than the original file).

This pr introduces an option named data_pagesize_limit to split large Array into small pages.

fixes #1291

@sundy-li
Copy link
Collaborator Author

sundy-li commented Nov 23, 2022

Read write benchmark gists: https://gist.github.com/sundy-li/4984ec7cfeade556d60306a3a218ec8a

We use TPCH's lineitem table, it's original file is generated by dremio, it's well paged, eg column l_quantity

column ["l_quantity"]: num_of_pages 460, num_of_rows 5192615, size 10454182, size(uncompressed) 41555178

We use parquet_write.rs to generate the output file by input file and parquet_read.rs to test the performance read on parallel.

Before (single large page per rowgroup):

/tmp/input.parquet, parallel 1 read took: 21 ms
/tmp/input.parquet, parallel 2 read took: 21 ms
/tmp/input.parquet, parallel 4 read took: 23 ms
/tmp/input.parquet, parallel 8 read took: 37 ms
/tmp/input.parquet, parallel 16 read took: 53 ms
/tmp/output.parquet, parallel 1 read took: 37 ms
/tmp/output.parquet, parallel 2 read took: 42 ms
/tmp/output.parquet, parallel 4 read took: 65 ms
/tmp/output.parquet, parallel 8 read took: 124 ms
/tmp/output.parquet, parallel 16 read took: 284 ms

As you can see, it's 5X worse in reading on 16 parallelism.

After this pr:

/tmp/input.parquet, parallel 1 read took: 21 ms
/tmp/input.parquet, parallel 2 read took: 22 ms
/tmp/input.parquet, parallel 4 read took: 24 ms
/tmp/input.parquet, parallel 8 read took: 29 ms
/tmp/input.parquet, parallel 16 read took: 52 ms
/tmp/output.parquet, parallel 1 read took: 23 ms
/tmp/output.parquet, parallel 2 read took: 25 ms
/tmp/output.parquet, parallel 4 read took: 27 ms
/tmp/output.parquet, parallel 8 read took: 31 ms
/tmp/output.parquet, parallel 16 read took: 48 ms

@jorgecarleitao
Copy link
Owner

Out of curiosity, what is the difference in file size? Usually smaller pages => larger files.

@sundy-li sundy-li marked this pull request as ready for review November 23, 2022 06:06
@sundy-li
Copy link
Collaborator Author

Out of curiosity, what is the difference in file size? Usually smaller pages => larger files.

It's a large file, 255MB snappy encoded.

column ["l_orderkey"]: num_of_pages 460, num_of_rows 5192615, size 8113390, size(uncompressed) 41555178
column ["l_partkey"]: num_of_pages 460, num_of_rows 5192615, size 25860456, size(uncompressed) 41555178
column ["l_suppkey"]: num_of_pages 460, num_of_rows 5192615, size 23360484, size(uncompressed) 41555178
column ["l_linenumber"]: num_of_pages 460, num_of_rows 5192615, size 4690406, size(uncompressed) 41555177
column ["l_quantity"]: num_of_pages 460, num_of_rows 5192615, size 10454182, size(uncompressed) 41555178
column ["l_extendedprice"]: num_of_pages 460, num_of_rows 5192615, size 29932431, size(uncompressed) 41555178
column ["l_discount"]: num_of_pages 460, num_of_rows 5192615, size 10101375, size(uncompressed) 41555178
column ["l_tax"]: num_of_pages 460, num_of_rows 5192615, size 9653229, size(uncompressed) 41555178
column ["l_returnflag"]: num_of_pages 289, num_of_rows 5192615, size 4907634, size(uncompressed) 25972033
column ["l_linestatus"]: num_of_pages 289, num_of_rows 5192615, size 3020518, size(uncompressed) 25972033
column ["l_shipdate"]: num_of_pages 231, num_of_rows 5192615, size 15957395, size(uncompressed) 20777621
column ["l_commitdate"]: num_of_pages 231, num_of_rows 5192615, size 15813887, size(uncompressed) 20777621
column ["l_receiptdate"]: num_of_pages 231, num_of_rows 5192615, size 16016467, size(uncompressed) 20777621
column ["l_shipinstruct"]: num_of_pages 914, num_of_rows 5192615, size 11190998, size(uncompressed) 83103193
column ["l_shipmode"]: num_of_pages 477, num_of_rows 5192615, size 11659513, size(uncompressed) 43039249
column ["l_comment"]: num_of_pages 1731, num_of_rows 5192615, size 67467279, size(uncompressed) 158417914

@codecov
Copy link

codecov bot commented Nov 23, 2022

Codecov Report

Base: 83.12% // Head: 83.14% // Increases project coverage by +0.02% 🎉

Coverage data is based on head (795b095) compared to base (0ba4f8e).
Patch coverage: 95.00% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1303      +/-   ##
==========================================
+ Coverage   83.12%   83.14%   +0.02%     
==========================================
  Files         369      369              
  Lines       40180    40234      +54     
==========================================
+ Hits        33399    33452      +53     
- Misses       6781     6782       +1     
Impacted Files Coverage Δ
src/io/parquet/write/sink.rs 72.41% <ø> (ø)
src/io/parquet/write/mod.rs 86.72% <95.00%> (+0.37%) ⬆️
src/array/binary/mod.rs 91.55% <0.00%> (-0.98%) ⬇️
src/ffi/schema.rs 90.08% <0.00%> (-0.30%) ⬇️
src/io/ipc/read/file.rs 97.32% <0.00%> (+0.44%) ⬆️
src/io/parquet/read/deserialize/utils.rs 82.23% <0.00%> (+0.89%) ⬆️
src/array/utf8/mod.rs 85.67% <0.00%> (+0.91%) ⬆️
src/bitmap/utils/slice_iterator.rs 98.78% <0.00%> (+1.21%) ⬆️
src/io/parquet/read/deserialize/binary/utils.rs 66.99% <0.00%> (+2.77%) ⬆️
...arquet/read/deserialize/fixed_size_binary/utils.rs 71.87% <0.00%> (+2.90%) ⬆️
... and 1 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

src/io/parquet/write/mod.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Collaborator

alamb commented Nov 23, 2022

There is a similar limit in arrow-rs:

https://docs.rs/parquet/27.0.0/parquet/file/properties/struct.WriterProperties.html#method.data_pagesize_limit

Depending on your usecase you may also find limiting the row count useful as some data compresses so well it fits on quite small pages: https://docs.rs/parquet/27.0.0/parquet/file/properties/struct.WriterProperties.html#method.data_page_row_count_limit

@sundy-li
Copy link
Collaborator Author

sundy-li commented Nov 23, 2022

There is a similar limit in arrow-rs

Thank you @alamb, I can't figure out a better name so I keep it compatible with arrow-rs. The row count limit is less used than the bytes limit, I'll try to add it in the next pr.

I found this best practice to optimize parquet files: https://docs.dremio.com/software/data-formats/parquet-files/

The page-size defaults to ~100KB seem to be really reasonable. (I have tested from 8192byte to 1MB today and found it has the best performance in this default config)

Some users tested dremio/datafusion/databend in TPCH queries and report to me that databend works 2-3x slower in reading parquet files from loading. So I looked into it and found this bottleneck for a couple of days.

With this pr, now databend can works with similar performance to duckdb/datafusion in tpch Q1 .

databend used to cost 2.2 sec to complete Q1
MySQL [(none)]> select
    ->     l_returnflag,
    ->     l_linestatus,
    ->     sum(l_quantity) as sum_qty,
    ->     to_int64(sum(l_extendedprice)) as sum_base_price,
    ->     truncate(sum(l_extendedprice * (1 - l_discount)),2) as sum_disc_price,
    ->     truncate(sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)),2) as sum_charge,
    ->     truncate(avg(l_quantity),4) as avg_qty,
    ->     truncate(avg(l_extendedprice),4) as avg_price,
    ->     truncate(avg(l_discount),4) as avg_disc,
    ->     count(*) as count_order
    -> from
    ->     lineitem
    -> where
    ->         l_shipdate <= add_days(to_date('1998-12-01'), 90)
    -> group by
    ->     l_returnflag,
    ->     l_linestatus
    -> order by
    ->     l_returnflag,
    ->     l_linestatus;
+--------------+--------------+-------------+----------------+---------------------+---------------------+---------+------------+----------+-------------+
| l_returnflag | l_linestatus | sum_qty     | sum_base_price | sum_disc_price      | sum_charge          | avg_qty | avg_price  | avg_disc | count_order |
+--------------+--------------+-------------+----------------+---------------------+---------------------+---------+------------+----------+-------------+
| A            | F            | 377518399.0 |   566065727797 |  5.3775910427806e11 |  5.5927667089211e11 | 25.5009 |  38237.151 |     0.05 |    14804077 |
| N            | F            |   9851614.0 |    14767438399 |   1.402880579221e10 |   1.459049099836e10 | 25.5224 | 38257.8106 |   0.0499 |      385998 |
| N            | O            | 764635193.0 |  1146548935600 | 1.08921587320188e12 | 1.13279676143136e12 | 25.4982 | 38233.8539 |     0.05 |    29987794 |
| R            | F            | 377732830.0 |   566431054975 |  5.3811092266476e11 |  5.5963478088508e11 | 25.5083 | 38251.2192 |   0.0499 |    14808183 |
+--------------+--------------+-------------+----------------+---------------------+---------------------+---------+------------+----------+-------------+
4 rows in set (0.611 sec)


duckdb:

explain analyze  SELECT     l_returnflag,     l_linestatus,     sum(l_quantity) AS sum_qty,     sum(l_extendedprice) AS sum_base_price,     sum(l_extendedprice * (1 - l_discount)) AS sum
_disc_price,     sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,     avg(l_quantity) AS avg_qty,     avg(l_extendedprice) AS avg_price,     avg(l_discount) AS avg_di
sc,     count(*) AS count_order FROM     lineitem WHERE     l_shipdate <= CAST('1998-09-02' AS date) GROUP BY     l_returnflag,     l_linestatus ORDER BY     l_returnflag,     l_linestat
us;
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││         Total Time: 0.723s        ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘


datafusion:
❯ SELECT     l_returnflag,     l_linestatus,     sum(l_quantity) AS sum_qty,     sum(l_extendedprice) AS sum_base_price,     sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,     sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,     avg(l_quantity) AS avg_qty,     avg(l_extendedprice) AS avg_price,     avg(l_discount) AS avg_disc,     count(*) AS count_order FROM     lineitem WHERE     l_shipdate <= CAST('1998-09-02' AS date) GROUP BY     l_returnflag,     l_linestatus ORDER BY     l_returnflag,     l_linestatus;
+--------------+--------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
| l_returnflag | l_linestatus | sum_qty   | sum_base_price     | sum_disc_price     | sum_charge         | avg_qty            | avg_price          | avg_disc             | count_order |
+--------------+--------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
| A            | F            | 377518399 | 566065727797.25    | 537759104278.0657  | 559276670892.117   | 25.500975103007097 | 38237.15100895854  | 0.05000657454024323  | 14804077    |
| N            | F            | 9851614   | 14767438399.170002 | 14028805792.211401 | 14590490998.366735 | 25.522448302840946 | 38257.810660081144 | 0.04997336773765667  | 385998      |
| N            | O            | 743124873 | 1114302286901.8804 | 1058580922144.9639 | 1100937000170.5918 | 25.498075870689316 | 38233.90292348182  | 0.050000811821131634 | 29144351    |
| R            | F            | 377732830 | 566431054976       | 538110922664.7675  | 559634780885.0863  | 25.50838478968014  | 38251.219273559764 | 0.049996792314087435 | 14808183    |
+--------------+--------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
4 rows in set. Query took 1.134 seconds.


src/io/parquet/write/mod.rs Show resolved Hide resolved
src/io/parquet/write/mod.rs Outdated Show resolved Hide resolved
src/io/parquet/write/mod.rs Show resolved Hide resolved
@jorgecarleitao jorgecarleitao merged commit 368aacc into jorgecarleitao:main Nov 25, 2022
@jorgecarleitao jorgecarleitao changed the title feat(parquet): introduce data_pagesize_limit to write parquet pages Added data_pagesize_limit to write parquet pages Nov 25, 2022
@jorgecarleitao
Copy link
Owner

Thanks a lot @sundy-li for the PR, and everyone else for the ideas, suggestions and reviews 🙇

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add page_size opt to control array_to_pages
4 participants