Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Allow parquet column chunks to be deserialized to multiple arrays (e.g. iterator) #768

Closed
jorgecarleitao opened this issue Jan 14, 2022 · 4 comments · Fixed by #789
Closed
Assignees
Labels
feature A new feature no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog

Comments

@jorgecarleitao
Copy link
Owner

As described by @tusvold here, there are important cases where it is more advantageous to deserialize a column chunk into more than one array.

Specifically, the main issue is that a decompressed parquet column chunk is still much smaller than its corresponding arrow array. Therefore, on large column chunks, the user may prefer to deserialize only part of the decompressed column chunk into arrow while still holding the chunk in memory.

The equation is something along the lines of

size(decompressed parquet column chunk) + size(partial arrow array) <  size(complete chunk as arrow array)

To allow users to tradeoff memory usage and data fragmentation (of each array), we should offer something like

fn column_chunk_to_array(chunk, column_meta, length: Option<usize>) -> impl Iterator<Arc<dyn Array>>

that when length: None we recover our single array, when length: Some(n), we iterate over the chunk and output n items at the time.

Thanks again to @tusvold for this important insight.

@tustvold
Copy link
Contributor

It isn't so much decompressed column chunks being smaller than a corresponding array, as a single decompressed page within a column chunk being smaller than an array of the entire column chunk.

Ideally you would stream pages from the column chunk, decompressing them as needed, and only keeping them around until all the values have been yielded from that page.

@jorgecarleitao
Copy link
Owner Author

jorgecarleitao commented Jan 18, 2022

I have been thinking about this. I haven't found this analysis anywhere else before, so will just leave it here for posterity.

data is split in row groups to allow for group pruning at the expense of lower compression - larger groups allow higher compression but less possible prunes, smaller groups allow for more pruning but can't be compressed as much.

We could separate this analysis between dict-encoded and non-dict encoded, but the overall conclusion is the same: the more values we pack to an compressor, the higher the maximum possible compression gains. For dict-encoded this is because dicts aren't shared between row groups.

The potential benefit to use more pages would be to allow page pruning. However, parquet itself deprecated writing page statistics since 2018. Thus, I do not really see how more pages per column chunk help us.

From this perspective, imo we should assume the optimal storage conditions: reading a column chunk with a single page (non-dict-encoded) or a column chunk with 2-3 pages (dict page + indices page + fallback page for groups with more than i32::MAX items).

Strategy 1

One strategy is to load a parquet row group in "mini-batches" of size n < N where N is the length of a row group. For this to happen, we need to:

  1. read M compressed parquet columns to memory (for a total of M x N compressed items)
  2. decompress the (usually single) pages on it
  3. deserializen < N items
  4. repeat 3. until we read all items

Strategy 2

An alternative load strategy is to load column by column:

  1. read one compressed column to memory
  2. decompress the (usually single) pages on it
  3. deserialize N items
  4. repeat 1-3 until we read all columns

Consequences of each

Let's now see how this pans out in two different data carrier patterns, "stream of chunks", Stream<Vec<Array>> (like datafusion) and "vector of streams", Vec<Stream<Array>> (like the chunked array in pyarrow).

Strategy 1 on stream of chunks

  • After step 1, we peak at M compressed columns
  • After step 2, we peak at M decompressed columns
  • During step 3-4, we peak at M decompressed columns + n * M deserialized items

Strategy 1 on vector of streams

  • After step 1, we peak at 1 compressed column
  • After step 2, we peak at 1 decompressed column
  • During step 3-4, we peak at 1 decompressed column + n deserialized items

Strategy 2 on stream of chunks

  • After step 1, we peak at 1 compressed columns
  • After step 2, we peak at 1 decompressed columns
  • During step 3-4, we peak at 1 decompressed column + N * M deserialized items

Strategy 2 on vector of streams

  • After step 1, we peak at 1 compressed column
  • After step 2, we peak at 1 decompressed column
  • During step 3-4, we peak at 1 decompressed column + N deserialized items

Analysis

