Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce IndexedParallelIterator::mapfold_collect_into_vec #652

Closed
wants to merge 3 commits into from

Conversation

nox
Copy link

@nox nox commented Apr 4, 2019

Nit: do bikeshed the name, I hate it anyway.

This method introduces a neat way to collect some parallel iterator in a vec in an allocation-efficient way, while still being able to do some operations on the data being collected. It uses a new trait, MapFolder<Item>, which is a generalised closure for the classic mapfold operation.

Given the very raison d'être of parallel iterators is to be able to do work by batches, the result of the various MapFolder::complete calls are passed to a Reducer and returned from mapfold_collect_into_vec.

Why

Because, as usual, I need this as part of some Servo-related stuff. :) In Victor, we build in parallel a vector of fragments from a vector of boxes:

https://github.com/SimonSapin/victor/blob/6ddce7345030ae4d25f846ca757d6b50f3f8aeac/victor/src/layout/mod.rs#L56-L59

Some CSS features (among which absolute positioning, in case you were curious) require us to make some of those fragments go up the fragment tree to where they should actually belong. To do this, we would like to be able map the boxes into fragments as usual, but also collect the absolutely positioned ones into a separate vec that we can then append to the parent's own list of absolutely positioned fragments, until we reach the final one. There are way fewer absolutely positioned boxes than normal ones so it's not a problem to concat them as we traverse the tree. This method allows us to achieve that this way:

#[derive(Clone)]
struct BoxMapFolder(Vec<AbsposFragment>);

impl<'a> MapFolder<&'a BlockLevelBox> for BoxMapFolder {
    type Output = Fragment;
    type Result = Vec<AbsposFragment>;

    fn consume(mut self, block_level_box: &'a BlockLevelBox) -> (Self, Self::Output) {
        let (fragment, mut absolutely_positioned_fragments) = block_level_box.layout();
        self.0.append(&mut absolutely_positioned_fragments);
        (self, fragment)
    }

    fn complete(self) -> Self::Result {
        self.0
    }
}

#[derive(Clone)]
struct BoxReducer;

impl Reducer<Vec<AbsposFragment>> for BoxReducer {
    fn reduce(
        self,
        mut left: Vec<AbsposFragment>,
        mut right: Vec<AbsposFragment>,
    ) -> Vec<AbsposFragment> {
        left.append(&mut right);
        left
    }
}

let mut child_fragments = vec![];
let absolutely_positioned_fragments = child_boxes
    .par_iter()
    .mapfold_collect_into_vec(
        &mut child_fragments,
        BoxMapFolder(vec![]),
        BoxReducer,
    );

Unresolved questions

Should this method really be relying on plumbing?

Probably not, I guess we should have a variety of different methods like map_init, map_with, fold_with etc, but as a proof of concept I didn't want to lose too much time on how the functionality should be exposed before I make the PR. Those methods will also require names, and I can't find any which doesn't annoy me.

Should it be named that way?

Probably not.

Is there a way to avoid the need for a new trait MapFolder?

I don't think so but I am certainly not sure of that.

@nox
Copy link
Author

nox commented Apr 4, 2019

Cc @nikomatsakis

nox added 3 commits April 4, 2019 22:33
map_collect_fold_reduce_into_vec is now
map_collect_fold_reduce_into_vec_with, and it takes an
initial value, a mapfold function and a reduce function.
@nox
Copy link
Author

nox commented Apr 4, 2019

I changed a bunch of things, the method is now map_collect_fold_reduce_into_vec_with(yeah, I know…), and the plumbing is now completely hidden from the user, adapting the example from the PR description:

let absolutely_positioned_fragments = child_boxes
    .par_iter()
    .map_collect_fold_reduce_into_vec_with(
        &mut child_fragments,
        vec![],
        |mut abspos_fragments, block_level_box| {
            let (fragment, mut abspos) = block_level_box.layout();
            abspos_fragments.append(&mut abspos);
            (abspos_fragments, fragment)
        },
        |mut left, mut right| {
            left.append(&mut right);
            left
        }
    );

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

I feel like there's probably a generic piece we can extract from this, something like:

trait ParallelIterator {
    fn collect_first<A, B, C>(self, collection: &mut C) -> CollectFirst<Self, C>
    where
        Self: ParallelIterator<Item = (A, B)>,
        A: Send,
        B: Send,
        C: ParallelExtend<A>,
    {
        // extend the collection with A, while producing a new iterator over B
        // (tricky, but see how unzip works -- might even share a lot of that)
    }
}

trait IndexedParallelIterator {
    fn collect_first_into_vec<A, B>(self, vec: &mut Vec<A>) -> CollectFirstIntoVec<Self>
    where
        Self: ParallelIterator<Item = (A, B)>,
        A: Send,
        B: Send,
    {
        // extend the vec with A, while producing a new iterator over B
        // (not sure we need this separately -- opt_len() trickery might
        //  be enough for indexed collect)
    }
}

So your fragment code would look something like:

let absolutely_positioned_fragments = child_boxes
    .par_iter()
    .map(|block_level_box| block_level_box.layout())
    .collect_first_into_vec(&mut child_fragments)
    .reduce(Vec::new, |mut left, mut right| {
        left.append(&mut right);
        left
    });

@nox
Copy link
Author

nox commented Apr 4, 2019

I thought about it and I think it would be way nicer than what I suggested, but I'm not sure we can do that: what happens if a later parallel iterator adapter doesn't actually consume everything? Wouldn't the collected-to vector be incomplete then? collect_first more or less exposes the CollectConsumer machinery to the whims of user land code and I'm not sure that's a good idea. Your call though, I don't know much about the innards of rayon.

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

collect_first more or less exposes the CollectConsumer machinery to the whims of user land code and I'm not sure that's a good idea.

It's already exposed, because it wouldn't require any internal access to implement collect_first similar to unzip. Someone could already write this adapter in a third party crate today. In fact, any custom ParallelIterator + IndexedParallelIterator has the potential to misbehave in how it feeds the consumer, which is why CollectConsumer has defensive checks.

@nox
Copy link
Author

nox commented Apr 4, 2019

I'll try to do that then!

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

what happens if a later parallel iterator adapter doesn't actually consume everything?

I think we can put in specific checks for this scenario though.

When driven as a pure consumer, it's a push model -- the consumer can return full() -> true, but we don't have to heed that. Again using unzip as a reference, see how it waits until both sides are full to stop.

If we implement IndexedParallelIterator for CollectFirst, then we also have to allow the possibility that one of the producer splits could be dropped, as Take does, but I think a Drop handler could still feed the remains into the collect side.

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

Bike-shedding collect_first -- maybe something like extend_first_into is a better name.

@nox
Copy link
Author

nox commented Apr 4, 2019

Yeah I'm not sure what's up with the current set of APIs, like collect_into_vec instead of extend_into_vec, etc. I'm ok with extend_first_into.

@nox
Copy link
Author

nox commented Apr 4, 2019

Wait so I'm confused, does ParallelIterator::unzip actually end up using the optimised path with the CollectConsumer machinery for halves that are Vec<T> and the input iterator is indexed? My very first idea before I did this PR was to use unzip, but I somehow convinced myself that couldn't be optimised correctly. Did I mislead myself?

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

It should use the optimized path, yes. That's the effect of opt_len(), our home-grown specialization. 🙂 It might be worth double-checking this with some instrumentation though.

So yeah, your fragment code could be rewritten to unzip_into_vecs into Vec<_> and Vec<Vec<_>> where both are filled by the optimized indexed path. But the extend_first_into would allow you to do more interesting work in parallel on the latter part. Unzip also wouldn't work for your doc-example that's acting like a filter.

@nox
Copy link
Author

nox commented Apr 4, 2019

Yeah sorry I meant something like .unzip::<_, _, Vec<Fragment>, FlatVec<AbsposFragment>>() where FlatVec<T> is a Vec<T> wrapper which implements ParallelExtend<FlatVec<T>>.

If I understand things correctly, unzip_into_vecs can work on multiple chunks in parallel but for each chunk it just fills all the first values in the first mutable slice, and then all the second values in the second mutable slice, am I understanding this correctly? Is it something that doesn't matter in practice? I'm asking because a potential extend_first_into would follow the same behaviour AFAIK, whereas the method in the current PR loops only once over each chunk.

@nox
Copy link
Author

nox commented Apr 4, 2019

Maybe another interesting way to look at the problem would be to consider some sort of iterator adaptor zipping the input iterator with the uninitialised destinations, and then provide inherent methods on that iterator adaptor such as apply or apply_fold.

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

but for each chunk it just fills all the first values in the first mutable slice, and then all the second values in the second mutable slice, am I understanding this correctly?

No, it fills them in an interleaved fashion, otherwise we would have to buffer all those second items somehow. These tuples could be the result of a map or something, so we don't have random access to them. And FWIW Iterator::unzip is similar, feeding items one at a time.

The indexing optimization is that we can write directly to the destination buffer, rather than collecting intermediates of unknown length.

@nox
Copy link
Author

nox commented Apr 4, 2019

So given ParallelExtend<T> for Vec<T> checks opt_len to go through special_extend, and UnzipA and UnzipB can return Some(_) from their opt_len method, would .unzip::<_, _, Vec<Fragment>, FlatVec<AbsposFragment>>() directly write to the destination buffer for the Vec<Fragment>? What would collect_first_into bring then?

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

would .unzip::<_, _, Vec<Fragment>, FlatVec<AbsposFragment>>() directly write to the destination buffer for the Vec<Fragment>?

Yes.

What would collect_first_into bring then?

The ability to do other parallel things with the second piece - but I'm not sure how useful this really is.

Now that I think of it, your doc example with filtered negatives could use a newtype ParallelExtend:

struct OnlyNegative<C>(C);

impl<C: ParallelExtend<i32>> ParallelExtend<i32> for OnlyNegative<C> {
    fn par_extend<I>(&mut self, par_iter: I)
    where
        I: IntoParallelIterator<Item = i32>,
    {
        let iter = par_iter.into_par_iter().filter(|&i| i < 0);
        self.0.par_extend(iter);
    }
}

With extend_first_into, you could just follow with .filter and .collect().

@cuviper
Copy link
Member

cuviper commented Apr 4, 2019

There are all kinds of crazy compositions possible -- it's hard to judge what's worth adding sugar.

@nox
Copy link
Author

nox commented Apr 4, 2019

But the other things on the second part can always be expressed by iterator shenanigans from the ParallelExtend<T> implementation of a new type for the second half of the unzip call right? Then I guess we can just close this PR hah.

@cuviper
Copy link
Member

cuviper commented Apr 5, 2019

You can hack it with unzip and ParallelExtend as long as you don't need any runtime state, since it starts with Default. I guess we could add unzip_into to solve that -- see also rust-lang/rust#45840.

nox added a commit to SimonSapin/victor that referenced this pull request Apr 5, 2019
@nox
Copy link
Author

nox commented Apr 5, 2019

Thanks for the info! I'll reopen a new PR with something like what we discuss whenever I need it. For now unzip is ok.

@nox nox closed this Apr 5, 2019
@nox
Copy link
Author

nox commented Apr 6, 2019

let absolutely_positioned_fragments = child_boxes
    .par_iter()
    .map_collect_fold_reduce_into_vec_with(
        &mut child_fragments,
        vec![],
        |mut abspos_fragments, block_level_box| {
            let (fragment, mut abspos) = block_level_box.layout();
            abspos_fragments.append(&mut abspos);
            (abspos_fragments, fragment)
        },
        |mut left, mut right| {
            left.append(&mut right);
            left
        }
    );
let absolutely_positioned_fragments = child_boxes
    .par_iter()
    .map(|block_level_box| block_level_box.layout())
    .collect_first_into_vec(&mut child_fragments)
    .reduce(Vec::new, |mut left, mut right| {
        left.append(&mut right);
        left
    });

So I was thinking about this again, and our two samples are not equivalent, but my example doesn't show it properly:

What I actually want is:

let absolutely_positioned_fragments = child_boxes
    .par_iter()
    .map_collect_fold_reduce_into_vec_with(
        &mut child_fragments,
        vec![],
        |mut abspos_fragments, block_level_box| {
            fragment = block_level_box.layout(&mut abspos_fragments);
            (abspos_fragments, fragment)
        },
        |mut left, mut right| {
            left.append(&mut right);
            left
        }
    );

Where an absolutely positioned box would push itself to abspos_fragments in BlockLevelBox::layout, thus not creating single-item vectors for the individual elements being collected.

@nox
Copy link
Author

nox commented Apr 6, 2019

I'll tell you what I want, what I really really want (so tell me what you want, what you really really want): I want a combination of par_extend and map_init.

Given this:

pub trait ParallelIterator: Sized + Send {
    type Item: Send;

    fn map_with<F, T, R>(self, init: T, map_op: F) -> MapWith<Self, T, F> 
    where
        F: Fn(&mut T, Self::Item) -> R + Sync + Send,
        T: Send + Clone,
        R: Send;
}

pub trait ParallelExtend<T> 
where
    T: Send, 
{
    fn par_extend<I>(&mut self, par_iter: I)
    where
        I: IntoParallelIterator<Item = T>;
}

I want:

pub trait ParallelIterator: Sized + Send {
    type Item: Send;

    fn extend_mapfold_into<T, U, F, C>(
        self,
        init: T,
        mapfold_op: F,
        target: &mut C,
    ) -> ExtendMapfoldInto<Self, T, F, C>
    where
        T: Send + Clone,
        F: Fn(&mut T, Self::Item) -> U + Send + Sync,
        C: ParallelExtend<U>;
}

impl<I, T, F, C> ParallelIterator for ExtendMapfoldInto<I, T, F, C> {
    type Item = T;
}

Does that make sense to you, @cuviper? AFAICT this cannot be emulated with your extend_first_into proposal.

SimonSapin pushed a commit to SimonSapin/victor that referenced this pull request Apr 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants