Skip to content

Commit

Permalink
stream: add From<Receiver<T>> impl for receiver streams (#4080)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancisMurillo committed Aug 29, 2021
1 parent b67d464 commit 909d3ec
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
6 changes: 6 additions & 0 deletions tokio-stream/src/wrappers/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ impl<T> fmt::Debug for BroadcastStream<T> {
f.debug_struct("BroadcastStream").finish()
}
}

impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> {
fn from(recv: Receiver<T>) -> Self {
Self::new(recv)
}
}
6 changes: 6 additions & 0 deletions tokio-stream/src/wrappers/mpsc_bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
&mut self.inner
}
}

impl<T> From<Receiver<T>> for ReceiverStream<T> {
fn from(recv: Receiver<T>) -> Self {
Self::new(recv)
}
}
6 changes: 6 additions & 0 deletions tokio-stream/src/wrappers/mpsc_unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,9 @@ impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
&mut self.inner
}
}

impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
fn from(recv: UnboundedReceiver<T>) -> Self {
Self::new(recv)
}
}
8 changes: 7 additions & 1 deletion tokio-stream/src/wrappers/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn make_future<T: Clone + Send + Sync>(
(result, rx)
}

impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
Expand Down Expand Up @@ -94,3 +94,9 @@ impl<T> fmt::Debug for WatchStream<T> {
f.debug_struct("WatchStream").finish()
}
}

impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> {
fn from(recv: Receiver<T>) -> Self {
Self::new(recv)
}
}

0 comments on commit 909d3ec

Please sign in to comment.