Skip to content

Commit

Permalink
Add Sender, Receiver, LocalSender and LocalReceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
try-box committed Jul 10, 2023
1 parent 4cd7bc7 commit 4494ef8
Showing 1 changed file with 297 additions and 41 deletions.
338 changes: 297 additions & 41 deletions mpsc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
use std::task::Poll;
use futures::{Sink, Stream};
use futures::{Sink, SinkExt, Stream, StreamExt};
use queue_ext::{Action, QueueExt, Reply};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};

pub use queue_ext::SendError;

///SegQueue based channel
#[cfg(feature = "segqueue")]
pub fn with_segqueue_channel<T>(queue: std::sync::Arc<crossbeam_queue::SegQueue<T>>, bound: usize)
-> (impl Sink<T, Error = SendError<T>> + Clone, impl Stream<Item=T>) {
pub fn with_segqueue_channel<T: 'static>(
queue: std::sync::Arc<crossbeam_queue::SegQueue<T>>,
bound: usize,
) -> (Sender<T, SendError<T>>, Receiver<T>) {
let (tx, rx) = queue.queue_channel::<T, _, _, _>(
move |s, act| match act {
Action::Send(val) => {
s.push(val);
Reply::Send(())
}
Action::IsFull => Reply::IsFull(s.len() >= bound),
Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
Action::Len => Reply::Len(s.len()),
},
|s, _| {
match s.pop() {
Some(m) => Poll::Ready(Some(m)),
None => Poll::Pending,
}
},
);
(tx, rx)
move |s, act| match act {
Action::Send(val) => {
s.push(val);
Reply::Send(())
}
Action::IsFull => Reply::IsFull(s.len() >= bound),
Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
Action::Len => Reply::Len(s.len()),
},
|s, _| match s.pop() {
Some(m) => Poll::Ready(Some(m)),
None => Poll::Pending,
},
);
(Sender::new(tx), Receiver::new(rx))
}

///SegQueue based channel

#[cfg(feature = "segqueue")]
pub fn segqueue_channel<T>(bound: usize) -> (impl Sink<T, Error = SendError<T>> + Clone, impl Stream<Item=T>) {
pub fn segqueue_channel<T: 'static>(bound: usize) -> (Sender<T, SendError<T>>, Receiver<T>) {
use crossbeam_queue::SegQueue;
use std_ext::ArcExt;
with_segqueue_channel(SegQueue::default().arc(), bound)
}

///VecDeque based channel
#[cfg(feature = "vecdeque")]
pub fn with_vecdeque_channel<T>(queue: std::sync::Arc<std_ext::RwLock<std::collections::VecDeque<T>>>, bound: usize)
-> (impl Sink<T, Error = SendError<T>> + Clone, impl Stream<Item=T>) {
pub fn with_vecdeque_channel<T: 'static>(
queue: std::sync::Arc<std_ext::RwLock<std::collections::VecDeque<T>>>,
bound: usize,
) -> (Sender<T, SendError<T>>, Receiver<T>) {
let (tx, rx) = queue.queue_channel::<T, _, _, _>(
move |s, act| match act {
Action::Send(val) => {
Expand All @@ -58,26 +63,27 @@ pub fn with_vecdeque_channel<T>(queue: std::sync::Arc<std_ext::RwLock<std::colle
}
},
);
(tx, rx)
}
(Sender::new(tx), Receiver::new(rx))
}

///VecDeque based channel
#[cfg(feature = "vecdeque")]
pub fn vecdeque_channel<T>(bound: usize) -> (impl Sink<T, Error = SendError<T>> + Clone, impl Stream<Item=T>) {
pub fn vecdeque_channel<T: 'static>(bound: usize) -> (Sender<T, SendError<T>>, Receiver<T>) {
use std::collections::VecDeque;
use std_ext::{ArcExt, RwLockExt};
let queue = VecDeque::default()
.rwlock()
.arc();
let queue = VecDeque::default().rwlock().arc();
with_vecdeque_channel(queue, bound)
}

///Indexmap based channel, remove entry if it already exists
#[cfg(feature = "indexmap")]
pub fn with_indexmap_channel<K, T>(indexmap: std::sync::Arc<std_ext::RwLock<indexmap::IndexMap<K, T>>>, bound: usize)
-> (impl Sink<(K, T), Error = SendError<(K, T)>> + Clone, impl Stream<Item=(K, T)>)
where
K: Eq + std::hash::Hash,
pub fn with_indexmap_channel<K, T>(
indexmap: std::sync::Arc<std_ext::RwLock<indexmap::IndexMap<K, T>>>,
bound: usize,
) -> (Sender<(K, T), SendError<(K, T)>>, Receiver<(K, T)>)
where
K: Eq + std::hash::Hash + 'static,
T: 'static,
{
let (tx, rx) = indexmap.queue_channel::<(K, T), _, _, _>(
move |s, act| match act {
Expand All @@ -99,20 +105,270 @@ pub fn with_indexmap_channel<K, T>(indexmap: std::sync::Arc<std_ext::RwLock<inde
}
},
);
(tx, rx)
(Sender::new(tx), Receiver::new(rx))
}


///Indexmap based channel, remove entry if it already exists
#[cfg(feature = "indexmap")]
pub fn indexmap_channel<K, T>(bound: usize) -> (impl Sink<(K, T), Error = SendError<(K, T)>> + Clone, impl Stream<Item=(K, T)>)
pub fn indexmap_channel<K, T>(bound: usize) -> (Sender<(K, T), SendError<(K, T)>>, Receiver<(K, T)>)
where
K: Eq + std::hash::Hash,
K: Eq + std::hash::Hash + 'static,
T: 'static,
{
use indexmap::IndexMap;
use std_ext::{ArcExt, RwLockExt};
let map = IndexMap::new()
.rwlock()
.arc();
let map = IndexMap::new().rwlock().arc();
with_indexmap_channel(map, bound)
}

pub trait SenderSink<M, E>: futures::Sink<M, Error = E> + Unpin + Send + Sync {
fn box_clone(&self) -> Box<dyn SenderSink<M, E>>;
}

impl<T, M, E> SenderSink<M, E> for T
where
T: futures::Sink<M, Error = E> + Unpin + Send + Sync + 'static,
T: Clone,
{
#[inline]
fn box_clone(&self) -> Box<dyn SenderSink<M, E>> {
Box::new(self.clone())
}
}

pub struct Sender<M, E> {
tx: Box<dyn SenderSink<M, E>>,
}

impl<M, E> Sender<M, E> {
#[inline]
pub fn new<T>(tx: T) -> Self
where
T: Sink<M, Error = E> + Sync + Send + Unpin + 'static,
T: Clone,
{
Sender { tx: Box::new(tx) }
}

#[inline]
pub async fn send(&mut self, t: M) -> std::result::Result<(), E> {
self.tx.send(t).await
}
}

impl<M, E> Clone for Sender<M, E> {
#[inline]
fn clone(&self) -> Self {
Sender {
tx: self.tx.box_clone(),
}
}
}

impl<M, E> Deref for Sender<M, E> {
type Target = Box<dyn SenderSink<M, E>>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.tx
}
}

impl<M, E> DerefMut for Sender<M, E> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.tx
}
}

impl<M, E> futures::Sink<M> for Sender<M, E> {
type Error = E;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx).poll_ready(cx)
}

fn start_send(mut self: Pin<&mut Self>, msg: M) -> Result<(), Self::Error> {
Pin::new(&mut self.tx).start_send(msg)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx).poll_close(cx)
}
}

pub trait ReceiverStream<M>: futures::Stream<Item = M> + Send + Unpin {}

impl<T, M> ReceiverStream<M> for T where T: futures::Stream<Item = M> + Send + Unpin + 'static {}

pub struct Receiver<M> {
rx: Box<dyn ReceiverStream<M>>,
}

impl<M> Receiver<M> {
#[inline]
pub fn new<T>(tx: T) -> Self
where
T: futures::Stream<Item = M> + Send + Unpin + 'static,
{
Receiver { rx: Box::new(tx) }
}

#[inline]
pub async fn recv(&mut self) -> Option<M> {
self.rx.next().await
}
}

impl<M> Deref for Receiver<M> {
type Target = Box<dyn ReceiverStream<M>>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.rx
}
}

impl<M> DerefMut for Receiver<M> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.rx
}
}

impl<M> Stream for Receiver<M> {
type Item = M;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_next(cx)
}
}

pub trait LocalSenderSink<M, E>: futures::Sink<M, Error = E> + Unpin {
fn box_clone(&self) -> Box<dyn LocalSenderSink<M, E>>;
}

impl<T, M, E> LocalSenderSink<M, E> for T
where
T: futures::Sink<M, Error = E> + Unpin + 'static,
T: Clone,
{
#[inline]
fn box_clone(&self) -> Box<dyn LocalSenderSink<M, E>> {
Box::new(self.clone())
}
}

pub struct LocalSender<M, E> {
tx: Box<dyn LocalSenderSink<M, E>>,
}

//unsafe impl<M, E> Sync for LocalSender<M, E> {}

impl<M, E> LocalSender<M, E> {
#[inline]
pub fn new<T>(tx: T) -> Self
where
T: Sink<M, Error = E> + Unpin + 'static,
T: Clone,
{
LocalSender { tx: Box::new(tx) }
}

#[inline]
pub async fn send(&mut self, t: M) -> std::result::Result<(), E> {
self.tx.send(t).await
}
}

impl<M, E> Clone for LocalSender<M, E> {
#[inline]
fn clone(&self) -> Self {
LocalSender {
tx: self.tx.box_clone(),
}
}
}

impl<M, E> Deref for LocalSender<M, E> {
type Target = Box<dyn LocalSenderSink<M, E>>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.tx
}
}

impl<M, E> DerefMut for LocalSender<M, E> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.tx
}
}

impl<M, E> futures::Sink<M> for LocalSender<M, E> {
type Error = E;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx).poll_ready(cx)
}

fn start_send(mut self: Pin<&mut Self>, msg: M) -> Result<(), Self::Error> {
Pin::new(&mut self.tx).start_send(msg)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.tx).poll_close(cx)
}
}

pub trait LocalReceiverStream<M>: futures::Stream<Item = M> + Unpin {}

impl<T, M> LocalReceiverStream<M> for T where T: futures::Stream<Item = M> + Unpin + 'static {}

pub struct LocalReceiver<M> {
rx: Box<dyn LocalReceiverStream<M>>,
}

impl<M> LocalReceiver<M> {
#[inline]
pub fn new<T>(tx: T) -> Self
where
T: futures::Stream<Item = M> + Unpin + 'static,
{
LocalReceiver { rx: Box::new(tx) }
}

#[inline]
pub async fn recv(&mut self) -> Option<M> {
self.rx.next().await
}
}

impl<M> Deref for LocalReceiver<M> {
type Target = Box<dyn LocalReceiverStream<M>>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.rx
}
}

impl<M> DerefMut for LocalReceiver<M> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.rx
}
}

impl<M> Stream for LocalReceiver<M> {
type Item = M;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_next(cx)
}
}

0 comments on commit 4494ef8

Please sign in to comment.