Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added example showing parallel writes to parquet (x num_cores) #436

Merged
merged 1 commit into from Sep 23, 2021

Conversation

jorgecarleitao
Copy link
Owner

Encoding + compression is embarrassingly parallel across columns, and thus results in a speedup factor equal to the number of available cores, up to the number of columns to be written.

@jorgecarleitao jorgecarleitao added the documentation Improvements or additions to documentation label Sep 21, 2021
@jorgecarleitao
Copy link
Owner Author

jorgecarleitao commented Sep 21, 2021

cc @ritchie46 @sundy-li @houqp @ritchie46

I tried to summarize in this example how we can go about writing parquet files in parallel, by moving the CPU-bounded work away from the Write, thereby maximizing IO.

IMO we should not offer a Writer for this because we need to commit to a thread pool, which I would rather delegate to dependencies like Polars, DataBlend and DataFusion.

For an iterator of batches, the row_groups is the iterator we want generalize. For a stream of batches, modify row_groups to be a stream and pass it to write_stream instead (likely with a oneshot to still leverage CPU-bounded tasks). For the full async experience (AsyncWrite), use write_stream_stream.

Regardless, the principle is the same: split IO-bounded from CPU-bounded and leverage all kinds of parallelism tricks to saturate IO.

When the order of the batches does not matter, there is a further optimization on which we parallelize both columns and records (in which case the records are written as they are processed).

@codecov
Copy link

codecov bot commented Sep 21, 2021

Codecov Report

Merging #436 (d0c6bf5) into main (24f6194) will increase coverage by 0.00%.
The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff           @@
##             main     #436   +/-   ##
=======================================
  Coverage   80.78%   80.79%           
=======================================
  Files         372      372           
  Lines       22645    22645           
