Get an arbitrary `Err` from a parallel iterator over `Result<()>` #113

Closed
emk opened this Issue Oct 19, 2016 · 8 comments

Comments

Projects
None yet
3 participants
@emk

emk commented Oct 19, 2016

I just converted a for loop in cage to a parallel iteration. The original loop used try!, giving me .map(|pod| -> Result<()> {.

In a perfect world, I might like to collect all the errors in some order, but that would require messing around with reducing a "vector of errors" type, which is going to be allocation intensive. We can do almost as well by just choosing one of many errors, and showing that one to the user.

@cuviper helped me figure out the following solution:

            // If more than one parallel branch fails, just return one error.
            .reduce_with(|result1, result2| result1.and(result2).and(Ok(())))
            .unwrap_or(Ok(()))

Having this as a standard, built-in reduction would make it much easier to parallelize loops using try!.

@nikomatsakis

This comment has been minimized.

Show comment
Hide comment
@nikomatsakis

nikomatsakis Oct 19, 2016

Member

I've been wanting for some time to do something standard like that, but also allow you to opt-in to a non-deterministic result in exchange for speed. The idea would be to use an AtomicBool which is set to true when an error occurs -- then, before we process each item, we check if an error has occurred and bail out if so.

Member

nikomatsakis commented Oct 19, 2016

I've been wanting for some time to do something standard like that, but also allow you to opt-in to a non-deterministic result in exchange for speed. The idea would be to use an AtomicBool which is set to true when an error occurs -- then, before we process each item, we check if an error has occurred and bail out if so.

@cuviper

This comment has been minimized.

Show comment
Hide comment
@cuviper

cuviper Oct 19, 2016

Member

See also the discussion in #51. This sort of early-exit can probably share common code.

Member

cuviper commented Oct 19, 2016

See also the discussion in #51. This sort of early-exit can probably share common code.

@cuviper

This comment has been minimized.

Show comment
Hide comment
@cuviper

cuviper Oct 19, 2016

Member

BTW, if your types are already Result<(), E>, then that final .and(Ok(())) is redundant. That was just a quick way to map any arbitrary Ok(T) into nothing.

Member

cuviper commented Oct 19, 2016

BTW, if your types are already Result<(), E>, then that final .and(Ok(())) is redundant. That was just a quick way to map any arbitrary Ok(T) into nothing.

@emk

This comment has been minimized.

Show comment
Hide comment
@emk

emk Oct 20, 2016

I ran a bit with the idea of converting existing for ... in and map(...).collect() iterations to parallel iterations using rayon, and I came up with two badly-named methods: reduce_automatically, which automatically reduces any type supporting an identity and an associative operator, and reduce_results_to_vec, which takes a parallel iterator over Result<T> and returns Result<Vec<T>> in some arbitrary order.

I have no idea how generally useful these might be, but I submit them as data points to your design process. Using these two reduction functions, I think I trivially convert most of the loops and iterations in cage to use rayon.

Thank you for a great library!

//! Extensions for `rayon`'s parallel iterators.

use rayon::prelude::*;

/// Wikipedia says: "In abstract algebra, a branch of mathematics, a monoid
/// is an algebraic structure with a single associative binary operation
/// and an identity element."
///
/// This trait represents values that have sensible "auto-reduce" behavior
/// when returned from a parallel computation.  We allow associative but
/// non-commutative reduce operators because it's often convenient to
/// collect results in a `Vec`, even if the order is arbitrary.
trait MonoidReduce {
    /// The identity element: This is what you want to return if you have a
    /// parallel computation with no inputs, just for example.
    fn default() -> Self;

    /// Given two values, reduce them to a single value.
    fn reduce(v1: Self, v2: Self) -> Self;
}

/// Reduce `()` to itself.
impl MonoidReduce for () {
    fn default() -> Self {
        ()
    }

    fn reduce(_: Self, _: Self) -> Self {
        ()
    }
}

/// Reduce `Vec` by combining two vectors into one.
impl<T> MonoidReduce for Vec<T> {
    fn default() -> Self {
        vec![]
    }

    fn reduce(mut v1: Self, mut v2: Self) -> Self {
        v1.append(&mut v2);
        v1
    }
}

/// Reduce `Result` by propagating failures.  If all results are
/// successful, do pair-wise reductions of the `Ok` values.
///
/// When https://github.com/nikomatsakis/rayon/issues/113 is fixed, there
/// should be a built-in API for reducing `Result` that aborts the
/// computation early.
impl<T: MonoidReduce, E> MonoidReduce for Result<T, E> {
    fn default() -> Self {
        Ok(T::default())
    }

    fn reduce(v1: Self, v2: Self) -> Self {
        match (v1, v2) {
            (Ok(v1), Ok(v2)) => Ok(T::reduce(v1, v2)),
            (err @ Err(_), _) |
            (_, err @ Err(_)) => err,
        }
    }
}

/// Extra methods that we can call on a `ParallelIterator` over a type
/// support `MonoidReduce`.
trait ParallelIteratorReduceAutomatically<T>: ParallelIterator<Item = T>
    where T: MonoidReduce + Send
{
    /// Return `Ok(())` if no result is an `Err`, or an
    /// arbitrarily-selected error if there are one or more error results.
    /// This is appropriate when we don't want to spam the user with
    /// multiple errors from the same run.
    fn reduce_automatically(self) -> T;
}

impl<T, I> ParallelIteratorReduceAutomatically<T> for I
    where T: MonoidReduce + Send,
          I: ParallelIterator<Item = T>
{
    fn reduce_automatically(self) -> T {
        self.reduce_with(T::reduce)
            .unwrap_or_else(T::default)
    }
}

#[test]
#[cfg_attr(feature="clippy", allow(let_unit_value))]
fn reduce_automatically_uses_monoid_reduce() {
    let _: () = vec![(), ()].par_iter().cloned().reduce_automatically();
    assert_eq!(vec![vec!(1), vec!(1)].par_iter().cloned().reduce_automatically(),
               vec![1, 1]);
    let ok: Result<(), &'static str> = Ok(());
    assert_eq!(vec![ok, ok].par_iter().cloned().reduce_automatically(),
               Ok(()));
    assert_eq!(vec![Err("!"), Ok(())].par_iter().cloned().reduce_automatically(),
               Err("!"));
}

/// Extra methods that we can call on a `ParallelIterator`.
trait ParallelIteratorReduceToResultsToVec<T, E>
    : ParallelIterator<Item = Result<T, E>>
    where T: Send,
          E: Send
{
    /// Collect the results of a parallel iteration as a `Vec` in an
    /// arbitrary order if all results succeed.
    fn reduce_results_to_vec(self) -> Result<Vec<T>, E>;
}

impl<T, E, I> ParallelIteratorReduceToResultsToVec<T, E> for I
    where T: Send,
          E: Send,
          I: ParallelIterator<Item = Result<T, E>>
{
    fn reduce_results_to_vec(self) -> Result<Vec<T>, E> {
        self.map(|result| result.map(|value| vec![value]))
            .reduce_automatically()
    }
}

#[test]
fn reduce_results_to_vec_returns_ok_on_success() {
    let result: Result<Vec<u8>, &'static str> = vec![1,1,1]
        .par_iter()
        .map(|v| Ok(v.to_owned()))
        .reduce_results_to_vec();
    assert_eq!(result.unwrap(), vec![1, 1, 1]);
}

#[test]
fn reduce_results_to_vec_returns_err_on_any_failure() {
    let result: Result<Vec<u8>, &'static str> = vec![1,1,1]
        .par_iter()
        .map(|_| Err("!"))
        .reduce_results_to_vec();
    assert!(result.is_err());
}

Note that if you use a wrapper type implementing MonoidReduce, you can reduce just about anything easily. And reduce_to_vec could be easily generalized to many more collection types with an extra trait.

emk commented Oct 20, 2016

I ran a bit with the idea of converting existing for ... in and map(...).collect() iterations to parallel iterations using rayon, and I came up with two badly-named methods: reduce_automatically, which automatically reduces any type supporting an identity and an associative operator, and reduce_results_to_vec, which takes a parallel iterator over Result<T> and returns Result<Vec<T>> in some arbitrary order.

I have no idea how generally useful these might be, but I submit them as data points to your design process. Using these two reduction functions, I think I trivially convert most of the loops and iterations in cage to use rayon.

Thank you for a great library!

//! Extensions for `rayon`'s parallel iterators.

use rayon::prelude::*;

/// Wikipedia says: "In abstract algebra, a branch of mathematics, a monoid
/// is an algebraic structure with a single associative binary operation
/// and an identity element."
///
/// This trait represents values that have sensible "auto-reduce" behavior
/// when returned from a parallel computation.  We allow associative but
/// non-commutative reduce operators because it's often convenient to
/// collect results in a `Vec`, even if the order is arbitrary.
trait MonoidReduce {
    /// The identity element: This is what you want to return if you have a
    /// parallel computation with no inputs, just for example.
    fn default() -> Self;

    /// Given two values, reduce them to a single value.
    fn reduce(v1: Self, v2: Self) -> Self;
}

/// Reduce `()` to itself.
impl MonoidReduce for () {
    fn default() -> Self {
        ()
    }

    fn reduce(_: Self, _: Self) -> Self {
        ()
    }
}

/// Reduce `Vec` by combining two vectors into one.
impl<T> MonoidReduce for Vec<T> {
    fn default() -> Self {
        vec![]
    }

    fn reduce(mut v1: Self, mut v2: Self) -> Self {
        v1.append(&mut v2);
        v1
    }
}

/// Reduce `Result` by propagating failures.  If all results are
/// successful, do pair-wise reductions of the `Ok` values.
///
/// When https://github.com/nikomatsakis/rayon/issues/113 is fixed, there
/// should be a built-in API for reducing `Result` that aborts the
/// computation early.
impl<T: MonoidReduce, E> MonoidReduce for Result<T, E> {
    fn default() -> Self {
        Ok(T::default())
    }

    fn reduce(v1: Self, v2: Self) -> Self {
        match (v1, v2) {
            (Ok(v1), Ok(v2)) => Ok(T::reduce(v1, v2)),
            (err @ Err(_), _) |
            (_, err @ Err(_)) => err,
        }
    }
}

/// Extra methods that we can call on a `ParallelIterator` over a type
/// support `MonoidReduce`.
trait ParallelIteratorReduceAutomatically<T>: ParallelIterator<Item = T>
    where T: MonoidReduce + Send
{
    /// Return `Ok(())` if no result is an `Err`, or an
    /// arbitrarily-selected error if there are one or more error results.
    /// This is appropriate when we don't want to spam the user with
    /// multiple errors from the same run.
    fn reduce_automatically(self) -> T;
}

impl<T, I> ParallelIteratorReduceAutomatically<T> for I
    where T: MonoidReduce + Send,
          I: ParallelIterator<Item = T>
{
    fn reduce_automatically(self) -> T {
        self.reduce_with(T::reduce)
            .unwrap_or_else(T::default)
    }
}

#[test]
#[cfg_attr(feature="clippy", allow(let_unit_value))]
fn reduce_automatically_uses_monoid_reduce() {
    let _: () = vec![(), ()].par_iter().cloned().reduce_automatically();
    assert_eq!(vec![vec!(1), vec!(1)].par_iter().cloned().reduce_automatically(),
               vec![1, 1]);
    let ok: Result<(), &'static str> = Ok(());
    assert_eq!(vec![ok, ok].par_iter().cloned().reduce_automatically(),
               Ok(()));
    assert_eq!(vec![Err("!"), Ok(())].par_iter().cloned().reduce_automatically(),
               Err("!"));
}

/// Extra methods that we can call on a `ParallelIterator`.
trait ParallelIteratorReduceToResultsToVec<T, E>
    : ParallelIterator<Item = Result<T, E>>
    where T: Send,
          E: Send
{
    /// Collect the results of a parallel iteration as a `Vec` in an
    /// arbitrary order if all results succeed.
    fn reduce_results_to_vec(self) -> Result<Vec<T>, E>;
}

impl<T, E, I> ParallelIteratorReduceToResultsToVec<T, E> for I
    where T: Send,
          E: Send,
          I: ParallelIterator<Item = Result<T, E>>
{
    fn reduce_results_to_vec(self) -> Result<Vec<T>, E> {
        self.map(|result| result.map(|value| vec![value]))
            .reduce_automatically()
    }
}

#[test]
fn reduce_results_to_vec_returns_ok_on_success() {
    let result: Result<Vec<u8>, &'static str> = vec![1,1,1]
        .par_iter()
        .map(|v| Ok(v.to_owned()))
        .reduce_results_to_vec();
    assert_eq!(result.unwrap(), vec![1, 1, 1]);
}

#[test]
fn reduce_results_to_vec_returns_err_on_any_failure() {
    let result: Result<Vec<u8>, &'static str> = vec![1,1,1]
        .par_iter()
        .map(|_| Err("!"))
        .reduce_results_to_vec();
    assert!(result.is_err());
}

Note that if you use a wrapper type implementing MonoidReduce, you can reduce just about anything easily. And reduce_to_vec could be easily generalized to many more collection types with an extra trait.

@nikomatsakis

This comment has been minimized.

Show comment
Hide comment
@nikomatsakis

nikomatsakis Nov 19, 2016

Member

Hmm so I guess we can't quite use find_any() for this; it's more like a collect() wrapper. But we're close. =)

Member

nikomatsakis commented Nov 19, 2016

Hmm so I guess we can't quite use find_any() for this; it's more like a collect() wrapper. But we're close. =)

@cuviper

This comment has been minimized.

Show comment
Hide comment
@cuviper

cuviper Oct 17, 2017

Member

The standard library has FromIterator for Option<C> and Result<C, E>, where C: FromIterator too, and these short-circuit on the first None or Err seen. Since #315 we've had the equivalent behavior with FromParallelIterator.

It's not a fully-general monoid reduction, but does this meet your original need?

Aside, it might be useful to allow collecting () from iterators of Item=(), both in std and rayon. This would essentially act like for_each, except you could combine it with the above Option and Result behavior too. I think I'll try it in std first...

Member

cuviper commented Oct 17, 2017

The standard library has FromIterator for Option<C> and Result<C, E>, where C: FromIterator too, and these short-circuit on the first None or Err seen. Since #315 we've had the equivalent behavior with FromParallelIterator.

It's not a fully-general monoid reduction, but does this meet your original need?

Aside, it might be useful to allow collecting () from iterators of Item=(), both in std and rayon. This would essentially act like for_each, except you could combine it with the above Option and Result behavior too. I think I'll try it in std first...

@emk

This comment has been minimized.

Show comment
Hide comment
@emk

emk Oct 18, 2017

It's not a fully-general monoid reduction, but does this meet your original need?

Yes, my specific use case is short-circuiting on errors, and just returning one. I don't need fully-general monoid reductions.

(I do admit a slight fondness for those basic abstract algebraic traits like "monoid" or "group". They're well-established mathematical abstractions, and they cover hundreds of use-cases precisely. But of course, in Rust, you'd probably want to rename them to something a bit friendlier, anyway. And here, we really prefer the short-circuiting behavior, anyway.)

emk commented Oct 18, 2017

It's not a fully-general monoid reduction, but does this meet your original need?

Yes, my specific use case is short-circuiting on errors, and just returning one. I don't need fully-general monoid reductions.

(I do admit a slight fondness for those basic abstract algebraic traits like "monoid" or "group". They're well-established mathematical abstractions, and they cover hundreds of use-cases precisely. But of course, in Rust, you'd probably want to rename them to something a bit friendlier, anyway. And here, we really prefer the short-circuiting behavior, anyway.)

@cuviper

This comment has been minimized.

Show comment
Hide comment
@cuviper

cuviper Oct 18, 2017

Member

OK, thanks!

Member

cuviper commented Oct 18, 2017

OK, thanks!

@cuviper cuviper closed this Oct 18, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment