Skip to content

Commit

Permalink
Optimize the codes
Browse files Browse the repository at this point in the history
  • Loading branch information
try-box committed Oct 5, 2023
1 parent 375dd54 commit ad11175
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 76 deletions.
33 changes: 17 additions & 16 deletions queue-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@ pub trait Waker {
fn rx_wake(&self);
fn tx_park(&self, w: std::task::Waker);
fn close_channel(&self);
fn is_closed(&self) -> bool;
}

impl<T: ?Sized> QueueExt for T {}

pub trait QueueExt {
#[inline]
fn queue_stream<Item, F>(self, f: F) -> QueueStream<Self, Item, F>
where
Self: Sized + Unpin,
F: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>>,
where
Self: Sized + Unpin,
F: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>>,
{
assert_stream::<Item, _>(QueueStream::new(self, f))
}

#[inline]
fn queue_sender<Item, F, R>(self, f: F) -> QueueSender<Self, Item, F, R>
where
Self: Sized + Waker,
F: Fn(&mut Self, Action<Item>) -> Reply<R>,
where
Self: Sized + Waker,
F: Fn(&mut Self, Action<Item>) -> Reply<R>,
{
QueueSender::new(self, f)
}
Expand All @@ -50,10 +51,10 @@ pub trait QueueExt {
QueueSender<QueueStream<Self, Item, F2>, Item, F1, R>,
QueueStream<Self, Item, F2>,
)
where
Self: Sized + Unpin + Clone,
F1: Fn(&mut QueueStream<Self, Item, F2>, Action<Item>) -> Reply<R>,
F2: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
where
Self: Sized + Unpin + Clone,
F1: Fn(&mut QueueStream<Self, Item, F2>, Action<Item>) -> Reply<R>,
F2: Fn(Pin<&mut Self>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
{
queue_channel(self, f1, f2)
}
Expand All @@ -69,10 +70,10 @@ pub fn queue_channel<Q, Item, F1, R, F2>(
QueueSender<QueueStream<Q, Item, F2>, Item, F1, R>,
QueueStream<Q, Item, F2>,
)
where
Q: Sized + Unpin + Clone,
F1: Fn(&mut QueueStream<Q, Item, F2>, Action<Item>) -> Reply<R>,
F2: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
where
Q: Sized + Unpin + Clone,
F1: Fn(&mut QueueStream<Q, Item, F2>, Action<Item>) -> Reply<R>,
F2: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>> + Clone + Unpin,
{
let rx = QueueStream::new(q, f2);
let tx = QueueSender::new(rx.clone(), f1);
Expand Down Expand Up @@ -170,8 +171,8 @@ impl<T> SendError<T> {
// right implementations.
#[inline]
pub(crate) fn assert_stream<T, S>(stream: S) -> S
where
S: Stream<Item=T>,
where
S: Stream<Item = T>,
{
stream
}
80 changes: 33 additions & 47 deletions queue-ext/src/queue_sender.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::fmt;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll};

use futures::Sink;
use pin_project::{pin_project, pinned_drop};


use super::{Action, Reply, SendError, TrySendError, Waker};

#[pin_project(PinnedDrop)]
Expand All @@ -17,7 +16,6 @@ pub struct QueueSender<S: Waker, Item, F, R> {
s: S,
#[pin]
f: F,
closed: AtomicBool,
num_senders: Arc<AtomicUsize>,
_item: PhantomData<Item>,
_r: PhantomData<R>,
Expand All @@ -28,19 +26,18 @@ unsafe impl<S: Waker, Item, F, R> Sync for QueueSender<S, Item, F, R> {}
unsafe impl<S: Waker, Item, F, R> Send for QueueSender<S, Item, F, R> {}

impl<S, Item, F, R> Clone for QueueSender<S, Item, F, R>
where
S: Clone + Waker,
F: Clone,
where
S: Clone + Waker,
F: Clone,
{
#[inline]
fn clone(&self) -> Self {
if !self.closed.load(Ordering::SeqCst) {
self.num_senders.fetch_add(1, Ordering::SeqCst);
}
//if !self.s.is_closed() {
self.num_senders.fetch_add(1, Ordering::SeqCst);
//}
Self {
s: self.s.clone(),
f: self.f.clone(),
closed: AtomicBool::new(false),
num_senders: self.num_senders.clone(),
_item: PhantomData,
_r: PhantomData,
Expand All @@ -49,18 +46,15 @@ impl<S, Item, F, R> Clone for QueueSender<S, Item, F, R>
}

#[pinned_drop]
impl<S: Waker, Item, F, R> PinnedDrop for QueueSender<S, Item, F, R>
{
impl<S: Waker, Item, F, R> PinnedDrop for QueueSender<S, Item, F, R> {
fn drop(self: Pin<&mut Self>) {
if !self.is_closed() {
self.set_closed();
}
self.set_closed();
}
}

impl<S, Item, F, R> fmt::Debug for QueueSender<S, Item, F, R>
where
S: fmt::Debug + Waker,
where
S: fmt::Debug + Waker,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueSender")
Expand All @@ -69,19 +63,12 @@ impl<S, Item, F, R> fmt::Debug for QueueSender<S, Item, F, R>
}
}


impl<S, Item, F, R> QueueSender<S, Item, F, R>
where
S: Waker,
where
S: Waker,
{
#[inline]
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}

#[inline]
fn set_closed(&self) -> usize {
self.closed.store(true, Ordering::SeqCst);
let prev = self.num_senders.fetch_sub(1, Ordering::SeqCst);
if prev == 1 {
self.s.close_channel();
Expand All @@ -91,16 +78,15 @@ impl<S, Item, F, R> QueueSender<S, Item, F, R>
}

impl<S, Item, F, R> QueueSender<S, Item, F, R>
where
S: Waker,
F: Fn(&mut S, Action<Item>) -> Reply<R>,
where
S: Waker,
F: Fn(&mut S, Action<Item>) -> Reply<R>,
{
#[inline]
pub(super) fn new(s: S, f: F) -> Self {
Self {
s,
f,
closed: AtomicBool::new(false),
num_senders: Arc::new(AtomicUsize::new(1)),
_item: PhantomData,
_r: PhantomData,
Expand All @@ -109,11 +95,11 @@ impl<S, Item, F, R> QueueSender<S, Item, F, R>

#[inline]
pub fn try_send(&mut self, item: Item) -> Result<R, TrySendError<Item>> {
if self.is_closed() {
return Err(SendError::disconnected(Some(item)))
if self.s.is_closed() {
return Err(SendError::disconnected(Some(item)));
}
if self.is_full() {
return Err(TrySendError::full(item))
return Err(TrySendError::full(item));
}
let res = (self.f)(&mut self.s, Action::Send(item));
self.s.rx_wake();
Expand Down Expand Up @@ -147,19 +133,18 @@ impl<S, Item, F, R> QueueSender<S, Item, F, R>
_ => unreachable!(),
}
}

}

impl<S, Item, F, R> Sink<Item> for QueueSender<S, Item, F, R>
where
S: Waker + Unpin,
F: Fn(&mut S, Action<Item>) -> Reply<R>,
where
S: Waker + Unpin,
F: Fn(&mut S, Action<Item>) -> Reply<R>,
{
type Error = SendError<Item>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_closed() {
return Poll::Ready(Err(SendError::disconnected(None)))
if self.s.is_closed() {
return Poll::Ready(Err(SendError::disconnected(None)));
}
let mut this = self.project();
match (this.f)(&mut this.s, Action::IsFull) {
Expand All @@ -173,7 +158,7 @@ impl<S, Item, F, R> Sink<Item> for QueueSender<S, Item, F, R>
}

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
if self.is_closed() {
if self.s.is_closed() {
return Err(SendError::disconnected(Some(item)));
}
let mut this = self.project();
Expand All @@ -183,22 +168,23 @@ impl<S, Item, F, R> Sink<Item> for QueueSender<S, Item, F, R>
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.s.is_closed() {
return Poll::Ready(Err(SendError::disconnected(None)));
}
Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_closed() {
return Poll::Ready(Err(SendError::disconnected(None)))
if self.s.is_closed() {
return Poll::Ready(Err(SendError::disconnected(None)));
}
if self.set_closed() > 1 {
return Poll::Ready(Ok(()))
return Poll::Ready(Ok(()));
}

let mut this = self.project();
match (this.f)(&mut this.s, Action::IsEmpty) {
Reply::IsEmpty(true) => {
Poll::Ready(Ok(()))
},
Reply::IsEmpty(true) => Poll::Ready(Ok(())),
Reply::IsEmpty(false) => {
this.s.tx_park(cx.waker().clone());
Poll::Pending
Expand Down
36 changes: 23 additions & 13 deletions queue-ext/src/queue_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::marker::PhantomData;
use std::marker::Unpin;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::sync::atomic::{AtomicBool, Ordering};

use futures::Stream;
use futures::task::AtomicWaker;
use futures::Stream;
use pin_project_lite::pin_project;

use super::Waker;
Expand All @@ -34,9 +34,9 @@ unsafe impl<Q, Item, F> Sync for QueueStream<Q, Item, F> {}
unsafe impl<Q, Item, F> Send for QueueStream<Q, Item, F> {}

impl<Q, Item, F> Clone for QueueStream<Q, Item, F>
where
Q: Clone,
F: Clone,
where
Q: Clone,
F: Clone,
{
#[inline]
fn clone(&self) -> Self {
Expand All @@ -52,8 +52,8 @@ impl<Q, Item, F> Clone for QueueStream<Q, Item, F>
}

impl<Q, Item, F> fmt::Debug for QueueStream<Q, Item, F>
where
Q: fmt::Debug,
where
Q: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueStream")
Expand Down Expand Up @@ -94,15 +94,25 @@ impl<Q, Item, F> Waker for QueueStream<Q, Item, F> {

#[inline]
fn close_channel(&self) {
self.closed.store(true, Ordering::SeqCst);
self.rx_wake();
if !self.closed.load(Ordering::SeqCst) {
self.closed.store(true, Ordering::SeqCst);
self.rx_wake();
if let Some(w) = self.parked_queue.lock().unwrap().pop_front() {
w.wake();
}
}
}

#[inline]
fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
}

impl<Q, Item, F> Stream for QueueStream<Q, Item, F>
where
Q: Unpin,
F: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>>,
where
Q: Unpin,
F: Fn(Pin<&mut Q>, &mut Context<'_>) -> Poll<Option<Item>>,
{
type Item = Item;

Expand All @@ -119,7 +129,7 @@ impl<Q, Item, F> Stream for QueueStream<Q, Item, F>
Poll::Pending => {
if this.closed.load(Ordering::SeqCst) {
Poll::Ready(None)
}else {
} else {
this.recv_task.register(ctx.waker());
f(this.q.as_mut(), ctx)
}
Expand Down

0 comments on commit ad11175

Please sign in to comment.