Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add bridge from Iterator to ParallelIterator #550

Merged
merged 12 commits into from Jun 6, 2018

Conversation

QuietMisdreavus
Copy link
Contributor

@QuietMisdreavus QuietMisdreavus commented Mar 7, 2018

Half of #46

This started getting reviewed in QuietMisdreavus/polyester#6, but i decided to move my work to Rayon proper.

This PR adds a new trait, AsParallel, an implementation on Iterator + Send, and an iterator adapter IterParallel that implements ParallelIterator with a similar "cache items as you go" methodology as Polyester. I introduced a new trait because ParallelIterator was implemented on Range, which is itself an Iterator.

The basic idea is that you would start with a quick sequential Iterator, call .as_parallel() on it, and be able to use ParallelIterator adapters after that point, to do more expensive processing in multiple threads.

The design of IterParallel is like this:

  • IterParallel defers background work to IterParallelProducer, which implements UnindexedProducer.
  • IterParallelProducer will split as many times as there are threads in the current pool. (I've been told that Add ThreadPool::broadcast #492 is a better way to organize this, but until that's in, this is how i wrote it. >_>)
  • When folding items, IterParallelProducer keeps a Stealer from crossbeam-deque (added as a dependency, but using the same version as rayon-core) to access a deque of items that have already been loaded from the iterator.
  • If the Stealer is empty, a worker will attempt to lock the Mutex to access the source Iterator and the Deque.
    • If the Mutex is already locked, it will call yield_now. The implementation in polyester used a synchronoise::SignalEvent but i've been told that worker threads should not block. In lieu of pre-RFC: rayon::yield_now #548, a regular spin-loop was chosen instead.
    • If the Mutex is available, the worker will load a number of items from the iterator (currently (number of threads * number of threads * 2)) before closing the Mutex and continuing.
    • (If the Mutex is poisoned, the worker will just... stop. Is there a recommended approach here? >_>)

This design is effectively a first brush, has the same caveats as polyester, probably needs some extra features in rayon-core, and needs some higher-level docs before i'm willing to let it go. However, i'm putting it here because it was not in the right place when i talked to @cuviper about it last time.

Copy link
Member

@cuviper cuviper left a comment

This is exciting! Do you have any performance results using it?

match self.iter.try_lock() {
Ok(mut guard) => {
let count = current_num_threads();
let count = (count * count) * 2;
Copy link
Member

@cuviper cuviper Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description, you said twice the number of threads, but you're also squaring?

Copy link
Contributor Author

@QuietMisdreavus QuietMisdreavus Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that's my bad. The code is what i meant - that's the number i used in polyester so i carried it over here.


let (ref mut iter, ref deque) = *guard;

for _ in 0..count {
Copy link
Member

@cuviper cuviper Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reading exactly count items, how about while deque.len() < count? This way, I'm imagining that one thread could become the de facto reader thread, just pushing items as fast as the others can pop them out. But if the other threads are too busy, this thread will fill the deque up to count and then break out to resume processing items itself. This will be dynamic too, so when other threads get the time they'll start cooperating again.

Copy link
Contributor Author

@QuietMisdreavus QuietMisdreavus Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea! It saves on some locking costs compared to this version, too.

}
Err(TryLockError::WouldBlock) => {
// someone else has the mutex, just sit tight until it's ready
yield_now(); //TODO: use a thread=pool-aware yield? (#548)
Copy link
Member

@cuviper cuviper Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we'll want the integrated yield. One question will be sleeping though, which I don't think we can avoid. Even if we decided that rayon::yield_now doesn't directly sleep, the idea is that it would steal other jobs to work on, and then we have to allow that those could sleep.

If some of these worker threads could go to sleep, then we also need to wake them up, which is done with rayon-core's internal tickle. Another thing to expose somehow. We wouldn't necessarily need to tickle for every deque.push, but perhaps just when we're about to release the lock. "Hey, I filled the deque faster than anyone could drain it -- if you're still sleeping, come help!"

Copy link
Member

@cuviper cuviper Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if it goes to sleep on stolen work, then waking up for activity on this deque isn't so helpful. We have work, but you're stuck in a nested call elsewhere... hmm.

Maybe this is a problem to have a deque separate from the normal job deques. It feels neat that we could do everything without really changing rayon-core, but then it's not really integrated either.

It's a hard problem. :)

type Item = Iter::Item;

fn split(self) -> (Self, Option<Self>) {
let mut count = self.split_count.load(Ordering::SeqCst);
Copy link
Member

@cuviper cuviper Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think that broadcast will be more useful than these manually counted splits. We're not creating discrete tasks here, but really workers that make no sense to ever stack on the same thread.

yield_now(); //TODO: use a thread=pool-aware yield? (#548)
}
Err(TryLockError::Poisoned(_)) => {
// TODO: how to handle poison?
Copy link
Member

@cuviper cuviper Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to just return. The rayon-core internals should have caught whatever panic poisoned this lock, and will re-throw it as the tasks are re-joined.

Copy link
Contributor Author

@QuietMisdreavus QuietMisdreavus Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, right, that's good to know. I'll leave this as-is (and remove the TODO notice).

@QuietMisdreavus
Copy link
Contributor Author

QuietMisdreavus commented Mar 8, 2018

I don't have any specific performance numbers for this on hand, at least not any that don't just count the thread-spawn/mutex-lock costs on a job that would be handled much better sequentially. Maybe i should borrow a task from one of the people that requested this, so i have a proper task to measure. :P

@nikomatsakis
Copy link
Member

nikomatsakis commented Mar 12, 2018

I'd love to review this too! I'm adding it to my calendar for later this week, so I can give it some time.

@nikomatsakis
Copy link
Member

nikomatsakis commented Mar 27, 2018

OK -- I reviewed this with @QuietMisdreavus. Looks pretty good! I think we should change the name, though I don't know what name I want. Maybe parallelize? adapt_into_par_iter? Something else? Maybe an RFC?! :)

Also, we added some benchmarks. We found that for nbody is was approx. the same as the traditional par iter, but for game of life is was radically slower. D'oh! But that seems ok. We can add a caveat and tinker with it later. =)

I'd like to compare it to a version that uses scope/spawn too, but I think that for that to work well we might want to land the "unstable" extensions to rayon-core.

@nikomatsakis
Copy link
Member

nikomatsakis commented Apr 6, 2018

@QuietMisdreavus @cuviper I'm sorry i'm super slow, but I'd like to see this land -- I think we had settle on something involving the word "bridge" for the name, right?

I was thinking that most of the iterator combinators (e.g., map) take fn(self) but are not named into. Perhaps we should follow suite here, and name this bridge_to_par_iter?

@cuviper
Copy link
Member

cuviper commented Apr 9, 2018

On gitter, the "bridge" ideas were into_par_bridge and into_par_bridge_iter.

@QuietMisdreavus
Copy link
Contributor Author

QuietMisdreavus commented Apr 9, 2018

I kinda like the idea of just par_bridge? There's a parallel with par_iter, but it's distinct enough to highlight that it's separate from other parallel iterators.

@QuietMisdreavus QuietMisdreavus changed the title [WIP] add bridge from Iterator to ParallelIterator add bridge from Iterator to ParallelIterator Apr 25, 2018
@QuietMisdreavus
Copy link
Contributor Author

QuietMisdreavus commented Apr 25, 2018

I've pushed a commit that renames the trait from AsParallel to ParallelBridge (and the method name from as_parallel to par_bridge). What do you think?

///
/// This needs to be distinct from `IntoParallelIterator` because that trait is already implemented
/// on a few `Iterator`s, like `std::ops::Range`.
pub trait ParallelBridge {
Copy link
Member

@nikomatsakis nikomatsakis May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is great, but we definitely need better rust-docs. We should include something like this:


That this "bridges" from a standard sequential iterator to a parallel one. This has the advantage of letting you parallelize just about anything, but it can be distinctly less efficient than the "native" parallel iterators produced by par_iter.


We also need an example or two I think — maybe one showing some combinators that don't work in parallel land?

type Iter: ParallelIterator<Item = Self::Item>;

/// What is the `Item` of the output `ParallelIterator`?
type Item: Send;
Copy link
Member

@cuviper cuviper May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these associated types actually useful? We could just have:

fn par_bridge(self) -> IterParallel<Self>;

and be done with it. That also offers some amount of control over who can impl this trait, as only we can construct that type.

For comparison, the ParallelSlice and ParallelString extensions just return their types directly.

Copy link
Contributor Author

@QuietMisdreavus QuietMisdreavus May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i wrote those in when i wasn't sure whether this trait would be used on other items, so i just made it a clone of ParallelIterator. I didn't realize that about ParallelSlice/ParallelString, so i can go in and change this up.

///
/// [`ParallelBridge`]: trait.ParallelBridge.html
#[derive(Debug)]
pub struct IterParallel<Iter> {
Copy link
Member

@cuviper cuviper May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we "bridge" this name too? Maybe IterBridge, or even just Bridge?

Copy link
Contributor Author

@QuietMisdreavus QuietMisdreavus May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make more sense - this is still the same name it had from when the trait was named AsParallel. Between those two, I'm more of a fan of IterBridge.

@cuviper
Copy link
Member

cuviper commented Jun 6, 2018

Time to cross this bridge... thanks!

bors r+

bors bot added a commit that referenced this pull request Jun 6, 2018
550: add bridge from Iterator to ParallelIterator r=cuviper a=QuietMisdreavus

Half of #46

This started getting reviewed in QuietMisdreavus/polyester#6, but i decided to move my work to Rayon proper.

This PR adds a new trait, `AsParallel`, an implementation on `Iterator + Send`, and an iterator adapter `IterParallel` that implements `ParallelIterator` with a similar "cache items as you go" methodology as Polyester. I introduced a new trait because `ParallelIterator` was implemented on `Range`, which is itself an `Iterator`.

The basic idea is that you would start with a quick sequential `Iterator`, call `.as_parallel()` on it, and be able to use `ParallelIterator` adapters after that point, to do more expensive processing in multiple threads.

The design of `IterParallel` is like this:

* `IterParallel` defers background work to `IterParallelProducer`, which implements `UnindexedProducer`.
* `IterParallelProducer` will split as many times as there are threads in the current pool. (I've been told that #492 is a better way to organize this, but until that's in, this is how i wrote it. `>_>`)
* When folding items, `IterParallelProducer` keeps a `Stealer` from `crossbeam-deque` (added as a dependency, but using the same version as `rayon-core`) to access a deque of items that have already been loaded from the iterator.
* If the `Stealer` is empty, a worker will attempt to lock the Mutex to access the source `Iterator` and the `Deque`.
  * If the Mutex is already locked, it will call `yield_now`. The implementation in polyester used a `synchronoise::SignalEvent` but i've been told that worker threads should not block. In lieu of #548, a regular spin-loop was chosen instead.
  * If the Mutex is available, the worker will load a number of items from the iterator (currently (number of threads * number of threads * 2)) before closing the Mutex and continuing.
  * (If the Mutex is poisoned, the worker will just... stop. Is there a recommended approach here? `>_>`)

This design is effectively a first brush, has [the same caveats as polyester](https://docs.rs/polyester/0.1.0/polyester/trait.Polyester.html#implementation-note), probably needs some extra features in rayon-core, and needs some higher-level docs before i'm willing to let it go. However, i'm putting it here because it was not in the right place when i talked to @cuviper about it last time.

Co-authored-by: QuietMisdreavus <grey@quietmisdreavus.net>
Co-authored-by: Niko Matsakis <niko@alum.mit.edu>
@bors
Copy link
Contributor

bors bot commented Jun 6, 2018

@bors bors bot merged commit d488733 into rayon-rs:master Jun 6, 2018
2 checks passed
@QuietMisdreavus QuietMisdreavus deleted the as-parallel branch Jun 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants