Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async fn for Service::ready() and Service::shutdown() #364

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Starting ntex v0.5 async runtime must be selected as a feature. Available option

```toml
[dependencies]
ntex = { version = "1.0", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
```

## Documentation & community resources
Expand Down
2 changes: 1 addition & 1 deletion ntex-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ simdutf8 = { version = "0.1.4", optional = true }
[dev-dependencies]
serde_test = "1"
serde_json = "1"
ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
ntex-bytes = { path = ".", features = ["mpool"] }
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ pin-project-lite = "0.2"
rand = "0.8"
env_logger = "0.11"

ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
2 changes: 1 addition & 1 deletion ntex-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ syn = { version = "^1", features = ["full", "parsing"] }
proc-macro2 = "^1"

[dev-dependencies]
ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
futures = "0.3"
env_logger = "0.11"
2 changes: 1 addition & 1 deletion ntex-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ thiserror = "1.0"

[dev-dependencies]
env_logger = "0.11"
ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
2 changes: 1 addition & 1 deletion ntex-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ socket2 = "0.5"
oneshot = { version = "0.1", default-features = false, features = ["async"] }

[dev-dependencies]
ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
ntex-macros = "0.1.3"

[target.'cfg(target_family = "unix")'.dependencies]
Expand Down
4 changes: 2 additions & 2 deletions ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ path = "src/lib.rs"
slab = "0.4"

[dev-dependencies]
ntex = { version = "1", features = ["tokio"] }
ntex-util = "1"
ntex = { version = "2", features = ["tokio"] }
ntex-util = "2"
2 changes: 1 addition & 1 deletion ntex-tls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tls_openssl = { version = "0.10", package = "openssl", optional = true }
tls_rust = { version = "0.23", package = "rustls", optional = true }