From this, my understanding is that the core difference here is between

  • Strategy 1: M decompressed columns + n * M deserialized items
  • Strategy 2: 1 decompressed column + N * M deserialized items

Let's do some math and see the difference. Defining

  • b = n/N to be our batch size n relative to the row group size N.
  • s_e the average size of an individual encoded item in parquet
  • s_d the average size of an individual deserialized item in arrow
  • r as the ratio of s_d / s_e (the average ratio between the deserialized arrow and encoded parquet). For PLAIN, this equals 1 - ratio of nulls (since null values do not take space in parquet).

Defining S1 and S2 as the peak memory usage of Strategy 1 and Strategy 2 respectively, we can write the above as:

S1 = M * N * s_e + M * n * s_d
S2 = N * s_e + M * N * s_d

The ratio S1/S2 can be written as (divide both sides by M*N*s_e):

S1/S2 = (1 + b * r) / (1/M + r)

which is the important formula here.

We can now take two limits: M = 1 (load one column) and M = infinity (load many columns).

Low number of columns

For M = 1, we have

S1/S2 = (1 + b * r) / (1 + r)

For highly-encoded columns, say r = 4, and a batch size of 0.1 (10% of the row group), the above yields

S1/S2 = (1 + 0.4) / (1 + 4) = 0.35

For low-encoded columns, say r = 0.9, and a batch size of 0.1 (10% of the row group), the above yields

S1/S2 = (1 + 0.09) / (1 + 0.9) = 0.57

showing that S1 requires less memory (~40%)

Loading high number of columns

For M = inf, we have

S1/S2 = (1 + b * r) / r

For highly-encoded columns, say r = 4, and a batch size of 0.1 (10% of the row group), the above yields

S1/S2 = (1 + 0.4) / 4 = 0.35

I.e. S1 requires less memory (~66%)

For low-encoded columns, say r = 0.9, and a batch size of 0.1 (10% of the row group), the above yields

S1/S2 = (1 + 0.09) / 0.9 = 1.2

I.e. S1 requires more memory (~20%)

Conclusions

My conclusion atm is that Strategy 1 dominates in most cases and it should be the default (i.e. the goal of this PR). IMO this is interesting because it just shows that it is better to read the whole (after projection pushdown) row group into memory, decompress it, and then iterate on its individual items.

@jorgecarleitao jorgecarleitao changed the title Allow column chunks to be deserialized to multiple arrays (e.g. iterator) Allow parquet column chunks to be deserialized to multiple arrays (e.g. iterator) Jan 18, 2022
@tustvold
Copy link
Contributor

data is split in row groups to allow for group pruning at the expense of lower compression

This is only partly true, the other motivator is that the row group is typically unit of IO and distributed parallelism. Separate row groups are not all that dissimilar from separate files, with all the costs and benefits thereof.

The potential benefit to use more pages would be to allow page pruning.

Pages are significantly smaller than column chunks, row groups may be up to a GB compressed, whereas pages (other than dictionary pages) are typically closer to 8KB compressed. See the end of this document. So another benefit is reducing the amount of uncompressed data that must be buffered in the read path at any point in time.

However, parquet itself deprecated writing page statistics since 2018.

That is because it now writes them into a different data structure called PageIndex that is slightly less useless. That being said I'm not aware of many systems that make use of this...

Thus, I do not really see how more pages per column chunk help us.

Regardless of the utility of multiple pages per column chunk, if interoperability with other tooling is important, multiple small pages per column chunk is likely to be an assumption of other tooling.

FWIW I agree that page pruning based on statistics is not particularly effective for most use-cases, I have a proposal for selection masks in parquet that I hope to start work on in the latter half of this week - apache/arrow-rs#1191.

@jorgecarleitao
Copy link
Owner Author

I now have a fix for this on my local, I am just cleaning it up before a PR. It reads the pages as they are needed.

@jorgecarleitao jorgecarleitao self-assigned this Jan 24, 2022
@jorgecarleitao jorgecarleitao added the no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog label Mar 6, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature A new feature no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants