Skip to content

Commit

Permalink
Better write back-pressure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 12, 2024
1 parent 1b3a530 commit 5eb84b9
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 36 deletions.
6 changes: 6 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [1.2.0] - 2024-05-12

* Better write back-pressure handling

* Dispatcher optimization for handling first item

## [1.1.0] - 2024-05-01

* Add IoRef::notify_timeout() helper method
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "1.1.0"
version = "1.2.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
93 changes: 59 additions & 34 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Framed transport dispatcher
#![allow(clippy::let_underscore_future)]
use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll};
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Pipeline, Service};
use ntex_service::{IntoService, Pipeline, PipelineCall, Service};
use ntex_util::{future::Either, ready, spawn, time::Seconds};

use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
Expand Down Expand Up @@ -117,6 +117,7 @@ pin_project_lite::pin_project! {
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Encoder,
U: Decoder,
U: 'static,
{
inner: DispatcherInner<S, U>,
}
Expand All @@ -136,12 +137,13 @@ bitflags::bitflags! {
struct DispatcherInner<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Encoder + Decoder,
U: Encoder + Decoder + 'static,
{
st: DispatcherState,
error: Option<S::Error>,
flags: Flags,
shared: Rc<DispatcherShared<S, U>>,
response: Option<PipelineCall<S, DispatchItem<U>>>,
pool: Pool,
cfg: DispatcherConfig,
read_remains: u32,
Expand Down Expand Up @@ -193,7 +195,7 @@ impl<S, U> From<Either<S, U>> for DispatcherError<S, U> {
impl<S, U> Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Decoder + Encoder,
U: Decoder + Encoder + 'static,
{
/// Construct new `Dispatcher` instance.
pub fn new<Io, F>(
Expand Down Expand Up @@ -230,6 +232,7 @@ where
shared,
flags,
cfg: cfg.clone(),
response: None,
error: None,
read_remains: 0,
read_remains_prev: 0,
Expand All @@ -245,8 +248,7 @@ where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Encoder + Decoder,
{
fn handle_result(&self, item: Result<S::Response, S::Error>, io: &IoBoxed) {
self.inflight.set(self.inflight.get() - 1);
fn handle_result(&self, item: Result<S::Response, S::Error>, io: &IoBoxed, wake: bool) {
match item {
Ok(Some(val)) => {
if let Err(err) = io.encode(val, &self.codec) {
Expand All @@ -256,11 +258,14 @@ where
Err(err) => self.error.set(Some(DispatcherError::Service(err))),
Ok(None) => (),
}
io.wake();
self.inflight.set(self.inflight.get() - 1);
if wake {
io.wake();
}
}
}

impl<S, U> future::Future for Dispatcher<S, U>
impl<S, U> Future for Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Decoder + Encoder + 'static,
Expand All @@ -271,6 +276,14 @@ where
let mut this = self.as_mut().project();
let slf = &mut this.inner;

// handle service response future
if let Some(fut) = slf.response.as_mut() {
if let Poll::Ready(item) = Pin::new(fut).poll(cx) {
slf.shared.handle_result(item, &slf.shared.io, false);
slf.response = None;
}
}

// handle memory pool pressure
if slf.pool.poll_ready(cx).is_pending() {
slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
Expand Down Expand Up @@ -339,37 +352,25 @@ where
PollService::Continue => continue,
};

// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let _ = spawn(async move {
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
slf.call_service(cx, item);
}
// handle write back-pressure
DispatcherState::Backpressure => {
let item = match ready!(slf.poll_service(cx)) {
PollService::Ready => {
if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) {
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(Some(err))
} else {
slf.st = DispatcherState::Processing;
DispatchItem::WBackPressureDisabled
}
}
PollService::Item(item) => item,
match ready!(slf.poll_service(cx)) {
PollService::Ready => (),
PollService::Item(item) => slf.call_service(cx, item),

Check warning on line 361 in ntex-io/src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/dispatcher.rs#L361

Added line #L361 was not covered by tests
PollService::Continue => continue,
};

// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let _ = spawn(async move {
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
let item = if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false))
{
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(Some(err))

Check warning on line 368 in ntex-io/src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/dispatcher.rs#L367-L368

Added lines #L367 - L368 were not covered by tests
} else {
slf.st = DispatcherState::Processing;
DispatchItem::WBackPressureDisabled
};
slf.call_service(cx, item);
}
// drain service responses and shutdown io
DispatcherState::Stop => {
Expand Down Expand Up @@ -432,6 +433,26 @@ where
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Decoder + Encoder + 'static,
{
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
let mut fut = self.shared.service.call_static(item);
self.shared.inflight.set(self.shared.inflight.get() + 1);

// optimize first call
if self.response.is_none() {
if let Poll::Ready(result) = Pin::new(&mut fut).poll(cx) {
self.shared.handle_result(result, &self.shared.io, false);
} else {
self.response = Some(fut);
}
} else {
let shared = self.shared.clone();
let _ = spawn(async move {
let result = fut.await;
shared.handle_result(result, &shared.io, true);
});
}
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
Expand Down Expand Up @@ -493,7 +514,10 @@ where
self.st = DispatcherState::Stop;
Poll::Ready(PollService::Item(DispatchItem::Disconnect(err)))
}
IoStatusUpdate::WriteBackpressure => Poll::Pending,
IoStatusUpdate::WriteBackpressure => {
self.st = DispatcherState::Backpressure;
Poll::Ready(PollService::Item(DispatchItem::WBackPressureEnabled))

Check warning on line 519 in ntex-io/src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/dispatcher.rs#L518-L519

Added lines #L518 - L519 were not covered by tests
}
}
}
// handle service readiness error
Expand Down Expand Up @@ -703,6 +727,7 @@ mod tests {
inner: DispatcherInner {
error: None,
st: DispatcherState::Processing,
response: None,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
Expand Down
3 changes: 3 additions & 0 deletions ntex-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub use self::tasks::{ReadContext, WriteContext};
pub use self::timer::TimerHandle;
pub use self::utils::{seal, Decoded};

#[doc(hidden)]
pub use self::io::Flags;

/// Status for read task
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum ReadStatus {
Expand Down
2 changes: 1 addition & 1 deletion ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ntex-bytes = "0.1.25"
ntex-server = "1.0.5"
ntex-h2 = "0.5.4"
ntex-rt = "0.4.12"
ntex-io = "1.1.0"
ntex-io = "1.2.0"
ntex-net = "1.0.1"
ntex-tls = "1.1.0"

Expand Down

0 comments on commit 5eb84b9

Please sign in to comment.