From 55dcad5f272ed17e73ab543b6a6af6691ecfb595 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 12 May 2024 18:39:16 +0500 Subject: [PATCH] Better write back-pressure handling (#359) --- ntex-io/CHANGES.md | 6 +++ ntex-io/Cargo.toml | 2 +- ntex-io/src/dispatcher.rs | 93 +++++++++++++++++++++++++-------------- ntex-io/src/lib.rs | 3 ++ ntex/Cargo.toml | 2 +- 5 files changed, 70 insertions(+), 36 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 34b537ad..a51c852c 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [1.2.0] - 2024-05-12 + +* Better write back-pressure handling + +* Dispatcher optimization for handling first item + ## [1.1.0] - 2024-05-01 * Add IoRef::notify_timeout() helper method diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index b0e51e78..90adc74e 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "1.1.0" +version = "1.2.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index ede2af34..8470fe6a 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1,10 +1,10 @@ //! Framed transport dispatcher #![allow(clippy::let_underscore_future)] -use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::{cell::Cell, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll}; use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; -use ntex_service::{IntoService, Pipeline, Service}; +use ntex_service::{IntoService, Pipeline, PipelineCall, Service}; use ntex_util::{future::Either, ready, spawn, time::Seconds}; use crate::{Decoded, DispatchItem, IoBoxed, IoStatusUpdate, RecvError}; @@ -117,6 +117,7 @@ pin_project_lite::pin_project! { S: Service, Response = Option>>, U: Encoder, U: Decoder, + U: 'static, { inner: DispatcherInner, } @@ -136,12 +137,13 @@ bitflags::bitflags! { struct DispatcherInner where S: Service, Response = Option>>, - U: Encoder + Decoder, + U: Encoder + Decoder + 'static, { st: DispatcherState, error: Option, flags: Flags, shared: Rc>, + response: Option>>, pool: Pool, cfg: DispatcherConfig, read_remains: u32, @@ -193,7 +195,7 @@ impl From> for DispatcherError { impl Dispatcher where S: Service, Response = Option>>, - U: Decoder + Encoder, + U: Decoder + Encoder + 'static, { /// Construct new `Dispatcher` instance. pub fn new( @@ -230,6 +232,7 @@ where shared, flags, cfg: cfg.clone(), + response: None, error: None, read_remains: 0, read_remains_prev: 0, @@ -245,8 +248,7 @@ where S: Service, Response = Option>>, U: Encoder + Decoder, { - fn handle_result(&self, item: Result, io: &IoBoxed) { - self.inflight.set(self.inflight.get() - 1); + fn handle_result(&self, item: Result, io: &IoBoxed, wake: bool) { match item { Ok(Some(val)) => { if let Err(err) = io.encode(val, &self.codec) { @@ -256,11 +258,14 @@ where Err(err) => self.error.set(Some(DispatcherError::Service(err))), Ok(None) => (), } - io.wake(); + self.inflight.set(self.inflight.get() - 1); + if wake { + io.wake(); + } } } -impl future::Future for Dispatcher +impl Future for Dispatcher where S: Service, Response = Option>> + 'static, U: Decoder + Encoder + 'static, @@ -271,6 +276,14 @@ where let mut this = self.as_mut().project(); let slf = &mut this.inner; + // handle service response future + if let Some(fut) = slf.response.as_mut() { + if let Poll::Ready(item) = Pin::new(fut).poll(cx) { + slf.shared.handle_result(item, &slf.shared.io, false); + slf.response = None; + } + } + // handle memory pool pressure if slf.pool.poll_ready(cx).is_pending() { slf.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT); @@ -339,37 +352,25 @@ where PollService::Continue => continue, }; - // call service - let shared = slf.shared.clone(); - shared.inflight.set(shared.inflight.get() + 1); - let _ = spawn(async move { - let result = shared.service.call(item).await; - shared.handle_result(result, &shared.io); - }); + slf.call_service(cx, item); } // handle write back-pressure DispatcherState::Backpressure => { - let item = match ready!(slf.poll_service(cx)) { - PollService::Ready => { - if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) { - slf.st = DispatcherState::Stop; - DispatchItem::Disconnect(Some(err)) - } else { - slf.st = DispatcherState::Processing; - DispatchItem::WBackPressureDisabled - } - } - PollService::Item(item) => item, + match ready!(slf.poll_service(cx)) { + PollService::Ready => (), + PollService::Item(item) => slf.call_service(cx, item), PollService::Continue => continue, }; - // call service - let shared = slf.shared.clone(); - shared.inflight.set(shared.inflight.get() + 1); - let _ = spawn(async move { - let result = shared.service.call(item).await; - shared.handle_result(result, &shared.io); - }); + let item = if let Err(err) = ready!(slf.shared.io.poll_flush(cx, false)) + { + slf.st = DispatcherState::Stop; + DispatchItem::Disconnect(Some(err)) + } else { + slf.st = DispatcherState::Processing; + DispatchItem::WBackPressureDisabled + }; + slf.call_service(cx, item); } // drain service responses and shutdown io DispatcherState::Stop => { @@ -432,6 +433,26 @@ where S: Service, Response = Option>> + 'static, U: Decoder + Encoder + 'static, { + fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem) { + let mut fut = self.shared.service.call_static(item); + self.shared.inflight.set(self.shared.inflight.get() + 1); + + // optimize first call + if self.response.is_none() { + if let Poll::Ready(result) = Pin::new(&mut fut).poll(cx) { + self.shared.handle_result(result, &self.shared.io, false); + } else { + self.response = Some(fut); + } + } else { + let shared = self.shared.clone(); + let _ = spawn(async move { + let result = fut.await; + shared.handle_result(result, &shared.io, true); + }); + } + } + fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { match self.shared.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { @@ -493,7 +514,10 @@ where self.st = DispatcherState::Stop; Poll::Ready(PollService::Item(DispatchItem::Disconnect(err))) } - IoStatusUpdate::WriteBackpressure => Poll::Pending, + IoStatusUpdate::WriteBackpressure => { + self.st = DispatcherState::Backpressure; + Poll::Ready(PollService::Item(DispatchItem::WBackPressureEnabled)) + } } } // handle service readiness error @@ -703,6 +727,7 @@ mod tests { inner: DispatcherInner { error: None, st: DispatcherState::Processing, + response: None, read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 74c14b88..c613a240 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -32,6 +32,9 @@ pub use self::tasks::{ReadContext, WriteContext}; pub use self::timer::TimerHandle; pub use self::utils::{seal, Decoded}; +#[doc(hidden)] +pub use self::io::Flags; + /// Status for read task #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum ReadStatus { diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index b9b1503c..9d504323 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -67,7 +67,7 @@ ntex-bytes = "0.1.25" ntex-server = "1.0.5" ntex-h2 = "0.5.4" ntex-rt = "0.4.12" -ntex-io = "1.1.0" +ntex-io = "1.2.0" ntex-net = "1.0.1" ntex-tls = "1.1.0"