Skip to content

Commit

Permalink
1. When the queue is full, put pending tasks.
Browse files Browse the repository at this point in the history
2. spawn(...).quickly(). add the task to the queue immediately.
  • Loading branch information
try-box committed Oct 3, 2023
1 parent cb9c382 commit d3376b7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 21 deletions.
2 changes: 1 addition & 1 deletion task-exec-queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "task-exec-queue"
version = "0.6.0"
version = "0.7.0"
authors = ["try <trywen@qq.com>"]
edition = "2018"
description = "A asynchronous task execution queue"
Expand Down
36 changes: 27 additions & 9 deletions task-exec-queue/src/local_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use super::{assert_future, Error, ErrorType, LocalTaskExecQueue};
pub struct LocalGroupSpawner<'a, Item, Tx, G> {
inner: LocalSpawner<'a, Item, Tx, G, ()>,
name: Option<G>,
is_pending: bool,
}

impl<Item, Tx, G> Unpin for LocalGroupSpawner<'_, Item, Tx, G> {}
Expand All @@ -32,10 +31,15 @@ where
Self {
inner,
name: Some(name),
is_pending: false,
}
}

#[inline]
pub fn quickly(mut self) -> Self {
self.inner.quickly = true;
self
}

#[inline]
pub async fn result(mut self) -> Result<Item::Output, Error<Item>>
where
Expand All @@ -46,7 +50,7 @@ where
return Err(Error::SendError(ErrorType::Closed(self.inner.item.take())));
}

if self.inner.sink.is_full() {
if !self.inner.quickly && self.inner.sink.is_full() {
let w = Rc::new(AtomicWaker::new());
self.inner.sink.waiting_wakers.push(w.clone());
LocalPendingOnce::new(w).await;
Expand Down Expand Up @@ -111,17 +115,17 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.inner.sink.is_closed() && !this.is_pending {
if this.inner.sink.is_closed() && !this.inner.is_pending {
return Poll::Ready(Err(Error::SendError(ErrorType::Closed(
this.inner.item.take(),
))));
}

if this.inner.sink.is_full() {
if !this.inner.quickly && this.inner.sink.is_full() {
let w = Rc::new(AtomicWaker::new());
w.register(cx.waker());
this.inner.sink.waiting_wakers.push(w);
this.is_pending = true;
this.inner.is_pending = true;
return Poll::Pending;
}

Expand Down Expand Up @@ -187,7 +191,6 @@ where
inner: LocalGroupSpawner {
inner,
name: Some(name),
is_pending: false,
},
}
}
Expand Down Expand Up @@ -233,6 +236,7 @@ pub struct LocalSpawner<'a, Item, Tx, G, D> {
sink: &'a LocalTaskExecQueue<Tx, G, D>,
item: Option<Item>,
d: Option<D>,
quickly: bool,
is_pending: bool,
}

Expand Down Expand Up @@ -265,10 +269,17 @@ where
sink,
item: Some(item),
d: Some(d),
quickly: false,
is_pending: false,
}
}

#[inline]
pub fn quickly(mut self) -> Self {
self.quickly = true;
self
}

#[inline]
pub async fn result(mut self) -> Result<Item::Output, Error<Item>>
where
Expand All @@ -279,7 +290,7 @@ where
return Err(Error::SendError(ErrorType::Closed(self.item.take())));
}

if self.sink.is_full() {
if !self.quickly && self.sink.is_full() {
let w = Rc::new(AtomicWaker::new());
self.sink.waiting_wakers.push(w.clone());
LocalPendingOnce::new(w).await;
Expand Down Expand Up @@ -343,7 +354,7 @@ where
return Poll::Ready(Err(Error::SendError(ErrorType::Closed(this.item.take()))));
}

if this.sink.is_full() {
if !this.quickly && this.sink.is_full() {
let w = Rc::new(AtomicWaker::new());
w.register(cx.waker());
this.sink.waiting_wakers.push(w);
Expand Down Expand Up @@ -425,11 +436,18 @@ where
sink,
item: Some(item),
d: Some(d),
quickly: false,
is_pending: false,
},
}
}

#[inline]
pub fn quickly(mut self) -> Self {
self.inner.quickly = true;
self
}