[dev-dependencies]
ntex = { version = "1", features = ["openssl", "rustls", "tokio"] }
ntex = { version = "2", features = ["openssl", "rustls", "tokio"] }
env_logger = "0.11"
rustls-pemfile = "2"
webpki-roots = "0.26"
2 changes: 1 addition & 1 deletion ntex-tls/examples/rustls-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> io::Result<()> {
.with_no_client_auth();

// rustls connector
let connector = connect::rustls::Connector::new(config.clone());
let connector = connect::rustls::TlsConnector::new(config.clone());

//let io = connector.connect("www.rust-lang.org:443").await.unwrap();
let io = connector.connect("127.0.0.1:8443").await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion ntex-tls/examples/simple-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn main() -> io::Result<()> {
builder.set_verify(SslVerifyMode::NONE);

// openssl connector
let connector = connect::openssl::Connector::new(builder.build());
let connector = connect::openssl::SslConnector::new(builder.build());

let io = connector.connect("127.0.0.1:8443").await.unwrap();
println!("Connected to ssl server");
Expand Down
1 change: 1 addition & 0 deletions ntex-tls/src/counter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(dead_code)]
use std::{cell::Cell, future::poll_fn, rc::Rc, task, task::Poll};

use ntex_util::task::LocalWaker;
Expand Down
2 changes: 1 addition & 1 deletion ntex-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ futures-sink = { version = "0.3", default-features = false, features = ["alloc"]
pin-project-lite = "0.2"

[dev-dependencies]
ntex = { version = "1", features = ["tokio"] }
ntex = { version = "2", features = ["tokio"] }
ntex-macros = "0.1.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.0.0] - 2024-05-28

* Use "async fn" for Service::ready() and Service::shutdown()

## [1.2.1] - 2024-03-28

* Feature gate websocket support #320
Expand Down
16 changes: 8 additions & 8 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "1.2.1"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -60,16 +60,16 @@ brotli = ["dep:brotli2"]
ntex-codec = "0.6.2"
ntex-http = "0.1.12"
ntex-router = "0.5.3"
ntex-service = "2.0.1"
ntex-service = "3.0"
ntex-macros = "0.1.3"
ntex-util = "1.0.1"
ntex-util = "2.0"
ntex-bytes = "0.1.25"
ntex-server = "1.0.5"
ntex-h2 = "0.5.4"
ntex-server = "2.0"
ntex-h2 = "1.0"
ntex-rt = "0.4.12"
ntex-io = "1.2.0"
ntex-net = "1.0.1"
ntex-tls = "1.1.0"
ntex-io = "2.0"
ntex-net = "2.0"
ntex-tls = "2.0"

base64 = "0.22"
bitflags = "2"
Expand Down
39 changes: 14 additions & 25 deletions ntex/src/http/client/connector.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{fmt, task::Context, task::Poll, time::Duration};
use std::{fmt, time::Duration};

use ntex_h2::{self as h2};

use crate::connect::{Connect as TcpConnect, Connector as TcpConnector};
use crate::service::{apply_fn, boxed, Service, ServiceCtx};
use crate::time::{Millis, Seconds};
use crate::util::{timeout::TimeoutError, timeout::TimeoutService};
use crate::util::{join, timeout::TimeoutError, timeout::TimeoutService};
use crate::{http::Uri, io::IoBoxed};

use super::{connection::Connection, error::ConnectError, pool::ConnectionPool, Connect};
Expand Down Expand Up @@ -273,31 +273,20 @@
type Response = <ConnectionPool<T> as Service<Connect>>::Response;
type Error = ConnectError;

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self.tcp_pool.poll_ready(cx)?.is_ready();
let ready = if let Some(ref ssl_pool) = self.ssl_pool {
ssl_pool.poll_ready(cx)?.is_ready() && ready
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
if let Some(ref ssl_pool) = self.ssl_pool {
let (r1, r2) = join(ctx.ready(&self.tcp_pool), ctx.ready(ssl_pool)).await;
r1?;
r2
} else {
ready
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
ctx.ready(&self.tcp_pool).await

Check warning on line 282 in ntex/src/http/client/connector.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/client/connector.rs#L282

Added line #L282 was not covered by tests
}
}

fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let tcp_ready = self.tcp_pool.poll_shutdown(cx).is_ready();
let ssl_ready = self
.ssl_pool
.as_ref()
.map(|pool| pool.poll_shutdown(cx).is_ready())
.unwrap_or(true);
if tcp_ready && ssl_ready {
Poll::Ready(())
} else {
Poll::Pending
async fn shutdown(&self) {
self.tcp_pool.shutdown().await;
if let Some(ref ssl_pool) = self.ssl_pool {
ssl_pool.shutdown().await;
}
}

Expand All @@ -322,11 +311,11 @@
#[cfg(test)]
mod tests {
use super::*;
use crate::util::lazy;
use crate::{service::Pipeline, util::lazy};

#[crate::rt_test]
async fn test_readiness() {
let conn = Connector::default().finish();
let conn = Pipeline::new(Connector::default().finish()).bind();
assert!(lazy(|cx| conn.poll_ready(cx).is_ready()).await);
assert!(lazy(|cx| conn.poll_shutdown(cx).is_ready()).await);
}
Expand Down
4 changes: 2 additions & 2 deletions ntex/src/http/client/h2proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where
if !eof {
// sending body is async process, we can handle upload and download
// at the same time
crate::rt::spawn(async move {
let _ = crate::rt::spawn(async move {
if let Err(e) = send_body(body, &snd_stream).await {
log::error!("Cannot send body: {:?}", e);
snd_stream.reset(frame::Reason::INTERNAL_ERROR);
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn get_response(
log::debug!("Creating local payload stream for {:?}", stream.id());
let (mut pl, payload) =
payload::Payload::create(stream.empty_capacity());
crate::rt::spawn(async move {
let _ = crate::rt::spawn(async move {
loop {
let h2::Message { stream, kind } =
match rcv_stream.recv().await {
Expand Down
20 changes: 13 additions & 7 deletions ntex/src/http/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where
}));

// start pool support future
crate::rt::spawn(ConnectionPoolSupport {
let _ = crate::rt::spawn(ConnectionPoolSupport {
connector: connector.clone(),
inner: inner.clone(),
waiters: waiters.clone(),
Expand Down Expand Up @@ -117,8 +117,13 @@ where
type Response = Connection;
type Error = ConnectError;

crate::forward_poll_ready!(connector);
crate::forward_poll_shutdown!(connector);
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
self.connector.ready().await
}

async fn shutdown(&self) {
self.connector.shutdown().await
}

async fn call(
&self,
Expand Down Expand Up @@ -252,7 +257,7 @@ impl Inner {
|| (now - conn.created) > self.conn_lifetime
{
if let ConnectionType::H1(io) = conn.io {
spawn(async move {
let _ = spawn(async move {
let _ = io.shutdown().await;
});
}
Expand Down Expand Up @@ -419,7 +424,7 @@ where
let disconnect_timeout = inner.borrow().disconnect_timeout;

#[allow(clippy::redundant_async_block)]
spawn(async move {
let _ = spawn(async move {
OpenConnection::<T> {
tx: Some(tx),
key: key.clone(),
Expand Down Expand Up @@ -576,7 +581,7 @@ impl Acquired {
);
match io {
ConnectionType::H1(io) => {
spawn(async move {
let _ = spawn(async move {
let _ = io.shutdown().await;
});
}
Expand Down Expand Up @@ -634,7 +639,8 @@ mod tests {
h2::Config::client(),
)
.clone(),
);
)
.bind();

// uri must contain authority
let req = Connect {
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl DateService {

// periodic date update
let s = self.clone();
crate::rt::spawn(async move {
let _ = crate::rt::spawn(async move {
sleep(Millis(500)).await;
s.0.current.set(false);
});
Expand Down
2 changes: 1 addition & 1 deletion ntex/src/http/h1/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl<F: Filter> Upgrade<F> {
H: FnOnce(Request, Io<F>, Codec) -> R + 'static,
R: Future<Output = O>,
{
crate::rt::spawn(async move {
let _ = crate::rt::spawn(async move {
let _ = f(self.req, self.io, self.codec).await;
});
ControlAck {
Expand Down
49 changes: 15 additions & 34 deletions ntex/src/http/h1/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{error::Error, fmt, marker, rc::Rc, task::Context, task::Poll};
use std::{error::Error, fmt, marker, rc::Rc};

use crate::http::body::MessageBody;
use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::error::{DispatchError, ResponseError};
use crate::http::{request::Request, response::Response};
use crate::io::{types, Filter, Io};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::join;

use super::control::{Control, ControlAck};
use super::default::DefaultControlService;
Expand Down Expand Up @@ -208,43 +209,23 @@
type Response = ();
type Error = DispatchError;

fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let cfg = self.config.as_ref();

let ready1 = cfg
.control
.poll_ready(cx)
.map_err(|e| {
log::error!("Http control service readiness error: {:?}", e);
DispatchError::Control(Box::new(e))
})?
.is_ready();

let ready2 = cfg
.service
.poll_ready(cx)
.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))
})?
.is_ready();

if ready1 && ready2 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
ready1.map_err(|e| {
log::error!("Http control service readiness error: {:?}", e);
DispatchError::Control(Box::new(e))

Check warning on line 218 in ntex/src/http/h1/service.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/h1/service.rs#L217-L218

Added lines #L217 - L218 were not covered by tests
})?;
ready2.map_err(|e| {
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(Box::new(e))

Check warning on line 222 in ntex/src/http/h1/service.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/h1/service.rs#L221-L222

Added lines #L221 - L222 were not covered by tests
})
}

fn poll_shutdown(&self, cx: &mut Context<'_>) -> Poll<()> {
let ready1 = self.config.control.poll_shutdown(cx).is_ready();
let ready2 = self.config.service.poll_shutdown(cx).is_ready();

if ready1 && ready2 {
Poll::Ready(())
} else {
Poll::Pending
}
async fn shutdown(&self) {
self.config.control.shutdown().await;
self.config.service.shutdown().await;

Check warning on line 228 in ntex/src/http/h1/service.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/h1/service.rs#L226-L228

Added lines #L226 - L228 were not covered by tests
}

async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
Expand Down
Loading
Loading