diff --git a/Cargo.toml b/Cargo.toml index 76c34f0c..f3eea4d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ ntex-io = { path = "ntex-io" } ntex-http = { path = "ntex-http" } ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } -#ntex-service = { path = "ntex-service" } +ntex-service = { path = "ntex-service" } ntex-tls = { path = "ntex-tls" } ntex-macros = { path = "ntex-macros" } ntex-util = { path = "ntex-util" } @@ -34,3 +34,5 @@ ntex-util = { path = "ntex-util" } ntex-glommio = { path = "ntex-glommio" } ntex-tokio = { path = "ntex-tokio" } ntex-async-std = { path = "ntex-async-std" } + +ntex-h2 = { git = "https://github.com/ntex-rs/ntex-h2.git", branch = "0-3" } diff --git a/ntex-async-std/Cargo.toml b/ntex-async-std/Cargo.toml index 0c9540e5..6a3844b1 100644 --- a/ntex-async-std/Cargo.toml +++ b/ntex-async-std/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-async-std" -version = "0.2.2" +version = "0.3.0" authors = ["ntex contributors "] description = "async-std intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.2.4" -ntex-util = "0.2.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" async-oneshot = "0.5.0" log = "0.4" pin-project-lite = "0.2" diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index a31c44a5..09e45878 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -27,4 +27,4 @@ simdutf8 = { version = "0.1.4", optional = true } [dev-dependencies] serde_test = "1.0" serde_json = "1.0" -ntex = { version = "0.6.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } diff --git a/ntex-connect/Cargo.toml b/ntex-connect/Cargo.toml index 49041cba..0e3af423 100644 --- a/ntex-connect/Cargo.toml +++ b/ntex-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-connect" -version = "0.2.1" +version = "0.3.0" authors = ["ntex contributors "] description = "ntexwork connect utils for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -34,28 +34,28 @@ glommio = ["ntex-rt/glommio", "ntex-glommio"] async-std = ["ntex-rt/async-std", "ntex-async-std"] [dependencies] -ntex-service = "1.0.0" +ntex-service = "1.2.0" ntex-bytes = "0.1.19" ntex-http = "0.1.8" -ntex-io = "0.2.1" +ntex-io = "0.3.0" ntex-rt = "0.4.7" -ntex-tls = "0.2.1" -ntex-util = "0.2.0" +ntex-tls = "0.3.0" +ntex-util = "0.3.0" log = "0.4" thiserror = "1.0" -ntex-tokio = { version = "0.2.3", optional = true } -ntex-glommio = { version = "0.2.4", optional = true } -ntex-async-std = { version = "0.2.2", optional = true } +ntex-tokio = { version = "0.3.0", optional = true } +ntex-glommio = { version = "0.3.0", optional = true } +ntex-async-std = { version = "0.3.0", optional = true } # openssl tls-openssl = { version="0.10", package = "openssl", optional = true } # rustls -tls-rustls = { version = "0.20", package = "rustls", optional = true } -webpki-roots = { version = "0.22", optional = true } +tls-rustls = { version = "0.21", package = "rustls", optional = true } +webpki-roots = { version = "0.23", optional = true } [dev-dependencies] rand = "0.8" env_logger = "0.10" -ntex = { version = "0.6.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } diff --git a/ntex-connect/src/lib.rs b/ntex-connect/src/lib.rs index c018c538..6fd06ce8 100644 --- a/ntex-connect/src/lib.rs +++ b/ntex-connect/src/lib.rs @@ -20,7 +20,6 @@ pub use self::resolve::Resolver; pub use self::service::Connector; use ntex_io::Io; -use ntex_service::Service; /// Resolve and connect to remote host pub async fn connect(message: U) -> Result @@ -28,7 +27,8 @@ where T: Address, Connect: From, { - service::ConnectServiceResponse::new(Resolver::new().call(message.into())).await + service::ConnectServiceResponse::new(Box::pin(Resolver::new().lookup(message.into()))) + .await } #[allow(unused_imports)] diff --git a/ntex-connect/src/openssl.rs b/ntex-connect/src/openssl.rs index e14e6ad0..c3695468 100644 --- a/ntex-connect/src/openssl.rs +++ b/ntex-connect/src/openssl.rs @@ -5,22 +5,22 @@ pub use tls_openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslM use ntex_bytes::PoolId; use ntex_io::{FilterFactory, Io, Layer}; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Container, Ctx, Service, ServiceFactory}; use ntex_tls::openssl::SslConnector as IoSslConnector; use ntex_util::future::{BoxFuture, Ready}; use super::{Address, Connect, ConnectError, Connector as BaseConnector}; pub struct Connector { - connector: BaseConnector, + connector: Container>, openssl: SslConnector, } -impl Connector { +impl Connector { /// Construct new OpensslConnectService factory pub fn new(connector: SslConnector) -> Self { Connector { - connector: BaseConnector::default(), + connector: BaseConnector::default().into(), openssl: connector, } } @@ -30,8 +30,15 @@ impl Connector { /// Use specified memory pool for memory allocations. By default P0 /// memory pool is used. pub fn memory_pool(self, id: PoolId) -> Self { + let connector = self + .connector + .into_service() + .expect("Connector has been cloned") + .memory_pool(id) + .into(); + Self { - connector: self.connector.memory_pool(id), + connector, openssl: self.openssl, } } @@ -100,7 +107,7 @@ impl Service> for Connector { type Future<'f> = BoxFuture<'f, Result>; #[inline] - fn call(&self, req: Connect) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Connect, _: Ctx<'a, Self>) -> Self::Future<'a> { Box::pin(self.connect(req)) } } @@ -108,7 +115,7 @@ impl Service> for Connector { #[cfg(test)] mod tests { use super::*; - use ntex_service::{Service, ServiceFactory}; + use ntex_service::ServiceFactory; #[ntex::test] async fn test_openssl_connect() { @@ -117,9 +124,9 @@ mod tests { }); let ssl = SslConnector::builder(SslMethod::tls()).unwrap(); - let factory = Connector::new(ssl.build()).clone().memory_pool(PoolId::P5); + let factory = Connector::new(ssl.build()).memory_pool(PoolId::P5).clone(); - let srv = factory.create(&()).await.unwrap(); + let srv = factory.container(&()).await.unwrap(); let result = srv .call(Connect::new("").set_addr(Some(server.addr()))) .await; diff --git a/ntex-connect/src/resolve.rs b/ntex-connect/src/resolve.rs index 013fca79..7fcf7244 100644 --- a/ntex-connect/src/resolve.rs +++ b/ntex-connect/src/resolve.rs @@ -1,7 +1,7 @@ use std::{fmt, io, marker, net}; use ntex_rt::spawn_blocking; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Ctx, Service, ServiceFactory}; use ntex_util::future::{BoxFuture, Either, Ready}; use crate::{Address, Connect, ConnectError}; @@ -115,7 +115,7 @@ impl Service> for Resolver { type Future<'f> = BoxFuture<'f, Result, Self::Error>>; #[inline] - fn call(&self, req: Connect) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Connect, _: Ctx<'a, Self>) -> Self::Future<'_> { Box::pin(self.lookup(req)) } } @@ -129,7 +129,7 @@ mod tests { async fn resolver() { let resolver = Resolver::default().clone(); assert!(format!("{:?}", resolver).contains("Resolver")); - let srv = resolver.create(()).await.unwrap(); + let srv = resolver.container(()).await.unwrap(); assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); let res = srv.call(Connect::new("www.rust-lang.org")).await; diff --git a/ntex-connect/src/rustls.rs b/ntex-connect/src/rustls.rs index 1674c87f..de271d3b 100644 --- a/ntex-connect/src/rustls.rs +++ b/ntex-connect/src/rustls.rs @@ -5,7 +5,7 @@ pub use tls_rustls::{ClientConfig, ServerName}; use ntex_bytes::PoolId; use ntex_io::{FilterFactory, Io, Layer}; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Container, Ctx, Service, ServiceFactory}; use ntex_tls::rustls::TlsConnector; use ntex_util::future::{BoxFuture, Ready}; @@ -13,24 +13,24 @@ use super::{Address, Connect, ConnectError, Connector as BaseConnector}; /// Rustls connector factory pub struct Connector { - connector: BaseConnector, + connector: Container>, inner: TlsConnector, } -impl From> for Connector { +impl From> for Connector { fn from(cfg: std::sync::Arc) -> Self { Connector { inner: TlsConnector::new(cfg), - connector: BaseConnector::default(), + connector: BaseConnector::default().into(), } } } -impl Connector { +impl Connector { pub fn new(config: ClientConfig) -> Self { Connector { inner: TlsConnector::new(std::sync::Arc::new(config)), - connector: BaseConnector::default(), + connector: BaseConnector::default().into(), } } @@ -39,8 +39,14 @@ impl Connector { /// Use specified memory pool for memory allocations. By default P0 /// memory pool is used. pub fn memory_pool(self, id: PoolId) -> Self { + let connector = self + .connector + .into_service() + .unwrap() + .memory_pool(id) + .into(); Self { - connector: self.connector.memory_pool(id), + connector, inner: self.inner, } } @@ -104,7 +110,7 @@ impl Service> for Connector { type Error = ConnectError; type Future<'f> = BoxFuture<'f, Result>; - fn call(&self, req: Connect) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Connect, _: Ctx<'a, Self>) -> Self::Future<'a> { Box::pin(self.connect(req)) } } @@ -115,7 +121,7 @@ mod tests { use tls_rustls::{OwnedTrustAnchor, RootCertStore}; use super::*; - use ntex_service::{Service, ServiceFactory}; + use ntex_service::ServiceFactory; use ntex_util::future::lazy; #[ntex::test] @@ -139,9 +145,11 @@ mod tests { .with_root_certificates(cert_store) .with_no_client_auth(); let _ = Connector::<&'static str>::new(config.clone()).clone(); - let factory = Connector::from(Arc::new(config)).memory_pool(PoolId::P5); + let factory = Connector::from(Arc::new(config)) + .memory_pool(PoolId::P5) + .clone(); - let srv = factory.create(&()).await.unwrap(); + let srv = factory.container(&()).await.unwrap(); // always ready assert!(lazy(|cx| srv.poll_ready(cx)).await.is_ready()); let result = srv diff --git a/ntex-connect/src/service.rs b/ntex-connect/src/service.rs index cf4cfd7b..9e408257 100644 --- a/ntex-connect/src/service.rs +++ b/ntex-connect/src/service.rs @@ -3,7 +3,7 @@ use std::{collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin}; use ntex_bytes::{PoolId, PoolRef}; use ntex_io::{types, Io}; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Ctx, Service, ServiceFactory}; use ntex_util::future::{BoxFuture, Either, Ready}; use crate::{net::tcp_connect_in, Address, Connect, ConnectError, Resolver}; @@ -39,7 +39,7 @@ impl Connector { Connect: From, { ConnectServiceResponse { - state: ConnectState::Resolve(self.resolver.call(message.into())), + state: ConnectState::Resolve(Box::pin(self.resolver.lookup(message.into()))), pool: self.pool, } .await @@ -80,13 +80,13 @@ impl Service> for Connector { type Future<'f> = ConnectServiceResponse<'f, T>; #[inline] - fn call(&self, req: Connect) -> Self::Future<'_> { - ConnectServiceResponse::new(self.resolver.call(req)) + fn call<'a>(&'a self, req: Connect, _: Ctx<'a, Self>) -> Self::Future<'a> { + ConnectServiceResponse::new(Box::pin(self.resolver.lookup(req))) } } enum ConnectState<'f, T: Address> { - Resolve( as Service>>::Future<'f>), + Resolve(BoxFuture<'f, Result, ConnectError>>), Connect(TcpConnectorResponse), } @@ -97,7 +97,7 @@ pub struct ConnectServiceResponse<'f, T: Address> { } impl<'f, T: Address> ConnectServiceResponse<'f, T> { - pub(super) fn new(fut: as Service>>::Future<'f>) -> Self { + pub(super) fn new(fut: BoxFuture<'f, Result, ConnectError>>) -> Self { Self { state: ConnectState::Resolve(fut), pool: PoolId::P0.pool_ref(), diff --git a/ntex-glommio/Cargo.toml b/ntex-glommio/Cargo.toml index 379ce728..334b255d 100644 --- a/ntex-glommio/Cargo.toml +++ b/ntex-glommio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-glommio" -version = "0.2.4" +version = "0.3.0" authors = ["ntex contributors "] description = "glommio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.2.4" -ntex-util = "0.2.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" async-oneshot = "0.5.0" futures-lite = "1.12" log = "0.4" diff --git a/ntex-http/src/value.rs b/ntex-http/src/value.rs index 58602fa2..93e3cc8c 100644 --- a/ntex-http/src/value.rs +++ b/ntex-http/src/value.rs @@ -1,5 +1,4 @@ #![allow( - clippy::derive_hash_xor_eq, clippy::should_implement_trait, clippy::no_effect, clippy::missing_safety_doc @@ -8,6 +7,7 @@ use std::{cmp, error::Error, fmt, str, str::FromStr}; use ntex_bytes::{ByteString, Bytes}; +#[allow(clippy::derived_hash_with_manual_eq)] /// Represents an HTTP header field value. /// /// In practice, HTTP header field values are usually valid ASCII. However, the @@ -17,7 +17,7 @@ use ntex_bytes::{ByteString, Bytes}; /// To handle this, the `HeaderValue` is useable as a type and can be compared /// with strings and implements `Debug`. A `to_str` fn is provided that returns /// an `Err` if the header value contains non visible ascii characters. -#[derive(Clone, Hash)] +#[derive(Clone, Hash, Eq)] pub struct HeaderValue { inner: Bytes, is_sensitive: bool, @@ -542,8 +542,6 @@ impl PartialEq for HeaderValue { } } -impl Eq for HeaderValue {} - impl PartialOrd for HeaderValue { #[inline] fn partial_cmp(&self, other: &HeaderValue) -> Option { diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 33210424..559d44da 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.2.10" +version = "0.3.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] ntex-codec = "0.6.2" ntex-bytes = "0.1.19" -ntex-util = "0.2.2" -ntex-service = "1.0.2" +ntex-util = "0.3.0" +ntex-service = "1.2.0" bitflags = "1.3" log = "0.4" @@ -29,4 +29,4 @@ pin-project-lite = "0.2" rand = "0.8" env_logger = "0.10" -ntex = { version = "0.6.3", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } diff --git a/ntex-io/src/buf.rs b/ntex-io/src/buf.rs index 55911316..36425f56 100644 --- a/ntex-io/src/buf.rs +++ b/ntex-io/src/buf.rs @@ -376,14 +376,12 @@ impl<'a> ReadBuf<'a> { if let Some(dst) = dst { if dst.is_empty() { self.io.memory_pool().release_read_buf(dst); + } else if let Some(mut buf) = self.curr.0.take() { + buf.extend_from_slice(&dst); + self.curr.0.set(Some(buf)); + self.io.memory_pool().release_read_buf(dst); } else { - if let Some(mut buf) = self.curr.0.take() { - buf.extend_from_slice(&dst); - self.curr.0.set(Some(buf)); - self.io.memory_pool().release_read_buf(dst); - } else { - self.curr.0.set(Some(dst)); - } + self.curr.0.set(Some(dst)); } } } diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 84c433b8..2fa79ce7 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -3,7 +3,7 @@ use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll, time} use ntex_bytes::Pool; use ntex_codec::{Decoder, Encoder}; -use ntex_service::{IntoService, Service}; +use ntex_service::{Container, IntoService, Service}; use ntex_util::time::Seconds; use ntex_util::{future::Either, ready, spawn}; @@ -51,7 +51,7 @@ where { io: IoBoxed, codec: U, - service: S, + service: Container, error: Cell::Error>>>, inflight: Cell, } @@ -107,7 +107,7 @@ where codec, error: Cell::new(None), inflight: Cell::new(0), - service: service.into_service(), + service: Container::new(service.into_service()), }); Dispatcher { @@ -340,7 +340,7 @@ where { fn poll_service( &self, - srv: &S, + srv: &Container, cx: &mut Context<'_>, io: &IoBoxed, ) -> Poll> { @@ -426,6 +426,7 @@ mod tests { use ntex_bytes::{Bytes, PoolId, PoolRef}; use ntex_codec::BytesCodec; + use ntex_service::Ctx; use ntex_util::{future::Ready, time::sleep, time::Millis, time::Seconds}; use super::*; @@ -475,7 +476,7 @@ mod tests { io: state.into(), error: Cell::new(None), inflight: Cell::new(0), - service: service.into_service(), + service: Container::new(service.into_service()), }); ( @@ -621,7 +622,11 @@ mod tests { Poll::Ready(Err(())) } - fn call(&self, _: DispatchItem) -> Self::Future<'_> { + fn call<'a>( + &'a self, + _: DispatchItem, + _: Ctx<'a, Self>, + ) -> Self::Future<'a> { Ready::Ok(None) } } diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 273eb842..ecbefc45 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -80,20 +80,15 @@ impl Default for IoTestFlags { } } -#[derive(Debug)] +#[derive(Debug, Default)] enum IoTestState { + #[default] Ok, Pending, Close, Err(io::Error), } -impl Default for IoTestState { - fn default() -> Self { - IoTestState::Ok - } -} - impl IoTest { /// Create a two interconnected streams pub fn create() -> (IoTest, IoTest) { diff --git a/ntex-io/src/utils.rs b/ntex-io/src/utils.rs index 24f392ac..78d1343c 100644 --- a/ntex-io/src/utils.rs +++ b/ntex-io/src/utils.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use ntex_service::{fn_service, pipeline_factory, Service, ServiceFactory}; +use ntex_service::{fn_service, pipeline_factory, Ctx, Service, ServiceFactory}; use ntex_util::future::Ready; use crate::{Filter, FilterFactory, Io, IoBoxed, Layer}; @@ -75,7 +75,7 @@ where type Future<'f> = T::Future where T: 'f, F: 'f; #[inline] - fn call(&self, req: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Io, _: Ctx<'a, Self>) -> Self::Future<'a> { self.filter.clone().create(req) } } @@ -106,7 +106,7 @@ mod tests { .unwrap(); Ok::<_, ()>(()) })) - .create(()) + .container(()) .await .unwrap(); let _ = svc.call(Io::new(server)).await; @@ -152,7 +152,7 @@ mod tests { let _ = io.recv(&BytesCodec).await; Ok::<_, ()>(()) }))) - .create(()) + .container(()) .await .unwrap(); let _ = svc.call(Io::new(server)).await; diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index ff29b090..5811cd08 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] } proc-macro2 = "^1" [dev-dependencies] -ntex = { version = "0.6.0-alpha.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } futures = "0.3" env_logger = "0.10" \ No newline at end of file diff --git a/ntex-router/src/quoter.rs b/ntex-router/src/quoter.rs index 5cf3541f..f619ea37 100644 --- a/ntex-router/src/quoter.rs +++ b/ntex-router/src/quoter.rs @@ -50,7 +50,7 @@ pub(super) fn requote(val: &[u8]) -> Option { #[inline] fn from_hex(v: u8) -> Option { - if (b'0'..=b'9').contains(&v) { + if v.is_ascii_digit() { Some(v - 0x30) // ord('0') == 0x30 } else if (b'A'..=b'F').contains(&v) { Some(v - 0x41 + 10) // ord('A') == 0x41 diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 9189a081..9df4ea42 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -20,5 +20,5 @@ pin-project-lite = "0.2.6" slab = "0.4" [dev-dependencies] -ntex = { version = "0.6.0", features = ["tokio"] } -ntex-util = "0.2.0" +ntex = { version = "0.7.0", features = ["tokio"] } +ntex-util = "0.3.0" diff --git a/ntex-service/src/and_then.rs b/ntex-service/src/and_then.rs index 78709c36..cc286dd1 100644 --- a/ntex-service/src/and_then.rs +++ b/ntex-service/src/and_then.rs @@ -59,10 +59,7 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { AndThenServiceResponse { slf: self, state: State::A { @@ -253,10 +250,7 @@ mod tests { Poll::Ready(Ok(())) } - fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> - where - &'static str: 'a, - { + fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok(req) } } @@ -274,10 +268,7 @@ mod tests { Poll::Ready(Ok(())) } - fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> - where - &'static str: 'a, - { + fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok((req, "srv2")) } } diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index cdb02605..479b183f 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -101,10 +101,7 @@ where crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - In: 'a, - { + fn call<'a>(&'a self, req: In, ctx: Ctx<'a, Self>) -> Self::Future<'a> { let (index, waiters) = ctx.into_inner(); let svc = ApplyService { index, @@ -235,10 +232,7 @@ mod tests { type Error = (); type Future<'f> = Ready<(), ()>; - fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> - where - (): 'a, - { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok(()) } } diff --git a/ntex-service/src/boxed.rs b/ntex-service/src/boxed.rs index 809061cf..1bdb0218 100644 --- a/ntex-service/src/boxed.rs +++ b/ntex-service/src/boxed.rs @@ -62,9 +62,7 @@ pub trait ServiceObj { req: Req, idx: usize, waiters: &'a Rc>>>, - ) -> BoxFuture<'a, Self::Response, Self::Error> - where - Req: 'a; + ) -> BoxFuture<'a, Self::Response, Self::Error>; } impl ServiceObj for S @@ -91,10 +89,7 @@ where req: Req, idx: usize, waiters: &'a Rc>>>, - ) -> BoxFuture<'a, Self::Response, Self::Error> - where - Req: 'a, - { + ) -> BoxFuture<'a, Self::Response, Self::Error> { Box::pin(Ctx::<'a, S>::new(idx, waiters).call_nowait(self, req)) } } @@ -194,10 +189,7 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { let (index, waiters) = ctx.into_inner(); self.0.call(req, index, waiters) } @@ -230,10 +222,7 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { let (index, waiters) = ctx.into_inner(); self.0.call(req, index, waiters) } diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index 5c6a7ca6..3d107c8c 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,18 +1,14 @@ -use std::{cell::RefCell, future::Future, marker, pin::Pin, rc::Rc, task, task::Poll}; +use std::{cell::RefCell, future::Future, marker, ops, pin::Pin, rc::Rc, task, task::Poll}; use crate::{Service, ServiceFactory}; -pub struct Container { +pub struct Container { svc: Rc, index: usize, waiters: Rc>>>, - _t: marker::PhantomData, } -impl Container -where - S: Service, -{ +impl Container { #[inline] pub fn new(svc: S) -> Self { let mut waiters = slab::Slab::new(); @@ -21,13 +17,15 @@ where index, svc: Rc::new(svc), waiters: Rc::new(RefCell::new(waiters)), - _t: marker::PhantomData, } } #[inline] /// Returns `Ready` when the service is able to process requests. - pub fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_ready(&self, cx: &mut task::Context<'_>) -> Poll> + where + S: Service, + { let res = self.svc.poll_ready(cx); if res.is_pending() { @@ -38,13 +36,19 @@ where #[inline] /// Shutdown enclosed service. - pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> Poll<()> { + pub fn poll_shutdown(&self, cx: &mut task::Context<'_>) -> Poll<()> + where + S: Service, + { self.svc.poll_shutdown(cx) } #[inline] /// Process the request and return the response asynchronously. - pub fn call<'a>(&'a self, req: R) -> ServiceCall<'a, S, R> { + pub fn call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R> + where + S: Service, + { let ctx = Ctx::<'a, S> { index: self.index, waiters: &self.waiters, @@ -53,7 +57,7 @@ where ctx.call(self.svc.as_ref(), req) } - pub(crate) fn create, C>( + pub(crate) fn create, R, C>( f: &F, cfg: C, ) -> ContainerFactory<'_, F, R, C> { @@ -62,9 +66,15 @@ where _t: marker::PhantomData, } } + + pub fn into_service(self) -> Option { + let svc = self.svc.clone(); + drop(self); + Rc::try_unwrap(svc).ok() + } } -impl Clone for Container { +impl Clone for Container { fn clone(&self) -> Self { let index = self.waiters.borrow_mut().insert(None); @@ -72,21 +82,26 @@ impl Clone for Container { index, svc: self.svc.clone(), waiters: self.waiters.clone(), - _t: marker::PhantomData, } } } -impl From for Container -where - S: Service, -{ +impl From for Container { fn from(svc: S) -> Self { Container::new(svc) } } -impl Drop for Container { +impl ops::Deref for Container { + type Target = S; + + #[inline] + fn deref(&self) -> &S { + self.svc.as_ref() + } +} + +impl Drop for Container { fn drop(&mut self) { let mut waiters = self.waiters.borrow_mut(); @@ -268,7 +283,7 @@ impl<'f, F, R, C> Future for ContainerFactory<'f, F, R, C> where F: ServiceFactory + 'f, { - type Output = Result, F::InitError>; + type Output = Result, F::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { Poll::Ready(Ok(Container::new(task::ready!(self @@ -297,10 +312,7 @@ mod tests { self.1.poll_ready(cx).map(|_| Ok(())) } - fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> - where - &'static str: 'a, - { + fn call<'a>(&'a self, req: &'static str, _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok(req) } } diff --git a/ntex-service/src/fn_service.rs b/ntex-service/src/fn_service.rs index 607a65b1..dc3bafb3 100644 --- a/ntex-service/src/fn_service.rs +++ b/ntex-service/src/fn_service.rs @@ -128,10 +128,7 @@ where type Future<'f> = Fut where Self: 'f, Req: 'f; #[inline] - fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> { (self.f)(req) } } @@ -193,10 +190,7 @@ where type Future<'f> = Fut where Self: 'f; #[inline] - fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> { (self.f)(req) } } diff --git a/ntex-service/src/fn_shutdown.rs b/ntex-service/src/fn_shutdown.rs index 06043dda..3b549877 100644 --- a/ntex-service/src/fn_shutdown.rs +++ b/ntex-service/src/fn_shutdown.rs @@ -60,10 +60,7 @@ where } #[inline] - fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, _: Ctx<'a, Self>) -> Self::Future<'a> { ready(Ok(req)) } } diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index addc4cc0..16333b9b 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -54,7 +54,7 @@ pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; /// simple API surfaces. This leads to simpler design of each service, improves test-ability and /// makes composition easier. /// -/// ```rust,ignore +/// ```rust /// # use std::convert::Infallible; /// # use std::future::Future; /// # use std::pin::Pin; @@ -101,9 +101,7 @@ pub trait Service { /// /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be /// resilient to this fact. - fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a; + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a>; #[inline] /// Returns `Ready` when the service is able to process requests. @@ -203,7 +201,7 @@ pub trait ServiceFactory { where Self: Sized, { - Container::::create(self, cfg) + Container::::create(self, cfg) } #[inline] @@ -260,10 +258,7 @@ where } #[inline] - fn call<'s>(&'s self, request: Req, ctx: Ctx<'s, Self>) -> S::Future<'s> - where - Req: 's, - { + fn call<'s>(&'s self, request: Req, ctx: Ctx<'s, Self>) -> S::Future<'s> { ctx.call_nowait(&**self, request) } } @@ -287,10 +282,7 @@ where } #[inline] - fn call<'a>(&'a self, request: Req, ctx: Ctx<'a, Self>) -> S::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, request: Req, ctx: Ctx<'a, Self>) -> S::Future<'a> { ctx.call_nowait(&**self, request) } } diff --git a/ntex-service/src/map.rs b/ntex-service/src/map.rs index 4888b0cf..4133fc4d 100644 --- a/ntex-service/src/map.rs +++ b/ntex-service/src/map.rs @@ -54,10 +54,7 @@ where crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { MapFuture { fut: ctx.call(&self.service, req), slf: self, @@ -209,10 +206,7 @@ mod tests { Poll::Ready(Ok(())) } - fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> - where - (): 'a, - { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok(()) } } diff --git a/ntex-service/src/map_err.rs b/ntex-service/src/map_err.rs index 487981d1..f02513a4 100644 --- a/ntex-service/src/map_err.rs +++ b/ntex-service/src/map_err.rs @@ -57,10 +57,7 @@ where } #[inline] - fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - R: 'a, - { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { MapErrFuture { slf: self, fut: ctx.call(&self.service, req), @@ -217,10 +214,7 @@ mod tests { } } - fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> - where - (): 'a, - { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Err(()) } } diff --git a/ntex-service/src/middleware.rs b/ntex-service/src/middleware.rs index 1f19b0d7..5468c278 100644 --- a/ntex-service/src/middleware.rs +++ b/ntex-service/src/middleware.rs @@ -239,10 +239,7 @@ mod tests { self.0.poll_ready(cx) } - fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - R: 'a, - { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { ctx.call(&self.0, req) } } diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 2cf90e16..d3fe5a09 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -118,7 +118,7 @@ impl> Pipeline { } /// Create service container - pub fn container(self) -> Container { + pub fn container(self) -> Container { Container::new(self.service) } } @@ -144,10 +144,7 @@ impl> Service for Pipeline { crate::forward_poll_shutdown!(service); #[inline] - fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - Req: 'a, - { + fn call<'a>(&'a self, req: Req, ctx: Ctx<'a, Self>) -> Self::Future<'a> { ctx.call(&self.service, req) } } diff --git a/ntex-service/src/then.rs b/ntex-service/src/then.rs index 3e6c26e7..fd0ec8ba 100644 --- a/ntex-service/src/then.rs +++ b/ntex-service/src/then.rs @@ -60,10 +60,7 @@ where } #[inline] - fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> - where - R: 'a, - { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { ThenServiceResponse { slf: self, state: State::A { @@ -270,10 +267,7 @@ mod tests { &'a self, req: Result<&'static str, &'static str>, _: Ctx<'a, Self>, - ) -> Self::Future<'a> - where - Result<&'static str, &'static str>: 'a, - { + ) -> Self::Future<'a> { match req { Ok(msg) => Ready::Ok(msg), Err(_) => Ready::Err(()), @@ -298,10 +292,7 @@ mod tests { &'a self, req: Result<&'static str, ()>, _: Ctx<'a, Self>, - ) -> Self::Future<'a> - where - Result<&'static str, ()>: 'a, - { + ) -> Self::Future<'a> { match req { Ok(msg) => Ready::Ok((msg, "ok")), Err(()) => Ready::Ok(("srv2", "err")), @@ -329,7 +320,6 @@ mod tests { .container(); let res = srv.call(Ok("srv1")).await; - println!("=========== {:?}", res); assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "ok")); diff --git a/ntex-tls/Cargo.toml b/ntex-tls/Cargo.toml index 25c2d6e9..8e1ac8eb 100644 --- a/ntex-tls/Cargo.toml +++ b/ntex-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tls" -version = "0.2.4" +version = "0.3.0" authors = ["ntex contributors "] description = "An implementation of SSL streams for ntex backed by OpenSSL" keywords = ["network", "framework", "async", "futures"] @@ -26,20 +26,20 @@ rustls = ["tls_rust"] [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.2.7" -ntex-util = "0.2.0" -ntex-service = "1.0.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" +ntex-service = "1.2.0" log = "0.4" pin-project-lite = "0.2" # openssl -tls_openssl = { version="0.10", package = "openssl", optional = true } +tls_openssl = { version = "0.10", package = "openssl", optional = true } # rustls tls_rust = { version = "0.21", package = "rustls", optional = true } [dev-dependencies] -ntex = { version = "0.6.3", features = ["openssl", "rustls", "tokio"] } +ntex = { version = "0.7.0", features = ["openssl", "rustls", "tokio"] } env_logger = "0.10" rustls-pemfile = { version = "1.0" } -webpki-roots = { version = "0.22" } +webpki-roots = { version = "0.23" } diff --git a/ntex-tls/src/openssl/accept.rs b/ntex-tls/src/openssl/accept.rs index 3e919fa6..582de060 100644 --- a/ntex-tls/src/openssl/accept.rs +++ b/ntex-tls/src/openssl/accept.rs @@ -2,7 +2,7 @@ use std::task::{Context, Poll}; use std::{error::Error, future::Future, marker::PhantomData, pin::Pin}; use ntex_io::{Filter, FilterFactory, Io, Layer}; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Ctx, Service, ServiceFactory}; use ntex_util::{future::Ready, time::Millis}; use tls_openssl::ssl::SslAcceptor; @@ -95,7 +95,7 @@ impl Service> for AcceptorService { } #[inline] - fn call(&self, req: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Io, _: Ctx<'a, Self>) -> Self::Future<'a> { AcceptorServiceResponse { _guard: self.conns.get(), fut: self.acceptor.clone().create(req), diff --git a/ntex-tls/src/rustls/accept.rs b/ntex-tls/src/rustls/accept.rs index f1373096..ae30db87 100644 --- a/ntex-tls/src/rustls/accept.rs +++ b/ntex-tls/src/rustls/accept.rs @@ -4,7 +4,7 @@ use std::{future::Future, io, marker::PhantomData, pin::Pin, sync::Arc}; use tls_rust::ServerConfig; use ntex_io::{Filter, FilterFactory, Io, Layer}; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Ctx, Service, ServiceFactory}; use ntex_util::{future::Ready, time::Millis}; use super::{TlsAcceptor, TlsFilter}; @@ -93,7 +93,7 @@ impl Service> for AcceptorService { } #[inline] - fn call(&self, req: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Io, _: Ctx<'a, Self>) -> Self::Future<'a> { AcceptorServiceFut { _guard: self.conns.get(), fut: self.acceptor.clone().create(req), diff --git a/ntex-tokio/Cargo.toml b/ntex-tokio/Cargo.toml index dbac8855..539f66b7 100644 --- a/ntex-tokio/Cargo.toml +++ b/ntex-tokio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-tokio" -version = "0.2.3" +version = "0.3.0" authors = ["ntex contributors "] description = "tokio intergration for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] ntex-bytes = "0.1.19" -ntex-io = "0.2.4" -ntex-util = "0.2.0" +ntex-io = "0.3.0" +ntex-util = "0.3.0" log = "0.4" pin-project-lite = "0.2" tokio = { version = "1", default-features = false, features = ["rt", "net", "sync", "signal"] } diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 1643bb16..88578646 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.0] - 2023-06-xx + +* Upgrade to ntex-service 1.2 + ## [0.2.3] - 2023-06-04 * Refactor timer driver diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index a9e25da0..bef46d01 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "0.2.3" +version = "0.3.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] ntex-rt = "0.4.7" -ntex-service = "1.0.0" +ntex-service = "1.2.0" bitflags = "1.3" fxhash = "0.2.1" log = "0.4" @@ -28,7 +28,7 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"] pin-project-lite = "0.2.9" [dev-dependencies] -ntex = { version = "0.6.0", features = ["tokio"] } +ntex = { version = "0.7.0", features = ["tokio"] } ntex-bytes = "0.1.18" ntex-macros = "0.1.3" futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index 90e012fa..0d935eab 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -1,63 +1,59 @@ //! Service that buffers incomming requests. use std::cell::{Cell, RefCell}; -use std::task::{Context, Poll}; -use std::{collections::VecDeque, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; +use std::task::{ready, Context, Poll}; +use std::{collections::VecDeque, future::Future, marker::PhantomData, pin::Pin}; -use ntex_service::{IntoService, Middleware, Service}; +use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall}; use crate::{channel::oneshot, future::Either, task::LocalWaker}; /// Buffer - service factory for service that can buffer incoming request. /// /// Default number of buffered requests is 16 -pub struct Buffer { +pub struct Buffer { buf_size: usize, - err: Rc E>, _t: PhantomData, } -impl Buffer { - pub fn new(f: F) -> Self - where - F: Fn() -> E + 'static, - { +impl Default for Buffer { + fn default() -> Self { Self { buf_size: 16, - err: Rc::new(f), _t: PhantomData, } } +} +impl Buffer { pub fn buf_size(mut self, size: usize) -> Self { self.buf_size = size; self } } -impl Clone for Buffer { +impl Clone for Buffer { fn clone(&self) -> Self { Self { buf_size: self.buf_size, - err: self.err.clone(), _t: PhantomData, } } } -impl Middleware for Buffer +impl Middleware for Buffer where - S: Service, + S: Service, { - type Service = BufferService; + type Service = BufferService; fn create(&self, service: S) -> Self::Service { BufferService { service, size: self.buf_size, - err: self.err.clone(), ready: Cell::new(false), waker: LocalWaker::default(), buf: RefCell::new(VecDeque::with_capacity(self.buf_size)), + _t: PhantomData, } } } @@ -65,58 +61,57 @@ where /// Buffer service - service that can buffer incoming requests. /// /// Default number of buffered requests is 16 -pub struct BufferService, E> { +pub struct BufferService> { size: usize, ready: Cell, service: S, waker: LocalWaker, - err: Rc E>, - buf: RefCell, R)>>, + buf: RefCell>>, + _t: PhantomData, } -impl BufferService +impl BufferService where - S: Service, + S: Service, { - pub fn new(size: usize, err: F, service: U) -> Self + pub fn new(size: usize, service: U) -> Self where U: IntoService, - F: Fn() -> E + 'static, { Self { size, - err: Rc::new(err), ready: Cell::new(false), service: service.into_service(), waker: LocalWaker::default(), buf: RefCell::new(VecDeque::with_capacity(size)), + _t: PhantomData, } } } -impl Clone for BufferService +impl Clone for BufferService where - S: Service + Clone, + S: Service + Clone, { fn clone(&self) -> Self { Self { size: self.size, - err: self.err.clone(), ready: Cell::new(false), service: self.service.clone(), waker: LocalWaker::default(), buf: RefCell::new(VecDeque::with_capacity(self.size)), + _t: PhantomData, } } } -impl Service for BufferService +impl Service for BufferService where - S: Service, + S: Service, { type Response = S::Response; type Error = S::Error; - type Future<'f> = Either, BufferServiceResponse<'f, R, S, E>> where Self: 'f, R: 'f; + type Future<'f> = Either, BufferServiceResponse<'f, R, S>> where Self: 'f, R: 'f; #[inline] fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { @@ -132,8 +127,8 @@ where log::trace!("Buffer limit exceeded"); Poll::Pending } - } else if let Some((sender, req)) = buffer.pop_front() { - let _ = sender.send(req); + } else if let Some(sender) = buffer.pop_front() { + let _ = sender.send(()); self.ready.set(false); Poll::Ready(Ok(())) } else { @@ -143,17 +138,18 @@ where } #[inline] - fn call(&self, req: R) -> Self::Future<'_> { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { if self.ready.get() { self.ready.set(false); - Either::Left(self.service.call(req)) + Either::Left(ctx.call(&self.service, req)) } else { let (tx, rx) = oneshot::channel(); - self.buf.borrow_mut().push_back((tx, req)); + self.buf.borrow_mut().push_back(tx); Either::Right(BufferServiceResponse { slf: self, - state: State::Tx { rx }, + fut: ctx.call(&self.service, req), + rx: Some(rx), }) } } @@ -163,63 +159,40 @@ where pin_project_lite::pin_project! { #[doc(hidden)] - pub struct BufferServiceResponse<'f, R, S: Service, E> + #[must_use = "futures do nothing unless polled"] + pub struct BufferServiceResponse<'f, R, S: Service> { - slf: &'f BufferService, #[pin] - state: State>, + fut: ServiceCall<'f, S, R>, + slf: &'f BufferService, + rx: Option>, } } -pin_project_lite::pin_project! { - #[project = StateProject] - enum State - where F: Future, - { - Tx { rx: oneshot::Receiver }, - Srv { #[pin] fut: F }, - } -} - -impl<'f, R, S, E> Future for BufferServiceResponse<'f, R, S, E> +impl<'f, R, S> Future for BufferServiceResponse<'f, R, S> where - S: Service, + S: Service, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - loop { - match this.state.project() { - StateProject::Tx { rx } => match Pin::new(rx).poll(cx) { - Poll::Ready(Ok(req)) => { - let state = State::Srv { - fut: this.slf.service.call(req), - }; - this = self.as_mut().project(); - this.state.set(state); - } - Poll::Ready(Err(_)) => return Poll::Ready(Err((*this.slf.err)())), - Poll::Pending => return Poll::Pending, - }, - StateProject::Srv { fut } => { - let res = match fut.poll(cx) { - Poll::Ready(res) => res, - Poll::Pending => return Poll::Pending, - }; - this.slf.waker.wake(); - return Poll::Ready(res); - } - } + let this = self.as_mut().project(); + + if let Some(ref rx) = this.rx { + let _ = ready!(rx.poll_recv(cx)); + this.rx.take(); } + + let res = ready!(this.fut.poll(cx)); + this.slf.waker.wake(); + Poll::Ready(res) } } #[cfg(test)] mod tests { - use ntex_service::{apply, fn_factory, Service, ServiceFactory}; - use std::task::{Context, Poll}; + use ntex_service::{apply, fn_factory, Container, Service, ServiceFactory}; + use std::{rc::Rc, task::Context, task::Poll, time::Duration}; use super::*; use crate::future::{lazy, Ready}; @@ -247,7 +220,7 @@ mod tests { } } - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { self.0.ready.set(false); self.0.count.set(self.0.count.get() + 1); Ready::Ok(()) @@ -255,21 +228,29 @@ mod tests { } #[ntex_macros::rt_test2] - async fn test_transform() { + async fn test_service() { let inner = Rc::new(Inner { ready: Cell::new(false), waker: LocalWaker::default(), count: Cell::new(0), }); - let srv = BufferService::new(2, || (), TestService(inner.clone())).clone(); + let srv = Container::new(BufferService::new(2, TestService(inner.clone())).clone()); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let fut1 = srv.call(()); + let srv1 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv1.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 0); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let fut2 = srv.call(()); + let srv1 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv1.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 0); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); @@ -277,14 +258,14 @@ mod tests { inner.waker.wake(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let _ = fut1.await; + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 1); inner.ready.set(true); inner.waker.wake(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let _ = fut2.await; + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 2); let inner = Rc::new(Inner { @@ -293,7 +274,7 @@ mod tests { count: Cell::new(0), }); - let srv = BufferService::new(2, || (), TestService(inner.clone())); + let srv = Container::new(BufferService::new(2, TestService(inner.clone()))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let _ = srv.call(()).await; assert_eq!(inner.count.get(), 1); @@ -303,7 +284,7 @@ mod tests { #[ntex_macros::rt_test2] #[allow(clippy::redundant_clone)] - async fn test_newtransform() { + async fn test_middleware() { let inner = Rc::new(Inner { ready: Cell::new(false), waker: LocalWaker::default(), @@ -311,18 +292,26 @@ mod tests { }); let srv = apply( - Buffer::new(|| ()).buf_size(2).clone(), + Buffer::default().buf_size(2).clone(), fn_factory(|| async { Ok::<_, ()>(TestService(inner.clone())) }), ); - let srv = srv.create(&()).await.unwrap(); + let srv = srv.container(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let fut1 = srv.call(()); + let srv1 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv1.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 0); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let fut2 = srv.call(()); + let srv1 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv1.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 0); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); @@ -330,14 +319,14 @@ mod tests { inner.waker.wake(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let _ = fut1.await; + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 1); inner.ready.set(true); inner.waker.wake(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let _ = fut2.await; + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(inner.count.get(), 2); } } diff --git a/ntex-util/src/services/inflight.rs b/ntex-util/src/services/inflight.rs index 2745c181..01d28506 100644 --- a/ntex-util/src/services/inflight.rs +++ b/ntex-util/src/services/inflight.rs @@ -1,7 +1,7 @@ //! Service that limits number of in-flight async requests. use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; -use ntex_service::{IntoService, Middleware, Service}; +use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall}; use super::counter::{Counter, CounterGuard}; @@ -76,9 +76,9 @@ where } #[inline] - fn call(&self, req: R) -> Self::Future<'_> { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { InFlightServiceResponse { - fut: self.service.call(req), + fut: ctx.call(&self.service, req), _guard: self.count.get(), _t: PhantomData, } @@ -93,7 +93,7 @@ pin_project_lite::pin_project! { where T: 'f, R: 'f { #[pin] - fut: T::Future<'f>, + fut: ServiceCall<'f, T, R>, _guard: CounterGuard, _t: PhantomData } @@ -109,39 +109,43 @@ impl<'f, T: Service, R> Future for InFlightServiceResponse<'f, T, R> { #[cfg(test)] mod tests { - use ntex_service::{apply, fn_factory, Service, ServiceFactory}; - use std::{task::Poll, time::Duration}; + use ntex_service::{apply, fn_factory, Container, Ctx, Service, ServiceFactory}; + use std::{cell::RefCell, task::Poll, time::Duration}; use super::*; - use crate::future::{lazy, BoxFuture}; + use crate::{channel::oneshot, future::lazy, future::BoxFuture}; - struct SleepService(Duration); + struct SleepService(oneshot::Receiver<()>); impl Service<()> for SleepService { type Response = (); type Error = (); type Future<'f> = BoxFuture<'f, Result<(), ()>>; - fn call(&self, _: ()) -> Self::Future<'_> { - let fut = crate::time::sleep(self.0); + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Box::pin(async move { - fut.await; + let _ = self.0.recv().await; Ok::<_, ()>(()) }) } } #[ntex_macros::rt_test2] - async fn test_inflight() { - let wait_time = Duration::from_millis(50); + async fn test_service() { + let (tx, rx) = oneshot::channel(); - let srv = InFlightService::new(1, SleepService(wait_time)); + let srv = Container::new(InFlightService::new(1, SleepService(rx))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let res = srv.call(()); + let srv2 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv2.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - let _ = res.await; + let _ = tx.send(()); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready()); } @@ -154,19 +158,28 @@ mod tests { "InFlight { max_inflight: 1 }" ); - let wait_time = Duration::from_millis(50); + let (tx, rx) = oneshot::channel(); + let rx = RefCell::new(Some(rx)); let srv = apply( InFlight::new(1), - fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), + fn_factory(move || { + let rx = rx.borrow_mut().take().unwrap(); + async move { Ok::<_, ()>(SleepService(rx)) } + }), ); - let srv = srv.create(&()).await.unwrap(); + let srv = srv.container(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let res = srv.call(()); + let srv2 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv2.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - let _ = res.await; + let _ = tx.send(()); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); } } diff --git a/ntex-util/src/services/keepalive.rs b/ntex-util/src/services/keepalive.rs index 5880a2b6..f27e3ac0 100644 --- a/ntex-util/src/services/keepalive.rs +++ b/ntex-util/src/services/keepalive.rs @@ -1,7 +1,7 @@ use std::task::{Context, Poll}; use std::{cell::Cell, convert::Infallible, marker, time::Duration, time::Instant}; -use ntex_service::{Service, ServiceFactory}; +use ntex_service::{Ctx, Service, ServiceFactory}; use crate::future::Ready; use crate::time::{now, sleep, Millis, Sleep}; @@ -113,7 +113,7 @@ where } } - fn call(&self, req: R) -> Self::Future<'_> { + fn call<'a>(&'a self, req: R, _: Ctx<'a, Self>) -> Self::Future<'a> { self.expire.set(now()); Ready::Ok(req) } @@ -121,7 +121,7 @@ where #[cfg(test)] mod tests { - use ntex_service::{Service, ServiceFactory}; + use ntex_service::ServiceFactory; use super::*; use crate::future::lazy; @@ -134,7 +134,7 @@ mod tests { let factory = KeepAlive::new(Millis(100), || TestErr); let _ = factory.clone(); - let service = factory.create(&()).await.unwrap(); + let service = factory.container(&()).await.unwrap(); assert_eq!(service.call(1usize).await, Ok(1usize)); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); diff --git a/ntex-util/src/services/mod.rs b/ntex-util/src/services/mod.rs index aa8f0347..28974192 100644 --- a/ntex-util/src/services/mod.rs +++ b/ntex-util/src/services/mod.rs @@ -4,7 +4,6 @@ mod extensions; pub mod inflight; pub mod keepalive; pub mod onerequest; -pub mod shared; pub mod timeout; pub mod variant; diff --git a/ntex-util/src/services/onerequest.rs b/ntex-util/src/services/onerequest.rs index b344de51..0554ba7b 100644 --- a/ntex-util/src/services/onerequest.rs +++ b/ntex-util/src/services/onerequest.rs @@ -1,7 +1,7 @@ //! Service that limits number of in-flight async requests to 1. use std::{cell::Cell, future::Future, pin::Pin, task::Context, task::Poll}; -use ntex_service::{IntoService, Middleware, Service}; +use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall}; use crate::task::LocalWaker; @@ -63,11 +63,11 @@ where } #[inline] - fn call(&self, req: R) -> Self::Future<'_> { + fn call<'a>(&'a self, req: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { self.ready.set(false); OneRequestServiceResponse { - fut: self.service.call(req), + fut: ctx.call(&self.service, req), service: self, } } @@ -81,7 +81,7 @@ pin_project_lite::pin_project! { where T: 'f, R: 'f { #[pin] - fut: T::Future<'f>, + fut: ServiceCall<'f, T, R>, service: &'f OneRequestService, } } @@ -101,23 +101,22 @@ impl<'f, T: Service, R> Future for OneRequestServiceResponse<'f, T, R> { #[cfg(test)] mod tests { - use ntex_service::{apply, fn_factory, Service, ServiceFactory}; - use std::{task::Poll, time::Duration}; + use ntex_service::{apply, fn_factory, Container, Ctx, Service, ServiceFactory}; + use std::{cell::RefCell, task::Poll, time::Duration}; use super::*; - use crate::future::{lazy, BoxFuture}; + use crate::{channel::oneshot, future::lazy, future::BoxFuture}; - struct SleepService(Duration); + struct SleepService(oneshot::Receiver<()>); impl Service<()> for SleepService { type Response = (); type Error = (); type Future<'f> = BoxFuture<'f, Result<(), ()>>; - fn call(&self, _: ()) -> Self::Future<'_> { - let fut = crate::time::sleep(self.0); + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Box::pin(async move { - fut.await; + let _ = self.0.recv().await; Ok::<_, ()>(()) }) } @@ -125,15 +124,20 @@ mod tests { #[ntex_macros::rt_test2] async fn test_oneshot() { - let wait_time = Duration::from_millis(50); + let (tx, rx) = oneshot::channel(); - let srv = OneRequestService::new(SleepService(wait_time)); + let srv = Container::new(OneRequestService::new(SleepService(rx))); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let res = srv.call(()); + let srv2 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv2.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - let _ = res.await; + let _ = tx.send(()); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(lazy(|cx| srv.poll_shutdown(cx)).await.is_ready()); } @@ -142,19 +146,28 @@ mod tests { async fn test_middleware() { assert_eq!(format!("{:?}", OneRequest), "OneRequest"); - let wait_time = Duration::from_millis(50); + let (tx, rx) = oneshot::channel(); + let rx = RefCell::new(Some(rx)); let srv = apply( OneRequest, - fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), + fn_factory(move || { + let rx = rx.borrow_mut().take().unwrap(); + async move { Ok::<_, ()>(SleepService(rx)) } + }), ); - let srv = srv.create(&()).await.unwrap(); + let srv = srv.container(&()).await.unwrap(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let res = srv.call(()); + let srv2 = srv.clone(); + ntex::rt::spawn(async move { + let _ = srv2.call(()).await; + }); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - let _ = res.await; + let _ = tx.send(()); + crate::time::sleep(Duration::from_millis(25)).await; assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); } } diff --git a/ntex-util/src/services/shared.rs b/ntex-util/src/services/shared.rs deleted file mode 100644 index a86ea173..00000000 --- a/ntex-util/src/services/shared.rs +++ /dev/null @@ -1,169 +0,0 @@ -/// A service that can be checked for readiness by multiple tasks -use std::{ - cell::Cell, cell::RefCell, marker::PhantomData, rc::Rc, task::Context, task::Poll, -}; - -use ntex_service::{Middleware, Service}; - -use crate::channel::{condition, oneshot}; -use crate::future::{poll_fn, select, Either}; - -/// A middleware that construct sharable service -pub struct Shared(PhantomData); - -impl Shared { - pub fn new() -> Self { - Self(PhantomData) - } -} - -impl Default for Shared { - fn default() -> Self { - Self::new() - } -} - -impl, R> Middleware for Shared { - type Service = SharedService; - - fn create(&self, service: S) -> Self::Service { - SharedService::new(service) - } -} - -/// A service that can be checked for readiness by multiple tasks -pub struct SharedService, R> { - inner: Rc>, - readiness: condition::Waiter, -} - -struct Inner, R> { - service: S, - ready: condition::Condition, - driver_stop: Cell>>, - driver_running: Cell, - error: RefCell>, -} - -impl, R> SharedService { - pub fn new(service: S) -> Self { - let condition = condition::Condition::default(); - Self { - readiness: condition.wait(), - inner: Rc::new(Inner { - service, - ready: condition, - driver_stop: Cell::default(), - driver_running: Cell::default(), - error: RefCell::default(), - }), - } - } -} - -impl, R> Clone for SharedService { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - readiness: self.readiness.clone(), - } - } -} - -impl, R> Drop for SharedService { - fn drop(&mut self) { - if self.inner.driver_running.get() { - // the only live references to inner are in this SharedService instance and the driver task - if Rc::strong_count(&self.inner) == 2 { - if let Some(stop) = self.inner.driver_stop.take() { - let _ = stop.send(()); - } - } - } - } -} - -impl Service for SharedService -where - S: Service + 'static, - S::Error: Clone, - R: 'static, -{ - type Response = S::Response; - type Error = S::Error; - type Future<'f> = S::Future<'f> where Self: 'f, R: 'f; - - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - // if there is an error, it should be returned to all tasks checking readiness - if let Some(error) = self.inner.error.borrow().as_ref() { - return Poll::Ready(Err(error.clone())); - } - - // if the service is being driven to readiness we must register our waker and wait - if self.inner.driver_running.get() { - log::trace!("polled SharedService driver, driver is already running"); - // register waker to be notified, regardless of any previous notification - let _ = self.readiness.poll_ready(cx); - return Poll::Pending; - } - - // driver is not running, check the inner service is ready - let result = self.inner.service.poll_ready(cx); - log::trace!( - "polled SharedService, ready: {}, errored: {}", - result.is_ready(), - matches!(result, Poll::Ready(Err(_))) - ); - - match result { - // pass through service is ready, allow call - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - // capture error, all future readiness checks will fail - Poll::Ready(Err(e)) => { - *self.inner.error.borrow_mut() = Some(e.clone()); - Poll::Ready(Err(e)) - } - // start driver and elide all poll_ready calls until it is complete - Poll::Pending => { - let inner = self.inner.clone(); - let (tx, rx) = oneshot::channel(); - inner.driver_running.set(true); - inner.driver_stop.set(Some(tx)); - - ntex_rt::spawn(async move { - log::trace!("SharedService driver has started"); - let service_ready = poll_fn(|cx| inner.service.poll_ready(cx)); - let clients_gone = rx; - let result = select(service_ready, clients_gone).await; - if let Either::Left(result) = result { - log::trace!( - "SharedService driver completed, errored: {}", - result.is_err() - ); - if let Err(e) = result { - inner.error.borrow_mut().replace(e); - } - inner.driver_running.set(false); - inner.driver_stop.set(None); - inner.ready.notify(); - } else { - log::trace!("SharedService driver task stopped because all clients are gone"); - } - }); - - // register waker to be notified, regardless of any previous notification - let _ = self.readiness.poll_ready(cx); - - Poll::Pending - } - } - } - - fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.service.poll_shutdown(cx) - } - - fn call(&self, req: R) -> Self::Future<'_> { - self.inner.service.call(req) - } -} diff --git a/ntex-util/src/services/timeout.rs b/ntex-util/src/services/timeout.rs index 597a15c5..3e3c6de5 100644 --- a/ntex-util/src/services/timeout.rs +++ b/ntex-util/src/services/timeout.rs @@ -2,11 +2,9 @@ //! //! If the response does not complete within the specified timeout, the response //! will be aborted. -use std::{ - fmt, future::Future, marker, marker::PhantomData, pin::Pin, task::Context, task::Poll, -}; +use std::{fmt, future::Future, marker, pin::Pin, task::Context, task::Poll}; -use ntex_service::{IntoService, Middleware, Service}; +use ntex_service::{Ctx, IntoService, Middleware, Service, ServiceCall}; use crate::future::Either; use crate::time::{sleep, Millis, Sleep}; @@ -127,17 +125,17 @@ where type Error = TimeoutError; type Future<'f> = Either, TimeoutServiceResponse2<'f, S, R>> where Self: 'f, R: 'f; - fn call(&self, request: R) -> Self::Future<'_> { + fn call<'a>(&'a self, request: R, ctx: Ctx<'a, Self>) -> Self::Future<'a> { if self.timeout.is_zero() { Either::Right(TimeoutServiceResponse2 { - fut: self.service.call(request), - _t: PhantomData, + fut: ctx.call(&self.service, request), + _t: marker::PhantomData, }) } else { Either::Left(TimeoutServiceResponse { - fut: self.service.call(request), + fut: ctx.call(&self.service, request), sleep: sleep(self.timeout), - _t: PhantomData, + _t: marker::PhantomData, }) } } @@ -149,14 +147,14 @@ where pin_project_lite::pin_project! { /// `TimeoutService` response future #[doc(hidden)] - #[derive(Debug)] + #[must_use = "futures do nothing unless polled"] pub struct TimeoutServiceResponse<'f, T: Service, R> where T: 'f, R: 'f, { #[pin] - fut: T::Future<'f>, + fut: ServiceCall<'f, T, R>, sleep: Sleep, - _t: PhantomData + _t: marker::PhantomData } } @@ -187,13 +185,13 @@ where pin_project_lite::pin_project! { /// `TimeoutService` response future #[doc(hidden)] - #[derive(Debug)] + #[must_use = "futures do nothing unless polled"] pub struct TimeoutServiceResponse2<'f, T: Service, R> where T: 'f, R: 'f, { #[pin] - fut: T::Future<'f>, - _t: PhantomData, + fut: ServiceCall<'f, T, R>, + _t: marker::PhantomData, } } @@ -216,7 +214,7 @@ where mod tests { use std::{fmt, time::Duration}; - use ntex_service::{apply, fn_factory, Service, ServiceFactory}; + use ntex_service::{apply, fn_factory, Container, Service, ServiceFactory}; use super::*; use crate::future::{lazy, BoxFuture}; @@ -238,7 +236,7 @@ mod tests { type Error = SrvError; type Future<'f> = BoxFuture<'f, Result<(), SrvError>>; - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { let fut = crate::time::sleep(self.0); Box::pin(async move { fut.await; @@ -252,7 +250,9 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(50); - let timeout = TimeoutService::new(resolution, SleepService(wait_time)).clone(); + let timeout = Container::new( + TimeoutService::new(resolution, SleepService(wait_time)).clone(), + ); assert_eq!(timeout.call(()).await, Ok(())); assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready()); assert!(lazy(|cx| timeout.poll_shutdown(cx)).await.is_ready()); @@ -263,7 +263,8 @@ mod tests { let wait_time = Duration::from_millis(50); let resolution = Duration::from_millis(0); - let timeout = TimeoutService::new(resolution, SleepService(wait_time)); + let timeout = + Container::new(TimeoutService::new(resolution, SleepService(wait_time))); assert_eq!(timeout.call(()).await, Ok(())); assert!(lazy(|cx| timeout.poll_ready(cx)).await.is_ready()); } @@ -273,7 +274,8 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(500); - let timeout = TimeoutService::new(resolution, SleepService(wait_time)); + let timeout = + Container::new(TimeoutService::new(resolution, SleepService(wait_time))); assert_eq!(timeout.call(()).await, Err(TimeoutError::Timeout)); } @@ -287,7 +289,7 @@ mod tests { Timeout::new(resolution).clone(), fn_factory(|| async { Ok::<_, ()>(SleepService(wait_time)) }), ); - let srv = timeout.create(&()).await.unwrap(); + let srv = timeout.container(&()).await.unwrap(); let res = srv.call(()).await.unwrap_err(); assert_eq!(res, TimeoutError::Timeout); diff --git a/ntex-util/src/services/variant.rs b/ntex-util/src/services/variant.rs index 5240cc52..35f9f319 100644 --- a/ntex-util/src/services/variant.rs +++ b/ntex-util/src/services/variant.rs @@ -1,7 +1,7 @@ //! Contains `Variant` service and related types and functions. use std::{future::Future, marker::PhantomData, pin::Pin, task::Context, task::Poll}; -use ntex_service::{IntoServiceFactory, Service, ServiceFactory}; +use ntex_service::{Ctx, IntoServiceFactory, Service, ServiceCall, ServiceFactory}; /// Construct `Variant` service factory. /// @@ -103,7 +103,8 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, { type Response = V1::Response; type Error = V1::Error; - type Future<'f> = $mod_name::ServiceResponse, $($T::Future<'f>),+> where Self: 'f, V1: 'f; + type Future<'f> = $mod_name::ServiceResponse< + ServiceCall<'f, V1, V1R>, $(ServiceCall<'f, $T, $R>),+> where Self: 'f, V1: 'f; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let mut ready = self.V1.poll_ready(cx)?.is_ready(); @@ -127,10 +128,11 @@ macro_rules! variant_impl ({$mod_name:ident, $enum_type:ident, $srv_type:ident, } } - fn call(&self, req: $enum_type) -> Self::Future<'_> { + fn call<'a>(&'a self, req: $enum_type, ctx: Ctx<'a, Self>) -> Self::Future<'a> + { match req { - $enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: self.V1.call(req) }, - $($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: self.$T.call(req) },)+ + $enum_type::V1(req) => $mod_name::ServiceResponse::V1 { fut: ctx.call(&self.V1, req) }, + $($enum_type::$T(req) => $mod_name::ServiceResponse::$T { fut: ctx.call(&self.$T, req) },)+ } } } @@ -319,7 +321,7 @@ mod tests { Poll::Ready(()) } - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::<_, ()>::Ok(1) } } @@ -340,7 +342,7 @@ mod tests { Poll::Ready(()) } - fn call(&self, _: ()) -> Self::Future<'_> { + fn call<'a>(&'a self, _: (), _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::<_, ()>::Ok(2) } } @@ -352,7 +354,7 @@ mod tests { .clone() .v3(fn_factory(|| async { Ok::<_, ()>(Srv2) })) .clone(); - let service = factory.create(&()).await.unwrap().clone(); + let service = factory.container(&()).await.unwrap().clone(); assert!(lazy(|cx| service.poll_ready(cx)).await.is_ready()); assert!(lazy(|cx| service.poll_shutdown(cx)).await.is_ready()); diff --git a/ntex-util/src/time/wheel.rs b/ntex-util/src/time/wheel.rs index 277560dd..b63c4133 100644 --- a/ntex-util/src/time/wheel.rs +++ b/ntex-util/src/time/wheel.rs @@ -711,7 +711,7 @@ impl Future for LowresTimerDriver { flags.remove(Flags::LOWRES_TIMER); self.0.flags.set(flags); } - return Poll::Pending; + Poll::Pending } } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index d9e40e7e..8e45598a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.6.7" +version = "0.7.0" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -49,20 +49,20 @@ async-std = ["ntex-rt/async-std", "ntex-async-std", "ntex-connect/async-std"] [dependencies] ntex-codec = "0.6.2" -ntex-connect = "0.2.1" +ntex-connect = "0.3.0" ntex-http = "0.1.9" ntex-router = "0.5.1" -ntex-service = "1.0.2" +ntex-service = "1.2.0" ntex-macros = "0.1.3" -ntex-util = "0.2.3" +ntex-util = "0.3.0" ntex-bytes = "0.1.19" -ntex-h2 = "0.2.3" +ntex-h2 = "0.3.0" ntex-rt = "0.4.9" -ntex-io = "0.2.10" -ntex-tls = "0.2.4" -ntex-tokio = { version = "0.2.3", optional = true } -ntex-glommio = { version = "0.2.4", optional = true } -ntex-async-std = { version = "0.2.1", optional = true } +ntex-io = "0.3.0" +ntex-tls = "0.3.0" +ntex-tokio = { version = "0.3.0", optional = true } +ntex-glommio = { version = "0.3.0", optional = true } +ntex-async-std = { version = "0.3.0", optional = true } async-oneshot = "0.5.0" async-channel = "1.8.0" @@ -94,8 +94,8 @@ coo-kie = { version = "0.17", package = "cookie", optional = true } tls-openssl = { version="0.10", package = "openssl", optional = true } # rustls -tls-rustls = { version = "0.20", package = "rustls", optional = true } -webpki-roots = { version = "0.22", optional = true } +tls-rustls = { version = "0.21", package = "rustls", optional = true } +webpki-roots = { version = "0.23", optional = true } # compression brotli2 = { version="0.3.2", optional = true } @@ -107,6 +107,6 @@ rand = "0.8" time = "0.3" futures-util = "0.3" tls-openssl = { version="0.10", package = "openssl" } -tls-rustls = { version = "0.20", package="rustls", features = ["dangerous_configuration"] } +tls-rustls = { version = "0.21", package="rustls", features = ["dangerous_configuration"] } rustls-pemfile = { version = "1.0" } -webpki-roots = { version = "0.22" } +webpki-roots = { version = "0.23" } diff --git a/ntex/src/http/body.rs b/ntex/src/http/body.rs index 5988d67c..842759c9 100644 --- a/ntex/src/http/body.rs +++ b/ntex/src/http/body.rs @@ -55,6 +55,7 @@ impl MessageBody for Box { } } +#[derive(Debug)] pub enum ResponseBody { Body(B), Other(Body), diff --git a/ntex/src/http/client/builder.rs b/ntex/src/http/client/builder.rs index 85c0e951..d16429de 100644 --- a/ntex/src/http/client/builder.rs +++ b/ntex/src/http/client/builder.rs @@ -36,7 +36,7 @@ impl ClientBuilder { config: ClientConfig { headers: HeaderMap::new(), timeout: Millis(5_000), - connector: Box::new(ConnectorWrapper(Connector::default().finish())), + connector: Box::new(ConnectorWrapper(Connector::default().finish().into())), }, } } @@ -46,7 +46,7 @@ impl ClientBuilder { where T: Service + 'static, { - self.config.connector = Box::new(ConnectorWrapper(connector)); + self.config.connector = Box::new(ConnectorWrapper(connector.into())); self } diff --git a/ntex/src/http/client/connect.rs b/ntex/src/http/client/connect.rs index 981baa75..451897b7 100644 --- a/ntex/src/http/client/connect.rs +++ b/ntex/src/http/client/connect.rs @@ -1,13 +1,13 @@ use std::net; use crate::http::{body::Body, RequestHeadType}; -use crate::{service::Service, util::BoxFuture}; +use crate::{service::Container, service::Service, util::BoxFuture}; use super::error::{ConnectError, SendRequestError}; use super::response::ClientResponse; use super::{Connect as ClientConnect, Connection}; -pub(super) struct ConnectorWrapper(pub(crate) T); +pub(super) struct ConnectorWrapper(pub(crate) Container); pub(super) trait Connect { fn send_request( diff --git a/ntex/src/http/client/connector.rs b/ntex/src/http/client/connector.rs index 7c9c9998..e8e6d7f2 100644 --- a/ntex/src/http/client/connector.rs +++ b/ntex/src/http/client/connector.rs @@ -3,11 +3,9 @@ use std::{task::Context, task::Poll, time::Duration}; use ntex_h2::{self as h2}; use crate::connect::{Connect as TcpConnect, Connector as TcpConnector}; -use crate::service::{apply_fn, boxed, Service}; +use crate::service::{apply_fn, boxed, Ctx, Service, ServiceCall}; use crate::time::{Millis, Seconds}; -use crate::util::{ - shared::SharedService, timeout::TimeoutError, timeout::TimeoutService, Either, Ready, -}; +use crate::util::{timeout::TimeoutError, timeout::TimeoutService, Either, Ready}; use crate::{http::Uri, io::IoBoxed}; use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect}; @@ -216,7 +214,7 @@ impl Connector { /// its combinator chain. pub fn finish( self, - ) -> impl Service + Clone { + ) -> impl Service { let tcp_service = connector(self.connector, self.timeout, self.disconnect_timeout); let ssl_pool = if let Some(ssl_connector) = self.ssl_connector { @@ -233,7 +231,7 @@ impl Connector { None }; - SharedService::new(InnerConnector { + InnerConnector { tcp_pool: ConnectionPool::new( tcp_service, self.conn_lifetime, @@ -243,7 +241,7 @@ impl Connector { self.h2config.clone(), ), ssl_pool, - }) + } } } @@ -283,7 +281,7 @@ where type Response = as Service>::Response; type Error = ConnectError; type Future<'f> = Either< - as Service>::Future<'f>, + ServiceCall<'f, ConnectionPool, Connect>, Ready, >; @@ -317,16 +315,16 @@ where } } - fn call(&self, req: Connect) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Connect, ctx: Ctx<'a, Self>) -> Self::Future<'_> { match req.uri.scheme_str() { Some("https") | Some("wss") => { if let Some(ref conn) = self.ssl_pool { - Either::Left(conn.call(req)) + Either::Left(ctx.call(conn, req)) } else { Either::Right(Ready::Err(ConnectError::SslIsNotSupported)) } } - _ => Either::Left(self.tcp_pool.call(req)), + _ => Either::Left(ctx.call(&self.tcp_pool, req)), } } } diff --git a/ntex/src/http/client/h2proto.rs b/ntex/src/http/client/h2proto.rs index 76ab0d67..46ac5435 100644 --- a/ntex/src/http/client/h2proto.rs +++ b/ntex/src/http/client/h2proto.rs @@ -7,7 +7,7 @@ use crate::http::header::{self, HeaderMap, HeaderValue}; use crate::http::message::{RequestHeadType, ResponseHead}; use crate::http::{h2::payload, payload::Payload, Method, Version}; use crate::util::{poll_fn, ByteString, Bytes, HashMap, Ready}; -use crate::{channel::oneshot, service::Service}; +use crate::{channel::oneshot, service::Ctx, service::Service}; use super::error::SendRequestError; @@ -226,7 +226,7 @@ impl Service for H2PublishService { type Error = &'static str; type Future<'f> = Ready; - fn call(&self, mut msg: h2::Message) -> Self::Future<'_> { + fn call<'a>(&'a self, mut msg: h2::Message, _: Ctx<'a, Self>) -> Self::Future<'a> { match msg.kind().take() { h2::MessageKind::Headers { pseudo, diff --git a/ntex/src/http/client/mod.rs b/ntex/src/http/client/mod.rs index a7b67f0f..6b007826 100644 --- a/ntex/src/http/client/mod.rs +++ b/ntex/src/http/client/mod.rs @@ -82,7 +82,7 @@ pub(self) struct ClientConfig { impl Default for Client { fn default() -> Self { Client(Rc::new(ClientConfig { - connector: Box::new(ConnectorWrapper(Connector::default().finish())), + connector: Box::new(ConnectorWrapper(Connector::default().finish().into())), headers: HeaderMap::new(), timeout: Millis(5_000), })) diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 5df46757..dbda2292 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -6,9 +6,10 @@ use ntex_h2::{self as h2}; use crate::http::uri::{Authority, Scheme, Uri}; use crate::io::{types::HttpProtocol, IoBoxed}; +use crate::service::{Container, Ctx, Service, ServiceCall}; use crate::time::{now, Millis}; use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet}; -use crate::{channel::pool, rt::spawn, service::Service, task::LocalWaker}; +use crate::{channel::pool, rt::spawn, task::LocalWaker}; use super::connection::{Connection, ConnectionType}; use super::h2proto::{H2Client, H2PublishService}; @@ -43,7 +44,7 @@ struct AvailableConnection { /// Connections pool pub(super) struct ConnectionPool { - connector: Rc, + connector: Container, inner: Rc>, waiters: Rc>, } @@ -60,7 +61,7 @@ where limit: usize, h2config: h2::Config, ) -> Self { - let connector = Rc::new(connector); + let connector = Container::new(connector); let waiters = Rc::new(RefCell::new(Waiters { waiters: HashMap::default(), pool: pool::new(), @@ -120,10 +121,8 @@ where crate::forward_poll_ready!(connector); crate::forward_poll_shutdown!(connector); - #[inline] - fn call(&self, req: Connect) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Connect, _: Ctx<'a, Self>) -> Self::Future<'_> { trace!("Get connection for {:?}", req.uri); - let connector = self.connector.clone(); let inner = self.inner.clone(); let waiters = self.waiters.clone(); @@ -151,7 +150,7 @@ where trace!("Connecting to {:?}", req.uri); let uri = req.uri.clone(); let (tx, rx) = waiters.borrow_mut().pool.channel(); - OpenConnection::spawn(key, tx, uri, inner, connector, req); + OpenConnection::spawn(key, tx, uri, inner, self.connector.clone(), req); match rx.await { Err(_) => Err(ConnectError::Disconnected(None)), @@ -308,7 +307,7 @@ impl Inner { } struct ConnectionPoolSupport { - connector: Rc, + connector: Container, inner: Rc>, waiters: Rc>, } @@ -391,7 +390,7 @@ pin_project_lite::pin_project! { { key: Key, #[pin] - fut: T::Future<'f>, + fut: ServiceCall<'f, T, Connect>, uri: Uri, tx: Option, guard: Option, @@ -409,11 +408,12 @@ where tx: Waiter, uri: Uri, inner: Rc>, - connector: Rc, + connector: Container, msg: Connect, ) { let disconnect_timeout = inner.borrow().disconnect_timeout; + #[allow(clippy::redundant_async_block)] spawn(async move { OpenConnection:: { fut: connector.call(msg), @@ -629,19 +629,21 @@ mod tests { let store = Rc::new(RefCell::new(Vec::new())); let store2 = store.clone(); - let pool = ConnectionPool::new( - fn_service(move |req| { - let (client, server) = Io::create(); - store2.borrow_mut().push((req, server)); - Box::pin(async move { Ok(IoBoxed::from(nio::Io::new(client))) }) - }), - Duration::from_secs(10), - Duration::from_secs(10), - Millis::ZERO, - 1, - h2::Config::client(), - ) - .clone(); + let pool = Container::new( + ConnectionPool::new( + fn_service(move |req| { + let (client, server) = Io::create(); + store2.borrow_mut().push((req, server)); + Box::pin(async move { Ok(IoBoxed::from(nio::Io::new(client))) }) + }), + Duration::from_secs(10), + Duration::from_secs(10), + Millis::ZERO, + 1, + h2::Config::client(), + ) + .clone(), + ); // uri must contain authority let req = Connect { diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index 0f46516d..ff7aa915 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -3,8 +3,9 @@ use std::{cell::Cell, ptr::copy_nonoverlapping, rc::Rc, time, time::Duration}; use ntex_h2::{self as h2}; use crate::http::{Request, Response}; +use crate::service::{boxed::BoxService, Container}; use crate::time::{sleep, Millis, Seconds}; -use crate::{io::IoRef, service::boxed::BoxService, util::BytesMut}; +use crate::{io::IoRef, util::BytesMut}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] /// Server keep-alive setting @@ -101,16 +102,16 @@ impl ServiceConfig { pub(super) type OnRequest = BoxService<(Request, IoRef), Request, Response>; pub(super) struct DispatcherConfig { - pub(super) service: S, - pub(super) expect: X, - pub(super) upgrade: Option, + pub(super) service: Container, + pub(super) expect: Container, + pub(super) upgrade: Option>, pub(super) keep_alive: Duration, pub(super) client_timeout: Duration, pub(super) client_disconnect: Seconds, pub(super) h2config: h2::Config, pub(super) ka_enabled: bool, pub(super) timer: DateService, - pub(super) on_request: Option, + pub(super) on_request: Option>, } impl DispatcherConfig { @@ -122,10 +123,10 @@ impl DispatcherConfig { on_request: Option, ) -> Self { DispatcherConfig { - service, - expect, - upgrade, - on_request, + service: service.into(), + expect: expect.into(), + upgrade: upgrade.map(|v| v.into()), + on_request: on_request.map(|v| v.into()), keep_alive: Duration::from(cfg.0.keep_alive), client_timeout: Duration::from(cfg.0.client_timeout), client_disconnect: cfg.0.client_disconnect, diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index fe80c68f..15d47bee 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -2,8 +2,9 @@ use std::task::{Context, Poll}; use std::{cell::RefCell, error::Error, future::Future, io, marker, mem, pin::Pin, rc::Rc}; -use crate::io::{Filter, Io, IoBoxed, IoStatusUpdate, RecvError}; -use crate::{service::Service, util::ready, util::BoxFuture, util::Bytes}; +use crate::io::{Filter, Io, IoBoxed, IoRef, IoStatusUpdate, RecvError}; +use crate::service::{Container, Service, ServiceCall}; +use crate::util::{ready, Bytes}; use crate::http; use crate::http::body::{BodySize, MessageBody, ResponseBody}; @@ -46,7 +47,7 @@ pin_project_lite::pin_project! { } } -#[derive(thiserror::Error)] +#[derive(Debug, thiserror::Error)] enum State { #[error("State::Call")] Call, @@ -77,10 +78,10 @@ pin_project_lite::pin_project! { where S: 'static, X: 'static { None, - Service { #[pin] fut: S::Future<'static> }, - ServiceUpgrade { #[pin] fut: S::Future<'static> }, - Expect { #[pin] fut: X::Future<'static> }, - Filter { fut: BoxFuture<'static, Result> } + Service { #[pin] fut: ServiceCall<'static, S, Request> }, + ServiceUpgrade { #[pin] fut: ServiceCall<'static, S, Request> }, + Expect { #[pin] fut: ServiceCall<'static, X, Request> }, + Filter { fut: ServiceCall<'static, OnRequest, (Request, IoRef)> } } } @@ -485,7 +486,7 @@ where st } - fn service_filter(&self, req: Request, f: &OnRequest) -> CallState { + fn service_filter(&self, req: Request, f: &Container) -> CallState { // Handle filter fut let fut = f.call((req, self.io.get_ref())); let st = CallState::Filter { diff --git a/ntex/src/http/h1/expect.rs b/ntex/src/http/h1/expect.rs index 3090d0d7..d32b83ab 100644 --- a/ntex/src/http/h1/expect.rs +++ b/ntex/src/http/h1/expect.rs @@ -1,7 +1,7 @@ use std::io; -use crate::http::request::Request; -use crate::{service::Service, service::ServiceFactory, util::Ready}; +use crate::service::{Ctx, Service, ServiceFactory}; +use crate::{http::request::Request, util::Ready}; pub struct ExpectHandler; @@ -24,7 +24,7 @@ impl Service for ExpectHandler { type Future<'f> = Ready; #[inline] - fn call(&self, req: Request) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Request, _: Ctx<'a, Self>) -> Self::Future<'_> { Ready::Ok(req) } } diff --git a/ntex/src/http/h1/service.rs b/ntex/src/http/h1/service.rs index d37956cd..af973f7a 100644 --- a/ntex/src/http/h1/service.rs +++ b/ntex/src/http/h1/service.rs @@ -5,7 +5,7 @@ use crate::http::config::{DispatcherConfig, OnRequest, ServiceConfig}; use crate::http::error::{DispatchError, ResponseError}; use crate::http::{request::Request, response::Response}; use crate::io::{types, Filter, Io}; -use crate::service::{IntoServiceFactory, Service, ServiceFactory}; +use crate::service::{Ctx, IntoServiceFactory, Service, ServiceFactory}; use crate::{time::Millis, util::BoxFuture}; use super::codec::Codec; @@ -331,7 +331,7 @@ where } } - fn call(&self, io: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, io: Io, _: Ctx<'a, Self>) -> Self::Future<'_> { log::trace!( "New http1 connection, peer address {:?}", io.query::().get() diff --git a/ntex/src/http/h1/upgrade.rs b/ntex/src/http/h1/upgrade.rs index fd6eea47..55e899cb 100644 --- a/ntex/src/http/h1/upgrade.rs +++ b/ntex/src/http/h1/upgrade.rs @@ -1,7 +1,7 @@ use std::{io, marker::PhantomData}; use crate::http::{h1::Codec, request::Request}; -use crate::{io::Io, service::Service, service::ServiceFactory, util::Ready}; +use crate::{io::Io, service::Ctx, service::Service, service::ServiceFactory, util::Ready}; pub struct UpgradeHandler(PhantomData); @@ -25,7 +25,11 @@ impl Service<(Request, Io, Codec)> for UpgradeHandler { type Future<'f> = Ready where F: 'f; #[inline] - fn call(&self, _: (Request, Io, Codec)) -> Self::Future<'_> { + fn call<'a>( + &'a self, + _: (Request, Io, Codec), + _: Ctx<'a, Self>, + ) -> Self::Future<'a> { unimplemented!() } } diff --git a/ntex/src/http/h2/service.rs b/ntex/src/http/h2/service.rs index 97170fff..d4f78c6d 100644 --- a/ntex/src/http/h2/service.rs +++ b/ntex/src/http/h2/service.rs @@ -10,7 +10,7 @@ use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue}; use crate::http::message::{CurrentIo, ResponseHead}; use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version}; use crate::io::{types, Filter, Io, IoBoxed, IoRef}; -use crate::service::{IntoServiceFactory, Service, ServiceFactory}; +use crate::service::{Ctx, IntoServiceFactory, Service, ServiceFactory}; use crate::util::{poll_fn, BoxFuture, Bytes, BytesMut, Either, HashMap, Ready}; use super::payload::{Payload, PayloadSender}; @@ -181,7 +181,7 @@ where self.config.service.poll_shutdown(cx) } - fn call(&self, io: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, io: Io, _: Ctx<'a, Self>) -> Self::Future<'_> { log::trace!( "New http2 connection, peer address {:?}", io.query::().get() @@ -230,7 +230,11 @@ impl Service> for ControlService { type Error = (); type Future<'f> = Ready; - fn call(&self, msg: h2::ControlMessage) -> Self::Future<'_> { + fn call<'a>( + &'a self, + msg: h2::ControlMessage, + _: Ctx<'a, Self>, + ) -> Self::Future<'a> { log::trace!("Control message: {:?}", msg); Ready::Ok::<_, ()>(msg.ack()) } @@ -276,7 +280,7 @@ where Ready, >; - fn call(&self, mut msg: h2::Message) -> Self::Future<'_> { + fn call<'a>(&'a self, mut msg: h2::Message, _: Ctx<'a, Self>) -> Self::Future<'a> { let (io, pseudo, headers, eof, payload) = match msg.kind().take() { h2::MessageKind::Headers { pseudo, diff --git a/ntex/src/http/payload.rs b/ntex/src/http/payload.rs index 600be3fd..2fd38adb 100644 --- a/ntex/src/http/payload.rs +++ b/ntex/src/http/payload.rs @@ -7,19 +7,15 @@ use crate::util::{poll_fn, Bytes, Stream}; pub type PayloadStream = Pin>>>; /// Type represent streaming payload +#[derive(Default)] pub enum Payload { + #[default] None, H1(h1::Payload), H2(h2::Payload), Stream(PayloadStream), } -impl Default for Payload { - fn default() -> Self { - Payload::None - } -} - impl From for Payload { fn from(v: h1::Payload) -> Self { Payload::H1(v) diff --git a/ntex/src/http/service.rs b/ntex/src/http/service.rs index 79877c49..19896c21 100644 --- a/ntex/src/http/service.rs +++ b/ntex/src/http/service.rs @@ -2,7 +2,7 @@ use std::task::{Context, Poll}; use std::{cell, error, fmt, future, marker, pin::Pin, rc::Rc}; use crate::io::{types, Filter, Io}; -use crate::service::{IntoServiceFactory, Service, ServiceFactory}; +use crate::service::{Ctx, IntoServiceFactory, Service, ServiceFactory}; use crate::time::{Millis, Seconds}; use crate::util::BoxFuture; @@ -368,7 +368,7 @@ where } } - fn call(&self, io: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, io: Io, _: Ctx<'a, Self>) -> Self::Future<'a> { log::trace!( "New http connection, peer address {:?}", io.query::().get() diff --git a/ntex/src/lib.rs b/ntex/src/lib.rs index 8672aff8..99e3d603 100644 --- a/ntex/src/lib.rs +++ b/ntex/src/lib.rs @@ -38,12 +38,11 @@ pub mod web; pub mod ws; pub use self::service::{ - fn_service, into_service, pipeline, pipeline_factory, IntoService, IntoServiceFactory, - Middleware, Service, ServiceFactory, + fn_service, into_service, pipeline, pipeline_factory, Container, Ctx, IntoService, + IntoServiceFactory, Middleware, Service, ServiceCall, ServiceFactory, }; -pub use ntex_util::channel; -pub use ntex_util::task; +pub use ntex_util::{channel, task}; pub mod codec { //! Utilities for encoding and decoding frames. diff --git a/ntex/src/server/config.rs b/ntex/src/server/config.rs index 252e5bc0..f0f0cf90 100644 --- a/ntex/src/server/config.rs +++ b/ntex/src/server/config.rs @@ -1,19 +1,15 @@ -use std::{ - cell::Cell, cell::RefCell, fmt, future::Future, io, marker::PhantomData, mem, net, - rc::Rc, -}; +use std::{cell::Cell, cell::RefCell, fmt, future::Future, io, marker, mem, net, rc::Rc}; use log::error; +use crate::io::Io; use crate::service::{self, boxed, ServiceFactory as NServiceFactory}; -use crate::util::{BoxFuture, HashMap, Ready}; -use crate::{io::Io, util::PoolId}; +use crate::util::{BoxFuture, HashMap, PoolId, Ready}; use super::service::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; -use super::Token; -use super::{builder::bind_addr, counter::CounterGuard}; +use super::{builder::bind_addr, counter::CounterGuard, Token}; #[derive(Clone)] pub struct Config(Rc); @@ -66,7 +62,7 @@ impl ServiceConfig { not_configured(); Ready::Ok::<_, &'static str>(()) }, - _t: PhantomData, + _t: marker::PhantomData, })), }))) } @@ -95,7 +91,7 @@ impl ServiceConfig { not_configured(); Ready::Ok::<_, &'static str>(()) }, - _t: PhantomData, + _t: marker::PhantomData, })); } inner.services.push((name.as_ref().to_string(), lst)); @@ -114,7 +110,10 @@ impl ServiceConfig { E: fmt::Display + 'static, { self.0.borrow_mut().applied = true; - self.0.borrow_mut().apply = Some(Box::new(ConfigWrapper { f, _t: PhantomData })); + self.0.borrow_mut().apply = Some(Box::new(ConfigWrapper { + f, + _t: marker::PhantomData, + })); Ok(()) } } @@ -192,7 +191,7 @@ impl InternalServiceFactory for ConfiguredService { let name = names.remove(&token).unwrap().0; res.push(( token, - boxed::rcservice(StreamService::new( + boxed::service(StreamService::new( service::fn_service(move |_: Io| { error!("Service {:?} is not configured", name); Ready::<_, ()>::Ok(()) @@ -215,7 +214,7 @@ pub(super) trait ServiceRuntimeConfiguration { pub(super) struct ConfigWrapper { pub(super) f: F, - pub(super) _t: PhantomData<(R, E)>, + pub(super) _t: marker::PhantomData<(R, E)>, } // SAFETY: we dont store R or E in ConfigWrapper @@ -230,7 +229,7 @@ where fn clone(&self) -> Box { Box::new(ConfigWrapper { f: self.f.clone(), - _t: PhantomData, + _t: marker::PhantomData, }) } @@ -304,7 +303,7 @@ impl ServiceRuntime { let token = *token; inner.services.insert( token, - boxed::rcfactory(ServiceFactory { + boxed::factory(ServiceFactory { pool, inner: service.into_factory(), }), @@ -323,8 +322,13 @@ impl ServiceRuntime { } } -type BoxServiceFactory = - service::boxed::RcServiceFactory<(), (Option, ServerMessage), (), (), ()>; +type BoxServiceFactory = service::boxed::BoxServiceFactory< + (), + (Option, ServerMessage), + (), + (), + (), +>; struct ServiceFactory { inner: T, @@ -349,7 +353,7 @@ where let fut = self.inner.create(()); Box::pin(async move { match fut.await { - Ok(s) => Ok(boxed::rcservice(StreamService::new(s, pool))), + Ok(s) => Ok(boxed::service(StreamService::new(s, pool))), Err(e) => { error!("Cannot construct service: {:?}", e); Err(()) diff --git a/ntex/src/server/service.rs b/ntex/src/server/service.rs index d514148f..501472ad 100644 --- a/ntex/src/server/service.rs +++ b/ntex/src/server/service.rs @@ -2,10 +2,9 @@ use std::{net::SocketAddr, rc::Rc, task::Context, task::Poll}; use log::error; -use crate::io::Io; -use crate::service::{boxed, Service, ServiceFactory}; -use crate::util::{BoxFuture, Pool, PoolId, Ready}; -use crate::{rt::spawn, time::Millis}; +use crate::service::{boxed, Ctx, Service, ServiceFactory}; +use crate::util::{BoxFuture, Pool, PoolId}; +use crate::{io::Io, time::Millis}; use super::{counter::CounterGuard, socket::Stream, Config, Token}; @@ -34,7 +33,7 @@ pub(super) trait InternalServiceFactory: Send { } pub(super) type BoxedServerService = - boxed::RcService<(Option, ServerMessage), (), ()>; + boxed::BoxService<(Option, ServerMessage), (), ()>; #[derive(Clone)] pub(super) struct StreamService { @@ -53,12 +52,11 @@ impl StreamService { impl Service<(Option, ServerMessage)> for StreamService where - T: Service + 'static, - T::Error: 'static, + T: Service, { type Response = (); type Error = (); - type Future<'f> = Ready<(), ()> where T: 'f; + type Future<'f> = BoxFuture<'f, Result<(), ()>> where T: 'f; crate::forward_poll_shutdown!(service); @@ -73,31 +71,31 @@ where } } - fn call( - &self, + fn call<'a>( + &'a self, (guard, req): (Option, ServerMessage), - ) -> Self::Future<'_> { - match req { - ServerMessage::Connect(stream) => { - let stream = stream.try_into().map_err(|e| { - error!("Cannot convert to an async io stream: {}", e); - }); - - if let Ok(stream) = stream { - let stream: Io<_> = stream; - stream.set_memory_pool(self.pool.pool_ref()); - let svc = self.service.clone(); - spawn(async move { - let _ = svc.call(stream).await; - drop(guard); + ctx: Ctx<'a, Self>, + ) -> Self::Future<'a> { + Box::pin(async move { + match req { + ServerMessage::Connect(stream) => { + let stream = stream.try_into().map_err(|e| { + error!("Cannot convert to an async io stream: {}", e); }); - Ready::Ok(()) - } else { - Ready::Err(()) + + if let Ok(stream) = stream { + let stream: Io<_> = stream; + stream.set_memory_pool(self.pool.pool_ref()); + let _ = ctx.call(self.service.as_ref(), stream).await; + drop(guard); + Ok(()) + } else { + Err(()) + } } + _ => Ok(()), } - _ => Ready::Ok(()), - } + }) } } @@ -153,8 +151,7 @@ where Box::pin(async move { match factory.create(()).await { Ok(inner) => { - let service: BoxedServerService = - boxed::rcservice(StreamService::new(inner, pool)); + let service = boxed::service(StreamService::new(inner, pool)); Ok(vec![(token, service)]) } Err(_) => Err(()), diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index 80a44740..0d8241d9 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -5,7 +5,7 @@ use async_channel::{unbounded, Receiver, Sender}; use async_oneshot as oneshot; use crate::rt::{spawn, Arbiter}; -use crate::service::Service; +use crate::service::Container; use crate::time::{sleep, Millis, Sleep}; use crate::util::{ join_all, ready, select, stream_recv, BoxFuture, Either, Stream as FutStream, @@ -138,12 +138,12 @@ pub(super) struct Worker { struct WorkerService { factory: usize, status: WorkerServiceStatus, - service: BoxedServerService, + service: Container, } impl WorkerService { fn created(&mut self, service: BoxedServerService) { - self.service = service; + self.service = Container::new(service); self.status = WorkerServiceStatus::Unavailable; } } @@ -239,7 +239,7 @@ impl Worker { assert_eq!(token.0, wrk.services.len()); wrk.services.push(WorkerService { factory, - service, + service: service.into(), status: WorkerServiceStatus::Unavailable, }); } @@ -490,9 +490,12 @@ impl Future for Worker { self.factories[srv.factory].name(msg.token) ); } - let _ = srv - .service - .call((Some(guard), ServerMessage::Connect(msg.io))); + let srv = srv.service.clone(); + spawn(async move { + let _ = srv + .call((Some(guard), ServerMessage::Connect(msg.io))) + .await; + }); } else { return Poll::Ready(()); } @@ -509,7 +512,7 @@ mod tests { use super::*; use crate::io::Io; use crate::server::service::Factory; - use crate::service::{Service, ServiceFactory}; + use crate::service::{Ctx, Service, ServiceFactory}; use crate::util::{lazy, Ready}; #[derive(Clone, Copy, Debug)] @@ -569,7 +572,7 @@ mod tests { } } - fn call(&self, _: Io) -> Self::Future<'_> { + fn call<'a>(&'a self, _: Io, _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok(()) } } diff --git a/ntex/src/web/app.rs b/ntex/src/web/app.rs index 8b5e3a80..19fb3492 100644 --- a/ntex/src/web/app.rs +++ b/ntex/src/web/app.rs @@ -4,7 +4,7 @@ use crate::http::Request; use crate::router::ResourceDef; use crate::service::boxed::{self, BoxServiceFactory}; use crate::service::{map_config, pipeline_factory, IntoServiceFactory, PipelineFactory}; -use crate::service::{Identity, Middleware, Service, ServiceFactory, Stack}; +use crate::service::{Ctx, Identity, Middleware, Service, ServiceFactory, Stack}; use crate::util::{BoxFuture, Extensions, Ready}; use super::app_service::{AppFactory, AppService}; @@ -581,7 +581,7 @@ impl Service> for Filter { type Future<'f> = Ready, Err::Container>; #[inline] - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, _: Ctx<'a, Self>) -> Self::Future<'a> { Ready::Ok(req) } } @@ -591,7 +591,7 @@ mod tests { use super::*; use crate::http::header::{self, HeaderValue}; use crate::http::{Method, StatusCode}; - use crate::service::{fn_service, Service}; + use crate::service::fn_service; use crate::util::{Bytes, Ready}; use crate::web::test::{call_service, init_service, read_body, TestRequest}; use crate::web::{ @@ -604,7 +604,7 @@ mod tests { let srv = App::new() .service(web::resource("/test").to(|| async { HttpResponse::Ok() })) .finish() - .create(()) + .container(()) .await .unwrap(); let req = TestRequest::with_uri("/test").to_request(); @@ -628,7 +628,7 @@ mod tests { Ok(r.into_response(HttpResponse::MethodNotAllowed())) }) .with_config(Default::default()) - .create(()) + .container(()) .await .unwrap(); diff --git a/ntex/src/web/app_service.rs b/ntex/src/web/app_service.rs index dbf35e64..bfc9fefd 100644 --- a/ntex/src/web/app_service.rs +++ b/ntex/src/web/app_service.rs @@ -4,8 +4,10 @@ use std::{cell::RefCell, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; use crate::http::{Request, Response}; use crate::router::{Path, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; -use crate::service::{fn_service, Middleware, PipelineFactory, Service, ServiceFactory}; -use crate::util::{BoxFuture, Extensions}; +use crate::service::{ + fn_service, Ctx, Middleware, PipelineFactory, Service, ServiceCall, ServiceFactory, +}; +use crate::util::{BoxFuture, Either, Extensions}; use super::config::AppConfig; use super::error::ErrorRenderer; @@ -21,8 +23,8 @@ type HttpService = BoxService, WebResponse, Err::Container>; type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; -type BoxResponse<'f, Err: ErrorRenderer> = - BoxFuture<'f, Result>; +type BoxResponse<'a, Err: ErrorRenderer> = + ServiceCall<'a, HttpService, WebRequest>; type FnStateFactory = Box BoxFuture<'static, Result>>; /// Service factory to convert `Request` to a `WebRequest`. @@ -198,12 +200,12 @@ where { type Response = WebResponse; type Error = T::Error; - type Future<'f> = T::Future<'f> where T: 'f; + type Future<'f> = ServiceCall<'f, T, WebRequest> where T: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - fn call(&self, req: Request) -> Self::Future<'_> { + fn call<'a>(&'a self, req: Request, ctx: Ctx<'a, Self>) -> Self::Future<'a> { let (head, payload) = req.into_parts(); let req = if let Some(mut req) = self.pool.get_request() { @@ -223,7 +225,7 @@ where self.pool, ) }; - self.service.call(WebRequest::new(req)) + ctx.call(&self.service, WebRequest::new(req)) } } @@ -245,9 +247,14 @@ struct AppRouting { impl Service> for AppRouting { type Response = WebResponse; type Error = Err::Container; - type Future<'f> = BoxResponse<'f, Err>; - - fn call(&self, mut req: WebRequest) -> Self::Future<'_> { + type Future<'f> = + Either, BoxFuture<'f, Result>>; + + fn call<'a>( + &'a self, + mut req: WebRequest, + ctx: Ctx<'a, Self>, + ) -> Self::Future<'a> { let res = self.router.recognize_checked(&mut req, |req, guards| { if let Some(guards) = guards { for f in guards { @@ -260,12 +267,14 @@ impl Service> for AppRouting { }); if let Some((srv, _info)) = res { - srv.call(req) + Either::Left(ctx.call(srv, req)) } else if let Some(ref default) = self.default { - default.call(req) + Either::Left(ctx.call(default, req)) } else { let req = req.into_parts().0; - Box::pin(async { Ok(WebResponse::new(Response::NotFound().finish(), req)) }) + Either::Right(Box::pin(async { + Ok(WebResponse::new(Response::NotFound().finish(), req)) + })) } } } @@ -296,23 +305,28 @@ where } } - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, ctx: Ctx<'a, Self>) -> Self::Future<'a> { AppServiceResponse { - filter: self.filter.call(req), + filter: ctx.call(&self.filter, req), routing: &self.routing, endpoint: None, + ctx, } } } +type BoxAppServiceResponse<'a, Err: ErrorRenderer> = + ServiceCall<'a, AppRouting, WebRequest>; + pin_project_lite::pin_project! { pub struct AppServiceResponse<'f, F: Service>, Err: ErrorRenderer> where F: 'f { #[pin] - filter: F::Future<'f>, + filter: ServiceCall<'f, F, WebRequest>, routing: &'f AppRouting, - endpoint: Option>, + endpoint: Option>, + ctx: Ctx<'f, AppService>, } } @@ -335,7 +349,7 @@ where } else { return Poll::Pending; }; - *this.endpoint = Some(this.routing.call(res)); + *this.endpoint = Some(this.ctx.call(this.routing, res)); this = self.as_mut().project(); } } @@ -347,7 +361,6 @@ mod tests { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - use crate::service::Service; use crate::web::test::{init_service, TestRequest}; use crate::web::{self, App, HttpResponse}; diff --git a/ntex/src/web/config.rs b/ntex/src/web/config.rs index 88ffccd8..67e28e92 100644 --- a/ntex/src/web/config.rs +++ b/ntex/src/web/config.rs @@ -129,9 +129,9 @@ impl ServiceConfig { mod tests { use super::*; use crate::http::{Method, StatusCode}; + use crate::util::Bytes; use crate::web::test::{call_service, init_service, read_body, TestRequest}; use crate::web::{self, App, HttpRequest, HttpResponse}; - use crate::{service::Service, util::Bytes}; #[crate::rt_test] async fn test_configure_state() { diff --git a/ntex/src/web/middleware/compress.rs b/ntex/src/web/middleware/compress.rs index ee6a12c5..9cc2626d 100644 --- a/ntex/src/web/middleware/compress.rs +++ b/ntex/src/web/middleware/compress.rs @@ -4,7 +4,7 @@ use std::{cmp, future::Future, marker, pin::Pin, str::FromStr}; use crate::http::encoding::Encoder; use crate::http::header::{ContentEncoding, ACCEPT_ENCODING}; -use crate::service::{Middleware, Service}; +use crate::service::{Ctx, Middleware, Service, ServiceCall}; use crate::web::{BodyEncoding, ErrorRenderer, WebRequest, WebResponse}; #[derive(Debug, Clone)] @@ -71,7 +71,7 @@ where crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, ctx: Ctx<'a, Self>) -> Self::Future<'a> { // negotiate content-encoding let encoding = if let Some(val) = req.headers().get(&ACCEPT_ENCODING) { if let Ok(enc) = val.to_str() { @@ -85,7 +85,7 @@ where CompressResponse { encoding, - fut: self.service.call(req), + fut: ctx.call(&self.service, req), _t: marker::PhantomData, } } @@ -97,7 +97,7 @@ pin_project_lite::pin_project! { where S: 'f, E: 'f { #[pin] - fut: S::Future<'f>, + fut: ServiceCall<'f, S, WebRequest>, encoding: ContentEncoding, _t: marker::PhantomData, } diff --git a/ntex/src/web/middleware/defaultheaders.rs b/ntex/src/web/middleware/defaultheaders.rs index 566d0f09..4fdd76d5 100644 --- a/ntex/src/web/middleware/defaultheaders.rs +++ b/ntex/src/web/middleware/defaultheaders.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use crate::http::error::HttpError; use crate::http::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE}; -use crate::service::{Middleware, Service}; +use crate::service::{Ctx, Middleware, Service}; use crate::util::BoxFuture; use crate::web::{WebRequest, WebResponse}; @@ -115,9 +115,9 @@ where crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, ctx: Ctx<'a, Self>) -> Self::Future<'a> { Box::pin(async move { - let mut res = self.service.call(req).await?; + let mut res = ctx.call(&self.service, req).await?; // set response headers for (key, value) in self.inner.headers.iter() { @@ -141,7 +141,7 @@ where mod tests { use super::*; use crate::http::header::CONTENT_TYPE; - use crate::service::IntoService; + use crate::service::{Container, IntoService}; use crate::util::lazy; use crate::web::request::WebRequest; use crate::web::test::{ok_service, TestRequest}; @@ -149,9 +149,11 @@ mod tests { #[crate::rt_test] async fn test_default_headers() { - let mw = DefaultHeaders::new() - .header(CONTENT_TYPE, "0001") - .create(ok_service()); + let mw = Container::new( + DefaultHeaders::new() + .header(CONTENT_TYPE, "0001") + .create(ok_service()), + ); assert!(lazy(|cx| mw.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| mw.poll_shutdown(cx).is_ready()).await); @@ -166,9 +168,11 @@ mod tests { req.into_response(HttpResponse::Ok().header(CONTENT_TYPE, "0002").finish()), ) }; - let mw = DefaultHeaders::new() - .header(CONTENT_TYPE, "0001") - .create(srv.into_service()); + let mw = Container::new( + DefaultHeaders::new() + .header(CONTENT_TYPE, "0001") + .create(srv.into_service()), + ); let resp = mw.call(req).await.unwrap(); assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "0002"); } @@ -178,9 +182,11 @@ mod tests { let srv = |req: WebRequest| async move { Ok::<_, Error>(req.into_response(HttpResponse::Ok().finish())) }; - let mw = DefaultHeaders::new() - .content_type() - .create(srv.into_service()); + let mw = Container::new( + DefaultHeaders::new() + .content_type() + .create(srv.into_service()), + ); let req = TestRequest::default().to_srv_request(); let resp = mw.call(req).await.unwrap(); diff --git a/ntex/src/web/middleware/logger.rs b/ntex/src/web/middleware/logger.rs index 7f6e377a..cefdc82a 100644 --- a/ntex/src/web/middleware/logger.rs +++ b/ntex/src/web/middleware/logger.rs @@ -7,7 +7,7 @@ use regex::Regex; use crate::http::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::HeaderName; -use crate::service::{Middleware, Service}; +use crate::service::{Ctx, Middleware, Service, ServiceCall}; use crate::util::{Bytes, Either, HashSet}; use crate::web::{HttpResponse, WebRequest, WebResponse}; @@ -136,15 +136,15 @@ where { type Response = WebResponse; type Error = S::Error; - type Future<'f> = Either, S::Future<'f>> where S: 'f, E: 'f; + type Future<'f> = Either, ServiceCall<'f, S, WebRequest>> where S: 'f, E: 'f; crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); #[inline] - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, ctx: Ctx<'a, Self>) -> Self::Future<'a> { if self.inner.exclude.contains(req.path()) { - Either::Right(self.service.call(req)) + Either::Right(ctx.call(&self.service, req)) } else { let time = time::SystemTime::now(); let mut format = self.inner.format.clone(); @@ -155,7 +155,7 @@ where Either::Left(LoggerResponse { time, format: Some(format), - fut: self.service.call(req), + fut: ctx.call(&self.service, req), _t: PhantomData, }) } @@ -168,7 +168,7 @@ pin_project_lite::pin_project! { where S: 'f, E: 'f { #[pin] - fut: S::Future<'f>, + fut: ServiceCall<'f, S, WebRequest>, time: time::SystemTime, format: Option, _t: PhantomData @@ -448,7 +448,7 @@ impl<'a> fmt::Display for FormatDisplay<'a> { mod tests { use super::*; use crate::http::{header, StatusCode}; - use crate::service::{IntoService, Middleware}; + use crate::service::{Container, IntoService, Middleware}; use crate::util::lazy; use crate::web::test::{self, TestRequest}; use crate::web::{DefaultError, Error}; @@ -468,7 +468,7 @@ mod tests { let logger = Logger::new("%% %{User-Agent}i %{X-Test}o %{HOME}e %D %% test") .exclude("/test"); - let srv = Middleware::create(&logger, srv.into_service()); + let srv = Container::new(Middleware::create(&logger, srv.into_service())); assert!(lazy(|cx| srv.poll_ready(cx).is_ready()).await); assert!(lazy(|cx| srv.poll_shutdown(cx).is_ready()).await); diff --git a/ntex/src/web/resource.rs b/ntex/src/web/resource.rs index 365ce03c..eb4248eb 100644 --- a/ntex/src/web/resource.rs +++ b/ntex/src/web/resource.rs @@ -3,9 +3,11 @@ use std::{cell::RefCell, fmt, rc::Rc}; use crate::http::Response; use crate::router::{IntoPattern, ResourceDef}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; -use crate::service::{dev::AndThen, pipeline, pipeline_factory, Pipeline, PipelineFactory}; use crate::service::{ - Identity, IntoServiceFactory, Middleware, Service, ServiceFactory, Stack, + dev::AndThen, pipeline, pipeline_factory, Ctx, Pipeline, PipelineFactory, +}; +use crate::service::{ + Identity, IntoServiceFactory, Middleware, Service, ServiceCall, ServiceFactory, Stack, }; use crate::util::{BoxFuture, Either, Extensions, Ready}; @@ -22,6 +24,8 @@ type HttpService = type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; type ResourcePipeline = Pipeline, AndThen>>; +type BoxResponse<'a, Err: ErrorRenderer> = + ServiceCall<'a, HttpService, WebRequest>; /// *Resource* is an entry in resources table which corresponds to requested URL. /// @@ -456,6 +460,9 @@ impl ServiceFactory> for ResourceRouterFacto } } +type BoxResourceRouterResponse<'a, Err: ErrorRenderer> = + ServiceCall<'a, RouteService, WebRequest>; + pub struct ResourceRouter { state: Option, routes: Vec>, @@ -466,26 +473,30 @@ impl Service> for ResourceRouter { type Response = WebResponse; type Error = Err::Container; type Future<'f> = Either< - Ready, - BoxFuture<'f, Result>, + BoxResourceRouterResponse<'f, Err>, + Either, BoxResponse<'f, Err>>, >; - fn call(&self, mut req: WebRequest) -> Self::Future<'_> { + fn call<'a>( + &'a self, + mut req: WebRequest, + ctx: Ctx<'a, Self>, + ) -> Self::Future<'a> { for route in self.routes.iter() { if route.check(&mut req) { if let Some(ref state) = self.state { req.set_state_container(state.clone()); } - return Either::Right(route.call(req)); + return Either::Left(ctx.call(route, req)); } } if let Some(ref default) = self.default { - Either::Right(default.call(req)) + Either::Right(Either::Right(ctx.call(default, req))) } else { - Either::Left(Ready::Ok(WebResponse::new( + Either::Right(Either::Left(Ready::Ok(WebResponse::new( Response::MethodNotAllowed().finish(), req.into_parts().0, - ))) + )))) } } } diff --git a/ntex/src/web/responder.rs b/ntex/src/web/responder.rs index adf8da8e..359a0adc 100644 --- a/ntex/src/web/responder.rs +++ b/ntex/src/web/responder.rs @@ -404,7 +404,7 @@ pub(crate) mod tests { use crate::http::header::{HeaderValue, CONTENT_TYPE}; use crate::http::{Response as HttpResponse, StatusCode}; use crate::web::test::{init_service, TestRequest}; - use crate::{service::Service, util::Bytes, util::BytesMut, web}; + use crate::{util::Bytes, util::BytesMut, web}; fn responder>(responder: T) -> impl Responder { responder diff --git a/ntex/src/web/route.rs b/ntex/src/web/route.rs index d15f8cdb..08e8a38a 100644 --- a/ntex/src/web/route.rs +++ b/ntex/src/web/route.rs @@ -1,7 +1,7 @@ use std::{mem, rc::Rc}; use crate::util::{BoxFuture, Ready}; -use crate::{http::Method, service::Service, service::ServiceFactory}; +use crate::{http::Method, service::Ctx, service::Service, service::ServiceFactory}; use super::error::ErrorRenderer; use super::error_default::DefaultError; @@ -90,7 +90,7 @@ impl Service> for RouteService { type Future<'f> = BoxFuture<'f, Result>; #[inline] - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, _: Ctx<'a, Self>) -> Self::Future<'a> { self.handler.call(req) } } diff --git a/ntex/src/web/scope.rs b/ntex/src/web/scope.rs index 1daa2753..f2fa3513 100644 --- a/ntex/src/web/scope.rs +++ b/ntex/src/web/scope.rs @@ -6,7 +6,9 @@ use crate::http::Response; use crate::router::{IntoPattern, ResourceDef, Router}; use crate::service::boxed::{self, BoxService, BoxServiceFactory}; use crate::service::{pipeline_factory, IntoServiceFactory, PipelineFactory}; -use crate::service::{Identity, Middleware, Service, ServiceFactory, Stack}; +use crate::service::{ + Ctx, Identity, Middleware, Service, ServiceCall, ServiceFactory, Stack, +}; use crate::util::{BoxFuture, Either, Extensions, Ready}; use super::app::Filter; @@ -27,7 +29,7 @@ type HttpService = type HttpNewService = BoxServiceFactory<(), WebRequest, WebResponse, Err::Container, ()>; type BoxResponse<'a, Err: ErrorRenderer> = - BoxFuture<'a, Result>; + ServiceCall<'a, HttpService, WebRequest>; /// Resources scope. /// @@ -503,11 +505,12 @@ where } } - fn call(&self, req: WebRequest) -> Self::Future<'_> { + fn call<'a>(&'a self, req: WebRequest, ctx: Ctx<'a, Self>) -> Self::Future<'a> { ScopeServiceResponse { - filter: self.filter.call(req), + filter: ctx.call(&self.filter, req), routing: &self.routing, endpoint: None, + ctx, } } } @@ -517,9 +520,10 @@ pin_project_lite::pin_project! { where F: 'f { #[pin] - filter: F::Future<'f>, + filter: ServiceCall<'f, F, WebRequest>, routing: &'f ScopeRouter, - endpoint: Option< as Service>>::Future<'f>>, + ctx: Ctx<'f, ScopeService>, + endpoint: Option, WebRequest>>, } } @@ -536,15 +540,15 @@ where loop { if let Some(fut) = this.endpoint.as_mut() { return Pin::new(fut).poll(cx); - } else { - let res = if let Poll::Ready(res) = this.filter.poll(cx) { - res? - } else { - return Poll::Pending; - }; - *this.endpoint = Some(this.routing.call(res)); - this = self.as_mut().project(); } + + let res = if let Poll::Ready(res) = this.filter.poll(cx) { + res? + } else { + return Poll::Pending; + }; + *this.endpoint = Some(this.ctx.call(this.routing, res)); + this = self.as_mut().project(); } } } @@ -601,7 +605,11 @@ impl Service> for ScopeRouter { type Error = Err::Container; type Future<'f> = Either, Ready>; - fn call(&self, mut req: WebRequest) -> Self::Future<'_> { + fn call<'a>( + &'a self, + mut req: WebRequest, + ctx: Ctx<'a, Self>, + ) -> Self::Future<'a> { let res = self.router.recognize_checked(&mut req, |req, guards| { if let Some(guards) = guards { for f in guards { @@ -617,9 +625,9 @@ impl Service> for ScopeRouter { if let Some(ref state) = self.state { req.set_state_container(state.clone()); } - Either::Left(srv.call(req)) + Either::Left(ctx.call(srv, req)) } else if let Some(ref default) = self.default { - Either::Left(default.call(req)) + Either::Left(ctx.call(default, req)) } else { let req = req.into_parts().0; Either::Right(Ready::Ok(WebResponse::new( @@ -635,7 +643,7 @@ mod tests { use crate::http::body::{Body, ResponseBody}; use crate::http::header::{HeaderValue, CONTENT_TYPE}; use crate::http::{Method, StatusCode}; - use crate::service::{fn_service, Service}; + use crate::service::fn_service; use crate::util::{Bytes, Ready}; use crate::web::middleware::DefaultHeaders; use crate::web::request::WebRequest; diff --git a/ntex/src/web/service.rs b/ntex/src/web/service.rs index 9fe56a04..408687d5 100644 --- a/ntex/src/web/service.rs +++ b/ntex/src/web/service.rs @@ -392,7 +392,6 @@ tuple_web_service!((0,A),(1,B),(2,C),(3,D),(4,E),(5,F),(6,G),(7,H),(8,I),(9,J),( mod tests { use super::*; use crate::http::{Method, StatusCode}; - use crate::service::Service; use crate::web::test::{init_service, TestRequest}; use crate::web::{self, guard, App, DefaultError, HttpResponse}; diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 1f2622ee..769abb24 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -3,8 +3,7 @@ use std::{fmt, net, net::SocketAddr, rc::Rc, sync::mpsc, thread}; #[cfg(feature = "cookie")] use coo_kie::Cookie; -use serde::de::DeserializeOwned; -use serde::Serialize; +use serde::{de::DeserializeOwned, Serialize}; use crate::http::body::MessageBody; use crate::http::client::{Client, ClientRequest, ClientResponse, Connector}; @@ -14,7 +13,7 @@ use crate::http::test::TestRequest as HttpTestRequest; use crate::http::{HttpService, Method, Payload, Request, StatusCode, Uri, Version}; use crate::router::{Path, ResourceDef}; use crate::service::{ - map_config, IntoService, IntoServiceFactory, Service, ServiceFactory, + map_config, Container, IntoService, IntoServiceFactory, Service, ServiceFactory, }; use crate::time::{sleep, Millis, Seconds}; use crate::util::{stream_recv, Bytes, BytesMut, Extensions, Ready, Stream}; @@ -70,14 +69,14 @@ pub fn default_service( /// ``` pub async fn init_service( app: R, -) -> impl Service +) -> Container> where R: IntoServiceFactory, S: ServiceFactory, S::InitError: std::fmt::Debug, { let srv = app.into_factory(); - srv.create(AppConfig::default()).await.unwrap() + srv.container(AppConfig::default()).await.unwrap() } /// Calls service and waits for response future completion. @@ -103,7 +102,7 @@ where /// assert_eq!(resp.status(), StatusCode::OK); /// } /// ``` -pub async fn call_service(app: &S, req: R) -> S::Response +pub async fn call_service(app: &Container, req: R) -> S::Response where S: Service, E: std::fmt::Debug, @@ -136,7 +135,7 @@ where /// assert_eq!(result, Bytes::from_static(b"welcome!")); /// } /// ``` -pub async fn read_response(app: &S, req: Request) -> Bytes +pub async fn read_response(app: &Container, req: Request) -> Bytes where S: Service, { @@ -235,7 +234,7 @@ where /// let result: Person = test::read_response_json(&mut app, req).await; /// } /// ``` -pub async fn read_response_json(app: &S, req: Request) -> T +pub async fn read_response_json(app: &Container, req: Request) -> T where S: Service, T: DeserializeOwned, diff --git a/ntex/src/web/types/payload.rs b/ntex/src/web/types/payload.rs index af58e739..1ec89a1a 100644 --- a/ntex/src/web/types/payload.rs +++ b/ntex/src/web/types/payload.rs @@ -159,8 +159,7 @@ impl FromRequest for Bytes { } let limit = cfg.limit; - let fut = HttpMessageBody::new(req, payload).limit(limit); - Either::Left(Box::pin(async move { fut.await })) + Either::Left(Box::pin(HttpMessageBody::new(req, payload).limit(limit))) } } diff --git a/ntex/src/web/types/state.rs b/ntex/src/web/types/state.rs index 4e62f2ec..04979f04 100644 --- a/ntex/src/web/types/state.rs +++ b/ntex/src/web/types/state.rs @@ -101,7 +101,6 @@ mod tests { use super::*; use crate::http::StatusCode; - use crate::service::Service; use crate::web::test::{self, init_service, TestRequest}; use crate::web::{self, App, HttpResponse}; diff --git a/ntex/src/web/ws.rs b/ntex/src/web/ws.rs index 10c29e9e..7921e310 100644 --- a/ntex/src/web/ws.rs +++ b/ntex/src/web/ws.rs @@ -5,7 +5,7 @@ pub use crate::ws::{CloseCode, CloseReason, Frame, Message, WsSink}; use crate::http::{body::BodySize, h1, StatusCode}; use crate::service::{ - apply_fn, fn_factory_with_config, IntoServiceFactory, Service, ServiceFactory, + apply_fn, fn_factory_with_config, IntoServiceFactory, ServiceFactory, }; use crate::web::{HttpRequest, HttpResponse}; use crate::ws::{self, error::HandshakeError, error::WsError, handshake}; diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index c81686f7..a33b6d78 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -16,7 +16,7 @@ use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue, AUTHORIZATIO use crate::http::{body::BodySize, client::ClientResponse, error::HttpError, h1}; use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri}; use crate::io::{Base, DispatchItem, Dispatcher, Filter, Io, Layer, Sealed}; -use crate::service::{apply_fn, into_service, IntoService, Service}; +use crate::service::{apply_fn, into_service, Container, IntoService, Service}; use crate::time::{timeout, Millis, Seconds}; use crate::{channel::mpsc, rt, util::Ready, ws}; @@ -25,7 +25,7 @@ use super::transport::WsTransport; /// `WebSocket` client builder pub struct WsClient { - connector: T, + connector: Container, head: Rc, addr: Option, max_size: usize, @@ -630,7 +630,7 @@ where } Ok(WsClient { - connector: inner.connector, + connector: inner.connector.into(), head: Rc::new(inner.head), addr: inner.addr, max_size: inner.max_size, diff --git a/ntex/tests/connect.rs b/ntex/tests/connect.rs index 6caee431..729436ba 100644 --- a/ntex/tests/connect.rs +++ b/ntex/tests/connect.rs @@ -3,7 +3,7 @@ use std::{io, rc::Rc, sync::Arc}; use ntex::codec::BytesCodec; use ntex::connect::Connect; use ntex::io::{types::PeerAddr, Io}; -use ntex::service::{fn_service, pipeline_factory, Service, ServiceFactory}; +use ntex::service::{fn_service, pipeline_factory, Container, ServiceFactory}; use ntex::{server::test_server, time, util::Bytes}; #[cfg(feature = "openssl")] @@ -97,7 +97,7 @@ async fn test_openssl_string() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let conn = ntex::connect::openssl::Connector::new(builder.build()); + let conn = Container::new(ntex::connect::openssl::Connector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); @@ -140,7 +140,7 @@ async fn test_openssl_read_before_error() { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); - let conn = ntex::connect::openssl::Connector::new(builder.build()); + let conn = Container::new(ntex::connect::openssl::Connector::new(builder.build())); let addr = format!("127.0.0.1:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); let item = io.recv(&Rc::new(BytesCodec)).await.unwrap().unwrap(); @@ -185,7 +185,7 @@ async fn test_rustls_string() { .with_custom_certificate_verifier(Arc::new(danger::NoCertificateVerification {})) .with_no_client_auth(); - let conn = ntex::connect::rustls::Connector::new(config); + let conn = Container::new(ntex::connect::rustls::Connector::new(config)); let addr = format!("localhost:{}", srv.addr().port()); let io = conn.call(addr.into()).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); @@ -225,13 +225,13 @@ async fn test_static_str() { }) }); - let conn = ntex::connect::Connector::new(); + let conn = Container::new(ntex::connect::Connector::new()); let io = conn.call(Connect::with("10", srv.addr())).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); let connect = Connect::new("127.0.0.1".to_owned()); - let conn = ntex::connect::Connector::new(); + let conn = Container::new(ntex::connect::Connector::new()); let io = conn.call(connect).await; assert!(io.is_err()); } @@ -248,7 +248,7 @@ async fn test_create() { }); let factory = ntex::connect::Connector::new(); - let conn = factory.create(()).await.unwrap(); + let conn = factory.container(()).await.unwrap(); let io = conn.call(Connect::with("10", srv.addr())).await.unwrap(); assert_eq!(io.query::().get().unwrap(), srv.addr().into()); } @@ -265,7 +265,7 @@ async fn test_uri() { }) }); - let conn = ntex::connect::Connector::default(); + let conn = Container::new(ntex::connect::Connector::default()); let addr = ntex::http::Uri::try_from(format!("https://localhost:{}", srv.addr().port())) .unwrap(); @@ -285,7 +285,7 @@ async fn test_rustls_uri() { }) }); - let conn = ntex::connect::Connector::default(); + let conn = Container::new(ntex::connect::Connector::default()); let addr = ntex::http::Uri::try_from(format!("https://localhost:{}", srv.addr().port())) .unwrap(); diff --git a/ntex/tests/http_ws.rs b/ntex/tests/http_ws.rs index 871cbd8d..f51f128f 100644 --- a/ntex/tests/http_ws.rs +++ b/ntex/tests/http_ws.rs @@ -4,7 +4,7 @@ use ntex::codec::BytesCodec; use ntex::http::test::server as test_server; use ntex::http::{body, h1, test, HttpService, Request, Response, StatusCode}; use ntex::io::{DispatchItem, Dispatcher, Io}; -use ntex::service::{fn_factory, Service}; +use ntex::service::{fn_factory, Ctx, Service}; use ntex::util::{BoxFuture, ByteString, Bytes, Ready}; use ntex::ws::{self, handshake, handshake_response}; @@ -40,7 +40,11 @@ impl Service<(Request, Io, h1::Codec)> for WsService { Poll::Ready(Ok(())) } - fn call(&self, (req, io, codec): (Request, Io, h1::Codec)) -> Self::Future<'_> { + fn call<'a>( + &'a self, + (req, io, codec): (Request, Io, h1::Codec), + _: Ctx<'a, Self>, + ) -> Self::Future<'a> { let fut = async move { let res = handshake(req.head()).unwrap().message_body(());