#[inline]
pub async fn result(mut self) -> Result<Item::Output, Error<Item>>
where
Expand Down
39 changes: 28 additions & 11 deletions task-exec-queue/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use super::{assert_future, Error, ErrorType, TaskExecQueue};
pub struct GroupSpawner<'a, Item, Tx, G> {
inner: Spawner<'a, Item, Tx, G, ()>,
name: Option<G>,
is_pending: bool,
}

impl<Item, Tx, G> Unpin for GroupSpawner<'_, Item, Tx, G> {}
Expand All @@ -32,10 +31,15 @@ where
Self {
inner,
name: Some(name),
is_pending: false,
}
}

#[inline]
pub fn quickly(mut self) -> Self {
self.inner.quickly = true;
self
}

#[inline]
pub async fn result(mut self) -> Result<Item::Output, Error<Item>>
where
Expand All @@ -46,7 +50,7 @@ where
return Err(Error::SendError(ErrorType::Closed(self.inner.item.take())));
}

if self.inner.sink.is_full() {
if !self.inner.quickly && self.inner.sink.is_full() {
let w = Arc::new(AtomicWaker::new());
self.inner.sink.waiting_wakers.push(w.clone());
PendingOnce::new(w).await;
Expand Down Expand Up @@ -111,17 +115,17 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.inner.sink.is_closed() && !this.is_pending {
if this.inner.sink.is_closed() && !this.inner.is_pending {
return Poll::Ready(Err(Error::SendError(ErrorType::Closed(
this.inner.item.take(),
))));
}

if this.inner.sink.is_full() {
if !this.inner.quickly && this.inner.sink.is_full() {
let w = Arc::new(AtomicWaker::new());
w.register(cx.waker());
this.inner.sink.waiting_wakers.push(w);
this.is_pending = true;
this.inner.is_pending = true;
return Poll::Pending;
}

Expand Down Expand Up @@ -184,7 +188,6 @@ where
inner: GroupSpawner {
inner,
name: Some(name),
is_pending: false,
},
}
}
Expand Down Expand Up @@ -230,6 +233,7 @@ pub struct Spawner<'a, Item, Tx, G, D> {
sink: &'a TaskExecQueue<Tx, G, D>,
item: Option<Item>,
d: Option<D>,
quickly: bool,
is_pending: bool,
}

Expand Down Expand Up @@ -262,10 +266,17 @@ where
sink,
item: Some(item),
d: Some(d),
quickly: false,
is_pending: false,
}
}

#[inline]
pub fn quickly(mut self) -> Self {
self.quickly = true;
self
}

#[inline]
pub async fn result(mut self) -> Result<Item::Output, Error<Item>>
where
Expand All @@ -276,7 +287,7 @@ where
return Err(Error::SendError(ErrorType::Closed(self.item.take())));
}

if self.sink.is_full() {
if !self.quickly && self.sink.is_full() {
let w = Arc::new(AtomicWaker::new());
self.sink.waiting_wakers.push(w.clone());
PendingOnce::new(w).await;
Expand Down Expand Up @@ -340,7 +351,7 @@ where
return Poll::Ready(Err(Error::SendError(ErrorType::Closed(this.item.take()))));
}

if this.sink.is_full() {
if !this.quickly && this.sink.is_full() {
let w = Arc::new(AtomicWaker::new());
w.register(cx.waker());
this.sink.waiting_wakers.push(w);
Expand All @@ -366,8 +377,7 @@ where

let mut tx = this.sink.tx.clone();
let mut sink = Pin::new(&mut tx);
//futures::ready!(sink.as_mut().poll_ready(cx))
// .map_err(|_| Error::SendError(ErrorType::Closed(None)))?;

let waiting_count = this.sink.waiting_count.clone();
let waiting_wakers = this.sink.waiting_wakers.clone();
let task = async move {
Expand Down Expand Up @@ -422,11 +432,18 @@ where
sink,
item: Some(item),
d: Some(d),
quickly: false,
is_pending: false,
},
}
}

#[inline]
pub fn quickly(mut self) -> Self {
self.inner.quickly = true;
self
}

#[inline]
pub async fn result(mut self) -> Result<Item::Output, Error<Item>>
where
Expand Down

0 comments on commit d3376b7

Please sign in to comment.