Skip to content

Commit

Permalink
Re-enable BufferService
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 28, 2024
1 parent b04bdf4 commit f81e45f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 60 deletions.
4 changes: 4 additions & 0 deletions ntex-util/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.0.1] - 2024-05-28

* Re-enable BufferService

## [2.0.0] - 2024-05-28

* Use async fn for Service::ready() and Service::shutdown()
Expand Down
6 changes: 3 additions & 3 deletions ntex-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "2.0.0"
version = "2.0.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -18,8 +18,8 @@ path = "src/lib.rs"
[dependencies]
ntex-service = "3.0"
ntex-rt = "0.4"
bitflags = "2.4"
fxhash = "0.2.1"
bitflags = "2"
fxhash = "0.2"
log = "0.4"
slab = "0.4"
futures-timer = "3.0"
Expand Down
121 changes: 65 additions & 56 deletions ntex-util/src/services/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cell::{Cell, RefCell};
use std::task::{ready, Poll};
use std::{collections::VecDeque, fmt, future::poll_fn, marker::PhantomData};

use ntex_service::{Middleware, Service, ServiceCtx};
use ntex_service::{Middleware, Pipeline, PipelineBinding, Service, ServiceCtx};

use crate::channel::oneshot;

Expand Down Expand Up @@ -62,18 +62,19 @@ impl<R> Clone for Buffer<R> {

impl<R, S> Middleware<S> for Buffer<R>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
type Service = BufferService<R, S>;

fn create(&self, service: S) -> Self::Service {
BufferService {
service,
service: Pipeline::new(service).bind(),
size: self.buf_size,
cancel_on_shutdown: self.cancel_on_shutdown,
ready: Cell::new(false),
buf: RefCell::new(VecDeque::with_capacity(self.buf_size)),
next_call: RefCell::default(),
cancel_on_shutdown: self.cancel_on_shutdown,
_t: PhantomData,
}
}
Expand Down Expand Up @@ -109,26 +110,27 @@ impl<E: std::fmt::Display + std::fmt::Debug> std::error::Error for BufferService
/// Default number of buffered requests is 16
pub struct BufferService<R, S: Service<R>> {
size: usize,
cancel_on_shutdown: bool,
ready: Cell<bool>,
service: S,
service: PipelineBinding<S, R>,
buf: RefCell<VecDeque<oneshot::Sender<oneshot::Sender<()>>>>,
next_call: RefCell<Option<oneshot::Receiver<()>>>,
cancel_on_shutdown: bool,
_t: PhantomData<R>,
}

impl<R, S> BufferService<R, S>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
pub fn new(size: usize, service: S) -> Self {
Self {
size,
service,
cancel_on_shutdown: false,
service: Pipeline::new(service).bind(),
ready: Cell::new(false),
buf: RefCell::new(VecDeque::with_capacity(size)),
next_call: RefCell::default(),
cancel_on_shutdown: false,
_t: PhantomData,
}
}
Expand All @@ -148,11 +150,11 @@ where
fn clone(&self) -> Self {
Self {
size: self.size,
cancel_on_shutdown: self.cancel_on_shutdown,
ready: Cell::new(false),
service: self.service.clone(),
buf: RefCell::new(VecDeque::with_capacity(self.size)),
next_call: RefCell::default(),
cancel_on_shutdown: self.cancel_on_shutdown,
_t: PhantomData,
}
}
Expand All @@ -176,45 +178,50 @@ where

impl<R, S> Service<R> for BufferService<R, S>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
type Response = S::Response;
type Error = BufferServiceError<S::Error>;

#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let mut buffer = self.buf.borrow_mut();
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
// hold advancement until the last released task either makes a call or is dropped
let _ = ready!(next_call.poll_recv(cx));
}
next_call.take();
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
let mut buffer = self.buf.borrow_mut();
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
// hold advancement until the last released task either makes a call or is dropped
let _ = ready!(next_call.poll_recv(cx));
}
next_call.take();

if self.service.poll_ready(cx)?.is_pending() {
if buffer.len() < self.size {
// buffer next request
self.ready.set(false);
return Poll::Ready(Ok(()));
} else {
log::trace!("Buffer limit exceeded");
return Poll::Pending;
}
}

if self.service.poll_ready(cx)?.is_pending() {
if buffer.len() < self.size {
// buffer next request
while let Some(sender) = buffer.pop_front() {
let (next_call_tx, next_call_rx) = oneshot::channel();
if sender.send(next_call_tx).is_err()
|| next_call_rx.poll_recv(cx).is_ready()
{
// the task is gone
continue;
}
next_call.replace(next_call_rx);
self.ready.set(false);
return Poll::Ready(Ok(()));
} else {
log::trace!("Buffer limit exceeded");
return Poll::Pending;
}
}

while let Some(sender) = buffer.pop_front() {
let (next_call_tx, next_call_rx) = oneshot::channel();
if sender.send(next_call_tx).is_err() || next_call_rx.poll_recv(cx).is_ready() {
// the task is gone
continue;
}
next_call.replace(next_call_rx);
self.ready.set(false);
return Poll::Ready(Ok(()));
}

self.ready.set(true);
Poll::Ready(Ok(()))
self.ready.set(true);
Poll::Ready(Ok(()))
})
.await
}

async fn shutdown(&self) {
Expand Down Expand Up @@ -252,6 +259,7 @@ where
return Poll::Pending;
}
}
Poll::Ready(())
})
.await;

Expand All @@ -261,11 +269,11 @@ where
async fn call(
&self,
req: R,
ctx: ServiceCtx<'_, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
if self.ready.get() {
self.ready.set(false);
Ok(ctx.call_nowait(&self.service, req).await?)
Ok(self.service.call_nowait(req).await?)
} else {
let (tx, rx) = oneshot::channel();
self.buf.borrow_mut().push_back(tx);
Expand All @@ -276,11 +284,8 @@ where
BufferServiceError::RequestCanceled
})?;

// check service readiness
ctx.ready(&self.service).await?;

// call service
Ok(ctx.call_nowait(&self.service, req).await?)
Ok(self.service.call(req).await?)
}
}
}
Expand All @@ -307,13 +312,16 @@ mod tests {
type Response = ();
type Error = ();

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.waker.register(cx.waker());
if self.0.ready.get() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
self.0.waker.register(cx.waker());
if self.0.ready.get() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
})
.await
}

async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
Expand All @@ -331,7 +339,8 @@ mod tests {
count: Cell::new(0),
});

let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone());
let srv =
Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone()).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let srv1 = srv.clone();
Expand Down Expand Up @@ -370,7 +379,7 @@ mod tests {
count: Cell::new(0),
});

let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())));
let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone()))).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = srv.call(()).await;
assert_eq!(inner.count.get(), 1);
Expand All @@ -392,7 +401,7 @@ mod tests {
fn_factory(|| async { Ok::<_, ()>(TestService(inner.clone())) }),
);

let srv = srv.pipeline(&()).await.unwrap();
let srv = srv.pipeline(&()).await.unwrap().bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));

let srv1 = srv.clone();
Expand Down
2 changes: 1 addition & 1 deletion ntex-util/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// pub mod buffer;
pub mod buffer;
pub mod counter;
mod extensions;
pub mod inflight;
Expand Down

0 comments on commit f81e45f

Please sign in to comment.