Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Concatenate list of indexed parallel iterators? #888

Closed
wants to merge 1 commit into from
Closed

Question: Concatenate list of indexed parallel iterators? #888

wants to merge 1 commit into from

Conversation

adamreichold
Copy link
Collaborator

@adamreichold adamreichold commented Oct 8, 2021

This is admittedly more of a question than a pull request but I thought that it might be easiest to answer this with the code at hand.

I have a situation in which I basically want to concatenate a

Vec<I> where I: IndexedParallelIterator

into a single indexed parallel iterator with the same semantics as if I would .into_iter().flatten() the vector.

I tried to implement this by looking at the existing Chain adapter which however handles a fixed number of inner iterators with possibly heterogeneous types instead of a unknown number of iterators with homogeneous types.

I am currently stuck at trying to collect the producers created by the inner iterators into a vector. This does not seem to work as it seems possible that each invocation of

<I as IndexParallelIterator>::with_producer

will call the given callback with a different producer type.

Leaving aside the overhead, boxing does not seem possible due to Producer not being object safe. I did consider defining a simpler DynProducer trait that would be, but I think this would end with

trait IntoIter<T>: Iterator<Item = T> + DoubleEndedIterator + ExactSizeIterator;
type IntoIter = Box<dyn IntoIter<Self::Item>>;

which feels prohibitively expensive.

I also considered building a producer out of a list of iterators also recording a list of split positions for each one and only turn this into producers and split those when e.g. Producer::into_iter is actually called. But that does seem to imply at least sharing (and hence synchronizing) the original list between tasks if a split happens to saddle the boundary between two of original iterators. Also, I am not sure if this would avoid the issue of different producer types eventually.

Does anybody know whether this is possible at all and if so how?

@adamreichold adamreichold changed the title Question: Concatenate list of indexed parallel iterators Question: Concatenate list of indexed parallel iterators? Oct 8, 2021
@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

Unindexed can just use .into_par_iter().flatten(), but indexed is a real challenge. I don't know how to handle producers in this case, because the Chain approach of statically-nested callbacks definitely won't work.

@adamreichold
Copy link
Collaborator Author

Unindexed can just use .into_par_iter().flatten()

Yes, this is what I am currently doing and performance is fine (the resulting splitting between archetypes seems a sensible choice for an ECS) but I am loosing a lot of API surface defined only for IndexedParallelIterator like zip or with_min_len even though I technically do know the exact item counts and this is what I would like to avoid.

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

Part of the challenge is the generic I: IndexedParallelIterator. But since you are using types: &[_] and mapping to a known ArchetypeIter, maybe you can instead use that specific knowledge for a lazy approach, like you said:

I also considered building a producer out of a list of iterators each recording a list of split positions and only turn this into producers and split those when e.g. Producer::into_iter is actually called.

... but not with producers, rather using the low-level type-specific details to make the iterator.


BTW, I would not implement both Iterator and ParallelIterator on the same type, as that creates caller ambiguity between their many common methods. It's not a big deal if this is an internal-only type though.

@adamreichold
Copy link
Collaborator Author

adamreichold commented Oct 8, 2021

Part of the challenge is the generic I: IndexedParallelIterator.

Yes, it seems a generic adaptor is not possible as after all some insidious parallel iterator could do something like

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
  CB: ProducerCallback<Self::Item>,
{
  if thread_rng().gen_bool(0.9) {
   callback.callback(MyUsualProducer::new(self))
  } else {
   callback.callback(JustToMessAroundABit::New(self))
  }
}

I will try to implement this directly on the "set of matching archetypes" I start from, especially since I can clone/copy a shared reference to an archetype and do not need to synchronize anything. Thank you for taking the time to look into this!

BTW, I would not implement both Iterator and ParallelIterator on the same type, as that creates caller ambiguity between their many common methods. It's not a big deal if this is an internal-only type though.

Good point! I was admittedly lazy there.

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

If you're curious about why the Producer is opaque, check out:
https://github.com/rayon-rs/rayon/tree/master/src/iter/plumbing#what-on-earth-is-producercallback

There's a supposition that associated type constructors (aka generic associated types = GATs) might let us handle Producer lifetimes in a more direct way, so we could use plain closures instead of ProducerCallback. I'm not sure that would solve your problem though, because any callback style with Vec<I> would still have len() recursion levels, which is a bad idea unless you know that's relatively small. I guess in theory you could use stacker to go ahead and recurse.

@adamreichold
Copy link
Collaborator Author

adamreichold commented Oct 8, 2021

because any callback style with Vec<I> would still have len() recursion levels,

As presented here, I think there should be no recursion. I am willing to pay collecting producers into newly allocated vectors though. However, I do not think GAT alone would help with this approach as the lifetime associated with the producer is currently generative, i.e. it is created inside the closure and cannot be named outside to e.g. store this outside of the scope created by the callback as I am trying to do here.

I think this would be possible if the signature were something like

fn with_producer<'iter, CB, R>(&'iter mut self, callback: CB) -> R where CB: FnOnce(Self::Producer<'iter>) -> R;

to connect the producer lifetime with the lifetime of the reference to inner iterator. Making it a mutable reference would allow moving things out of the iterator at the price of having to perform the option dance or resort to MaybeUninit.

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

You would still have to nest all I::with_producer before you could return R, no?
(Your inner callbacks can't return the producer as an R, because that's independent of 'iter.)

@adamreichold
Copy link
Collaborator Author

(Your inner callbacks can't return the producer as an R, because that's independent of 'iter.)

I think I can choose ProducerCallback::Output as I am the implementer of the inner callback? So in https://github.com/rayon-rs/rayon/pull/888/files#diff-f60d08059f674f5bfaf42bfba2d2cad67ef1e2274fe0179ff4c6ecb67d54e52aR60 I just chose () and do not have to produce a value at all. The producer itself is stashed away via self which would basically be &'a mut Vec<I::Producer<'a>> in my case.

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

Well I'm hoping for a normal closure instead, but yes I suppose you could push to a captured &mut Vec.

I'm going to experiment with this -- first trying to make it a GAT closure, then perhaps &mut self. This is definitely a breaking change in any case, so we're talking about a possible Rayon 2.0 here, but it might be nice!

@adamreichold
Copy link
Collaborator Author

adamreichold commented Oct 8, 2021

I do wonder whether there exist any "insidious" implementations as considered above today which do pass completely unrelated producer types as e.g. optimizations. 🤔

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

Oh, yes, Either does that:

rayon/src/par_either.rs

Lines 51 to 54 in 2991c04

match self {
Left(iter) => iter.with_producer(callback),
Right(iter) => iter.with_producer(callback),
}

But I guess it could use Either<L::Producer, R::Producer> and implement dispatching Producer for that.

@adamreichold
Copy link
Collaborator Author

But I guess it could use Either<L::Producer, R::Producer> and implement dispatching Producer for that.

I think this would also require to use trivial callbacks to stash away the inner producer as here? Is the callback structure really necessary if the lifetime would be bound to the reference to the inner iterator instead of being generative? Wouldn't

fn as_producer<'iter>(&'iter mut self) -> Self::Producer<'iter>;

work as well in that case?

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

Maybe we don't need a callback in that case, since it could trivially dissolve into a captured producer anyway.

Another thing I realized either way is that an external 'iter parameter forces us to not have any local state to be borrowed in the producer. You'd have to stash that in the iterator itself, which also constrains the lifetime of that state, e.g. nothing self-referential.

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

It's also weird that a &mut self method implies it can be called multiple times.

@adamreichold
Copy link
Collaborator Author

It's also weird that a &mut self method implies it can be called multiple times.

I think this is the same issue as for Drop, i.e. rust-lang/rfcs#998

@adamreichold
Copy link
Collaborator Author

which also constrains the lifetime of that state, e.g. nothing self-referential.

It would not be possible to move the producer around otherwise, so that appears consistent to me.

@adamreichold
Copy link
Collaborator Author

I am sorry if this is getting out of hand, but since we are here: Is the DoubleEndedIterator requirement on Producer::IntoIter essential? At least in the case I am looking at, this has considerable complexity and likely performance impacts.

@cuviper
Copy link
Member

cuviper commented Oct 8, 2021

It's also weird that a &mut self method implies it can be called multiple times.

I think this is the same issue as for Drop, i.e. rust-lang/rfcs#998

Yeah, sort of, but the compiler forbids explicit calls to Drop::drop altogether, so multiple calls are out of the question.

which also constrains the lifetime of that state, e.g. nothing self-referential.

It would not be possible to move the producer around otherwise, so that appears consistent to me.

I meant with that state in the iterator type, not the producer, but still. A concrete example is:

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
super::DrainGuard::new(self.heap)
.par_drain(..)
.with_producer(callback)
}

Is the DoubleEndedIterator requirement on Producer::IntoIter essential?

That's for rev() -- see #150 and #224. I think the indirect nature of producers would make conditional constraints fall afoul of rust-lang/rust#20671, even if they weren't used opaquely.

@schell
Copy link

schell commented Apr 14, 2022

I am also trying to solve this problem. Conceptually it seems like it should be possible to construct an IndexedParallelIterator from a vector of IndexedParallelIterators. Has anyone managed it in another crate? Or is there another way around the problem?

In my specific use case I am trying to zip the elements of range maps (where keys are usize and values are stored in contiguous ranges), while providing None for the elements that are outside a range. I can construct a ParallelIterator using Flatten, but then I can't MultiZip all of those up.

@schell
Copy link

schell commented Apr 15, 2022

I have quite a naive (to me) implementation that uses Vec::pop and rayon::iter::chain that compiles:

pub struct Concat<I>(Vec<I>);

impl<I: IndexedParallelIterator> ParallelIterator for Concat<I> {
    type Item = I::Item;

    fn drive_unindexed<C>(self, consumer: C) -> C::Result
    where
        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item> {
        rayon::iter::plumbing::bridge(self, consumer)
    }
}

impl<I: IndexedParallelIterator> IndexedParallelIterator for Concat<I> {
    fn len(&self) -> usize {
        self.0.iter().map(IndexedParallelIterator::len).sum()
    }

    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(mut self, consumer: C) -> C::Result {
        use rayon::iter::plumbing::Reducer;

        let len = self.len();
        if let Some(tail) = self.0.pop() {
            let (left, right, reducer) = consumer.split_at(len - tail.len());
            let (a, b) = rayon::join(|| self.drive(left), || tail.drive(right));
            reducer.reduce(a, b)
        } else {
            rayon::iter::empty().drive(consumer)
        }
    }

    fn with_producer<CB: ProducerCallback<Self::Item>>(mut self, callback: CB) -> CB::Output {
        if let Some(tail) = self.0.pop() {
            self.chain(tail).with_producer(callback)
        } else {
            rayon::iter::empty().with_producer(callback)
        }
    }
}

It compiles, but you can't use it as it hits the recursion limit while instantiating the big nest of Chains. Any thoughts?

@schell
Copy link

schell commented Apr 15, 2022

Here's another implementation which compiles and runs, but my tests show that it's dropping some elements (summing a 1000x1000 vector of 1's comes out to 736605 instead of 1_000_000) - so I think my splitting is off by one in at least one place - but maybe this is a workable solution?

pub struct Concat<I>(Vec<I>);

impl<I: IndexedParallelIterator> ParallelIterator for Concat<I> {
    type Item = I::Item;

    fn drive_unindexed<C>(self, consumer: C) -> C::Result
    where
        C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
    {
        rayon::iter::plumbing::bridge(self, consumer)
    }
}

impl<I: IndexedParallelIterator> IndexedParallelIterator for Concat<I> {
    fn len(&self) -> usize {
        self.0.iter().map(IndexedParallelIterator::len).sum()
    }

    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(mut self, consumer: C) -> C::Result {
        use rayon::iter::plumbing::Reducer;

        let len = self.len();
        if let Some(tail) = self.0.pop() {
            let (left, right, reducer) = consumer.split_at(len - tail.len());
            let (a, b) = rayon::join(|| self.drive(left), || tail.drive(right));
            reducer.reduce(a, b)
        } else {
            rayon::iter::empty().drive(consumer)
        }
    }

    fn with_producer<CB: ProducerCallback<Self::Item>>(mut self, callback: CB) -> CB::Output {
        return callback.callback(ConcatProducer(vec![], self.0, vec![]));

        struct ConcatProducer<T: IndexedParallelIterator>(Vec<T::Item>, Vec<T>, Vec<T::Item>);

        impl<T: IndexedParallelIterator> Default for ConcatProducer<T> {
            fn default() -> Self {
                Self(vec![], vec![], vec![])
            }
        }

        impl<T: IndexedParallelIterator> rayon::iter::plumbing::Producer for ConcatProducer<T> {
            type Item = T::Item;

            type IntoIter = std::vec::IntoIter<Self::Item>;

            fn into_iter(self) -> Self::IntoIter {
                let mut vs: Vec<Self::Item> = self.0;
                vs.extend(self.1.into_par_iter().flatten().collect::<Vec<_>>());
                vs.extend(self.2);
                vs.into_iter()
            }

            fn split_at(self, index: usize) -> (Self, Self) {
                let (_, left, right) = self.1.into_iter().fold(
                    (0, ConcatProducer::default(), ConcatProducer::default()),
                    |(mut n, mut left, mut right), vs| {
                        let iter_len = vs.len();
                        if n + iter_len <= index {
                            // we have not yet accumulated enough for the split,
                            // everything goes in left
                            left.1.push(vs);
                        } else if n >= index {
                            // we already meet our split quota, everything goes in right
                            right.1.push(vs);
                        } else {
                            // we have to split this vec
                            let mut vs = vs.into_par_iter().collect::<Vec<_>>();
                            let vs_left = vs.splice(0..=(index - n), std::iter::empty());
                            left.2.extend(vs_left);
                            right.0.extend(vs);
                        }
                        n += iter_len;
                        (n, left, right)
                    },
                );
                (left, right)
            }
        }
    }
}

@schell
Copy link

schell commented Apr 15, 2022

I realize now that impl is pretty broken, I didn't take into account the lefts and rights. I have another impl which passes tests, but I'm fairly certain it's pretty whack. I'm struggling a bit to understand the relationship between with_producer and drive:

impl<I: IndexedParallelIterator> IndexedParallelIterator for Concat<I> {
    fn len(&self) -> usize {
        self.0.iter().map(IndexedParallelIterator::len).sum()
    }

    fn drive<C: rayon::iter::plumbing::Consumer<Self::Item>>(mut self, consumer: C) -> C::Result {
        use rayon::iter::plumbing::Reducer;
        println!("drive self.len(): {}", self.len());
        let len = self.len();
        if let Some(tail) = self.0.pop() {
            println!("  tail.len: {}", tail.len());
            println!("  len - tail.len: {}", len - tail.len());
            let (left, right, reducer) = consumer.split_at(len - tail.len());
            let (a, b) = rayon::join(|| self.drive(left), || tail.drive(right));
            reducer.reduce(a, b)
        } else {
            rayon::iter::empty().drive(consumer)
        }
    }

    fn with_producer<CB: ProducerCallback<Self::Item>>(mut self, callback: CB) -> CB::Output {
        return callback.callback(ConcatProducer(vec![], self.0, vec![]));

        struct ConcatProducer<T: IndexedParallelIterator>(Vec<T::Item>, Vec<T>, Vec<T::Item>);

        impl<T: IndexedParallelIterator> Default for ConcatProducer<T> {
            fn default() -> Self {
                Self(vec![], vec![], vec![])
            }
        }

        impl<T: IndexedParallelIterator> ConcatProducer<T> {
            fn total_len(&self) -> usize {
                self.0.len()
                    + self
                        .1
                        .iter()
                        .map(IndexedParallelIterator::len)
                        .sum::<usize>()
                    + self.2.len()
            }
        }

        impl<T: IndexedParallelIterator> rayon::iter::plumbing::Producer for ConcatProducer<T> {
            type Item = T::Item;

            type IntoIter = std::vec::IntoIter<Self::Item>;

            fn into_iter(self) -> Self::IntoIter {
                let mut vs: Vec<Self::Item> = self.0;
                vs.extend(self.1.into_par_iter().flatten().collect::<Vec<_>>());
                vs.extend(self.2);
                vs.into_iter()
            }

            fn split_at(mut self, index: usize) -> (Self, Self) {
                let mut lines = vec![];
                lines.push(format!(
                    "split_at: index:{} left:{}  center:{},{}  right:{}",
                    index,
                    self.0.len(),
                    self.1.len(),
                    self.1
                        .iter()
                        .map(IndexedParallelIterator::len)
                        .sum::<usize>(),
                    self.2.len()
                ));
                let total_len = self.total_len();
                let mut left = ConcatProducer::<T>::default();
                let taken_from_left = self
                    .0
                    .splice(0..index.min(self.0.len()), std::iter::empty())
                    .collect::<Vec<_>>();
                left.0 = taken_from_left;
                if !left.0.is_empty() {
                    lines.push(format!("  took {} from left", left.0.len()));
                }

                let (mut left, mut right) = std::mem::take(&mut self.1).into_iter().fold(
                    (left, self),
                    |(mut left, mut right), vs| {
                        let left_len = left.total_len();
                        let iter_len = vs.len();
                        if left_len + iter_len < index {
                            lines.push(format!("    {} + {} < {} - going left", left_len, iter_len, index));
                            // we have not yet accumulated enough for the split,
                            // everything goes in left
                            left.1.push(vs);
                        } else if left_len >= index {
                            lines.push(format!("    {} >= {} - going right", left_len, index));
                            // we already meet our split quota, everything goes in right
                            right.1.push(vs);
                        } else {
                            lines.push(format!(
                                "    index:{} splitting with splice 0..{}",
                                index,
                                index - left_len
                            ));
                            // we have to split this vec
                            let mut vs = vs.into_par_iter().collect::<Vec<_>>();
                            let vs_left = vs.splice(0..(index - left_len), std::iter::empty());
                            left.2.extend(vs_left);
                            right.0.extend(vs);
                        }
                        (left, right)
                    },
                );

                let left_len = left.total_len();
                if left_len < index {
                    left.2.extend(right.2.splice(0..(index - left_len), std::iter::empty()));
                }

                std::thread::sleep_ms(10);
                println!("{}", lines.join("\n"));
                assert_eq!(left.total_len(), index, "{} /= {}", left.total_len(), index);
                assert_eq!(
                    left.total_len() + right.total_len(),
                    total_len,
                    "{} + {} /= {}",
                    left.total_len(),
                    right.total_len(),
                    total_len
                );
                (left, right)
            }
        }
    }
}

@schell
Copy link

schell commented Apr 15, 2022

Doing this would be a lot easier with #513, because we could easily split the iterator that's on the index.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants