Skip to content

Commit

Permalink
Add with_priority_channel and priority_channel
Browse files Browse the repository at this point in the history
  • Loading branch information
try-box committed Oct 3, 2023
1 parent d333213 commit f2f4080
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
8 changes: 5 additions & 3 deletions mpsc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mpsc"
version = "0.2.0"
version = "0.2.1"
authors = ["try <trywen@qq.com>"]
edition = "2018"
rust-version = "1.47"
Expand All @@ -15,18 +15,20 @@ categories = []
default = ["segqueue"]
segqueue = ["crossbeam-queue"]
vecdeque = []
priority = ["collections"]

[dependencies]
std-ext = { version = "0.1", path = "../std-ext" }
std-ext = { version = "0.2", path = "../std-ext" }
queue-ext = { version = "0.4", path = "../queue-ext" }
futures = "0.3"
rand = "0.8"
dashmap = "5.4"
dashmap = "5.5"
ahash = "0.8"
log = "0.4"

crossbeam-queue = { version = "0.3", optional = true }
indexmap = { version = "1.9", features = ["std"], optional = true }
collections = { version = "0.1", path = "../collections", features = ["std", "priority-queue"], optional = true }



43 changes: 42 additions & 1 deletion mpsc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,46 @@ use std::task::{Context, Poll};

pub use queue_ext::SendError;

///BinaryHeap based channel
#[cfg(feature = "priority")]
#[allow(clippy::type_complexity)]
pub fn with_priority_channel<P: Ord + 'static, T: 'static>(
queue: std::sync::Arc<std_ext::RwLock<collections::PriorityQueue<P, T>>>,
bound: usize,
) -> (Sender<(P, T), SendError<(P, T)>>, Receiver<(P, T)>) {
let (tx, rx) = queue.queue_channel::<_, _, _, _>(
move |s, act| match act {
Action::Send((p, val)) => {
s.write().push(p, val);
Reply::Send(())
}
Action::IsFull => Reply::IsFull(s.read().len() >= bound),
Action::IsEmpty => Reply::IsEmpty(s.read().is_empty()),
Action::Len => Reply::Len(s.read().len()),
},
|s, _| {
let mut s = s.write();
match s.pop() {
Some(m) => Poll::Ready(Some(m)),
None => Poll::Pending,
}
},
);
(Sender::new(tx), Receiver::new(rx))
}

///BinaryHeap based channel
#[cfg(feature = "priority")]
#[allow(clippy::type_complexity)]
pub fn priority_channel<P: 'static + Ord, T: 'static>(
bound: usize,
) -> (Sender<(P, T), SendError<(P, T)>>, Receiver<(P, T)>) {
use collections::PriorityQueue;
use std_ext::{ArcExt, RwLockExt};
let queue = PriorityQueue::default().rwlock().arc();
with_priority_channel(queue, bound)
}

///SegQueue based channel
#[cfg(feature = "segqueue")]
pub fn with_segqueue_channel<T: 'static>(
Expand All @@ -31,7 +71,6 @@ pub fn with_segqueue_channel<T: 'static>(
}

///SegQueue based channel

#[cfg(feature = "segqueue")]
pub fn segqueue_channel<T: 'static>(bound: usize) -> (Sender<T, SendError<T>>, Receiver<T>) {
use crossbeam_queue::SegQueue;
Expand Down Expand Up @@ -77,6 +116,7 @@ pub fn vecdeque_channel<T: 'static>(bound: usize) -> (Sender<T, SendError<T>>, R

///Indexmap based channel, remove entry if it already exists
#[cfg(feature = "indexmap")]
#[allow(clippy::type_complexity)]
pub fn with_indexmap_channel<K, T>(
indexmap: std::sync::Arc<std_ext::RwLock<indexmap::IndexMap<K, T>>>,
bound: usize,
Expand Down Expand Up @@ -110,6 +150,7 @@ where

///Indexmap based channel, remove entry if it already exists
#[cfg(feature = "indexmap")]
#[allow(clippy::type_complexity)]
pub fn indexmap_channel<K, T>(bound: usize) -> (Sender<(K, T), SendError<(K, T)>>, Receiver<(K, T)>)
where
K: Eq + std::hash::Hash + 'static,
Expand Down

0 comments on commit f2f4080

Please sign in to comment.