Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to process very big file in parallel and write results in correct order? (I want FuturesOrdered!) #1070

Closed
safinaskar opened this issue Jul 3, 2023 · 4 comments

Comments

@safinaskar
Copy link

safinaskar commented Jul 3, 2023

Here is my tasks:

  • I want to read very big file. It possibly doesn't fit in memory. In my particular case file size is 10 GiB and my memory size is 32 GiB. It fits in memory in this case, but it is still possible that in some point of future file will be too big. So, I want to read the file, split it into reasonably-sized chunks, say, 4 MiB, process them in parallel (this processing is CPU-bounded, not I/O-bounded) and then write results to some (small) destination file in correct order
  • I want to read small file, split it into very small chunks, process them in parallel and write the results to very big file in correct order. I. e. the same as previous point, but now small-to-big, not big-to-small

(My OS is Linux.)

(In fact my actual problem is this: borgbackup/borg#7674 (comment) . See there sequential Rust solution.)

It seems rayon doesn't support these cases directly. This is how I managed to do the first (big-to-small) task:

struct Chunks {
  input: std::fs::File,
  ...
}

// My own hand-rolled sequential iterator for chunks. It reads chunks from file. Remember: it is potentially not possible to store all chunks in the same time in memory
impl Iterator for Chunks {
    type Item = Vec<u8>;
    fn next(&mut self) -> Option<Vec<u8>> {
        ...
    }
}

fn main() {
        use rayon::prelude::*;

        // We rely on this: https://github.com/rayon-rs/rayon/issues/1068
        let mut hash_vec = Chunks { input: std::fs::File::open("/dev/stdin").unwrap(), .... }.enumerate().par_bridge().map(|(chunk_n, chunk)| {
            ...
            return (chunk_n, ...);
        }).collect::<Vec<_>>();
        hash_vec.sort_by_key(|(n, _)|*n);
        for (_, chunk) in hash_vec {
            index_file.write_all(&chunk).unwrap();
        }
}

Unfortunately, I see multiple problems here:

Okay, so I decided to stick with this solution. If you know better, please, let me know.

Now to small-to-big task. I spend a lot of time reading rayon documentation and still it seems that this is not possible to solve this problem using rayon! But it seems the problem can be solved using tokio and futures::stream::FuturesOrdered ( https://docs.rs/futures/0.3.28/futures/stream/struct.FuturesOrdered.html ). FuturesOrdered by itself runs everything in single thread, so it seems I should also use tokio::task::spawn_blocking ( https://docs.rs/tokio/1.29.1/tokio/task/fn.spawn_blocking.html ). I think the code should look like so (not tested):

let input_chunks: Vec<_> = ..;
let mut futures = FuturesOrdered::new();
for chunk in input_chunks {
  futures.push_back(move async {
    spawn_blocking(...).await
  });
}
while let Some(output_chunk) = futures.next().await {
  /* write output_chunk to file */
}

But my problem is CPU-bounded, not I/O bounded! And everyone says that we should use threads and rayon for CPU-bounded tasks and async for I/O-bounded tasks. So, it seems everyone lies. It seems I should use async, despite my problem is CPU-bounded.

How we got here? How it is became possible that rayon, which designed for CPU-bounded problems, perform on them worse than async programming, which has whole book on how bad it is ( https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo.html )?

It seems that FuturesOrdered will go for big-to-small task, too.

Another possible solution for small-to-big task is so: mmap output file to memory, then write to it using rayon and my own collect_into_vec-like function ( https://docs.rs/rayon/1.7.0/rayon/iter/trait.IndexedParallelIterator.html#method.collect_into_vec ). (I cannot use collect_into_vec directly, because I want to collect parallel iterator of Vec<u8> into single &mut [u8], i. e. I don't need merely collect_into_vec, I need flatten_and_collect_into_vec, or, more precisely, flatten_and_collect_into_mut_slice.)

Such solution will be idiomatic rayon code. Unfortunately, it will move all hard work to OS. It will generate too much pressure for page cache. And I know that this is bad, and that for performance we should not put too much information to page cache (see borgbackup/borg#7674 (comment) ). So I think FuturesOrdered solution will be better.

Conclusions/questions:

  • How should I solve my problem? Are my proposed solutions best?
  • I think some solutions to my problems (i. e. something similar to FuturesOrdered) should be added to rayon. Or at least rayon docs should tell user what he should do in such situation, i. e. to tell him that if his data doesn't fit to memory, then he probably should use some solution X
  • Is well-known principle "Use rayon and threads for CPU-bounded tasks and async for I/O bounded tasks" still true? If not, then please write to rayon docs what user should use. For example, write something like this: "If your data doesn't fit to memory, then use X even if you task is CPU-bounded"
@safinaskar
Copy link
Author

It seems I know how to solve this with rayon. I will possibly open pull request today or tomorrow

@adamreichold
Copy link
Collaborator

I think it would be much easier if you used positioned I/O, i.e. just iterate over 0..num_chunks using par_iter().for_each(..) and compute the correct offset to read from based on the chunk number to then call read_exact_at. This should also parallelize your I/O requests somewhat giving the kernel's I/O scheduler more parallelism to work with. The number of chunks and their offsets do depend only on the length of the file after all.

@adamreichold
Copy link
Collaborator

(Actually, if the file is really large, you might even consider using mmap together with par_chunks.

@safinaskar
Copy link
Author

Okay, so I wrote my solution here #1071 and I will publish it to crates.io in some point of future. I'm closing this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants