Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add traits ParallelDrainRange and ParallelDrainFull #787

Merged
merged 2 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/collections/binary_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,65 @@ delegate_indexed_iterator! {
}

// `BinaryHeap` doesn't have a mutable `Iterator`

/// Draining parallel iterator that moves out of a binary heap,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Ord + Send> {
heap: &'a mut BinaryHeap<T>,
}

impl<'a, T: Ord + Send> ParallelDrainFull for &'a mut BinaryHeap<T> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain(self) -> Self::Iter {
Drain { heap: self }
}
}

impl<'a, T: Ord + Send> ParallelIterator for Drain<'a, T> {
type Item = T;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<'a, T: Ord + Send> IndexedParallelIterator for Drain<'a, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}

fn len(&self) -> usize {
self.heap.len()
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
super::DrainGuard::new(self.heap)
.par_drain(..)
.with_producer(callback)
}
}

impl<'a, T: Ord + Send> Drop for Drain<'a, T> {
fn drop(&mut self) {
if !self.heap.is_empty() {
// We must not have produced, so just call a normal drain to remove the items.
self.heap.drain();
}
}
}
29 changes: 29 additions & 0 deletions src/collections/hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::collections::HashMap;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

use crate::iter::plumbing::*;
use crate::iter::*;
Expand Down Expand Up @@ -65,3 +66,31 @@ delegate_iterator! {
IterMut<'a, K, V> => (&'a K, &'a mut V),
impl<'a, K: Hash + Eq + Sync + 'a, V: Send + 'a>
}

/// Draining parallel iterator that moves out of a hash map,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, K: Hash + Eq + Send, V: Send> {
inner: vec::IntoIter<(K, V)>,
marker: PhantomData<&'a mut HashMap<K, V>>,
}

impl<'a, K: Hash + Eq + Send, V: Send, S: BuildHasher> ParallelDrainFull
for &'a mut HashMap<K, V, S>
{
type Iter = Drain<'a, K, V>;
type Item = (K, V);

fn par_drain(self) -> Self::Iter {
let vec: Vec<_> = self.drain().collect();
Drain {
inner: vec.into_par_iter(),
marker: PhantomData,
}
}
}

delegate_iterator! {
Drain<'_, K, V> => (K, V),
impl<K: Hash + Eq + Send, V: Send>
}
27 changes: 27 additions & 0 deletions src/collections/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::collections::HashSet;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;

use crate::iter::plumbing::*;
use crate::iter::*;
Expand Down Expand Up @@ -51,3 +52,29 @@ delegate_iterator! {
}

// `HashSet` doesn't have a mutable `Iterator`

/// Draining parallel iterator that moves out of a hash set,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Hash + Eq + Send> {
inner: vec::IntoIter<T>,
marker: PhantomData<&'a mut HashSet<T>>,
}

impl<'a, T: Hash + Eq + Send, S: BuildHasher> ParallelDrainFull for &'a mut HashSet<T, S> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain(self) -> Self::Iter {
let vec: Vec<_> = self.drain().collect();
Drain {
inner: vec.into_par_iter(),
marker: PhantomData,
}
}
}

delegate_iterator! {
Drain<'_, T> => T,
impl<T: Hash + Eq + Send>
}
54 changes: 54 additions & 0 deletions src/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,57 @@ pub mod hash_map;
pub mod hash_set;
pub mod linked_list;
pub mod vec_deque;

use self::drain_guard::DrainGuard;

mod drain_guard {
use crate::iter::ParallelDrainRange;
use std::mem;
use std::ops::RangeBounds;

/// A proxy for draining a collection by converting to a `Vec` and back.
///
/// This is used for draining `BinaryHeap` and `VecDeque`, which both have
/// zero-allocation conversions to/from `Vec`, though not zero-cost:
/// - `BinaryHeap` will heapify from `Vec`, but at least that will be empty.
/// - `VecDeque` has to shift items to offset 0 when converting to `Vec`.
#[allow(missing_debug_implementations)]
pub(super) struct DrainGuard<'a, T, C: From<Vec<T>>> {
collection: &'a mut C,
vec: Vec<T>,
}

impl<'a, T, C> DrainGuard<'a, T, C>
where
C: Default + From<Vec<T>>,
Vec<T>: From<C>,
{
pub(super) fn new(collection: &'a mut C) -> Self {
Self {
// Temporarily steal the inner `Vec` so we can drain in place.
vec: Vec::from(mem::replace(collection, C::default())),
collection,
}
}
}

impl<'a, T, C: From<Vec<T>>> Drop for DrainGuard<'a, T, C> {
fn drop(&mut self) {
// Restore the collection from the `Vec` with its original capacity.
*self.collection = C::from(mem::replace(&mut self.vec, Vec::new()));
}
}

impl<'a, T, C> ParallelDrainRange<usize> for &'a mut DrainGuard<'_, T, C>
where
T: Send,
C: From<Vec<T>>,
{
type Iter = crate::vec::Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
self.vec.par_drain(range)
}
}
}
84 changes: 81 additions & 3 deletions src/collections/vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//! unless you have need to name one of the iterator types.

use std::collections::VecDeque;
use std::ops::{Range, RangeBounds};

use crate::iter::plumbing::*;
use crate::iter::*;
use crate::math::simplify_range;

use crate::slice;
use crate::vec;
Expand All @@ -16,9 +18,15 @@ pub struct IntoIter<T: Send> {
inner: vec::IntoIter<T>,
}

into_par_vec! {
VecDeque<T> => IntoIter<T>,
impl<T: Send>
impl<T: Send> IntoParallelIterator for VecDeque<T> {
type Item = T;
type Iter = IntoIter<T>;

fn into_par_iter(self) -> Self::Iter {
// NOTE: requires data movement if the deque doesn't start at offset 0.
let inner = Vec::from(self).into_par_iter();
IntoIter { inner }
}
}

delegate_indexed_iterator! {
Expand Down Expand Up @@ -79,3 +87,73 @@ delegate_indexed_iterator! {
IterMut<'a, T> => &'a mut T,
impl<'a, T: Send + 'a>
}

/// Draining parallel iterator that moves a range out of a double-ended queue,
/// but keeps the total capacity.
#[derive(Debug)]
pub struct Drain<'a, T: Send> {
deque: &'a mut VecDeque<T>,
range: Range<usize>,
orig_len: usize,
}

impl<'a, T: Send> ParallelDrainRange<usize> for &'a mut VecDeque<T> {
type Iter = Drain<'a, T>;
type Item = T;

fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
Drain {
orig_len: self.len(),
range: simplify_range(range, self.len()),
deque: self,
}
}
}

impl<'a, T: Send> ParallelIterator for Drain<'a, T> {
type Item = T;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge(self, consumer)
}

fn opt_len(&self) -> Option<usize> {
Some(self.len())
}
}

impl<'a, T: Send> IndexedParallelIterator for Drain<'a, T> {
fn drive<C>(self, consumer: C) -> C::Result
where
C: Consumer<Self::Item>,
{
bridge(self, consumer)
}

fn len(&self) -> usize {
self.range.len()
}

fn with_producer<CB>(self, callback: CB) -> CB::Output
where
CB: ProducerCallback<Self::Item>,
{
// NOTE: requires data movement if the deque doesn't start at offset 0.
super::DrainGuard::new(self.deque)
.par_drain(self.range.clone())
.with_producer(callback)
}
}

impl<'a, T: Send> Drop for Drain<'a, T> {
fn drop(&mut self) {
if self.deque.len() != self.orig_len - self.range.len() {
// We must not have produced, so just call a normal drain to remove the items.
assert_eq!(self.deque.len(), self.orig_len);
self.deque.drain(self.range.clone());
}
}
}