Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mapped_futures #2751

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pin-project-lite = "0.2.6"
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
futures-test = { path = "../futures-test" }
tokio = "0.1.11"
futures-timer = "3.0.3"

[package.metadata.docs.rs]
all-features = true
Expand Down
1 change: 0 additions & 1 deletion futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
)
))]
#![warn(missing_docs, unsafe_op_in_unsafe_fn)]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![allow(clippy::needless_borrow)] // https://github.com/rust-lang/futures-rs/pull/2558#issuecomment-1030745203
#![allow(clippy::arc_with_non_send_sync)] // false positive https://github.com/rust-lang/rust-clippy/issues/11076
Expand Down
96 changes: 12 additions & 84 deletions futures-util/src/stream/futures_unordered/iter.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use super::task::Task;
use super::FuturesUnordered;
use core::marker::PhantomData;
use crate::stream::futures_unordered_internal;
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::Ordering::Relaxed;

use super::DummyStruct;

/// Mutable iterator over all futures in the unordered set.
#[derive(Debug)]
pub struct IterPinMut<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
pub(super) inner: futures_unordered_internal::IterPinMut<'a, (), Fut, DummyStruct>,
}

/// Mutable iterator over all futures in the unordered set.
Expand All @@ -20,10 +16,7 @@ pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
/// Immutable iterator over all futures in the unordered set.
#[derive(Debug)]
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) pending_next_all: *mut Task<Fut>,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
pub(super) inner: futures_unordered_internal::IterPinRef<'a, (), Fut, DummyStruct>,
}

/// Immutable iterator over all the futures in the unordered set.
Expand All @@ -33,42 +26,18 @@ pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
/// Owned iterator over all futures in the unordered set.
#[derive(Debug)]
pub struct IntoIter<Fut: Unpin> {
pub(super) len: usize,
pub(super) inner: FuturesUnordered<Fut>,
pub(super) inner: futures_unordered_internal::IntoIter<(), Fut, DummyStruct>,
}

impl<Fut: Unpin> Iterator for IntoIter<Fut> {
type Item = Fut;

fn next(&mut self) -> Option<Self::Item> {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = self.inner.head_all.get_mut();

if (*task).is_null() {
return None;
}

unsafe {
// Moving out of the future is safe because it is `Unpin`
let future = (*(**task).future.get()).take().unwrap();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
if !task.is_null() {
*(**task).prev_all.get() = ptr::null_mut();
}
self.len -= 1;
Some(future)
}
self.inner.next().map(|opt| opt.1)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
(self.inner.len, Some(self.inner.len))
}
}

Expand All @@ -78,26 +47,11 @@ impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;

fn next(&mut self) -> Option<Self::Item> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_mut().unwrap();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (*self.task).next_all.load(Relaxed);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
}
self.inner.next().map(|opt| opt.1)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
(self.inner.len, Some(self.inner.len))
}
}

Expand All @@ -121,26 +75,11 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
type Item = Pin<&'a Fut>;

fn next(&mut self) -> Option<Self::Item> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();

// Relaxed ordering can be used since acquire ordering when
// `head_all` was initially read for this iterator implies acquire
// ordering for all previously inserted nodes (and we don't need to
// read `len_all` again for any other nodes).
let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
}
self.inner.next().map(|opt| opt.1)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
(self.inner.len, Some(self.inner.len))
}
}

Expand All @@ -159,14 +98,3 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
}

impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}

// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}

unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}

unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}
Loading
Loading