Skip to content

Commit

Permalink
Merge #726
Browse files Browse the repository at this point in the history
726: add step by parallel iterator. r=cuviper a=FlyingCanoe

this is a pull request for the #494. It add a step_by iterator for `IndexedParallelIterator` producing a iterator that is also indexed parallel iterator. It is Implemented in pull mode.

Co-authored-by: flyingcanoe <flyingcanoe@protonmail.com>
  • Loading branch information
bors[bot] and FlyingCanoe committed Mar 5, 2020
2 parents 87dfd36 + 075088a commit 81f7e16
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 176 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ doc-comment = "0.3"
[dev-dependencies.serde]
version = "1.0.85"
features = ["derive"]

[build-dependencies]
autocfg = "1"
6 changes: 6 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fn main() {
let ac = autocfg::new();
if ac.probe_expression("(0..10).step_by(2).rev()") {
autocfg::emit("step_by");
}
}
362 changes: 186 additions & 176 deletions ci/compat-Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/compile_fail/must_use.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ macro_rules! must_use {
}

must_use! {
step_by /** v.par_iter().step_by(2); */
chain /** v.par_iter().chain(&v); */
chunks /** v.par_iter().chunks(2); */
cloned /** v.par_iter().cloned(); */
Expand Down
27 changes: 27 additions & 0 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ mod intersperse;
pub use self::intersperse::Intersperse;
mod update;
pub use self::update::Update;
mod step_by;
#[cfg(step_by)]
pub use self::step_by::StepBy;

mod noop;
mod rev;
Expand Down Expand Up @@ -2426,6 +2429,30 @@ pub trait IndexedParallelIterator: ParallelIterator {
Enumerate::new(self)
}

/// Creates an iterator that steps by the given amount
///
/// # Examples
///
/// ```
///use rayon::prelude::*;
///
/// let range = (3..10);
/// let result: Vec<i32> = range
/// .into_par_iter()
/// .step_by(3)
/// .collect();
///
/// assert_eq!(result, [3, 6, 9])
/// ```
///
/// # Compatibility
///
/// This method is only available on Rust 1.38 or greater.
#[cfg(step_by)]
fn step_by(self, step: usize) -> StepBy<Self> {
StepBy::new(self, step)
}

/// Creates an iterator that skips the first `n` elements.
///
/// # Examples
Expand Down
144 changes: 144 additions & 0 deletions src/iter/step_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#![cfg(step_by)]
use std::cmp::min;

use super::plumbing::*;
use super::*;
use crate::math::div_round_up;
use std::iter;
use std::usize;

/// `StepBy` is an iterator that skips `n` elements between each yield, where `n` is the given step.
/// This struct is created by the [`step_by()`] method on [`IndexedParallelIterator`]
///
/// [`step_by()`]: trait.IndexedParallelIterator.html#method.step_by
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct StepBy<I: IndexedParallelIterator> {
base: I,
step: usize,
}

impl<I> StepBy<I>
where
I: IndexedParallelIterator,
{
/// Create a new `StepBy` iterator.
pub(super) fn new(base: I, step: usize) -> Self {
StepBy { base, step }
}
}

impl<I> ParallelIterator for StepBy<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<I> IndexedParallelIterator for StepBy<I>
where
I: IndexedParallelIterator,
{
fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
bridge(self, consumer)
}

fn len(&self) -> usize {
div_round_up(self.base.len(), self.step)
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
let len = self.base.len();
return self.base.with_producer(Callback {
callback,
step: self.step,
len,
});

struct Callback<CB> {
callback: CB,
step: usize,
len: usize,
}

impl<T, CB> ProducerCallback<T> for Callback<CB>
where
CB: ProducerCallback<T>,
{
type Output = CB::Output;
fn callback<P>(self, base: P) -> CB::Output
where
P: Producer<Item = T>,
{
let producer = StepByProducer {
base,
step: self.step,
len: self.len,
};
self.callback.callback(producer)
}
}
}
}

/// ////////////////////////////////////////////////////////////////////////
/// Producer implementation

struct StepByProducer<P> {
base: P,
step: usize,
len: usize,
}

impl<P> Producer for StepByProducer<P>
where
P: Producer,
{
type Item = P::Item;
type IntoIter = iter::StepBy<P::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
self.base.into_iter().step_by(self.step)
}

fn split_at(self, index: usize) -> (Self, Self) {
let elem_index = min(index * self.step, self.len);

let (left, right) = self.base.split_at(elem_index);
(
StepByProducer {
base: left,
step: self.step,
len: elem_index,
},
StepByProducer {
base: right,
step: self.step,
len: self.len - elem_index,
},
)
}

fn min_len(&self) -> usize {
div_round_up(self.base.min_len(), self.step)
}

fn max_len(&self) -> usize {
self.base.max_len() / self.step
}
}
24 changes: 24 additions & 0 deletions src/iter/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,30 @@ fn fold_is_full() {
assert!(counter.load(Ordering::SeqCst) < 2048); // should not have visited every single one
}

#[test]
fn check_step_by() {
let a: Vec<i32> = (0..1024).step_by(2).collect();
let b: Vec<i32> = (0..1024).into_par_iter().step_by(2).collect();

assert_eq!(a, b);
}

#[test]
fn check_step_by_unaligned() {
let a: Vec<i32> = (0..1029).step_by(10).collect();
let b: Vec<i32> = (0..1029).into_par_iter().step_by(10).collect();

assert_eq!(a, b)
}

#[test]
fn check_step_by_rev() {
let a: Vec<i32> = (0..1024).step_by(2).rev().collect();
let b: Vec<i32> = (0..1024).into_par_iter().step_by(2).rev().collect();

assert_eq!(a, b);
}

#[test]
fn check_enumerate() {
let a: Vec<usize> = (0..1024).rev().collect();
Expand Down
1 change: 1 addition & 0 deletions tests/clones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ fn clone_adaptors() {
check(v.par_iter().with_min_len(1));
check(v.par_iter().zip(&v));
check(v.par_iter().zip_eq(&v));
check(v.par_iter().step_by(2));
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions tests/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ fn debug_adaptors() {
check(v.par_iter().with_min_len(1));
check(v.par_iter().zip(&v));
check(v.par_iter().zip_eq(&v));
check(v.par_iter().step_by(2));
}

#[test]
Expand Down
12 changes: 12 additions & 0 deletions tests/producer_split_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ fn enumerate() {
check(&v, || (0..10).into_par_iter().enumerate());
}

#[test]
fn step_by() {
let v: Vec<_> = (0..10).step_by(2).collect();
check(&v, || (0..10).into_par_iter().step_by(2))
}

#[test]
fn step_by_unaligned() {
let v: Vec<_> = (0..10).step_by(3).collect();
check(&v, || (0..10).into_par_iter().step_by(3))
}

#[test]
fn inspect() {
let v: Vec<_> = (0..10).collect();
Expand Down

0 comments on commit 81f7e16

Please sign in to comment.