Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FR: Parallel to sequential iterator #210

Open
Storyyeller opened this issue Jan 15, 2017 · 31 comments
Open

FR: Parallel to sequential iterator #210

Storyyeller opened this issue Jan 15, 2017 · 31 comments

Comments

@Storyyeller
Copy link

Storyyeller commented Jan 15, 2017

It would be nice if there was a way to convert a parallel to a sequential iterator. Fold/reduce can only be used in cases where the reduction is associative, which rules out e.g. performing I/O on every element in order, as the I/O is a global state. One possibility is to collect() into a vec, but then you have to wait until every element is computed before you can process the first element of the results.

@cuviper
Copy link
Member

cuviper commented Jan 16, 2017

Hmm. It seems to me that you'd still need a buffer big enough for the whole iteration, as if you did a normal collect() into a vector. But if I understand, you want to start iterating that collected result in order, with as little latency as possible. I'm not sure how to accomplish that though.

I wonder if futures (#193) can help with this? I'm thinking roughly that each time we split, it could be represented as some kind of left_future.map(your_fn).and_then(right_future).map(your_fn). So the future polling outside the threadpool might naturally come out serialized. I haven't really used futures though, so imagine I'm saying this with very wild hand-waving. :)

@Storyyeller
Copy link
Author

Storyyeller commented Jan 16, 2017

In the worst case, it would devolve into just collecting into a vec. But usually, the first item will be finished early and so on. But you do need some sort of buffer. I'm not sure whether it makes sense to have one big buffer, or a linked list of buffers (one per split consumer). It depends on how much you care about being able to drop parts of the buffer early. In the worst case, you still need storage equal to the entire list of results, so it probably won't make much difference either way.

@nikomatsakis
Copy link
Member

nikomatsakis commented Jan 17, 2017

Hmm, I think the right answer is perhaps a futures stream. That is, you could convert a par iter into a stream. Streams are the next thing that I want to figure out after #212 lands.

@nikomatsakis
Copy link
Member

nikomatsakis commented Jan 17, 2017

But it'd be worth drilling a bit more into what you want. In particular, do you want this parallel computation to run asychronously -- that is, disconnected from any particular stack frame? Or is it part of some function? Reading your original request, it sounds like you want something like foreach, but which executes in-order? I've thought about adding that, and I suspect it would be much easier than converting into an iterator.

@Storyyeller
Copy link
Author

Storyyeller commented Jan 17, 2017

Yes, foreach is also ok.

@jswrenn
Copy link

jswrenn commented Mar 15, 2017

This would be very useful for me, too. My use-case: programs that read lines from a file (sequentially), perform an expensive computation independently on each line (parallelizable), and then must print out the results in exactly the same order.

@0x7CFE
Copy link

0x7CFE commented Apr 9, 2017

and then must print out the results in exactly the same order.

Why not sort the output later then? Assign an atomic number to each line being read, perform the computation and write that number to the output. Later, when data is processed all you need to do is to sort lines by that number. Much like map-reduce.

Of course, that may not be convenient solution for you, so just in case.

@Storyyeller
Copy link
Author

Storyyeller commented Apr 9, 2017

I'm not sure about jswrenn's case, but in my case, I want to access the first result as soon as it is ready, rather than having to wait until the entire vector is processed.

@jswrenn
Copy link

jswrenn commented Apr 9, 2017

Why not sort the output later then?

In my case, I'm piping stdout to another process which reads input line-by-line.

@amandasaurus
Copy link

amandasaurus commented May 26, 2017

I would be interested in this too. I'd like to be able to get the results out, but order isn't important to me. I am using rayon to map in parallel, and I know I have a 1:1 correspondence, so I'd like to be able to print out some form of feedback to the user (Done X of Y items...).

@rocallahan
Copy link
Contributor

rocallahan commented Feb 26, 2018

Would it work to implement this as a Consumer that buffers incoming Items until all left-ward copies of the same Consumer have finished processing, and then processes the buffered Items? That strategy assumes that if you use with_max_len(1) then Rayon will start processing items roughly in-order from the start of the iterator, but it's not clear if such ordering is guaranteed.

@cuviper
Copy link
Member

cuviper commented Feb 26, 2018

@rocallahan The current design requires a parallel Producer as well -- I'm not sure how that would fit with what you're suggesting. The general issue here is how to split an arbitrary iterator in parallel, and once you have that, then all the normal consumers should Just Work.

@rocallahan
Copy link
Contributor

rocallahan commented Feb 26, 2018

FWIW I tried implementing parallel ordered output using a Rayon for_each which writes to an "ordered output sink" --- an object which buffers output so that we don't output item N until all items 0..N-1 have been output. I also used with_max_len(1). It didn't work because Rayon's work stealing meant threads picked up items from all over the input range and we had to buffer massive amounts of data. So maybe Rayon just isn't compatible with my needs. I solved my problem using a simple thread pool where each thread always takes the lowest-numbered input item to work on.

@dlukes
Copy link

dlukes commented Mar 27, 2018

I think my use case would be very similar to that of @jswrenn, so just to put it in terms of code -- I have this:

stdin_guard
    .lines()
    .filter_map(|result| match result {
        Ok(line) => Some(expensive_cpu_bound_operation(&line)),
        Err(_) => panic!("error reading line"),
    })
    .for_each(|line| {
        writeln!(buffered_stdout, "{}", line).expect("error writing line");
    });

And I would like to be able to do something like this:

stdin_guard
    .lines()
    .par_iter()  // switch to parallel iteration
    .filter_map(|result| match result {
        Ok(line) => Some(expensive_cpu_bound_operation(&line)),
        Err(_) => panic!("error reading line"),
    })
    .seq_iter()  // switch back
    .for_each(|line| {
        writeln!(buffered_stdout, "{}", line).expect("error writing line");
    });

Which would make the operation as easy and straightforward as the first example in Rayon's README, which is very compelling :)

Collecting + sorting is not really an option, as the input stream is potentially infinite (well, very long, in practice).

@gbutler69
Copy link

gbutler69 commented Jun 15, 2018

This seems like it could be implemented by "seq_iter" simply creating a growable, circular queue that is then processed as the results of the "seq_iter" by the next function ("for_each" in this case). Ideally, "seq_iter" could take an initial size and/or a maximum size. If it grows to the maximum size, it would apply back-pressure on the parallel processing until it was drained sufficiently (high-water mark/low-water mark sort of thing). Of course, this would require that "seq_iter" has some mechanism provided by Rayon for applying back-pressure. This back-pressure would cause the number of worker threads to suspend until given the OK to proceed. Would something like that work for you?

@dlukes
Copy link

dlukes commented Jun 18, 2018

Yes, that sounds promising :) The only thing is, there's nothing in there guaranteeing the ordering of the items after seq_iter(), is there?

Though now that I think of it, it may not be necessary in some applications, so carting around the ordering information and reconstructing it should probably be opt-in...

@gbutler69
Copy link

gbutler69 commented Jun 18, 2018

The only useful way to still have ordering after the parallel processing is to:

  1. Process and Re-sort the output (which requires enough space for the entire result set)
  2. Have multiple "queues" (1 for each parallel thread) and take the "Next" (in order) by examining the head of each queue for the "Next" one repeatedly

Either of those cases will require much more space and be much less likely to gain much benefit overall from the parallel processing. The single circular, growable queue has the benefit of maximizing the concurrency of the parallel part while allowing the processing rate of the sequential part to slow the parallel processing down (if needed) to prevent too much memory usage.

Above, option number 1 is equivalent to just collecting to a Sorted Map and then processing those results, so, if that is your requirement, just do that. Option 2 could, in some circumstances be useful, but, the likelihood of it using significant amounts of memory while requiring significant processing overhead makes it likely to be unuseful.

@dlukes
Copy link

dlukes commented Jun 18, 2018

Collecting all the results is unfortunately not an option for me :(

I thought maybe it would be possible to wrap all of the items in Futures and pass these through in the same order while waiting for rayon's thread pool to do the work. In the par_iter() phase, computations for many items could run in parallel; in the seq_iter() phase, later Futures would have to wait until earlier Futures have been resolved before being handled.

But my mental model of how futures work is still hazy at best, maybe they simply don't allow you to do this, or the overhead would dwarf the gains of parallelization, or maybe this whole idea stinks of deadlock...

@gbutler69
Copy link

gbutler69 commented Jun 18, 2018

Rayon works like this:

  • You have a bunch of identical tasks that need completed
  • You have a budget to hire people to do the work (think number of CPU threads available)
  • You decide to make an in-box for each worker you hire
  • each worker, takes the next task item from their in-box, performs the task, and puts it in a collective out-box
    • If a particular worker gets done with all their work in their in-box, they steal something from one of the other workers in-boxes
  • Rayon takes tasks from the out-boxes and puts it in the in-boxes of the next stage of the processing, but, the same workers need to handle those in-boxes as well (just different work to do) in the same fashion
  • If you wanted to serialize, you'd only have 1 in-box for the next stage and only 1 worker dedicated to that in-box (sequential)

Given that, do you see how keeping things sorted just wouldn't work without significant overhead that would likely ruin the gains from parallelism?

@dlukes
Copy link

dlukes commented Jun 18, 2018

@gbutler69 Thanks for taking the time to spell this out!

@TheSilvus
Copy link

TheSilvus commented Jun 1, 2019

Is there a way to do this (non-parallel folding in my case) if I do not require the order of items to stay the same?

@cuviper
Copy link
Member

cuviper commented Jun 1, 2019

Unordered, you could use a channel. Spawn a parallel for_each writing the Sender, then iterate on the Receiver.

@dansanduleac
Copy link

dansanduleac commented Jun 8, 2019

Noting that if you try to do that naively, you'll run into the fact that Sender isn't Sync, whereas ParallelIterator's for_each requires the function to be Send + Sync. However, luckily for_each_with is meant to help with exactly that.

@nwtgck
Copy link

nwtgck commented Jul 5, 2019

Hi, everyone! I'm new to Rayon and also Rust.

I made a piece of code which may solve this feature.

Implementation

Here is the implementation. Everything is in lib.rs below.
https://github.com/nwtgck/rayon-seq-iter/blob/42c4cba29aa53fadf2a0f3b84bffed6c2148f926/src/lib.rs#L1-L66

Usage

API looks like the following.
(doubled numbers (10~19) and prints in the original order)

// Create a par iter
let par_iter = (10..20).into_par_iter().map(|x| x * 2);
// Convert to iter
let iter = par_iter.into_seq_iter(num_cpus::get());
// Iterate by normal for
for x in iter {
    println!("item: {}", x);
}

testing: https://github.com/nwtgck/rayon-seq-iter/blob/42c4cba29aa53fadf2a0f3b84bffed6c2148f926/src/lib.rs#L74-L80

Output

Here is the output.

item: 20
item: 22
item: 24
item: 26
item: 28
item: 30
item: 32
item: 34
item: 36
item: 38

CI status

CI has passed: CircleCI.

If it's useful, I'd like Rayon to have this feature officially.

@cuviper
Copy link
Member

cuviper commented Jul 5, 2019

OK, so as you commented, that work is based on this post:
https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/2

That idea is to feed a channel of enumerated values into a binary min-heap, and wait for the appropriate next index to come through.

The biggest thing I fear with making this "official" is that it won't compose well within the pool. That is, if the sequentializing receiver is on a thread in the pool, it's a bad idea to let that block on the channel. Maybe we can make that use try_iter/try_recv instead, and participate in thread pool when that's empty, but I also fear dependency inversion from work stealing, like #592.

Not to say this is insurmountable, but we need to be careful.

@kevincox
Copy link

kevincox commented Nov 6, 2019

I think there are two concerns here depending on if you want order to be preserved or not. Both are valid use cases. For the non-preserving case I want to get items as they finish. I think it is a common use case to do something serially with the results of your computation. Basically I want a better, and blessed version of:

fn into_iter<T>(iter: impl rayon::ParallelIterator<Item=T>) -> Iterator<Item=T> {
  let (send, recv) = std::sync::mpsc::sync_channel();
  thread::spawn(move || iter.for_each(|el| { let _ = send.send(el); });
  recv.into_iter()
}

I think the above is pretty good but it can be improved by avoiding the extra thread and canceling the rest of the parent iterator rather then computing the remaining elements then throwing them away.

@nickbabcock
Copy link

nickbabcock commented Nov 7, 2019

In case there are others who would like to see an example of printing the parallel iterator to a buffered stdout (unordered output) via a for:

let json_lines = unimplemented!(); // a parallel iterator of strings

let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads());

// Spawn a thread that is dedicated to printing to a buffered stdout
let thrd = std::thread::spawn(|| {
    let stdout = io::stdout();
    let lock = stdout.lock();
    let mut writer = BufWriter::new(lock);
    for json in recv.into_iter() {
        let _ = writeln!(writer, "{}", json);
    }
});

let send = json_lines.try_for_each_with(send, |s, x| s.send(x));
if let Err(e) = send {
    eprintln!("Unable to send internal data: {:?}", e);
}

if let Err(e) = thrd.join() {
    eprintln!("Unable to join internal thread: {:?}", e);
}

@benkay86
Copy link

benkay86 commented Feb 20, 2021

Recently encountered this issue myself and discussed some workarounds that may be helpful to others encountering this limitation of Rayon. You can use ParallelSlice::par_windows() to perform sequentially-dependent non-associative operations, but will need the entire slice of data in memory at least once for this to work. In memory-constrained situations you can use Itertools::tuple_windows() but will have to sacrifice some degree of parallelism or accept some amount of redundant computations.

I agree this feature would be very useful (e.g. for computing derivatives or convolving with kernels), but agree with above comments that we would want to implement this carefully in a way that is performant and composes well within the threadpool.

@phiresky
Copy link

phiresky commented May 16, 2021

In my case I had a iterator that is processed with a heavy operation in parallel, and then needs to be read in chunks in a sequential loop. So I basically wanted a .collect<Iter<Vec<T>> that collects a parallel iterator into a sequential iterator of chunks of a specific size. Here's an example:

    let par_iter = walkdir::WalkDir::new("...pathname")
        .into_iter()
        .par_bridge()
        .map(heavy_operation); // heavy operation in parallel
    for e in CollChunks::new(par_iter, 1000, 1000) {
        // collected into sequential chunks
        println!("got chunk with {} items", e.len());
    }

and here's the implementation:

struct CollChunks<T: Send> {
    chunk_size: usize,
    cur_chunk: Vec<T>,
    receiver: std::sync::mpsc::IntoIter<T>
}
impl<T: Send + 'static> CollChunks<T> {
    fn new<I>(par_iter: I, chunk_size: usize, queue_len: usize) -> Self
    where
        I: IntoParallelIterator<Item = T>,
        <I as rayon::iter::IntoParallelIterator>::Iter: 'static,
    {
        let par_iter = par_iter.into_par_iter();
        let (sender, receiver) = std::sync::mpsc::sync_channel(queue_len);
        // send all the items in parallel
        std::thread::spawn(move || {
            par_iter.for_each(|ele| sender.send(ele).expect("receiver gone?"))
        });
        let coll = CollChunks {
            receiver: receiver.into_iter(),
            chunk_size,
            cur_chunk: Vec::with_capacity(chunk_size),
        };
        coll
    }
}
impl<T: Send> Iterator for CollChunks<T> {
    type Item = Vec<T>;
    fn next(&mut self) -> Option<Self::Item> {
        // receive items until we have a full buffer or sender is done
        while self.cur_chunk.len() < self.chunk_size {
            match self.receiver.next() {
                Some(e) => self.cur_chunk.push(e),
                None => {
                    if self.cur_chunk.len() > 0 {
                        break; // last chunk
                    } else {
                        return None; // done
                    }
                }
            }
        }
        let mut new_vec = Vec::with_capacity(self.chunk_size);
        std::mem::swap(&mut self.cur_chunk, &mut new_vec);
        return Some(new_vec);
    }
}

No guarantees of course, but seems to work

@dpc
Copy link

dpc commented Jan 8, 2022

In case someone is looking here in need of parallel processing over iterator items preserving order (and without storing the data in the collection first), I've been working on a library that does it. Unlike rayon it's not work-stealing based, which comes with different tradeoffs: https://github.com/dpc/pariter

@venth
Copy link

venth commented Feb 20, 2022

I've also liked to converge parallel iterator results into sequential one. The implementation is similar to @phiresky. I've managed to avoid static lifetime. Unfortunately, I don't have an idea how to handle errors that can happen during processing and sending results to crossbeam::channel::unbounded.

Additionally production from a parallel iterator starts only when the sequential iterator is pulled.

Still, I've issues with this code. For example, I don't know how to check how it will behave when an issue while sending or receiving will happen.

use core::option::Option;
use std::iter;

use crossbeam::channel;
use crossbeam::channel::{Receiver, Sender};
use rayon::iter::ParallelIterator;

pub trait IntoSequentialIteratorEx<'a, T: Sized>: Sized {
    fn into_seq_iter(self) -> Box<dyn 'a + Iterator<Item=T>>;
}

impl<'a, T, PI> IntoSequentialIteratorEx<'a, T> for PI
    where
        T: 'a + Send,
        PI: 'a + ParallelIterator<Item=T>,
{
    fn into_seq_iter(self) -> Box<dyn 'a + Iterator<Item=T>> {
        let (sender, receiver) = channel::unbounded();

        Box::new(deferred_first_element(self, sender, receiver.clone())
            .chain(deferred_remaining_elements(receiver)))
    }
}

fn deferred_first_element<'a, T: 'a + Send, PI: 'a + ParallelIterator<Item=T>>(
    par_iter: PI,
    sender: Sender<T>,
    receiver: Receiver<T>) -> Box<dyn 'a + Iterator<Item=T>>
{
    let deferred = iter::once(Box::new(move || {
        crossbeam::scope(|s| {
            s.spawn(|_| {
                par_iter.for_each(|element| {
                    sender.send(element).unwrap();
                });

                drop(sender);
            });
        }).unwrap();

        receiver.recv().ok()
    }) as Box<dyn FnOnce() -> Option<T>>);

    Box::new(deferred
        .map(|f| {
            f()
        })
        .filter(Option::is_some)
        .map(Option::unwrap))
}

fn deferred_remaining_elements<'a, T: 'a + Send>(receiver: Receiver<T>) -> Box<dyn 'a + Iterator<Item=T>> {
    Box::new(
        iter::repeat_with(move || {
            receiver.recv().ok()
        })
            .take_while(Option::is_some)
            .map(Option::unwrap))
}

#[cfg(test)]
mod tests {
    use itertools::Itertools;
    use rayon::iter::{IntoParallelIterator, ParallelBridge};

    use super::*;

    #[test]
    fn does_noting_for_an_empty_iterator() {
        // when
        let result = Vec::<i32>::new().into_par_iter().into_seq_iter().collect_vec();

        // then
        assert_eq!(Vec::<i32>::new(), result);
    }

    #[test]
    fn iterates_over_whole_iterator_range() {
        // given
        let elements = 120;

        // when
        let result = iter::repeat(12)
            .take(elements)
            .par_bridge()
            .into_seq_iter()
            .count();

        // then
        assert_eq!(elements, result);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests