Skip to content

Commit

Permalink
Merge pull request #164 from yoshuawuyts/concurrent-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 21, 2024
2 parents 79903a6 + 8ef8cd0 commit 398a2f1
Show file tree
Hide file tree
Showing 22 changed files with 1,760 additions and 22 deletions.
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ readme = "README.md"
edition = "2021"
keywords = []
categories = []
authors = [
"Yoshua Wuyts <yoshuawuyts@gmail.com>"
]
authors = ["Yoshua Wuyts <yoshuawuyts@gmail.com>"]

[profile.bench]
debug = true
Expand All @@ -33,17 +31,21 @@ std = ["alloc"]
alloc = ["bitvec/alloc", "dep:slab", "dep:smallvec"]

[dependencies]
bitvec = { version = "1.0.1", default-features = false }
bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false }
futures-lite = "1.12.0"
pin-project = "1.0.8"
slab = { version = "0.4.8", optional = true }
smallvec = { version = "1.11.0", optional = true }

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
criterion = { version = "0.3", features = [
"async",
"async_futures",
"html_reports",
] }
futures = "0.3.25"
futures-lite = "1.12.0"
futures-time = "3.0.0"
lending-stream = "1.0.0"
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion examples/happy_eyeballs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ async fn open_tcp_socket(
}

// Start connecting. If an attempt succeeds, cancel all others attempts.
Ok(futures.race_ok().await?)
futures.race_ok().await
}
2 changes: 2 additions & 0 deletions src/collections/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "alloc")]
pub mod vec;
67 changes: 67 additions & 0 deletions src/collections/vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Parallel iterator types for [vectors][std::vec] (`Vec<T>`)
//!
//! You will rarely need to interact with this module directly unless you need
//! to name one of the iterator types.
//!
//! [std::vec]: https://doc.rust-lang.org/stable/std/vec/use core::future::Ready;

use crate::concurrent_stream::{self, FromStream};
use crate::prelude::*;
use crate::utils::{from_iter, FromIter};
#[cfg(all(feature = "alloc", not(feature = "std")))]
use alloc::vec::Vec;
use core::future::Ready;

pub use crate::future::join::vec::Join;
pub use crate::future::race::vec::Race;
pub use crate::future::race_ok::vec::{AggregateError, RaceOk};
pub use crate::future::try_join::vec::TryJoin;
pub use crate::stream::chain::vec::Chain;
pub use crate::stream::merge::vec::Merge;
pub use crate::stream::zip::vec::Zip;

/// Concurrent async iterator that moves out of a vector.
#[derive(Debug)]
pub struct IntoConcurrentStream<T>(FromStream<FromIter<alloc::vec::IntoIter<T>>>);

impl<T> ConcurrentStream for IntoConcurrentStream<T> {
type Item = T;

type Future = Ready<T>;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: concurrent_stream::Consumer<Self::Item, Self::Future>,
{
self.0.drive(consumer).await
}

fn concurrency_limit(&self) -> Option<core::num::NonZeroUsize> {
self.0.concurrency_limit()
}
}

impl<T> concurrent_stream::IntoConcurrentStream for Vec<T> {
type Item = T;

type IntoConcurrentStream = IntoConcurrentStream<T>;

fn into_co_stream(self) -> Self::IntoConcurrentStream {
let stream = from_iter(self);
let co_stream = stream.co();
IntoConcurrentStream(co_stream)
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;

#[test]
fn collect() {
futures_lite::future::block_on(async {
let v: Vec<_> = vec![1, 2, 3, 4, 5].into_co_stream().collect().await;
assert_eq!(v, &[1, 2, 3, 4, 5]);
});
}
}
154 changes: 154 additions & 0 deletions src/concurrent_stream/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use pin_project::pin_project;

use super::{ConcurrentStream, Consumer};
use core::future::Future;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

/// A concurrent iterator that yields the current count and the element during iteration.
///
/// This `struct` is created by the [`enumerate`] method on [`ConcurrentStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: ConcurrentStream::enumerate
/// [`ConcurrentStream`]: trait.ConcurrentStream.html
#[derive(Debug)]
pub struct Enumerate<CS: ConcurrentStream> {
inner: CS,
}

impl<CS: ConcurrentStream> Enumerate<CS> {
pub(crate) fn new(inner: CS) -> Self {
Self { inner }
}
}

impl<CS: ConcurrentStream> ConcurrentStream for Enumerate<CS> {
type Item = (usize, CS::Item);
type Future = EnumerateFuture<CS::Future, CS::Item>;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: Consumer<Self::Item, Self::Future>,
{
self.inner
.drive(EnumerateConsumer {
inner: consumer,
count: 0,
})
.await
}

fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.inner.concurrency_limit()
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

#[pin_project]
struct EnumerateConsumer<C> {
#[pin]
inner: C,
count: usize,
}
impl<C, Item, Fut> Consumer<Item, Fut> for EnumerateConsumer<C>
where
Fut: Future<Output = Item>,
C: Consumer<(usize, Item), EnumerateFuture<Fut, Item>>,
{
type Output = C::Output;

async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
let this = self.project();
let count = *this.count;
*this.count += 1;
this.inner.send(EnumerateFuture::new(future, count)).await
}

async fn progress(self: Pin<&mut Self>) -> super::ConsumerState {
let this = self.project();
this.inner.progress().await
}

async fn flush(self: Pin<&mut Self>) -> Self::Output {
let this = self.project();
this.inner.flush().await
}
}

/// Takes a future and maps it to another future via a closure
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
done: bool,
#[pin]
fut_t: FutT,
count: usize,
}

impl<FutT, T> EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
fn new(fut_t: FutT, count: usize) -> Self {
Self {
done: false,
fut_t,
count,
}
}
}

impl<FutT, T> Future for EnumerateFuture<FutT, T>
where
FutT: Future<Output = T>,
{
type Output = (usize, T);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if *this.done {
panic!("future has already been polled to completion once");
}

let item = ready!(this.fut_t.poll(cx));
*this.done = true;
Poll::Ready((*this.count, item))
}
}

#[cfg(test)]
mod test {
// use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;
use futures_lite::StreamExt;
use std::num::NonZeroUsize;

#[test]
fn enumerate() {
futures_lite::future::block_on(async {
let mut n = 0;
stream::iter(std::iter::from_fn(|| {
let v = n;
n += 1;
Some(v)
}))
.take(5)
.co()
.limit(NonZeroUsize::new(1))
.enumerate()
.for_each(|(index, n)| async move {
assert_eq!(index, n);
})
.await;
});
}
}

0 comments on commit 398a2f1

Please sign in to comment.