Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read parquet row groups in chunks #789

Merged
merged 23 commits into from
Feb 4, 2022

Conversation

jorgecarleitao
Copy link
Owner

@jorgecarleitao jorgecarleitao commented Jan 25, 2022

This PR allows reading of parquet columns in chunks, thereby allowing decompressing and deserializing pages on demand to reduce the memory footprint.

This is a draft as there is still some work to do, which I will continue

  • fix known issues in counting where we are at a given page (offset is incorrect atm)
  • add tests for a non-None chunk size
  • add back reading dictionary
  • add back support to read structs
  • add back delta length encoding for binary
  • add back support for lists
  • triple check usage of with_capacity and reserve to avoid un-needed reallocations
  • bench (performance should improve when the chunk size is None, there should be minimal diff with chunked)
  • DRY copy-pasted code between primitive, binary, fixed binary and boolean
  • remove old code

Design

This PR follows the design of this crate:

  • decompression/deserialization cannot cross column chunk boundaries, so that columns can be read (IO-bounded) and deserialized (CPU-bounded) independently and independently of each of the two operations.
  • unsafe free
  • generics to inline hot ops

The overall design of the changes enables pages to be deserialized to arrays of a different length, based on a new parameter chunk_size: usize:

parquet
[                             column chunk                       ]
[page][page][page][page][page][page][page][page][page][page][page]

arrow
[      array      ][      array      ][      array      ][ array ]
 <- chunk size  ->

Broadly, this PR does the following (for a single row group):

  • read N column chunks to memory (Vec<u8>)
  • for each column, compose an iterator with 2 iterator adapters:
    • Vec<u8> -> Iterator<CompressedDataPage>
    • Iterator<CompressedDataPage> -> Iterator<&DataPage> (decompression)
    • Iterator<&DataPage> -> Iterator<Arc<dyn Array>> (deserialization)

All these iterators are CPU-bounded. On the last step, we track where we are on the DataPage (through PageState, see below) and the temporary mutable array, and either:

  • consume the page to the mutable and the rest of it to a ring of mutables when the chunk size is so small that one page contains multiple arrays, or
  • iterate over more pages until we fill the temporary mutable up to chunk_size, at which point we freeze the mutable and return it

In more detail:

  • column chunks are now read in a single read_exact, instead of page by page. This was a mistake on the previous design, since we should not mix reading columns from deserializing pages
  • a &'a DataPage is mapped to a PageState<'a> based on its parquet physical type and encoding. This is the input state of a page and allows us to "suspend" an iteration over a page, thereby allowing a page to "extend" an array without having it completely consumed
  • a new concept, Decoder, knows how to initialize mutable arrays and how to extend them state from &mut PageState<'a>, advancing the page state accordingly. This is mostly a DRY trait

Closes #768

@jorgecarleitao
Copy link
Owner Author

jorgecarleitao commented Jan 29, 2022

This now reproduces all the existing functionality apart from:

  • read structs
  • read nested list (i.e. list of list)
  • delta length encoding

I found a bug in parquet2 in writing nested data while working on this. Thus, I plan to first release parquet2 with the fix and a simpler API to write, and then move back to this one again.

@codecov
Copy link

codecov bot commented Jan 30, 2022

Codecov Report

Merging #789 (a84539b) into main (e577c9f) will decrease coverage by 0.09%.
The diff coverage is 69.67%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #789      +/-   ##
==========================================
- Coverage   71.29%   71.19%   -0.10%     
==========================================
  Files         321      326       +5     
  Lines       16834    17471     +637     
==========================================
+ Hits        12001    12438     +437     
- Misses       4833     5033     +200     
Impacted Files Coverage Δ
src/io/parquet/read/null.rs 0.00% <0.00%> (ø)
src/io/parquet/read/record_batch.rs 0.00% <0.00%> (-79.75%) ⬇️
src/io/parquet/read/boolean/nested.rs 50.00% <50.90%> (-38.24%) ⬇️
src/io/parquet/read/binary/nested.rs 53.84% <54.90%> (-0.70%) ⬇️
src/io/parquet/read/primitive/dictionary.rs 60.00% <60.00%> (+4.73%) ⬆️
src/io/parquet/read/mod.rs 53.46% <61.22%> (+14.55%) ⬆️
src/io/parquet/read/primitive/nested.rs 62.26% <62.26%> (+4.36%) ⬆️
src/io/parquet/read/binary/dictionary.rs 63.38% <63.38%> (-5.72%) ⬇️
src/io/parquet/read/dictionary.rs 63.41% <63.41%> (ø)
...rc/io/parquet/read/fixed_size_binary/dictionary.rs 63.63% <63.63%> (ø)
... and 29 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e577c9f...a84539b. Read the comment docs.

@jorgecarleitao jorgecarleitao force-pushed the parquet_pages branch 2 times, most recently from fb6a54a to aef6394 Compare January 30, 2022 21:26
@jorgecarleitao jorgecarleitao marked this pull request as ready for review January 30, 2022 22:38
@jorgecarleitao
Copy link
Owner Author

@houqp, this is ready for a spin.

There is a regression (20-40%) when reading a whole binary/utf8 column chunk at once (i.e. no chunk size). This is related to some tricks in pre-computed capacities of binary/utf8 that benefit from reading the whole column (we can recover this behavior).

I deactivated structs of structs and lists of lists for now as I need to dig a bit into the dremel. The failing tests are just examples that I need to update. ^^

@jorgecarleitao jorgecarleitao force-pushed the parquet_pages branch 3 times, most recently from 2aaf8dd to 97fcf3b Compare February 2, 2022 16:49
@jorgecarleitao jorgecarleitao merged commit f35e02a into main Feb 4, 2022
@jorgecarleitao jorgecarleitao deleted the parquet_pages branch February 4, 2022 15:58
@jorgecarleitao jorgecarleitao changed the title Read parquet row groups in chunks Added support to read parquet row groups in chunks Mar 6, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature A new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow parquet column chunks to be deserialized to multiple arrays (e.g. iterator)
1 participant