Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better write back-pressure handling #359

Merged
merged 1 commit into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [1.2.0] - 2024-05-12

* Better write back-pressure handling

* Dispatcher optimization for handling first item

## [1.1.0] - 2024-05-01

* Add IoRef::notify_timeout() helper method
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "1.1.0"
version = "1.2.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
93 changes: 59 additions & 34 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Framed transport dispatcher
#![allow(clippy::let_underscore_future)]
use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll};
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};

use ntex_bytes::Pool;
use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Pipeline, Service};
use ntex_service::{IntoService, Pipeline, PipelineCall, Service};
use ntex_util::{future::Either, ready, spawn, time::Seconds};

use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError};
Expand Down Expand Up @@ -117,6 +117,7 @@
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Encoder,
U: Decoder,
U: 'static,
{
inner: DispatcherInner<S, U>,
}
Expand All @@ -136,12 +137,13 @@
struct DispatcherInner<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Encoder + Decoder,
U: Encoder + Decoder + 'static,
{
st: DispatcherState,
error: Option<S::Error>,
flags: Flags,
shared: Rc<DispatcherShared<S, U>>,
response: Option<PipelineCall<S, DispatchItem<U>>>,
pool: Pool,
cfg: DispatcherConfig,
read_remains: u32,
Expand Down Expand Up @@ -193,7 +195,7 @@
impl<S, U> Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Decoder + Encoder,
U: Decoder + Encoder + 'static,
{
/// Construct new `Dispatcher` instance.
pub fn new<Io, F>(
Expand Down Expand Up @@ -230,6 +232,7 @@
shared,
flags,
cfg: cfg.clone(),
response: None,
error: None,
read_remains: 0,
read_remains_prev: 0,
Expand All @@ -245,8 +248,7 @@
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Encoder + Decoder,
{
fn handle_result(&self, item: Result<S::Response, S::Error>, io: &IoBoxed) {
self.inflight.set(self.inflight.get() - 1);
fn handle_result(&self, item: Result<S::Response, S::Error>, io: &IoBoxed, wake: bool) {
match item {
Ok(Some(val)) => {
if let Err(err) = io.encode(val, &self.codec) {
Expand All @@ -256,11 +258,14 @@
Err(err) => self.error.set(Some(DispatcherError::Service(err))),
Ok(None) => (),
}
io.wake();
self.inflight.set(self.inflight.get() - 1);
if wake {
io.wake();
}
}
}

impl<S, U> future::Future for Dispatcher<S, U>
impl<S, U> Future for Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Decoder + Encoder + 'static,
Expand All @@ -271,6 +276,14 @@
let mut this = self.as_mut().project();
let slf = &mut this.inner;

// handle service response future
if let Some(fut) = slf.response.as_mut() {
if let Poll::Ready(item) = Pin::new(fut).poll(cx) {
slf.shared.handle_result(item, &slf.shared.io, false);
slf.response = None;
}
}

// handle memory pool pressure
if slf.pool.poll_ready(cx).is_pending() {
slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
Expand Down Expand Up @@ -339,37 +352,25 @@
PollService::Continue => continue,
};

// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let _ = spawn(async move {
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
slf.call_service(cx, item);
}
// handle write back-pressure
DispatcherState::Backpressure => {
let item = match ready!(slf.poll_service(cx)) {
PollService::Ready => {
if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) {
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(Some(err))
} else {
slf.st = DispatcherState::Processing;
DispatchItem::WBackPressureDisabled
}
}
PollService::Item(item) => item,
match ready!(slf.poll_service(cx)) {
PollService::Ready => (),
PollService::Item(item) => slf.call_service(cx, item),

Check warning on line 361 in ntex-io/src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/dispatcher.rs#L361

Added line #L361 was not covered by tests
PollService::Continue => continue,
};

// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let _ = spawn(async move {
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
let item = if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false))
{
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(Some(err))

Check warning on line 368 in ntex-io/src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/dispatcher.rs#L367-L368

Added lines #L367 - L368 were not covered by tests
} else {
slf.st = DispatcherState::Processing;
DispatchItem::WBackPressureDisabled
};
slf.call_service(cx, item);
}
// drain service responses and shutdown io
DispatcherState::Stop => {
Expand Down Expand Up @@ -432,6 +433,26 @@
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Decoder + Encoder + 'static,
{
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
let mut fut = self.shared.service.call_static(item);
self.shared.inflight.set(self.shared.inflight.get() + 1);

// optimize first call
if self.response.is_none() {
if let Poll::Ready(result) = Pin::new(&mut fut).poll(cx) {
self.shared.handle_result(result, &self.shared.io, false);
} else {
self.response = Some(fut);
}
} else {
let shared = self.shared.clone();
let _ = spawn(async move {
let result = fut.await;
shared.handle_result(result, &shared.io, true);
});
}
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
Expand Down Expand Up @@ -493,7 +514,10 @@
self.st = DispatcherState::Stop;
Poll::Ready(PollService::Item(DispatchItem::Disconnect(err)))
}
IoStatusUpdate::WriteBackpressure => Poll::Pending,
IoStatusUpdate::WriteBackpressure => {
self.st = DispatcherState::Backpressure;
Poll::Ready(PollService::Item(DispatchItem::WBackPressureEnabled))

Check warning on line 519 in ntex-io/src/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/dispatcher.rs#L518-L519

Added lines #L518 - L519 were not covered by tests
}
}
}
// handle service readiness error
Expand Down Expand Up @@ -703,6 +727,7 @@
inner: DispatcherInner {
error: None,
st: DispatcherState::Processing,
response: None,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
Expand Down
3 changes: 3 additions & 0 deletions ntex-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub use self::tasks::{ReadContext, WriteContext};
pub use self::timer::TimerHandle;
pub use self::utils::{seal, Decoded};

#[doc(hidden)]
pub use self::io::Flags;

/// Status for read task
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum ReadStatus {
Expand Down
2 changes: 1 addition & 1 deletion ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ntex-bytes = "0.1.25"
ntex-server = "1.0.5"
ntex-h2 = "0.5.4"
ntex-rt = "0.4.12"
ntex-io = "1.1.0"
ntex-io = "1.2.0"
ntex-net = "1.0.1"
ntex-tls = "1.1.0"

Expand Down
Loading