Tokio v0.2.12
Polish, small additions, and fixes. The biggest additions in this release are StreamMap and Notify.
StreamMap
Similar to StreamExt::merge, StreamMap supports merging multiple source streams into a single stream, producing items as they become available in the source streams. However, StreamMap supports inserting and removing streams at run-time. This is useful for cases where a consumer wishes to subscribe to messages from multiple sources and dynamically manage those subscriptions.
As the name implies, StreamMap maps keys to streams. Streams are [inserted] or [removed] as needed and then the StreamMap is used as any other stream. Items are returned with their keys, enabling the caller to identify which source stream the item originated from.
Example
use tokio::stream::{StreamExt, StreamMap};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = mpsc::channel(10);
let (mut tx2, rx2) = mpsc::channel(10);
// use `Sender` handles
let mut map = StreamMap::new();
// Insert both streams
map.insert("one", rx1);
map.insert("two", rx2);
// Read twice
for _ in 0..2 {
let (key, val) = map.next().await.unwrap();
println!("got {} from {}", val, key);
// Remove the stream to prevent reading the next value
map.remove(key);
}
}Notify
Notify is the next step in providing async / await based synchronization primitives. It is similar to how thread::park() / unpark() work, but for asynchronous tasks. Consumers await notifications and producers notify consumers. Notify is intended to be used as a building block for higher level synchronization primitives, such as channels.
Examples
Basic usage.
use tokio::sync::Notify;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify();
}Here is how Notify can be used as a building block for an unbounded channel.
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
values: Mutex<VecDeque<T>>,
notify: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, value: T) {
self.values.lock().unwrap()
.push_back(value);
// Notify the consumer a value is available
self.notify.notify();
}
pub async fn recv(&self) -> T {
loop {
// Drain values
if let Some(value) = self.values.lock().unwrap().pop_front() {
return value;
}
// Wait for values to be available
self.notify.notified().await;
}
}
}Changes
Fixes
- net:
UnixStream::poll_shutdownshould callshutdown(Write)(#2245). - process: Wake up read and write on
EPOLLERR(#2218). - rt: potential deadlock when using
block_in_placeand shutting down the
runtime (#2119). - rt: only detect number of CPUs if
core_threadsnot specified (#2238). - sync: reduce
watch::Receiverstruct size (#2191). - time: succeed when setting delay of
$MAX-1(#2184). - time: avoid having to poll
DelayQueueafter inserting new delay (#2217).
Added
- macros:
pin!variant that assigns to identifier and pins (#2274). - net: impl
StreamforListenertypes (#2275). - rt:
Runtime::shutdown_timeoutwaits for runtime to shutdown for specified
duration (#2186). - stream:
StreamMapmerges streams and can insert / remove streams at
runtime (#2185). - stream:
StreamExt::skip()skips a fixed number of items (#2204). - stream:
StreamExt::skip_while()skips items based on a predicate (#2205). - sync:
Notifyprovides basicasync/awaittask notification (#2210). - sync:
Mutex::into_innerretrieves guarded data (#2250). - sync:
mpsc::Sender::send_timeoutsends, waiting for up to specified duration
for channel capacity (#2227). - time: impl
OrdandHashforInstant(#2239).