Skip to content

Commit

Permalink
Synchronized buffer (#216)
Browse files Browse the repository at this point in the history
* added a new function to servicecall to allow only advancing to the service call and then returning the service response future
* buffer can maintain order/backpressure by implementing strict readiness and synchronous calling
* buffer can flush in order or cancel pending buffered futures on shutdown
  • Loading branch information
wpbrown committed Jun 26, 2023
1 parent 4380b3a commit eea5b3b
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 39 deletions.
4 changes: 4 additions & 0 deletions ntex-service/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.2.2] - 2023-06-24

* Added `ServiceCall::advance_to_call`

## [1.2.1] - 2023-06-23

* Make `PipelineCall` static
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.1"
version = "1.2.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
Expand Down
82 changes: 82 additions & 0 deletions ntex-service/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,26 @@ pin_project_lite::pin_project! {
}
}

impl<'a, S, Req> ServiceCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> {
match self.state {
ServiceCallState::Ready { .. } => {}
ServiceCallState::Call { .. } | ServiceCallState::Empty => {
panic!(
"`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`"
)
}
}
ServiceCallToCall { state: self.state }
}
}

pin_project_lite::pin_project! {
#[project = ServiceCallStateProject]
enum ServiceCallState<'a, S, Req>
Expand Down Expand Up @@ -234,6 +254,68 @@ where
}
}

pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct ServiceCallToCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
#[pin]
state: ServiceCallState<'a, S, Req>,
}
}

impl<'a, S, Req> Future for ServiceCallToCall<'a, S, Req>
where
S: Service<Req> + ?Sized,
{
type Output = Result<S::Future<'a>, S::Error>;

fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
let mut this = self.as_mut().project();

match this.state.as_mut().project() {
ServiceCallStateProject::Ready {
req,
svc,
idx,
waiters,
} => match svc.poll_ready(cx)? {
task::Poll::Ready(()) => {
waiters.notify();

let fut = svc.call(
req.take().unwrap(),
ServiceCtx {
idx: *idx,
waiters,
_t: marker::PhantomData,
},
);
this.state.set(ServiceCallState::Empty);
task::Poll::Ready(Ok(fut))
}
task::Poll::Pending => {
waiters.register(*idx, cx);
task::Poll::Pending
}
},
ServiceCallStateProject::Call { .. } => {
unreachable!("`ServiceCallToCall` can only be constructed in `Ready` state")
}
ServiceCallStateProject::Empty => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}

#[cfg(test)]
mod tests {
use ntex_util::future::{lazy, poll_fn, Ready};
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod then;

pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::chain::{chain, chain_factory};
pub use self::ctx::{ServiceCall, ServiceCtx};
pub use self::ctx::{ServiceCall, ServiceCallToCall, ServiceCtx};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::fn_shutdown::fn_shutdown;
pub use self::map_config::{map_config, unit_config};
Expand Down
6 changes: 6 additions & 0 deletions ntex-util/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [0.3.1] - 2023-06-24

* Changed `BufferService` to maintain order

* Buffer error type changed to indicate cancellation

## [0.3.0] - 2023-06-22

* Release v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions ntex-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "0.3.0"
version = "0.3.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -17,7 +17,7 @@ path = "src/lib.rs"

[dependencies]
ntex-rt = "0.4.7"
ntex-service = "1.2.0"
ntex-service = "1.2.2"
bitflags = "1.3"
fxhash = "0.2.1"
log = "0.4"
Expand Down
Loading

0 comments on commit eea5b3b

Please sign in to comment.