Skip to content

Commit

Permalink
Add ParallelIterator::opt_len to specialize vec collect
Browse files Browse the repository at this point in the history
As suggested by @nikomatsakis, we can use `opt_len` to report the true
length of any `ExactParallelIterator`, while returning `None` otherwise.
Then `CollectConsumer` can masquerade as `UnindexedConsumer`, and it all
works out because exact iterators will only drive it indexed.
  • Loading branch information
cuviper committed Nov 22, 2016
1 parent 66f3d64 commit 870bf5b
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 23 deletions.
1 change: 1 addition & 0 deletions rayon-demo/src/main.rs
Expand Up @@ -16,6 +16,7 @@ mod tsp;
// these are not "full-fledged" benchmarks yet,
// they only run with cargo bench
#[cfg(test)] mod map_collect;
#[cfg(test)] mod vec_collect;
#[cfg(test)] mod factorial;
#[cfg(test)] mod pythagoras;
#[cfg(test)] mod fibonacci;
Expand Down
151 changes: 151 additions & 0 deletions rayon-demo/src/vec_collect.rs
@@ -0,0 +1,151 @@
//! Some benchmarks stress-testing various ways to build a standard `Vec`.

mod util {
use rayon::prelude::*;
use std::collections::LinkedList;

/// Do whatever `collect` does by default.
pub fn collect<T, PI>(pi: PI) -> Vec<T>
where T: Send,
PI: ParallelIterator<Item = T> + Send
{
pi.collect()
}

/// Use a linked list of vectors intermediary.
pub fn linked_list_vec<T, PI>(pi: PI) -> Vec<T>
where T: Send,
PI: ParallelIterator<Item = T> + Send
{
let list: LinkedList<Vec<_>> = pi
.fold(|| Vec::new(),
|mut vec, elem| { vec.push(elem); vec })
.collect();
list.into_iter()
.fold(Vec::new(),
|mut vec, mut sub| { vec.append(&mut sub); vec })
}

/// Use a linked list of vectors intermediary, with a size hint.
pub fn linked_list_vec_sized<T, PI>(pi: PI) -> Vec<T>
where T: Send,
PI: ParallelIterator<Item = T> + Send
{
let list: LinkedList<Vec<_>> = pi
.fold(|| Vec::new(),
|mut vec, elem| { vec.push(elem); vec })
.collect();

let len = list.iter().map(Vec::len).sum();
list.into_iter()
.fold(Vec::with_capacity(len),
|mut vec, mut sub| { vec.append(&mut sub); vec })
}

/// Fold into vectors and then reduce them together.
pub fn fold<T, PI>(pi: PI) -> Vec<T>
where T: Send,
PI: ParallelIterator<Item = T> + Send
{
pi.fold(|| Vec::new(),
|mut vec, x| {
vec.push(x);
vec
})
.reduce(|| Vec::new(),
|mut vec1, mut vec2| { vec1.append(&mut vec2); vec1 })
}
}


macro_rules! make_bench {
($generate:ident, $check:ident) => {
#[bench]
fn with_collect(b: &mut ::test::Bencher) {
use vec_collect::util;
let mut vec = None;
b.iter(|| vec = Some(util::collect($generate())));
$check(&vec.unwrap());
}

#[bench]
fn with_linked_list_vec(b: &mut ::test::Bencher) {
use vec_collect::util;
let mut vec = None;
b.iter(|| vec = Some(util::linked_list_vec($generate())));
$check(&vec.unwrap());
}

#[bench]
fn with_linked_list_vec_sized(b: &mut ::test::Bencher) {
use vec_collect::util;
let mut vec = None;
b.iter(|| vec = Some(util::linked_list_vec_sized($generate())));
$check(&vec.unwrap());
}

#[bench]
fn with_fold(b: &mut ::test::Bencher) {
use vec_collect::util;
let mut vec = None;
b.iter(|| vec = Some(util::fold($generate())));
$check(&vec.unwrap());
}
}
}

/// Tests a big vector of i forall i in 0 to N.
mod vec_i {
use rayon::prelude::*;

const N: u32 = 4 * 1024 * 1024;

fn generate() -> impl ExactParallelIterator<Item=u32> {
(0_u32..N)
.into_par_iter()
}

fn check(v: &Vec<u32>) {
assert!(v.iter().cloned().eq(0..N));
}

#[bench]
fn with_collect_into(b: &mut ::test::Bencher) {
let mut vec = None;
b.iter(|| {
let mut v = vec![];
generate().collect_into(&mut v);
vec = Some(v);
});
check(&vec.unwrap());
}

#[bench]
fn with_collect_into_reused(b: &mut ::test::Bencher) {
let mut vec = vec![];
b.iter(|| generate().collect_into(&mut vec));
check(&vec);
}

make_bench!(generate, check);
}

/// Tests a big vector of i forall i in 0 to N, with a no-op
/// filter just to make sure it's not an exact iterator.
mod vec_i_filtered {
use rayon::prelude::*;

const N: u32 = 4 * 1024 * 1024;

fn generate() -> impl ParallelIterator<Item=u32> {
(0_u32..N)
.into_par_iter()
.filter(|_| true)
}

fn check(v: &Vec<u32>) {
assert!(v.iter().cloned().eq(0..N));
}

make_bench!(generate, check);
}
7 changes: 7 additions & 0 deletions src/par_iter/chain.rs
Expand Up @@ -33,6 +33,13 @@ impl<A, B> ParallelIterator for ChainIter<A, B>
let b = self.b.drive_unindexed(consumer.split_off());
consumer.to_reducer().reduce(a, b)
}

fn opt_len(&mut self) -> Option<usize> {
match (self.a.opt_len(), self.b.opt_len()) {
(Some(a_len), Some(b_len)) => a_len.checked_add(b_len),
_ => None,
}
}
}

impl<A, B> BoundedParallelIterator for ChainIter<A, B>
Expand Down
11 changes: 11 additions & 0 deletions src/par_iter/collect/consumer.rs
Expand Up @@ -110,3 +110,14 @@ impl<'c, ITEM: Send> Folder<ITEM> for CollectFolder<'c, ITEM> {
self.consumer.writes.fetch_add(self.consumer.len, Ordering::SeqCst);
}
}

/// Pretend to be unindexed for `special_collect_into`,
/// but we should never actually get used that way...
impl<'c, ITEM: Send> UnindexedConsumer<ITEM> for CollectConsumer<'c, ITEM> {
fn split_off(&self) -> Self {
unreachable!("CollectConsumer must be indexed!")
}
fn to_reducer(&self) -> Self::Reducer {
NoopReducer
}
}
25 changes: 22 additions & 3 deletions src/par_iter/collect/mod.rs
@@ -1,16 +1,35 @@
use super::ExactParallelIterator;
use super::{ParallelIterator, ExactParallelIterator};
use std::isize;
use std::sync::atomic::{AtomicUsize, Ordering};

mod consumer;
use self::consumer::CollectConsumer;

/// Collects the results of the exact iterator into the specified vector.
pub fn collect_into<PAR_ITER, T>(mut pi: PAR_ITER, v: &mut Vec<T>)
where PAR_ITER: ExactParallelIterator<Item = T>,
PAR_ITER: ExactParallelIterator,
T: Send
{
let len = pi.len();
special_collect_into(pi, len, v);
}

/// Collects the results of the iterator into the specified vector.
///
/// Technically, this only works for `ExactParallelIterator`, but we're faking a
/// bit of specialization here until Rust can do that natively. Callers are
/// using `opt_len` to find the length before calling this, and only exact
/// iterators will return anything but `None` there.
///
/// Since the type system doesn't understand that contract, we have to allow
/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
/// `UnindexedConsumer`. That implementation panics `unreachable!` in case
/// there's a bug where we actually do try to use this unindexed.
#[doc(hidden)]
pub fn special_collect_into<PAR_ITER, T>(pi: PAR_ITER, len: usize, v: &mut Vec<T>)
where PAR_ITER: ParallelIterator<Item = T>,
T: Send
{
assert!(len < isize::MAX as usize);

v.truncate(0); // clear any old data
Expand All @@ -21,7 +40,7 @@ pub fn collect_into<PAR_ITER, T>(mut pi: PAR_ITER, v: &mut Vec<T>)
// assert that target..target+len is unique and writable
CollectConsumer::new(&writes, target, len)
};
pi.drive(consumer);
pi.drive_unindexed(consumer);

unsafe {
// Here, we assert that `v` is fully initialized. This is
Expand Down
4 changes: 4 additions & 0 deletions src/par_iter/enumerate.rs
Expand Up @@ -23,6 +23,10 @@ impl<M> ParallelIterator for Enumerate<M>
{
bridge(self, consumer)
}

fn opt_len(&mut self) -> Option<usize> {
Some(self.len())
}
}

impl<M> BoundedParallelIterator for Enumerate<M>
Expand Down
43 changes: 28 additions & 15 deletions src/par_iter/from_par_iter.rs
Expand Up @@ -45,21 +45,34 @@ impl<T> FromParallelIterator<T> for Vec<T>
fn from_par_iter<PAR_ITER>(par_iter: PAR_ITER) -> Self
where PAR_ITER: IntoParallelIterator<Item = T>
{
// When Rust gets specialization, we can use `collect_into` for exact iterators.
// This works like `combine`, but `Vec::append` is more efficient than `extend`.
let list = par_iter.into_par_iter()
.fold(Vec::new, |mut vec, elem| {
vec.push(elem);
// See the vec_collect benchmarks in rayon-demo for different strategies.
let mut par_iter = par_iter.into_par_iter();
match par_iter.opt_len() {
Some(len) => {
// When Rust gets specialization, call `par_iter.collect_into()`
// for exact iterators. Until then, `special_collect_into()` fakes
// the same thing on the promise that `opt_len()` is accurate.
let mut vec = vec![];
super::collect::special_collect_into(par_iter, len, &mut vec);
vec
})
.collect();

let start = Vec::with_capacity(combined_len(&list));
list.into_iter()
.fold(start, |mut vec, mut sub| {
vec.append(&mut sub);
vec
})
}
None => {
// This works like `combine`, but `Vec::append` is more efficient than `extend`.
let list = par_iter.fold(Vec::new, |mut vec, elem| {
vec.push(elem);
vec
})
.collect();

let len = combined_len(&list);
let start = Vec::with_capacity(len);
list.into_iter()
.fold(start, |mut vec, mut sub| {
vec.append(&mut sub);
vec
})
}
}
}
}

Expand Down Expand Up @@ -104,7 +117,7 @@ impl<T> FromParallelIterator<T> for LinkedList<T>
list1.append(&mut list2);
list1
})
.unwrap_or_else(|| LinkedList::new())
.unwrap_or_else(LinkedList::new)
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/par_iter/map.rs
Expand Up @@ -71,6 +71,10 @@ impl<M, MAP_OP> ParallelIterator for Map<M, MAP_OP>
let consumer1 = MapConsumer::new(consumer, &self.map_op);
self.base.drive_unindexed(consumer1)
}

fn opt_len(&mut self) -> Option<usize> {
self.base.opt_len()
}
}

impl<M, MAP_OP> BoundedParallelIterator for Map<M, MAP_OP>
Expand Down
11 changes: 6 additions & 5 deletions src/par_iter/mod.rs
Expand Up @@ -608,6 +608,12 @@ pub trait ParallelIterator: Sized {
#[doc(hidden)]
fn drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>;

/// Internal method used to fake specialization for indexed collect.
#[doc(hidden)]
fn opt_len(&mut self) -> Option<usize> {
None
}

/// Create a fresh collection containing all the element produced
/// by this parallel iterator.
///
Expand Down Expand Up @@ -650,11 +656,6 @@ pub trait BoundedParallelIterator: ParallelIterator {
pub trait ExactParallelIterator: BoundedParallelIterator {
/// Produces an exact count of how many items this iterator will
/// produce, presuming no panic occurs.
///
/// # Safety note
///
/// Returning an incorrect value here could lead to **undefined
/// behavior**.
fn len(&mut self) -> usize;

/// Collects the results of the iterator into the specified
Expand Down
4 changes: 4 additions & 0 deletions src/par_iter/option.rs
Expand Up @@ -70,6 +70,10 @@ impl<T: Send> ParallelIterator for OptionIter<T> {
{
bridge(self, consumer)
}

fn opt_len(&mut self) -> Option<usize> {
Some(self.len())
}
}

impl<T: Send> BoundedParallelIterator for OptionIter<T> {
Expand Down
4 changes: 4 additions & 0 deletions src/par_iter/range.rs
Expand Up @@ -38,6 +38,10 @@ macro_rules! indexed_range_impl {
{
bridge(self, consumer)
}

fn opt_len(&mut self) -> Option<usize> {
Some(self.len())
}
}

impl BoundedParallelIterator for RangeIter<$t> {
Expand Down
8 changes: 8 additions & 0 deletions src/par_iter/slice.rs
Expand Up @@ -43,6 +43,10 @@ impl<'data, T: Sync + 'data> ParallelIterator for SliceIter<'data, T> {
{
bridge(self, consumer)
}

fn opt_len(&mut self) -> Option<usize> {
Some(self.len())
}
}

impl<'data, T: Sync + 'data> BoundedParallelIterator for SliceIter<'data, T> {
Expand Down Expand Up @@ -84,6 +88,10 @@ impl<'data, T: Sync + 'data> ParallelIterator for ChunksIter<'data, T> {
{
bridge(self, consumer)
}

fn opt_len(&mut self) -> Option<usize> {
Some(self.len())
}
}

impl<'data, T: Sync + 'data> BoundedParallelIterator for ChunksIter<'data, T> {
Expand Down

0 comments on commit 870bf5b

Please sign in to comment.