Skip to content

Latest commit

 

History

History
325 lines (249 loc) · 17.7 KB

Streams.md

File metadata and controls

325 lines (249 loc) · 17.7 KB

流(Streams)

流是一个一系列异步值的称呼. 它与Rust的 std::iter::Iterator 异步等效且由 Stream trait表示. 流能在async函数中迭代. 它们也可以使用适配器进行 转换. Tokio在 StreamExt trait上提供了一些通用适配器.

Tokio 在 stream 特性标识下提供对流的支持. 当依赖Tokio时, 包括streamfull的特性能访问此功能.

tokio ={version = "0.3", features = ["stream"]}

我们已经看到了一些类型也实现了Stream. 比如说,mpsc::Receiver 的接收(receive)部分也实现了Stream. AsyncBufReadExt::lines() 方法采用一个被缓存的 I/O reader并返回一个 Stream,其中每个值代表一行数据.

迭代(Iteration)

当前Rust程序语言还不支持异步for循环. 取而代之是的使用while let循环与 StreamExt::next() 配对来完成流的迭代.

use tokio::stream::StreamExt;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = mpsc::channel(10);

    tokio::spawn(async move {
        tx.send(1).await.unwrap();
        tx.send(2).await.unwrap();
        tx.send(3).await.unwrap();
    });

    while let Some(v) = rx.next().await {
        println!("GOT = {:?}", v);
    }
}

像迭代器一样,next()方法返回Option<T> 其中 T 就是流的类型. 接收到 None 表明流迭代被终止了.

Mini-Redis 广播(Mini-Redis broadcast)

让我们来看一个使用Mini-Redis客户端的更加复杂的示例.

完整的代码可以看 这里.

use tokio::stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Publish some data
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

产生一个任务来将消息发布到"numbers" 通道上的Mini-Redis服务上. 然后我们在main(主)任务中,订阅"numbers" 通道并显示收到的消息.

订阅之后,在返回的订阅者上调用 into_stream(). 消息者订阅,返回在消息到达时产生消息的流. 在我们开始迭代消息之前,注意到,使用了tokio::pin!将流固定到堆栈. 在流上调用next()需要流被 pinned. into_stream() 函数返回的流不是固定的,我们必须显示的固定住它来进行迭代.

一个Rust的值是"pinned"时,它会被固定且它不能在内存中移动. 固定值的关键是可以将指针用作固定数据,并且调用都可以确信指针一直保持有效.
`async/await`使用此特性来支持跨`.await`点的数据**借用**.

如果我们忘记固定住流,我们会得到像下面这样的错误:

error[E0277]: `std::future::from_generator::GenFuture<[static generator@mini_redis::client::Subscriber::into_stream::{{closure}}#0 0:mini_redis::client::Subscriber, 1:async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {std::future::ResumeTy, &'r mut mini_redis::client::Subscriber, mini_redis::client::Subscriber, impl std::future::Future, (), std::result::Result<std::option::Option<mini_redis::client::Message>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't0)>>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't4)>>>, std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't5)>>, impl std::future::Future, std::option::Option<mini_redis::client::Message>, mini_redis::client::Message}]>` cannot be unpinned
   --> streams/src/main.rs:22:36
    |
22  |     while let Some(msg) = messages.next().await {
    |                                    ^^^^ within `impl futures_core::stream::Stream`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@mini_redis::client::Subscriber::into_stream::{{closure}}#0 0:mini_redis::client::Subscriber, 1:async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {std::future::ResumeTy, &'r mut mini_redis::client::Subscriber, mini_redis::client::Subscriber, impl std::future::Future, (), std::result::Result<std::option::Option<mini_redis::client::Message>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't0)>>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't4)>>>, std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't5)>>, impl std::future::Future, std::option::Option<mini_redis::client::Message>, mini_redis::client::Message}]>`
    | 
   ::: /home/carllerche/.cargo/registry/src/github.com-1ecc6299db9ec823/mini-redis-0.2.0/src/client.rs:398:37
    |
398 |     pub fn into_stream(mut self) -> impl Stream<Item = crate::Result<Message>> {
    |                                     ------------------------------------------ within this `impl futures_core::stream::Stream`
    |
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>, impl std::future::Future>`
    = note: required because it appears within the type `impl futures_core::stream::Stream`

error[E0277]: `std::future::from_generator::GenFuture<[static generator@mini_redis::client::Subscriber::into_stream::{{closure}}#0 0:mini_redis::client::Subscriber, 1:async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {std::future::ResumeTy, &'r mut mini_redis::client::Subscriber, mini_redis::client::Subscriber, impl std::future::Future, (), std::result::Result<std::option::Option<mini_redis::client::Message>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't0)>>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't4)>>>, std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't5)>>, impl std::future::Future, std::option::Option<mini_redis::client::Message>, mini_redis::client::Message}]>` cannot be unpinned
   --> streams/src/main.rs:22:27
    |
22  |     while let Some(msg) = messages.next().await {
    |                           ^^^^^^^^^^^^^^^^^^^^^ within `impl futures_core::stream::Stream`, the trait `std::marker::Unpin` is not implemented for `std::future::from_generator::GenFuture<[static generator@mini_redis::client::Subscriber::into_stream::{{closure}}#0 0:mini_redis::client::Subscriber, 1:async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {std::future::ResumeTy, &'r mut mini_redis::client::Subscriber, mini_redis::client::Subscriber, impl std::future::Future, (), std::result::Result<std::option::Option<mini_redis::client::Message>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't0)>>, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't4)>>>, std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 't5)>>, impl std::future::Future, std::option::Option<mini_redis::client::Message>, mini_redis::client::Message}]>`
    | 
   ::: /home/carllerche/.cargo/registry/src/github.com-1ecc6299db9ec823/mini-redis-0.2.0/src/client.rs:398:37
    |
398 |     pub fn into_stream(mut self) -> impl Stream<Item = crate::Result<Message>> {
    |                                     ------------------------------------------ within this `impl futures_core::stream::Stream`
    |
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<mini_redis::client::Message, std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>>, impl std::future::Future>`
    = note: required because it appears within the type `impl futures_core::stream::Stream`
    = note: required because of the requirements on the impl of `std::future::Future` for `tokio::stream::next::Next<'_, impl futures_core::stream::Stream>`

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0277`.
error: could not compile `streams`.

To learn more, run the command again with --verbose.

如果你遇到一个像这样的错误,可以尝试将流固定!

在运行之前,先启动Mini-Redis 服务:

$ mini-redis-server

然后尝试运行代码,我们将看到消息在标准输出流中打印.

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

由于订阅与发布之间存在竞争,某些早期的消息可能会被丢弃. 该程序永远不会退出. 只要服务器处于活动状态,对Mini-Redis通道的订阅将保持活动状态.

让我们来看看如何使用流来扩展此程序.

适配器(Adapters)

接收一个Stream并返回一个Stream的函数通常被叫做"流适配器"(Stream adapters),因为他们是适配器模式中的一种形式. 公共流适配器包括 maptake, 和filter.

让我们更新一个Mini-Redis来让它可以退出. 在接收到三个消息之后停止迭代消息. 这可以用take. 此适配器限制流最多产生 n 个消息.

let message = subscriber.into_stream().take(3);

再次运行程序,我们可以看到:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

这一次程序可以终止.

现在让我们将流限制为一个数字. 我们将通过消息的长度来检查. 我们使用filter适配器来删除与条件(译者注: predicate,这里译为条件好点)不匹配的消息.

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

再一次执行程序,我们看到:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

请注意适配器的应用很重要. 首先调用filter然后再是take与调用takefilter是不同的.

最后我们通过剥离 Ok(Message { ... } 的输出部分来整理输出. 这是使用map来完成. 因为它应用在filter之后,我们知道消息是 Ok,所以 我们可以使用unwrap().

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

现在我们得到输出:

got = b"1"
got = b"3"
got = b"6"

另外可选的是,组合filtermap的操作步可以使用 filter_map.

这里有更多可用的适配器,清单请查看这里.

Stream的实现(Implementing Stream)

Stream trait与 Future trait非常类似.

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Stream::poll_next() 函数与Future::poll非常类似,不同之处在于,它可以被重复调来从流中接收多个值. 与我们在深入异步 中看到的一样,当流不是就绪状态时将返回Poll::Pending. 任务注册waker程序. 一旦应该再次轮询流时,就会通知waker.

size_hint() 方法使用的方式与iterators相同.

通常当手动实现一个Stream时,它是通过组合future和其它流来完成的. 例如,让我们以在深入异步中实现的Delay future为基础. 我们将它转换成10ms为间隔产生三次()的流.

use tokio::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // No more delays
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream

使用Stream trait手动来实现流可能很繁琐. 不幸的是,Rust语言目前还不支持在流上使用async/await语法. 这还在进行中,但现在还没准备好.(译者注: 指在流上的async/await语法)

async-stream 包是一个临时可用的解决方案. 这个包提供了一个async_stream!的宏,可以将输入转换成一个流》 使用这个包,可以像这样实现上面的间隔需求:

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}

Select

词汇表