From decc2282f2478d392262449218b045f307291b76 Mon Sep 17 00:00:00 2001 From: w-sodalite <70255432+w-sodalite@users.noreply.github.com> Date: Sat, 20 Jan 2024 14:35:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8F=8D=E5=90=91=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E8=BD=AC=E5=8F=91=E6=80=A7=E8=83=BD=20(#13)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 优化反向代理转发请求的性能 --- Cargo.lock | 3 +- Cargo.toml | 1 + examples/echo.yaml | 14 + examples/proxy.yaml | 19 ++ examples/satex.yaml | 4 +- satex-core/src/config/mod.rs | 62 ----- satex-layer/Cargo.toml | 2 - satex-layer/src/make/default/mod.rs | 1 - satex-layer/src/make/default/registry.rs | 7 +- .../src/make/default/set_http_client.rs | 43 --- satex-layer/src/make/path_strip/mod.rs | 11 +- satex-serve/src/router/mod.rs | 11 +- satex-service/Cargo.toml | 1 + satex-service/src/make/proxy/make.rs | 251 +++++++++++++++++- satex-service/src/make/proxy/mod.rs | 18 +- 15 files changed, 314 insertions(+), 134 deletions(-) create mode 100644 examples/echo.yaml create mode 100644 examples/proxy.yaml delete mode 100644 satex-layer/src/make/default/set_http_client.rs diff --git a/Cargo.lock b/Cargo.lock index c6b4b67..12eeaf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -952,8 +952,6 @@ dependencies = [ "futures", "http-body-util", "hyper", - "hyper-rustls", - "hyper-util", "leaky-bucket", "satex-core", "satex-discovery", @@ -1009,6 +1007,7 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", + "rustls", "satex-core", "satex-discovery", "serde", diff --git a/Cargo.toml b/Cargo.toml index 9ad7ebf..421f84e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ tracing = { version = "0.1" } futures = { version = "0.3" } qstring = { version = "0.7" } cookie = { version = "0.18" } +rustls = { version = "0.22.0" } path-tree = { version = "0.7" } serde_yaml = { version = "0.9" } hyper-util = { version = "0.1" } diff --git a/examples/echo.yaml b/examples/echo.yaml new file mode 100644 index 0000000..eba9b76 --- /dev/null +++ b/examples/echo.yaml @@ -0,0 +1,14 @@ +# 服务配置 +server: + # 服务端口 + port: 8003 + +# 路由配置 +router: + # 路由表 + routes: + # 静态文件 + - id: echo + matchers: + - Path=/echo/+ + service: Echo \ No newline at end of file diff --git a/examples/proxy.yaml b/examples/proxy.yaml new file mode 100644 index 0000000..e6ceee4 --- /dev/null +++ b/examples/proxy.yaml @@ -0,0 +1,19 @@ +# 服务配置 +server: + # 服务端口 + port: 8002 + +# 路由配置 +router: + # 路由表 + routes: + # 静态文件 + - id: proxy + matchers: + - Path=/backend/+ + layers: + - PathStrip=1 + service: + kind: Proxy + args: + uri: http://127.0.0.1:3000 \ No newline at end of file diff --git a/examples/satex.yaml b/examples/satex.yaml index 9b7804c..2c5a960 100644 --- a/examples/satex.yaml +++ b/examples/satex.yaml @@ -2,7 +2,7 @@ satex: # 日志配置 tracing: # 日志级别 - max_level: info + max_level: warn # 是否使用ANSI ansi: true # 是否显示级别 @@ -18,3 +18,5 @@ satex: serves: - examples/static.yaml - examples/tls/static.yaml + - examples/proxy.yaml + - examples/echo.yaml diff --git a/satex-core/src/config/mod.rs b/satex-core/src/config/mod.rs index 185c51f..1d188b0 100644 --- a/satex-core/src/config/mod.rs +++ b/satex-core/src/config/mod.rs @@ -134,12 +134,6 @@ pub struct ServeConfig { /// #[serde(default)] discovery: Vec, - - /// - /// Http Client配置 - /// - #[serde(default)] - client: Client, } impl ServeConfig { @@ -155,9 +149,6 @@ impl ServeConfig { pub fn discovery(&self) -> &[Metadata] { &self.discovery } - pub fn client(&self) -> &Client { - &self.client - } pub fn from_yaml>(path: P) -> Result { let display = path.as_ref().display(); let bytes = std::fs::read(path.as_ref()) @@ -384,59 +375,6 @@ impl Route { } } -#[derive(Debug, Clone, Deserialize)] -pub struct Client { - #[serde(default = "Client::default_pool_max_idle_per_host")] - pool_max_idle_per_host: usize, - - #[serde(default = "Client::default_pool_idle_timeout_secs")] - pool_idle_timeout_secs: u64, - - #[serde(default = "Client::default_retry_canceled_requests")] - retry_canceled_requests: bool, - - #[serde(default = "Client::default_set_host")] - set_host: bool, -} - -impl Default for Client { - fn default() -> Self { - Self { - pool_max_idle_per_host: Self::default_pool_max_idle_per_host(), - pool_idle_timeout_secs: Self::default_pool_idle_timeout_secs(), - retry_canceled_requests: Self::default_retry_canceled_requests(), - set_host: Self::default_set_host(), - } - } -} - -impl Client { - fn default_pool_max_idle_per_host() -> usize { - 16 - } - fn default_pool_idle_timeout_secs() -> u64 { - 60 - } - fn default_retry_canceled_requests() -> bool { - true - } - fn default_set_host() -> bool { - true - } - pub fn pool_max_idle_per_host(&self) -> usize { - self.pool_max_idle_per_host - } - pub fn pool_idle_timeout_secs(&self) -> u64 { - self.pool_idle_timeout_secs - } - pub fn retry_canceled_requests(&self) -> bool { - self.retry_canceled_requests - } - pub fn set_host(&self) -> bool { - self.set_host - } -} - #[derive(Debug, Default, Clone, Deserialize)] pub struct Tls { /// diff --git a/satex-layer/Cargo.toml b/satex-layer/Cargo.toml index 5d1ea7b..e189fac 100644 --- a/satex-layer/Cargo.toml +++ b/satex-layer/Cargo.toml @@ -15,13 +15,11 @@ hyper = { workspace = true } bytes = { workspace = true } tracing = { workspace = true } futures = { workspace = true } -hyper-rustls = { workspace = true } leaky-bucket = { workspace = true } aho-corasick = { workspace = true } http-body-util = { workspace = true } tower = { workspace = true, features = ["limit"] } serde = { workspace = true, features = ["derive"] } -hyper-util = { workspace = true, features = ["client"] } tower-http = { workspace = true, features = [ "limit", "set-header", diff --git a/satex-layer/src/make/default/mod.rs b/satex-layer/src/make/default/mod.rs index d06ae07..b0b9d17 100644 --- a/satex-layer/src/make/default/mod.rs +++ b/satex-layer/src/make/default/mod.rs @@ -3,7 +3,6 @@ use satex_core::export_make; mod make; mod registry; mod set_discovery; -mod set_http_client; mod trace; export_make!(MakeDefaultRouteServiceLayer); diff --git a/satex-layer/src/make/default/registry.rs b/satex-layer/src/make/default/registry.rs index 04a8766..a17d099 100644 --- a/satex-layer/src/make/default/registry.rs +++ b/satex-layer/src/make/default/registry.rs @@ -2,7 +2,6 @@ use satex_core::config::ServeConfig; use satex_core::{registry, Error}; use crate::make::default::set_discovery::MakeSetDiscoveryLayer; -use crate::make::default::set_http_client::MakeSetHttpClientLayer; use crate::make::default::trace::MakeTraceLayer; use crate::make::default::{ArcMakeDefaultRouteServiceLayer, MakeDefaultRouteServiceLayer}; use crate::NamedRouteServiceLayer; @@ -10,11 +9,7 @@ use crate::NamedRouteServiceLayer; registry!( MakeDefaultRouteServiceLayerRegistry, ArcMakeDefaultRouteServiceLayer, - [ - MakeSetDiscoveryLayer, - MakeSetHttpClientLayer, - MakeTraceLayer - ] + [MakeSetDiscoveryLayer, MakeTraceLayer] ); impl MakeDefaultRouteServiceLayerRegistry { diff --git a/satex-layer/src/make/default/set_http_client.rs b/satex-layer/src/make/default/set_http_client.rs deleted file mode 100644 index 275c62a..0000000 --- a/satex-layer/src/make/default/set_http_client.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::time::Duration; - -use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; -use hyper_util::client::legacy::connect::HttpConnector; -use hyper_util::client::legacy::Client; -use hyper_util::rt::TokioExecutor; -use tower_http::add_extension::AddExtensionLayer; - -use satex_core::config::ServeConfig; -use satex_core::http::Body; -use satex_core::{satex_error, Error}; - -use crate::make::default::MakeDefaultRouteServiceLayer; - -type HttpClient = Client, Body>; - -#[derive(Default)] -pub struct MakeSetHttpClientLayer; - -impl MakeDefaultRouteServiceLayer for MakeSetHttpClientLayer { - type Layer = AddExtensionLayer; - - fn name(&self) -> &'static str { - "SetHttpClient" - } - - fn make(&self, config: &ServeConfig) -> Result { - let config = config.client(); - let connector = HttpsConnectorBuilder::default() - .with_native_roots() - .map(|builder| builder.https_or_http()) - .map(|builder| builder.enable_all_versions().build()) - .map_err(|e| satex_error!(e))?; - Ok(AddExtensionLayer::new( - Client::builder(TokioExecutor::new()) - .pool_max_idle_per_host(config.pool_max_idle_per_host()) - .pool_idle_timeout(Duration::from_secs(config.pool_idle_timeout_secs())) - .retry_canceled_requests(config.retry_canceled_requests()) - .set_host(config.set_host()) - .build(connector), - )) - } -} diff --git a/satex-layer/src/make/path_strip/mod.rs b/satex-layer/src/make/path_strip/mod.rs index 254270f..28d73d2 100644 --- a/satex-layer/src/make/path_strip/mod.rs +++ b/satex-layer/src/make/path_strip/mod.rs @@ -36,9 +36,8 @@ where fn call(&mut self, mut req: Request) -> Self::Future { let level = self.level; if level > 0 { - let uri = req.uri(); - debug!("path strip source: {}", uri); - let mut path = uri + let source = req.uri(); + let mut path = source .path() .split('/') .filter(|segment| !segment.is_empty()) @@ -52,14 +51,14 @@ where .map(|query| format!("{}?{}", path, query)) .unwrap_or(path); let mut builder = Uri::builder().path_and_query(path_and_query); - if let Some(schema) = uri.scheme_str() { + if let Some(schema) = source.scheme_str() { builder = builder.scheme(schema); } - if let Some(authority) = uri.authority() { + if let Some(authority) = source.authority() { builder = builder.authority(authority.as_str()); } let uri = builder.build().expect("build uri error!"); - debug!("path strip target: {}", uri); + debug!("Strip path: {} => {}", source, uri); *req.uri_mut() = uri; } self.inner.call(req) diff --git a/satex-serve/src/router/mod.rs b/satex-serve/src/router/mod.rs index 0ce8c2c..62aa868 100644 --- a/satex-serve/src/router/mod.rs +++ b/satex-serve/src/router/mod.rs @@ -83,10 +83,13 @@ where match >>::ready_oneshot(route).await { Ok(mut route) => match route.call(request).await { Ok(response) => Ok(response), - Err(e) => Ok(make_response( - format!("{e}"), - StatusCode::INTERNAL_SERVER_ERROR, - )), + Err(e) => { + warn!("[{:?}] call request error: {}", route, e); + Ok(make_response( + format!("{e}"), + StatusCode::INTERNAL_SERVER_ERROR, + )) + } }, Err(e) => Ok(make_response( format!("{e}"), diff --git a/satex-service/Cargo.toml b/satex-service/Cargo.toml index 4bf743c..f262260 100644 --- a/satex-service/Cargo.toml +++ b/satex-service/Cargo.toml @@ -14,6 +14,7 @@ serde = { workspace = true } tower = { workspace = true } hyper = { workspace = true } bytes = { workspace = true } +rustls = { workspace = true } tracing = { workspace = true } futures = { workspace = true } tower-http = { workspace = true, features = ["fs"] } diff --git a/satex-service/src/make/proxy/make.rs b/satex-service/src/make/proxy/make.rs index b20145f..6fb70e8 100644 --- a/satex-service/src/make/proxy/make.rs +++ b/satex-service/src/make/proxy/make.rs @@ -1,14 +1,261 @@ +use std::time::Duration; + +use hyper_rustls::{ConfigBuilderExt, HttpsConnector}; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::Client; +use hyper_util::rt::TokioExecutor; +use rustls::ClientConfig as ClientTlsConfig; +use serde::Deserialize; + use satex_core::config::args::Args; use satex_core::Error; -use crate::make::proxy::ProxyService; +use crate::make::proxy::{HttpClient, ProxyService}; use crate::{MakeRouteService, __make_service}; +/// +/// Keepalive超时时间 +/// +const DEFAULT_KEEPALIVE: Duration = Duration::from_secs(60); + __make_service! { Proxy, uri: String, + #[serde(default)] + client: ClientConfig +} + +#[derive(Default, Debug, Clone, Deserialize)] +pub struct ClientConfig { + #[serde(default)] + pool_max_idle_per_host: Option, + + #[serde(default)] + pool_idle_timeout_secs: Option, + + #[serde(default)] + retry_canceled_requests: Option, + + #[serde(default)] + http09_responses: Option, + + #[serde(default)] + set_host: Option, + + #[serde(default)] + keepalive_secs: Option, + + #[serde(default)] + keepalive_interval_secs: Option, + + #[serde(default)] + nodelay: Option, + + #[serde(default)] + reuse_address: Option, + + #[serde(default)] + connect_timeout_secs: Option, + + #[serde(default)] + keepalive_retries: Option, + + #[serde(default)] + recv_buffer_size: Option, + + #[serde(default)] + send_buffer_size: Option, + + #[serde(default)] + happy_eyeballs_timeout_secs: Option, + + #[serde(default)] + http1_writev: Option, + + #[serde(default)] + http1_max_buf_size: Option, + + #[serde(default)] + http1_read_buf_exact_size: Option, + + #[serde(default)] + http1_preserve_header_case: Option, + + #[serde(default)] + http1_title_case_headers: Option, + + #[serde(default)] + http1_allow_obsolete_multiline_headers_in_responses: Option, + + #[serde(default)] + http1_allow_spaces_after_header_name_in_responses: Option, + + #[serde(default)] + http1_ignore_invalid_headers_in_responses: Option, + + #[serde(default)] + http2_only: Option, + + #[serde(default)] + http2_keep_alive_timeout_secs: Option, + + #[serde(default)] + http2_keep_alive_interval_secs: Option, + + #[serde(default)] + http2_keep_alive_while_idle: Option, + + #[serde(default)] + http2_adaptive_window: Option, + + #[serde(default)] + http2_initial_connection_window_size: Option, + + #[serde(default)] + http2_initial_stream_window_size: Option, + + #[serde(default)] + http2_max_concurrent_reset_streams: Option, + + #[serde(default)] + http2_max_frame_size: Option, + + #[serde(default)] + http2_max_send_buf_size: Option, } fn make(args: Args) -> Result { - Config::try_from(args).map(|config| ProxyService::new(config.uri)) + Config::try_from(args).map(|config| ProxyService::new(config.uri, make_client(config.client))) +} + +fn make_connector(client_config: &ClientConfig) -> HttpsConnector { + let mut connector = HttpConnector::new(); + connector.set_keepalive(Some( + client_config + .keepalive_secs + .map(Duration::from_secs) + .unwrap_or(DEFAULT_KEEPALIVE), + )); + connector.set_keepalive_interval( + client_config + .keepalive_interval_secs + .map(|secs| Duration::from_secs(secs)), + ); + connector.set_keepalive_retries(client_config.keepalive_retries); + connector.set_nodelay(client_config.nodelay.unwrap_or(true)); + connector.set_reuse_address(client_config.reuse_address.unwrap_or(true)); + connector.set_connect_timeout(client_config.connect_timeout_secs.map(Duration::from_secs)); + connector.set_recv_buffer_size(client_config.recv_buffer_size); + connector.set_send_buffer_size(client_config.send_buffer_size); + connector.set_happy_eyeballs_timeout( + client_config + .happy_eyeballs_timeout_secs + .map(Duration::from_secs), + ); + // Client Tls配置 + let tls_config = ClientTlsConfig::builder() + .with_native_roots() + .map(|builder| builder.with_no_client_auth()) + .unwrap(); + HttpsConnector::from((connector, tls_config)) +} + +fn make_client(client_config: ClientConfig) -> HttpClient { + let connector = make_connector(&client_config); + let mut builder = Client::builder(TokioExecutor::new()); + + // Basic + if let Some(set_host) = client_config.set_host { + builder.set_host(set_host); + } + if let Some(http09_responses) = client_config.http09_responses { + builder.http09_responses(http09_responses); + } + if let Some(retry_canceled_requests) = client_config.retry_canceled_requests { + builder.retry_canceled_requests(retry_canceled_requests); + } + if let Some(pool_max_idle_per_host) = client_config.pool_max_idle_per_host { + builder.pool_max_idle_per_host(pool_max_idle_per_host); + } + if let Some(pool_idle_timeout_secs) = client_config.pool_idle_timeout_secs { + builder.pool_idle_timeout(Duration::from_secs(pool_idle_timeout_secs)); + } + + // Http1 + if let Some(http1_writev) = client_config.http1_writev { + builder.http1_writev(http1_writev); + } + if let Some(http1_max_buf_size) = client_config.http1_max_buf_size { + builder.http1_max_buf_size(http1_max_buf_size); + } + if let Some(http1_read_buf_exact_size) = client_config.http1_read_buf_exact_size { + builder.http1_read_buf_exact_size(http1_read_buf_exact_size); + } + if let Some(http1_preserve_header_case) = client_config.http1_preserve_header_case { + builder.http1_preserve_header_case(http1_preserve_header_case); + } + if let Some(http1_title_case_headers) = client_config.http1_title_case_headers { + builder.http1_title_case_headers(http1_title_case_headers); + } + if let Some(http1_allow_obsolete_multiline_headers_in_responses) = + client_config.http1_allow_obsolete_multiline_headers_in_responses + { + builder.http1_allow_obsolete_multiline_headers_in_responses( + http1_allow_obsolete_multiline_headers_in_responses, + ); + } + if let Some(http1_allow_spaces_after_header_name_in_responses) = + client_config.http1_allow_spaces_after_header_name_in_responses + { + builder.http1_allow_spaces_after_header_name_in_responses( + http1_allow_spaces_after_header_name_in_responses, + ); + } + if let Some(http1_ignore_invalid_headers_in_responses) = + client_config.http1_ignore_invalid_headers_in_responses + { + builder + .http1_ignore_invalid_headers_in_responses(http1_ignore_invalid_headers_in_responses); + } + + // Http2 + if let Some(http2_only) = client_config.http2_only { + builder.http2_only(http2_only); + } + builder.http2_keep_alive_timeout( + client_config + .http2_keep_alive_timeout_secs + .map(Duration::from_secs) + .unwrap_or(DEFAULT_KEEPALIVE), + ); + if let Some(http2_keep_alive_interval_secs) = client_config.http2_keep_alive_interval_secs { + builder.http2_keep_alive_interval(Duration::from_secs(http2_keep_alive_interval_secs)); + } + if let Some(http2_keep_alive_while_idle) = client_config.http2_keep_alive_while_idle { + builder.http2_keep_alive_while_idle(http2_keep_alive_while_idle); + } + if let Some(http2_adaptive_window) = client_config.http2_adaptive_window { + builder.http2_adaptive_window(http2_adaptive_window); + } + if let Some(http2_initial_connection_window_size) = + client_config.http2_initial_connection_window_size + { + builder.http2_initial_connection_window_size(http2_initial_connection_window_size); + } + if let Some(http2_initial_stream_window_size) = client_config.http2_initial_stream_window_size { + builder.http2_initial_stream_window_size(http2_initial_stream_window_size); + } + if let Some(http2_max_concurrent_reset_streams) = + client_config.http2_max_concurrent_reset_streams + { + builder.http2_max_concurrent_reset_streams(http2_max_concurrent_reset_streams); + } + if let Some(http2_max_frame_size) = client_config.http2_max_frame_size { + builder.http2_max_frame_size(http2_max_frame_size); + } + if let Some(http2_max_send_buf_size) = client_config.http2_max_send_buf_size { + builder.http2_max_send_buf_size(http2_max_send_buf_size); + } + + builder.build(connector) } diff --git a/satex-service/src/make/proxy/mod.rs b/satex-service/src/make/proxy/mod.rs index 5d2fee7..593ff35 100644 --- a/satex-service/src/make/proxy/mod.rs +++ b/satex-service/src/make/proxy/mod.rs @@ -39,11 +39,15 @@ type HttpClient = Client, Body>; #[derive(Clone)] pub struct ProxyService { uri: String, + client: HttpClient, } impl ProxyService { - pub fn new(uri: impl Into) -> Self { - Self { uri: uri.into() } + pub fn new(uri: impl Into, client: HttpClient) -> Self { + Self { + uri: uri.into(), + client, + } } } @@ -58,11 +62,16 @@ impl Service> for ProxyService { fn call(&mut self, req: Request) -> Self::Future { let uri = self.uri.clone(); - Box::pin(async move { proxy(uri, req).await }) + let client = self.client.clone(); + Box::pin(async move { proxy(client, uri, req).await }) } } -async fn proxy(uri: String, mut request: Request) -> Result, Error> { +async fn proxy( + client: HttpClient, + uri: String, + mut request: Request, +) -> Result, Error> { let prefix = uri.strip_suffix('/').unwrap_or(&uri); let path = request .uri() @@ -115,7 +124,6 @@ async fn proxy(uri: String, mut request: Request) -> Result for name in REMOVE_HEADER_NAMES { headers.remove(name); } - let client = request.extensions_mut().remove::().unwrap(); let response = client.request(request).await.map_err(|e| satex_error!(e))?; Ok(response.map(Body::new)) }