Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Create Scan parallel iterator #1036

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions rayon-demo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ once_cell = "1.17.1"
rand = "0.8"
rand_xorshift = "0.3"
regex = "1"
ndarray = "0.15.6"

[dependencies.serde]
version = "1.0.85"
Expand Down
2 changes: 2 additions & 0 deletions rayon-demo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ mod sort;
mod str_split;
#[cfg(test)]
mod vec_collect;
#[cfg(test)]
mod scan;

#[cfg(test)]
extern crate test;
Expand Down
124 changes: 124 additions & 0 deletions rayon-demo/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use ndarray::{Array, Dim};
use rayon::iter::*;
use std::time::{Duration, Instant};
use std::num::Wrapping;

const SIZE: usize = 10000;

enum Procs {
Sequential,
Parallel,
}

fn scan_sequential<T, P, I>(init: I, id: T, scan_op: P) -> Vec<T>
where
T: Clone,
I: Fn() -> T,
P: FnMut(&mut T, &T) -> Option<T>,
{
let v = vec![init(); SIZE];
let scan = v.iter().scan(id, scan_op);
scan.collect()
}

fn scan_parallel<T, P, I>(init: I, id: T, scan_op: P) -> Vec<T>
where
T: Clone + Send + Sync,
I: Fn() -> T,
P: Fn(&T, &T) -> T + Sync,
{
let v = vec![init(); SIZE];
let scan = v.into_par_iter().with_min_len(SIZE / 100).scan(&scan_op, id);
scan.collect()
}

/******* Addition with artificial delay *******/

const DELAY: Duration = Duration::from_nanos(10);
fn wait() -> i32 {
let time = Instant::now();

let mut sum = 0;
while time.elapsed() < DELAY {
sum += 1;
}
sum
}

fn scan_add(procs: Procs) -> Vec<i32> {
let init = || 2;
let id = 0;

match procs {
Procs::Sequential => {
let f = |state: &mut i32, x: &i32| {
test::black_box(wait());
*state += x;
Some(*state)
};
scan_sequential(init, id, f)
}
Procs::Parallel => {
let f = |x: &i32, y: &i32| {
test::black_box(wait());
*x + *y
};
scan_parallel(init, id, f)
}
}
}

#[bench]
fn scan_add_sequential(b: &mut test::Bencher) {
b.iter(|| scan_add(Procs::Sequential));
}

#[bench]
fn scan_add_parallel(b: &mut test::Bencher) {
b.iter(|| scan_add(Procs::Parallel));
}

#[test]
fn test_scan_add() {
assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel));
}

/******** Matrix multiplication with wrapping arithmetic *******/

type Matrix = Array<Wrapping<i32>, Dim<[usize; 2]>>;
fn scan_matmul(procs: Procs) -> Vec<Matrix> {
const MAT_SIZE: usize = 50;
let init = || {
Array::from_iter((0..((MAT_SIZE * MAT_SIZE) as i32)).map(|x| Wrapping(x)))
.into_shape((MAT_SIZE, MAT_SIZE))
.unwrap()
};
let id = Array::eye(MAT_SIZE);

match procs {
Procs::Sequential => {
let f = |state: &mut Matrix, x: &Matrix| {
*state = state.dot(x);
Some(state.clone())
};

scan_sequential(init, id, f)
}
Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)),
}
}

#[bench]
fn scan_matmul_sequential(b: &mut test::Bencher) {
b.iter(|| scan_matmul(Procs::Sequential));
}

#[bench]
fn scan_matmul_parallel(b: &mut test::Bencher) {
b.iter(|| scan_matmul(Procs::Parallel));
}

#[test]
fn test_scan_matmul() {
assert_eq!(scan_matmul(Procs::Sequential), scan_matmul(Procs::Parallel));
}
11 changes: 11 additions & 0 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@

use self::plumbing::*;
use self::private::Try;
use self::scan::Scan;
pub use either::Either;
use std::cmp::{self, Ordering};
use std::iter::{Product, Sum};
Expand Down Expand Up @@ -158,6 +159,8 @@ mod while_some;
mod zip;
mod zip_eq;

mod scan;

pub use self::{
chain::Chain,
chunks::Chunks,
Expand Down Expand Up @@ -1384,6 +1387,14 @@ pub trait ParallelIterator: Sized + Send {
sum::sum(self)
}

fn scan<F>(self, scan_op: F, id: Self::Item) -> Scan<Self::Item, F>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's start here, documenting at the API level, especially for a user who is looking for an equivalent to Iterator::scan. How would you describe what this does, and importantly how is it different than the sequential version? The signature of F is quite different, looking more like some flavor of reduce.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I wasn't exactly sure what the signature should be, and it could change. But it's similar to the way reduce compares to fold. This should have an identical result to sequential scan when the operation doesn't have internal state, and is associative. If not, it won't make sense to run it in parallel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cuviper, could you take another look at this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel you addressed my first comment? The signature is part of it, but please make an attempt at adding documentation describing what this does. And frankly, I don't think many people even use Iterator::scan, so it's not sufficient to just reference that. Suppose this existed independently - describe what it does, and what points are important for the user to think about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood you last time. I've added documentation.

I was hoping to get some feedback on the signature and general approach. These are the differences from the other ParallelIterator functions, and my rationale for them:

  1. The scan operation currently has type Fn(&Item, &Item) -> Item, since when we call it iteratively, we need to keep each intermediate result and not consume it.
  2. Since scan_op takes in references, we don't need multiple copies of identity. So, I have identity as type Item, rather than Fn() -> Item, since it's simpler. The other functions use Fn() -> Item, though, and it would be easy to change.

Do you have any preferences on those?

where
F: Fn(&Self::Item, &Self::Item) -> Self::Item + Sync + Send,
<Self as ParallelIterator>::Item: Send + Sync,
{
scan::scan(self, scan_op, id)
}

/// Multiplies all the items in the iterator.
///
/// Note that the order in items will be reduced is not specified,
Expand Down