Skip to content

Releases: yoshuawuyts/futures-concurrency

v7.6.0

12 Apr 09:49
dec5ce0
Compare
Choose a tag to compare

✨ Portable Concurrent Async Iteration ✨

This release introduces the ConcurrentStream API: a concurrent async/.await adaptation of Rayon's ParallelStream API. We've been trying to implement this API for the past six or so years, and we've happy to announce we finally have a working implementation!

The main feature that sets this implementation apart from other, similar attempts is that it works for any existing Stream impl. All you need to do is call the .co() method to obtain a ConcurrentStream, and from that point all Stream combinators should just work as expected:

use futures_concurrency::prelude::*;

let v: Vec<_> = stream::repeat("chashu")
    .co()   // ← call this to convert any `Stream` into a `ConcurrentStream`
    .take(2)
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

assert_eq!(v, &["hello chashu", "hello chashu"]);

See the call to collect at the end there? That's right: that's concurrent async iteration, collecting back into a single structure. This makes writing fan-out/fan-in pipelines trivial to author. But that's not all: in addition to converting into collections, we can also directly convert collections into ConcurrentStream implementations too:

use futures_concurrency::prelude::*;

let v: Vec<_> = vec!["chashu", "nori"]
    .into_co_stream()  // ← call this to convert collections into concurrent async iterators
    .map(|msg| async move { format!("hello {msg}") })
    .collect()
    .await;

The amount of concurrency by default is unbounded, but can be bounded by calling the limit method. This will apply backpressure should the stream produce items faster than the concurrent iterator can process them.

This API also resolves the buffered streams problem. ConcurrentStream removes the need for the dreaded combination of mapping to futures and then calling the futures-rs buffered method. Instead it ensures that the processing of items in a loop always happens in concert with the execution of the concurrent futures 1.

Notably this API will work with any async runtime or async framework, because it makes no assumptions about the underlying runtime. The only assumption is makes is that an allocator is available. This means that at this time, unlike most other APIs in futures-concurrency this will not work on #[no_std] environments. This, however, is not an inherent restriction but merely an artifact of the implementation. In the future we may explore porting this to a #[no_std] compatible version - this will require some minor API changes, but should, as a system, likely work.

In order to make this system work with parallel execution it should be possible to write a custom adapter. We encourage async runtimes to wrap the ConcurrentStream trait exposed in this crate to create their own ParallelStream system. This can depend on the runtime, and ensure that all execution not only happens concurrently, but can also be scheduled on multiple cores.

We're excited for people to give this a try. We certainly hope this lowers the bar for correctly applying structured, asynchronous, concurrent streaming processing in Rust!

What's Changed

Full Changelog: v7.5.0...v7.6.0


  1. There is a more elaborate version of this problem we don't have a user story for yet. Rust's futures model couples "liveness" to "backpressure". In theory we might fail to send keepalive messages if we apply backpressure for too long; this is a direct artifact of queueing theory, and would be a reason to add some form of async Future::pending / Future::poll_pending method for. This is a more subtle / niche issue than "barbara battles buffered streams". But it's worth explicitly calling out the limits of our solutions. ↩

v7.5.0

11 Mar 13:38
ac90582
Compare
Choose a tag to compare

This release adds support for no_std and alloc environments to futures-concurrency. Starting this release (v7.5.0) you can pass feature flags to your Cargo.toml to select the right feature set for your environment:

[dependencies]
# std
futures-concurrency = "7.5.0"

# alloc
futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] }

# no_std
futures-concurrency = { version = "7.5.0", default-features = false }

What's Changed

New Contributors

Full Changelog: v7.4.3...v7.5.0

v7.4.3

22 Sep 15:59
112b40e
Compare
Choose a tag to compare

This release fixes a bug introduced in v7.3.0 related to the try_join family of operations. Please consider upgrading to this release if you're running v7.3.0 or later.

What's Changed

Full Changelog: v7.4.2...v7.4.3

v7.4.2

25 Aug 14:29
Compare
Choose a tag to compare

What's Changed

Full Changelog: v7.4.1...v7.4.2

v7.4.1

14 Aug 11:35
Compare
Choose a tag to compare

What's Changed

Full Changelog: v7.4.0...v7.4.1

Example

use futures_concurrency::stream::StreamGroup;
use lending_stream::prelude::*;
use futures_lite::stream;

// Setup the stream group and populate it with one stream
let mut group = StreamGroup::new();
group.insert(stream::once(4));

let mut index = 3;
let mut out = 0;
let mut group = group.lend_mut();
while let Some((group, num)) = group.next().await {
    if index != 0 {
        // Update the group while iterating over the group's contents
        group.insert(stream::once(index));
        index -= 1;
    }
    out += num;
}

assert_eq!(out, 10);

v7.4.0

13 Aug 11:28
Compare
Choose a tag to compare

What's Changed

Full Changelog: v7.3.0...v7.4.0

v7.3.0

23 Jun 13:36
Compare
Choose a tag to compare

This release makes progress towards resolving #137 for the join and try_join operations. Rather than dropping futures when the enclosing future is dropped, futures are instead now dropped as soon as they're done yielding data. This has the immediate benefit that tracing the execution duration of the contained futures will generally more accurate. But in the longer term this also sets us up to support concurrent asynchronous destructor behavior, where async destructors can begin executing as soon as individual futures are ready, rather then once the future is dropped.

This change is considered semver minor since we've intentionally never guaranteed drop ordering of futures, knowing we wanted to enable concurrent execution of asynchronous destructors once those become available in the language.

What's Changed

Full Changelog: v7.2.1...v7.3.0

v7.2.1

22 May 18:34
Compare
Choose a tag to compare

What's Changed

Full Changelog: v7.2.0...v7.2.1

v7.2.0

05 Apr 13:16
Compare
Choose a tag to compare

What's Changed

Full Changelog: v7.1.1...v7.2.0

v7.1.1

05 Apr 11:25
Compare
Choose a tag to compare

What's Changed

  • shuffle vec/array/tuple futures/streams for benchmark by @wishawa in #117
  • updated the documentation examples for crates.io by @yoshuawuyts

New Contributors

Full Changelog: v7.1.0...v7.1.1