Skip to content

Commit

Permalink
stream: add Stream wrappers in tokio-stream (#3343)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jan 4, 2021
1 parent 3b6bee8 commit 7f17822
Show file tree
Hide file tree
Showing 20 changed files with 553 additions and 8 deletions.
3 changes: 3 additions & 0 deletions tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ categories = ["asynchronous"]
[features]
default = ["time"]
time = ["tokio/time"]
net = ["tokio/net"]
io-util = ["tokio/io-util"]
fs = ["tokio/fs"]

[dependencies]
futures-core = { version = "0.3.0" }
Expand Down
2 changes: 2 additions & 0 deletions tokio-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
#[macro_use]
mod macros;

pub mod wrappers;

mod stream_ext;
pub use stream_ext::{collect::FromStream, StreamExt};

Expand Down
30 changes: 30 additions & 0 deletions tokio-stream/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
macro_rules! cfg_fs {
($($item:item)*) => {
$(
#[cfg(feature = "fs")]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
$item
)*
}
}

macro_rules! cfg_io_util {
($($item:item)*) => {
$(
#[cfg(feature = "io-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
$item
)*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
#[cfg(feature = "net")]
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
$item
)*
}
}

macro_rules! cfg_time {
($($item:item)*) => {
$(
Expand Down
35 changes: 35 additions & 0 deletions tokio-stream/src/wrappers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! Wrappers for Tokio types that implement `Stream`.

mod mpsc_bounded;
pub use mpsc_bounded::ReceiverStream;

mod mpsc_unbounded;
pub use mpsc_unbounded::UnboundedReceiverStream;

cfg_time! {
mod interval;
pub use interval::IntervalStream;
}

cfg_net! {
mod tcp_listener;
pub use tcp_listener::TcpListenerStream;

#[cfg(unix)]
mod unix_listener;
#[cfg(unix)]
pub use unix_listener::UnixListenerStream;
}

cfg_io_util! {
mod split;
pub use split::SplitStream;

mod lines;
pub use lines::LinesStream;
}

cfg_fs! {
mod read_dir;
pub use read_dir::ReadDirStream;
}
50 changes: 50 additions & 0 deletions tokio-stream/src/wrappers/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{Instant, Interval};

/// A wrapper around [`Interval`] that implements [`Stream`].
///
/// [`Interval`]: struct@tokio::time::Interval
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub struct IntervalStream {
inner: Interval,
}

impl IntervalStream {
/// Create a new `IntervalStream`.
pub fn new(interval: Interval) -> Self {
Self { inner: interval }
}

/// Get back the inner `Interval`.
pub fn into_inner(self) -> Interval {
self.inner
}
}

impl Stream for IntervalStream {
type Item = Instant;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
self.inner.poll_tick(cx).map(Some)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(std::usize::MAX, None)
}
}

impl AsRef<Interval> for IntervalStream {
fn as_ref(&self) -> &Interval {
&self.inner
}
}

impl AsMut<Interval> for IntervalStream {
fn as_mut(&mut self) -> &mut Interval {
&mut self.inner
}
}
59 changes: 59 additions & 0 deletions tokio-stream/src/wrappers/lines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::Stream;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, Lines};

pin_project! {
/// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
///
/// [`tokio::io::Lines`]: struct@tokio::io::Lines
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct LinesStream<R> {
#[pin]
inner: Lines<R>,
}
}

impl<R> LinesStream<R> {
/// Create a new `LinesStream`.
pub fn new(lines: Lines<R>) -> Self {
Self { inner: lines }
}

/// Get back the inner `Lines`.
pub fn into_inner(self) -> Lines<R> {
self.inner
}

/// Obtain a pinned reference to the inner `Lines<R>`.
pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> {
self.project().inner
}
}

impl<R: AsyncBufRead> Stream for LinesStream<R> {
type Item = io::Result<String>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project()
.inner
.poll_next_line(cx)
.map(Result::transpose)
}
}

impl<R> AsRef<Lines<R>> for LinesStream<R> {
fn as_ref(&self) -> &Lines<R> {
&self.inner
}
}

impl<R> AsMut<Lines<R>> for LinesStream<R> {
fn as_mut(&mut self) -> &mut Lines<R> {
&mut self.inner
}
}
59 changes: 59 additions & 0 deletions tokio-stream/src/wrappers/mpsc_bounded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::Receiver;

/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
///
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
pub struct ReceiverStream<T> {
inner: Receiver<T>,
}

impl<T> ReceiverStream<T> {
/// Create a new `ReceiverStream`.
pub fn new(recv: Receiver<T>) -> Self {
Self { inner: recv }
}

/// Get back the inner `Receiver`.
pub fn into_inner(self) -> Receiver<T> {
self.inner
}

/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered. Any
/// outstanding [`Permit`] values will still be able to send messages.
///
/// To guarantee no messages are dropped, after calling `close()`, you must
/// receive all items from the stream until `None` is returned.
///
/// [`Permit`]: struct@tokio::sync::mpsc::Permit
pub fn close(&mut self) {
self.inner.close()
}
}

impl<T> Stream for ReceiverStream<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}

impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
fn as_ref(&self) -> &Receiver<T> {
&self.inner
}
}

impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
fn as_mut(&mut self) -> &mut Receiver<T> {
&mut self.inner
}
}
53 changes: 53 additions & 0 deletions tokio-stream/src/wrappers/mpsc_unbounded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedReceiver;

/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
///
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
pub struct UnboundedReceiverStream<T> {
inner: UnboundedReceiver<T>,
}

impl<T> UnboundedReceiverStream<T> {
/// Create a new `UnboundedReceiverStream`.
pub fn new(recv: UnboundedReceiver<T>) -> Self {
Self { inner: recv }
}

/// Get back the inner `UnboundedReceiver`.
pub fn into_inner(self) -> UnboundedReceiver<T> {
self.inner
}

/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.inner.close()
}
}

impl<T> Stream for UnboundedReceiverStream<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}

impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
fn as_ref(&self) -> &UnboundedReceiver<T> {
&self.inner
}
}

impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
&mut self.inner
}
}
47 changes: 47 additions & 0 deletions tokio-stream/src/wrappers/read_dir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{DirEntry, ReadDir};

/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
///
/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
pub struct ReadDirStream {
inner: ReadDir,
}

impl ReadDirStream {
/// Create a new `ReadDirStream`.
pub fn new(read_dir: ReadDir) -> Self {
Self { inner: read_dir }
}

/// Get back the inner `ReadDir`.
pub fn into_inner(self) -> ReadDir {
self.inner
}
}

impl Stream for ReadDirStream {
type Item = io::Result<DirEntry>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_entry(cx).map(Result::transpose)
}
}

impl AsRef<ReadDir> for ReadDirStream {
fn as_ref(&self) -> &ReadDir {
&self.inner
}
}

impl AsMut<ReadDir> for ReadDirStream {
fn as_mut(&mut self) -> &mut ReadDir {
&mut self.inner
}
}
Loading

0 comments on commit 7f17822

Please sign in to comment.