=======================================
+ Hits        18294    18295    +1     
+ Misses       4351     4350    -1     
Impacted Files Coverage Δ
src/io/json_integration/write.rs 6.25% <0.00%> (+6.25%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 24f6194...d0c6bf5. Read the comment docs.

@Dandandan
Copy link
Collaborator

Thanks for this great example and discussion starter.

I am also interested what kind of overhead why might expect when parallelizing across columns.

Like in this classic paper:
https://www.usenix.org/system/files/conference/hotos15/hotos15-paper-mcsherry.pdf

The "traditional" approach of e.g. Spark is to map one thread per (input or output) file. Great when the number of target files is bigger than the number of threads - but not ideal when having one file as output, where having more parallelism in e.g. a parquet writer could be beneficial.

Also at greater concurrency (lot's of users using a system on shared resources) resource efficiency and scalability is another angle which is also important.

@houqp
Copy link
Collaborator

houqp commented Sep 22, 2021

This is really cool @jorgecarleitao . The abstraction looks solid 👍

Spark is to map one thread per (input or output) file. Great when the number of target files is bigger than the number of threads

I feel like even if we have more files than threads, this is still not optimal right? For example, encoding+compression for a single file might not incur enough compute to saturate a single core?

Perhaps this is where @andygrove 's idea of introducing a scheduler to a compute engine would help. The scheduler can first decide on what's the unit of parallelism (column or records) based on requirements. Then schedule as many parallel tasks as it can until we saturate either CPU or IO.

@houqp
Copy link
Collaborator

houqp commented Sep 22, 2021

@alamb might be interested in doing this for iox too :)

@ritchie46
Copy link
Collaborator

The "traditional" approach of e.g. Spark is to map one thread per (input or output) file. Great when the number of target files is bigger than the number of threads - but not ideal when having one file as output, where having more parallelism in e.g. a parquet writer could be beneficial.

For this reason I agree with @jorgecarleitao that this behavior should be delegated to the implemenations. For DataFusion, Spark it might make sens to write mutliple parquet files at once. For Polars I don't expect this to happen much.

Anyhow.. very nice! 🚀

@alamb
Copy link
Collaborator

alamb commented Sep 22, 2021

Looks neat 👍 Do I read the code correctly that for this approach the resulting parquet pages must be all buffered in memory before writing out the file? Such buffering is probably fine for most usecases (including IOx) though for larger data the ability to do streaming writes is likely desirable.

Very cool

@Dandandan
Copy link
Collaborator

Spark is to map one thread per (input or output) file. Great when the number of target files is bigger than the number of threads

I feel like even if we have more files than threads, this is still not optimal right? For example, encoding+compression for a single file might not incur enough compute to saturate a single core?

Perhaps this is where @andygrove 's idea of introducing a scheduler to a compute engine would help. The scheduler can first decide on what's the unit of parallelism (column or records) based on requirements. Then schedule as many parallel tasks as it can until we saturate either CPU or IO.

Yes - but usually output bandwidth (e.g. to S3) is higher than the writing / compression speed, so writing has usually very high cpu utilization, depending on de size of the partitions/files of course. Encoding + compression is actually very expensive and hard to achieve the full 10-25Gbps (or more) per node. Even if possible, there is no use to try to be faster than that!

Some nice strategy by Databricks/Spark that you might have heard is the optimized writes https://docs.databricks.com/delta/optimizations/auto-optimize.html#when-to-opt-in-to-optimized-writes .
This does an extra shuffle / exchange and combining partitions to avoid writing lot's of small files, optimizing for throughput and read performance.

I personally think there are more directions / techniques to try beyond what Spark has to offer so happy for this example and looking forward to more experiments in the area.

@Dandandan
Copy link
Collaborator

Looks neat 👍 Do I read the code correctly that for this approach the resulting parquet pages must be all buffered in memory before writing out the file? Such buffering is probably fine for most usecases (including IOx) though for larger data the ability to do streaming writes is likely desirable.

Very cool

To me it looks like a task per batch and column, so that should work in a streaming manner too?

The 10M batch size in this example is pretty large I would say (>1000x the DF default).

@Dandandan
Copy link
Collaborator

The "traditional" approach of e.g. Spark is to map one thread per (input or output) file. Great when the number of target files is bigger than the number of threads - but not ideal when having one file as output, where having more parallelism in e.g. a parquet writer could be beneficial.

For this reason I agree with @jorgecarleitao that this behavior should be delegated to the implemenations. For DataFusion, Spark it might make sens to write mutliple parquet files at once. For Polars I don't expect this to happen much.

Anyhow.. very nice! 🚀

@ritchie46 Yes agreed.

@jorgecarleitao
Copy link
Owner Author

Do I read the code correctly that for this approach the resulting parquet pages must be all buffered in memory before writing out the file?

That is great observation. Note that this for a single RecordBatch; each batch will be independent as @Dandandan wrote.

I believe that we can't have both "parallelism" and "order" without buffering: once we introduce two workers, either we write the results as they arrive, or we need to potentially buffer up to all pages to keep them in order. The encode + compress (what I call CPU-bound) is basically Array -> Vec<u8>. For a RecordBatch, it is Vec<Array> -> Vec<Vec<u8>>. In parquet, columns must be written in a specific order to the file. Thus, we must collect them all for a given RecordBatch (there is an optimization where we keep a buffer with the minimum to preserve order).

Alternatively, we could parallelize over the other dimension (Vec<RecordBatch>), but without buffering, we must relinquish the order over which they are written to parquet (which usually is not nice, specially if there is a "ORDER BY" at the end ^_^).

Finally, we may not parallelize, but them we likely not saturate IO and thus not maximize our capacity (at least not without writing to other files, a-la spark, which has other consequences like the default 200 partitions).

A reason why this op is usually CPU-bounded is that high compression requires long pages (so that all values belong to the same "compression unit"), which requires going through a lot of data; the higher the number of groups, the less compressed the file is.

cc'ing @xhochy here because I learnt most of this from him, and will likely correct me here (see https://github.com/elastacloud/parquet-dotnet/issues/392 for some context).

@jorgecarleitao jorgecarleitao merged commit 688e979 into main Sep 23, 2021
@jorgecarleitao jorgecarleitao deleted the paral_write branch September 23, 2021 05:14
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants