Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ser_bridge: Opposite of par_bridge #858

Open
hniksic opened this issue May 14, 2021 · 22 comments
Open

ser_bridge: Opposite of par_bridge #858

hniksic opened this issue May 14, 2021 · 22 comments

Comments

@hniksic
Copy link
Contributor

hniksic commented May 14, 2021

In summary, I would like to propose adding ser_bridge(), a bridge that converts ParallelIterator into an ordinary Iterator, sort of the opposite of par_bridge(). In this issue I present a toy implementation and would like to inquire about the feasibility of providing a production-ready one.

Rayon already provides par_bridge(), an extension to Iterator that converts any Iterator into a ParallelIterator. In most cases you can then proceed to transform the parallel iterator, ending with a consuming operation like fold(), sum() or for_each(). But in some cases you want to funnel the values produced by a parallel iterator back into an ordinary iterator, e.g. in order to feed them to a single-threaded resource. This is expressed in this StackOverflow question (which also proposes the name ser_bridge()), but also in my own usage of Rayon.

The goal would be to enable writing code like this:

let mut sink = ...;
(0..1_000_000)           // start with sequential operation
    .into_par_iter()     // parallelize it
    .map(|n| (n as f64) * (n as f64))   // do expensive transformation in parallel
    .ser_bridge()        // and serialize it again
    .try_for_each(|n| sink.write_all(n.to_string().as_bytes()))?;

Note that sink.write() requires mutable, and therefore single-threaded, access. Although one could imagine this particular case being solved by putting the sink in an Arc<Mutex> or a channel (the latter being the resolution of the SO question), in many cases it is much more convenient to obtain an iterator. If nothing else, it allows us to pass the result of the processing to a function that expects an iterator, such as those from the itertools crate.

If this is considered useful, I would like to submit this toy implementation for review:

use crossbeam_channel::Receiver;
use rayon::prelude::*;

pub trait SerBridge<T>
where
    T: Send + 'static,
    Self: ParallelIterator<Item = T> + 'static,
{
    fn ser_bridge(self) -> SerBridgeImpl<T> {
        SerBridgeImpl::new(self)
    }
}

impl<PI, T> SerBridge<T> for PI
where
    T: Send + 'static,
    PI: ParallelIterator<Item = T> + 'static,
{
}

pub struct SerBridgeImpl<T> {
    rx: Receiver<T>,
}

impl<T: Send + 'static> SerBridgeImpl<T> {
    pub fn new<PI>(par_iterable: impl IntoParallelIterator<Item = T, Iter = PI>) -> Self
    where
        PI: ParallelIterator<Item = T> + 'static,
    {
        let par_iter = par_iterable.into_par_iter();
        let (tx, rx) = crossbeam_channel::bounded(0);
        std::thread::spawn(move || {
            let _ = par_iter.try_for_each(|item| tx.send(item));
        });
        SerBridgeImpl { rx }
    }
}

impl<T> Iterator for SerBridgeImpl<T> {
    type Item = T;

    fn next(&mut self) -> Option<T> {
        self.rx.recv().ok()
    }
}

This implementation has the following nice properties:

  • simple;
  • uses rendezvous channel for backpressure and to avoid values getting stuck in an intermediate buffer;
  • correctly handles the returned iterator not getting exhausted to the end - dropping SerBridgeImpl will also drop the receiver and cause ParallelIterator::try_for_each to return an error, terminating the parallel iteration.

The downside, and the reason I consider it a toy implementation, is that it creates a whole new thread for the bridging.

My questions are:

  • Do the Rayon maintainers consider this an area worth pursuing?
  • If so, what would be the correct way to submit the pulling of the values to the Rayon thread pool (without causing deadlocks)? Is it possible to do this without a separate task?
  • Is it acceptable to depend on crossbeam_channel?
@nikomatsakis
Copy link
Member

I think I would prefer a name like into_serial_iter() or something, but I could see the concept being useful. Your implementation looks like what I would expect.

@hniksic
Copy link
Contributor Author

hniksic commented May 15, 2021

@nikomatsakis Thanks for the review. I am of course open to a different name. I would propose into_ser_iter for brevity and analogy with into_par_iter. Also, we could even implement IntoIterator for ParallelIterator, though that might make it too easy to introduce serialization by accident (e.g. end a par_iter chain with .into_iter().sum() instead of just sum() etc.)

Your implementation looks like what I would expect.

Could the implementation use the Rayon thread pool? I'm really uncomfortable with each into_ser_iter() unconditionally creating a new thread, but I'm not sure it's safe to use a thread from the Rayon thread pool either, due to possibility of deadlocks. E.g. if all threads in the pool are waiting in Sender::send(), will there be anyone left to make progress? I don't know enough about Rayon's architecture to predict whether this is a problem in practice, but I wouldn't want to introduce a deadlock that can occur through natural usage of the API.

@nikomatsakis
Copy link
Member

Hmm, I admit I hadn't read the code that closely. I'm thinking a bit more about this now. I had expected to use a channel for this, yes, but I'm wondering if there isn't a kind of hazard where using this on a Rayon worker thread would compose poorly and create deadlock hazards.

(As far as naming, I would definitely not want to implement IntoIterator, too footgun-prone.)

@wagnerf42
Copy link
Contributor

it would be cleaner to have blocks handling algorithms in rayon. it needs some discussions though.

@cuviper
Copy link
Member

cuviper commented May 17, 2021

See also #210. I would like to solve this issue, but I'm really wary of how this will compose. It's not too hard to write something that will work with specific assumptions, but we need to be generally robust with what we add to Rayon's API. Even par_bridge already struggles there, e.g. people have gotten stuck with a "serial" iterator that does some parallel processing internally, and then work-stealing inverts the dependency.

@hniksic
Copy link
Contributor Author

hniksic commented May 18, 2021

@cuviper I agree that we should be very careful not to introduce deadlocks. This is why the proof-of-concept implementation unconditionally spawns a new thread, which guarantees deadlock-free operation at the cost of, well, unconditionally spawning a new thread. I would hope that a better implementation is possible without significant changes to Rayon's internals, but it's far from obvious (to me) how to write one. But I'm willing to invest time into it, so if you have some pointers where to look at, they'd be appreciated!

Thanks for the pointer to #210, I missed that issue previously. The discussion there is mostly about whether the implementation should preserve order. I think it shouldn't because an order-preserving parallel-to-serial bridge is both harder to implement and requires performance tradeoffs (if it can even be made to avoid both unbounded allocation and possibility of deadlock). I think the proof-of-concept implementation in this issue is better than the brief one given in a comment there because this one terminates iteration when the sequential iterator is dropped.

This issue is technically a duplicate of #210. If you want, we can keep this one open and make it specifically about the implementation strategy for the non-order-preserving sequential iterator, keeping the semantics pegged to that of the proof-of-concept (and fixing the name). On the other hand, if you'd prefer to keep the design possibilities open, then this issue should be closed as duplicate of #210.

@hniksic hniksic changed the title par_bridge: Opposite of ser_bridge ser_bridge: Opposite of par_bridge May 18, 2021
@cuviper
Copy link
Member

cuviper commented May 19, 2021

This is why the proof-of-concept implementation unconditionally spawns a new thread, which guarantees deadlock-free operation at the cost of, well, unconditionally spawning a new thread.

Well...

  • uses rendezvous channel for backpressure and to avoid values getting stuck in an intermediate buffer;

This leaves a deadlock hazard that the serial side must not call into rayon in any other way. If the entire threadpool is blocked on the channel, waiting for the Iterator to consume something, then other rayon calls will get queued up and blocked.

  • correctly handles the returned iterator not getting exhausted to the end - dropping SerBridgeImpl will also drop the receiver and cause ParallelIterator::try_for_each to return an error, terminating the parallel iteration.

Rust can't generally rely on drop for safety, which is why mem::forget is a safe function. As long as that iterator lives at all, leaked or even just neglected, the pool will be stuck. I guess here it's not really a safety concern per se, "just" a deadlock, but still.

I would hope that a better implementation is possible without significant changes to Rayon's internals, but it's far from obvious (to me) how to write one. But I'm willing to invest time into it, so if you have some pointers where to look at, they'd be appreciated!

It's not obvious to me either, or I probably would have attempted it already. 😉

@hniksic
Copy link
Contributor Author

hniksic commented May 20, 2021

This leaves a deadlock hazard that the serial side must not call into rayon in any other way[...]

Good point, I didn't think of that. Perhaps we could avoid it by replacing the blocking send() with a loop that calls try_send():

        let par_iter = par_iterable.into_par_iter();
        let (tx, rx) = crossbeam_channel::bounded(0);
        std::thread::spawn(move || {
            let _ = par_iter.try_for_each(|mut item| {
                loop {
                    match tx.send(item) {
                        Err(TrySendError::Full(returned_item)) => item = returned_item,
                        other => return other,  // success or disconnect
                    }
                    rayon::yield();  // do other Rayon business here
                }
            });
        });

Something like rayon::yield() doesn't seem to exist in the public API, but might exist internally?

  • correctly handles the returned iterator not getting exhausted to the end - dropping SerBridgeImpl will also drop the receiver and cause ParallelIterator::try_for_each to return an error, terminating the parallel iteration.

Rust can't generally rely on drop for safety,

That's clear, though the linked article is mostly about memory safety. Here by "correctly" I meant the code handles an explicit drop by actually noticing when the channel is dropped and terminating the iteration by returning an error from try_for_each(). (Some toy implementations of into_ser_iter() don't even do that much).

If the serial iterator is neither dropped nor exhausted, our options are somewhat limited. It would be nice to be able to apply backpressure in a clean way, but barring that, the option of calling rayon::yield() to avoid deadlock seems workable.

Do you think the above approach could work? In that case it might actually be ok to offload the work using rayon::spawn() instead of thread::spawn(), which would resolve the other concern I had with the proof-of-concept impl.

@cuviper
Copy link
Member

cuviper commented May 21, 2021

Something like rayon::yield() doesn't seem to exist in the public API, but might exist internally?

It doesn't, but there's a note in the par_bridge() that we might want it, #548 -- and oh hmm, I even had a working branch there. I'm going to have to think about why I never pursued that further...

It's not a total panacea though. If there's not other work available, then it becomes a busy-loop burning CPU. Or else we could try some sleep heuristic, since we don't have a precise way to wake up between both external (crossbeam) events and internal work-available events.

I also fear that work-stealing might end up quickly stealing from the same set jobs of this iterator, getting them all recursively stacked on one thread instead of spreading out on all threads. That's also that hazard if we yielded in par_bridge().

In that case it might actually be ok to offload the work using rayon::spawn() instead of thread::spawn(),

Oh, both of those unfortunately require 'static too; I glossed over that before. Maybe there's a way to restructure this with in_place_scope to spawn and then callback to some FnMut(&mut SerBridgeImpl), but that's getting messy.


Compromise upon compromise -- but I'm glad to have you exploring this!

@hniksic
Copy link
Contributor Author

hniksic commented May 21, 2021

It's not a total panacea though. If there's not other work available, then it becomes a busy-loop burning CPU.

True, and such an issue might be lurking behind #795 in par_bridge() which I reported previously. Busy-looping is certainly better than deadlock, but to support this properly there should be built-in support for backpressure, probably both in par_bridge() and here.

I can't tell how big an issue recursive stealing is.

Oh, both of those unfortunately require 'static too

It never occurred to me that it might not be acceptable to require 'static. I'm not sure if that can be avoided while keeping the current interface. As specified, the adapter completely moves the processing to the background, leaving the current thread to interact with the serial iterator. Since the current thread must remain active, into_ser_iter() itself cannot wait for the parallel processing to finish, as I understand the scope-based interfaces imply.

@cuviper
Copy link
Member

cuviper commented May 21, 2021

Requiring 'static rules out a lot of borrowed iterators, especially any par_iter() or par_iter_mut() -- unless those started with 'static data, which would be unusual. And yes, that does have real implications for the interface. Maybe there could be one "easy" conversion for 'static iterators, and another callback-oriented mode for the borrowed case, hopefully sharing implementation details underneath.

@hniksic
Copy link
Contributor Author

hniksic commented May 21, 2021

Maybe there could be one "easy" conversion for 'static iterators, and another callback-oriented mode for the borrowed case

That makes sense. We already have the callback-oriented interface of sorts, ParallelIterator::for_each() - but it requires an Fn callback. The conversion to full iterator allows great freedom with using the iterator, but requires 'static on the original parallel iterator. There could also be a middle ground, a callback-based interface like for_each, that doesn't require 'static bound on the parallel iterator, but still allows FnMut non-Send closure (because it executes the callback in the current thread). There are already crates that define such interfaces, sometimes called internal iterators.

Maybe this option should be offered separately, after issues with the ordinary iterator (requiring 'static) are worked out.

@nikomatsakis
Copy link
Member

My inclination right now is not to support this-- I suspect that gathering into a Vec will typically be faster.

@hniksic
Copy link
Contributor Author

hniksic commented May 24, 2021

[...] I suspect that gathering into a Vec will typically be faster.

@nikomatsakis The trouble is some iterators produce a huge number of items that doesn't fit into available memory, and that you have to process as they are being generated. Of course, batching results into intermediate Vecs of fixed size works, but is not as convenient to express.

@cuviper
Copy link
Member

cuviper commented May 24, 2021

Can you fold them at all into partial results?

@hniksic
Copy link
Contributor Author

hniksic commented May 24, 2021

Can you fold them at all into partial results?

I don't think so. In one case we for_each() them on the parallel iterator passing it a closure with side effects (still Fn because it uses fine-grained locks as described here). In other cases the items get filtered, turned into fixed-size records containing just numbers, and dumped to file. The disk-saving part is an example where something like ser_bridge() would come in useful, currently we just use channels manually.

kmeisthax added a commit to kmeisthax/ruffle that referenced this issue Sep 8, 2021
…tor.

This relies on a proposed serial bridge as per rayon-rs/rayon#858. As that project is licensed the same as Ruffle itself and the submitter intended it to be included in Rayon, I believe it's legal to copy this code.
@kmeisthax
Copy link

I can concur with @hniksic's assertions here. I'm currently using Rayon in a PR to Ruffle's scanner binary (which you may know about because GitHub decided my commit message should get tagged in here three times). The proposed serial bridge impl (which I copied into my PR, assuming that's OK) makes it very easy to report partial results in case, say, the scan is cancelled halfway through; or something irrecoverably panics the host process.

I eventually implemented multiprocess scanning to deal with unrecoverable panics, so that alone isn't as much of a concern, but being able to terminate the scan and still get partial data out of it is still useful. Batching everything up into a Vec wouldn't work for this use case. Keep in mind that this is a process intended to run on ~170,000 Flash files, to check for bugs in Ruffle, and it takes 5 hours to complete even with 16 cores.

My untested opinion about performance/thread-scalability, at least in my use case, is that the internal channel lock this uses will probably be less contended for than, say, if I had Arc<Mutex>'d the CSV writer I'm using so that I could share it across all threads. If I had done it the latter way, then each process thread would have to wait for file I/O to finish, which would be a bottleneck at high core counts. The internal channel in SerBridge can presumably buffer some number of results for the mainthread to write at a later time.

However, if this use case is considered too niche, it might make more sense to package this code up as a library crate.

Herschel pushed a commit to ruffle-rs/ruffle that referenced this issue Sep 11, 2021
…tor.

This relies on a proposed serial bridge as per rayon-rs/rayon#858. As that project is licensed the same as Ruffle itself and the submitter intended it to be included in Rayon, I believe it's legal to copy this code.
@Congyuwang
Copy link

Congyuwang commented Oct 25, 2021

Similar use case. I currently need to process 400-500 GB of blockchain data. All the file reading and decoding can be done in parallel, but certain steps of computation must follow chronological order.

So, if I can do something like:

(0...end)
    .into_par_iter()
    .map(|h| read_block_number(h))    // <- parallel
    .into_sync_ser_iter()
    .map(|block| do_something(block) // <~ sequential

Or even something like (chained):

(0...end)
    .into_par_iter()
    .map(|h| read_block_number(h))    // <- parallel
    .into_sync_ser_iter()
    .map(|block| do_something(block) // <~ sequential
    .into_par_iter()
    .map(|h| read_block_number(h))    // <- parallel
    .into_sync_ser_iter()
    .map(|block| do_something(block) // <~ sequential
    .into_par_iter()
    .map(|h| read_block_number(h))    // <- parallel
    .into_sync_ser_iter()
    .map(|block| do_something(block) // <~ sequential

I somehow implemented a tiny crate for my own usage: https://github.com/Congyuwang/Synced-Parallel-Iterator.

I assign each worker a synced channel to buffer their output,
and a registry channel to record thread order when each worker thread is polling a new task.

So, when I consume this iterator, I can consult this registry channel which worker buffer to read from.

I somehow cannot get rid of a Mutex lock of guarding the upstream iterator
to ensure the registry record the correct task polling order, which seems a bit expensive though:

worker -> lock upstream iterator -> call `next()` -> `send` its own thread ID to `registry` -> release lock (of upstream iterator) -> process its task -> store result in its own buffer

#893

@safinaskar
Copy link

I described various solutions to this problem here: #1070

@safinaskar
Copy link

Here is my solution: #1071

@hniksic
Copy link
Contributor Author

hniksic commented Jul 6, 2023

@safinaskar FWIW I don't think #1071 resolves this. This issue is about funneling a parallel iterator into a serial one, the inverse of par_bridge(). While the example given uses .into_par_iter() on a sequence followed by a .map() and .ser_bridge(), that's just an example. The idea of ser_bridge() is that it works on Parallelterator, so one can use iterator combinators such as fold() and flat_map() before it. Your solution operates outside the parallel combinator ecosystem, and only supports .map().

@safinaskar
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants