Skip to content

Commit

Permalink
impl ParallelExtend for tuple pairs
Browse files Browse the repository at this point in the history
`ParallelExtend<(A, B)>` and `ParallelExtend<Either<L, R>>` for tuples
behave like `unzip` and `partition_map` respectively.  These allow the
possibility of nested `unzip` and `partition_map` operations, filling
into more than just two collections.  For instance, `(A, (B, C))` items
can be unzipped into `(Vec<A>, (Vec<B>, Vec<C>))`.
  • Loading branch information
cuviper committed Oct 9, 2018
1 parent df86443 commit e46efd2
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 20 deletions.
49 changes: 42 additions & 7 deletions src/iter/mod.rs
Expand Up @@ -1519,6 +1519,20 @@ pub trait ParallelIterator: Sized + Send {
/// assert_eq!(left, [0, 1, 2, 3]);
/// assert_eq!(right, [1, 2, 3, 4]);
/// ```
///
/// Nested pairs can be unzipped too.
///
/// ```
/// use rayon::prelude::*;
///
/// let (values, (squares, cubes)): (Vec<_>, (Vec<_>, Vec<_>)) = (0..4).into_par_iter()
/// .map(|i| (i, (i * i, i * i * i)))
/// .unzip();
///
/// assert_eq!(values, [0, 1, 2, 3]);
/// assert_eq!(squares, [0, 1, 4, 9]);
/// assert_eq!(cubes, [0, 1, 8, 27]);
/// ```
fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
where Self: ParallelIterator<Item = (A, B)>,
FromA: Default + Send + ParallelExtend<A>,
Expand Down Expand Up @@ -1567,17 +1581,38 @@ pub trait ParallelIterator: Sized + Send {
/// use rayon::iter::Either;
///
/// let (left, right): (Vec<_>, Vec<_>) = (0..8).into_par_iter()
/// .partition_map(|x| {
/// if x % 2 == 0 {
/// Either::Left(x * 4)
/// } else {
/// Either::Right(x * 3)
/// }
/// });
/// .partition_map(|x| {
/// if x % 2 == 0 {
/// Either::Left(x * 4)
/// } else {
/// Either::Right(x * 3)
/// }
/// });
///
/// assert_eq!(left, [0, 8, 16, 24]);
/// assert_eq!(right, [3, 9, 15, 21]);
/// ```
///
/// Nested `Either` enums can be split as well.
///
/// ```
/// use rayon::prelude::*;
/// use rayon::iter::Either::*;
///
/// let ((fizzbuzz, fizz), (buzz, other)): ((Vec<_>, Vec<_>), (Vec<_>, Vec<_>)) = (1..20)
/// .into_par_iter()
/// .partition_map(|x| match (x % 3, x % 5) {
/// (0, 0) => Left(Left(x)),
/// (0, _) => Left(Right(x)),
/// (_, 0) => Right(Left(x)),
/// (_, _) => Right(Right(x)),
/// });
///
/// assert_eq!(fizzbuzz, [15]);
/// assert_eq!(fizz, [3, 6, 9, 12, 18]);
/// assert_eq!(buzz, [5, 10]);
/// assert_eq!(other, [1, 2, 4, 7, 8, 11, 13, 14, 16, 17, 19]);
/// ```
fn partition_map<A, B, P, L, R>(self, predicate: P) -> (A, B)
where A: Default + Send + ParallelExtend<L>,
B: Default + Send + ParallelExtend<R>,
Expand Down
89 changes: 76 additions & 13 deletions src/iter/unzip.rs
Expand Up @@ -24,7 +24,7 @@ trait UnzipOp<T>: Sync + Send {
}
}

/// Run an unzip-like operation into `ParallelExtend` collections.
/// Run an unzip-like operation into default `ParallelExtend` collections.
fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
where I: ParallelIterator,
OP: UnzipOp<I::Item>,
Expand All @@ -33,21 +33,30 @@ fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
{
let mut a = FromA::default();
let mut b = FromB::default();
{
// We have no idea what the consumers will look like for these
// collections' `par_extend`, but we can intercept them in our own
// `drive_unindexed`. Start with the left side, type `A`:
let iter = UnzipA {
base: pi,
op: op,
b: &mut b,
};
a.par_extend(iter);
}
execute_into(&mut a, &mut b, pi, op);
(a, b)
}


/// Run an unzip-like operation into `ParallelExtend` collections.
fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP)
where I: ParallelIterator,
OP: UnzipOp<I::Item>,
FromA: Send + ParallelExtend<OP::Left>,
FromB: Send + ParallelExtend<OP::Right>
{
// We have no idea what the consumers will look like for these
// collections' `par_extend`, but we can intercept them in our own
// `drive_unindexed`. Start with the left side, type `A`:
let iter = UnzipA {
base: pi,
op: op,
b: b,
};
a.par_extend(iter);
}


/// Unzips the items of a parallel iterator into a pair of arbitrary
/// `ParallelExtend` containers.
///
Expand Down Expand Up @@ -188,7 +197,7 @@ struct UnzipA<'b, I, OP, FromB: 'b> {
impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
where I: ParallelIterator,
OP: UnzipOp<I::Item>,
FromB: Default + Send + ParallelExtend<OP::Right>
FromB: Send + ParallelExtend<OP::Right>
{
type Item = OP::Left;

Expand Down Expand Up @@ -386,3 +395,57 @@ impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB>
(self.left.reduce(left.0, right.0), self.right.reduce(left.1, right.1))
}
}


impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB)
where
A: Send,
B: Send,
FromA: Send + ParallelExtend<A>,
FromB: Send + ParallelExtend<B>,
{
fn par_extend<I>(&mut self, pi: I)
where
I: IntoParallelIterator<Item = (A, B)>,
{
execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip);
}
}

impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B)
where
L: Send,
R: Send,
A: Send + ParallelExtend<L>,
B: Send + ParallelExtend<R>,
{
fn par_extend<I>(&mut self, pi: I)
where
I: IntoParallelIterator<Item = Either<L, R>>,
{
execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither);
}
}

/// An `UnzipOp` that routes items depending on their `Either` variant.
struct UnEither;

impl<L, R> UnzipOp<Either<L, R>> for UnEither
where
L: Send,
R: Send,
{
type Left = L;
type Right = R;

fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR)
where
FL: Folder<L>,
FR: Folder<R>,
{
match item {
Either::Left(item) => (left.consume(item), right),
Either::Right(item) => (left, right.consume(item)),
}
}
}

0 comments on commit e46efd2

Please sign in to comment.