diff --git a/orion-error/src/lib.rs b/orion-error/src/lib.rs index 04edbf99..3541aef0 100644 --- a/orion-error/src/lib.rs +++ b/orion-error/src/lib.rs @@ -192,6 +192,10 @@ impl Error { self.0 } + pub fn inner(&self) -> &(impl ErrorTrait + Send + Sync + 'static) { + &self.0 + } + pub fn get_context_data(&self) -> Option<&T> { if let ErrorImpl::Context(ErrorInfo { message: _, any: Some(val) }, _) = &self.0 { val.downcast_ref::() @@ -274,8 +278,7 @@ enum ErrorImpl { impl ErrorTrait for ErrorImpl { fn source(&self) -> Option<&(dyn ErrorTrait + 'static)> { match self { - Self::Error(err) => err.source(), - Self::Context(_, err) => Some(err.as_ref()), + Self::Error(err) | Self::Context(_, err) => Some(err.as_ref()), } } } diff --git a/orion-format/benches/benchmark.rs b/orion-format/benches/benchmark.rs index 92fe2511..ea75e884 100644 --- a/orion-format/benches/benchmark.rs +++ b/orion-format/benches/benchmark.rs @@ -95,6 +95,9 @@ fn benchmark_rust_format(c: &mut Criterion) { bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::empty(), + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }; let default_header_value = HeaderValue::from_static(""); @@ -174,6 +177,9 @@ fn benchmark_log_formatter(c: &mut Criterion) { bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::empty(), + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }; let fmt = LogFormatter::try_new(DEFAULT_ACCESS_LOG_FORMAT, false).stealth_unwrap(); @@ -183,7 +189,7 @@ fn benchmark_log_formatter(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -196,7 +202,7 @@ fn benchmark_log_formatter(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -214,7 +220,7 @@ fn benchmark_log_formatter(c: &mut Criterion) { let mut formatted = fmt.local_clone(); eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -244,6 +250,9 @@ fn benchmark_request_parts(c: &mut Criterion) { bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::empty(), + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }; let fmt = LogFormatter::try_new("%START_TIME%", false).stealth_unwrap(); @@ -251,7 +260,7 @@ fn benchmark_request_parts(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -265,7 +274,7 @@ fn benchmark_request_parts(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -279,7 +288,7 @@ fn benchmark_request_parts(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -293,7 +302,7 @@ fn benchmark_request_parts(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, @@ -357,6 +366,9 @@ fn benchmark_log_headers(c: &mut Criterion) { bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::empty(), + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }; let fmt = LogFormatter::try_new(ENVOY_FORMAT, false).stealth_unwrap(); @@ -365,7 +377,7 @@ fn benchmark_log_headers(c: &mut Criterion) { b.iter(|| { let mut fmt = fmt.local_clone(); black_box(eval_format( - &DownstreamContext { request: &request, trace_id: None, request_head_size: 0 }, + &DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, diff --git a/orion-format/src/bin/format-test.rs b/orion-format/src/bin/format-test.rs index 4ed74b14..f06450f6 100644 --- a/orion-format/src/bin/format-test.rs +++ b/orion-format/src/bin/format-test.rs @@ -59,6 +59,9 @@ fn main() -> Result<(), BoxError> { bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::empty(), + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }; let fmt = LogFormatter::try_new(DEF_FMT, false)?; @@ -71,7 +74,7 @@ fn main() -> Result<(), BoxError> { for _ in 0..TOTAL { let mut fmt = black_box(fmt.local_clone()); black_box(eval_format( - &DownstreamContext { request: &request, request_head_size: 0, trace_id: None }, + &DownstreamContext { request: &request, request_head_size: 0, trace_id: None, server_name: None }, &DownstreamResponse { response: &response, response_head_size: 0 }, &start, &end, diff --git a/orion-format/src/context.rs b/orion-format/src/context.rs index 7bd33f4a..50781a64 100644 --- a/orion-format/src/context.rs +++ b/orion-format/src/context.rs @@ -52,56 +52,44 @@ impl Context for TcpContext<'_> { } fn eval_part(&self, op: &Operator) -> StringType { match op { - Operator::UpstreamHost | Operator::UpstreamRemoteAddress => match self.upstream_peer_addr { - Some(addr) => StringType::Smol(addr.to_smolstr()), - None => StringType::None, + Operator::UpstreamHost | Operator::UpstreamRemoteAddress => { + self.upstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr())) }, - Operator::UpstreamRemoteAddressWithoutPort => match self.upstream_peer_addr { - None => StringType::None, - Some(addr) => StringType::Smol(addr.ip().to_smolstr()), + Operator::UpstreamRemoteAddressWithoutPort => { + self.upstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr())) }, - Operator::UpstreamRemotePort => match self.upstream_peer_addr { - None => StringType::None, - Some(addr) => StringType::Smol(addr.port().to_smolstr()), + Operator::UpstreamRemotePort => { + self.upstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr())) }, - Operator::UpstreamLocalAddress => match self.upstream_local_addr { - Some(addr) => StringType::Smol(addr.to_smolstr()), - None => StringType::None, + Operator::UpstreamLocalAddress => { + self.upstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr())) }, - Operator::UpstreamLocalAddressWithoutPort => match self.upstream_local_addr { - Some(addr) => StringType::Smol(addr.ip().to_smolstr()), - None => StringType::None, + Operator::UpstreamLocalAddressWithoutPort => { + self.upstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr())) }, - Operator::UpstreamLocalPort => match self.upstream_local_addr { - Some(addr) => StringType::Smol(addr.port().to_smolstr()), - None => StringType::None, + Operator::UpstreamLocalPort => { + self.upstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr())) }, - Operator::DownstreamLocalAddress => match self.downstream_local_addr { - Some(addr) => StringType::Smol(addr.to_smolstr()), - None => StringType::None, + Operator::DownstreamLocalAddress => { + self.downstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr())) }, - Operator::DownstreamLocalAddressWithoutPort => match self.downstream_local_addr { - Some(addr) => StringType::Smol(addr.ip().to_smolstr()), - None => StringType::None, + Operator::DownstreamLocalAddressWithoutPort => { + self.downstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr())) }, - Operator::DownstreamLocalPort => match self.downstream_local_addr { - Some(addr) => StringType::Smol(addr.port().to_smolstr()), - None => StringType::None, + Operator::DownstreamLocalPort => { + self.downstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr())) }, - Operator::DownstreamRemoteAddress => match self.downstream_peer_addr { - Some(addr) => StringType::Smol(addr.to_smolstr()), - None => StringType::None, + Operator::DownstreamRemoteAddress => { + self.downstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr())) }, - Operator::DownstreamRemoteAddressWithoutPort => match self.downstream_peer_addr { - Some(addr) => StringType::Smol(addr.ip().to_smolstr()), - None => StringType::None, + Operator::DownstreamRemoteAddressWithoutPort => { + self.downstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr())) }, - Operator::DownstreamRemotePort => match self.downstream_peer_addr { - Some(addr) => StringType::Smol(addr.port().to_smolstr()), - None => StringType::None, + Operator::DownstreamRemotePort => { + self.downstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr())) }, Operator::UpstreamCluster | Operator::UpstreamClusterRaw => { @@ -142,8 +130,9 @@ fn hash_connection(local: Option<&SocketAddr>, peer: Option<&SocketAddr>, protoc #[derive(Clone, Debug)] pub struct UpstreamContext<'a> { - pub authority: &'a Authority, - pub cluster_name: &'a str, + pub authority: Option<&'a Authority>, + pub cluster_name: Option<&'a str>, + pub route_name: &'a str, } impl Context for UpstreamContext<'_> { @@ -152,10 +141,13 @@ impl Context for UpstreamContext<'_> { } fn eval_part(&self, op: &Operator) -> StringType { match op { - Operator::UpstreamHost => StringType::Smol(SmolStr::new(self.authority.as_str())), + Operator::UpstreamHost => { + self.authority.map_or(StringType::None, |name| StringType::Smol(SmolStr::new(name))) + }, Operator::UpstreamCluster | Operator::UpstreamClusterRaw => { - StringType::Smol(SmolStr::new(self.cluster_name)) + self.cluster_name.map_or(StringType::None, |cluster_name| StringType::Smol(SmolStr::new(cluster_name))) }, + Operator::RouteName => StringType::Smol(SmolStr::new(self.route_name)), _ => StringType::None, } } @@ -184,6 +176,7 @@ pub struct InitHttpContext<'a, T> { pub downstream_request: &'a Request, pub request_head_size: usize, pub trace_id: Option, + pub server_name: Option<&'a str>, } impl Context for InitHttpContext<'_, T> { @@ -197,6 +190,7 @@ impl Context for InitHttpContext<'_, T> { request: self.downstream_request, trace_id: self.trace_id, request_head_size: self.request_head_size, + server_name: self.server_name, } .eval_part(op), } @@ -259,6 +253,9 @@ pub struct FinishContext { pub bytes_received: u64, pub bytes_sent: u64, pub response_flags: ResponseFlags, + pub upstream_failure: Option<&'static str>, + pub response_code_details: Option<&'static str>, + pub connection_termination_details: Option<&'static str>, } impl Context for FinishContext { @@ -281,6 +278,15 @@ impl Context for FinishContext { let mut buffer = itoa::Buffer::new(); StringType::Smol(SmolStr::new(buffer.format(self.bytes_sent))) }, + Operator::UpstreamTransportFailureReason => { + self.upstream_failure.map_or(StringType::None, |msg| StringType::Smol(SmolStr::new_static(msg))) + }, + Operator::ResponseCodeDetails => { + self.response_code_details.map_or(StringType::None, |msg| StringType::Smol(SmolStr::new_static(msg))) + }, + Operator::ConnectionTerminationDetails => self + .connection_termination_details + .map_or(StringType::None, |msg| StringType::Smol(SmolStr::new_static(msg))), _ => StringType::None, } } @@ -290,6 +296,7 @@ pub struct DownstreamContext<'a, T> { pub request: &'a Request, pub request_head_size: usize, pub trace_id: Option, + pub server_name: Option<&'a str>, } pub struct DownstreamResponse<'a, T> { @@ -336,21 +343,18 @@ impl Context for DownstreamContext<'_, T> { StringType::None } }, - Operator::Request(h) => { - let hv = self.request.headers().get(h.0.as_str()); - match hv { - Some(hv) => StringType::Bytes(hv.as_bytes().into()), - None => StringType::None, - } - }, - Operator::TraceId => { - if let Some(trace_id) = self.trace_id { - StringType::Smol(format_smolstr!("{:032x}", trace_id)) - } else { - StringType::None - } - }, + Operator::Request(h) => self + .request + .headers() + .get(h.0.as_str()) + .map_or(StringType::None, |hv| StringType::Bytes(hv.as_bytes().into())), + Operator::TraceId => self + .trace_id + .map_or(StringType::None, |trace_id| StringType::Smol(format_smolstr!("{:032x}", trace_id))), Operator::Protocol => StringType::Smol(SmolStr::new_static(self.request.version().to_static_str())), + Operator::RequestedServerName => { + self.server_name.map_or(StringType::None, |sni| StringType::Smol(SmolStr::new(sni))) + }, _ => StringType::None, } } @@ -394,13 +398,11 @@ impl Context for DownstreamResponse<'_, T> { Operator::ResponseStatus | Operator::ResponseCode => { StringType::Smol(SmolStr::new_inline(self.response.status().as_str())) }, - Operator::Response(header_name) => { - let hv = self.response.headers().get(header_name.0.as_str()); - match hv { - Some(hv) => StringType::Bytes(hv.as_bytes().into()), - None => StringType::None, - } - }, + Operator::Response(header_name) => self + .response + .headers() + .get(header_name.0.as_str()) + .map_or(StringType::None, |hv| StringType::Bytes(hv.as_bytes().into())), _ => StringType::None, } } diff --git a/orion-format/src/grammar.rs b/orion-format/src/grammar.rs index 2d590ded..882fab99 100644 --- a/orion-format/src/grammar.rs +++ b/orion-format/src/grammar.rs @@ -73,8 +73,13 @@ static ENVOY_PATTERNS: LazyLock> = L trie_mapstr!(trie, "PROTOCOL", Operator::Protocol, Category::DOWNSTREAM_REQUEST); trie_mapstr!(trie, "UPSTREAM_PROTOCOL", Operator::UpstreamProtocol, Category::UPSTREAM_REQUEST); trie_mapstr!(trie, "RESPONSE_CODE", Operator::ResponseCode, Category::DOWNSTREAM_RESPONSE); - trie_mapstr!(trie, "RESPONSE_CODE_DETAILS", Operator::ResponseCodeDetails, Category::UNSUPPORTED); - trie_mapstr!(trie, "CONNECTION_TERMINATION_DETAILS", Operator::ConnectionTerminationDetails, Category::UNSUPPORTED); + trie_mapstr!(trie, "RESPONSE_CODE_DETAILS", Operator::ResponseCodeDetails, Category::FINISH_CONTEXT); + trie_mapstr!( + trie, + "CONNECTION_TERMINATION_DETAILS", + Operator::ConnectionTerminationDetails, + Category::FINISH_CONTEXT + ); trie_mapstr!(trie, "BYTES_SENT", Operator::BytesSent, Category::FINISH_CONTEXT); trie_mapstr!(trie, "UPSTREAM_WIRE_BYTES_SENT", Operator::UpstreamWireBytesSent, Category::UNSUPPORTED); trie_mapstr!(trie, "UPSTREAM_HEADER_BYTES_SENT", Operator::UpstreamHeaderBytesSent, Category::UNSUPPORTED); @@ -164,8 +169,8 @@ static ENVOY_PATTERNS: LazyLock> = L trie_mapstr!(trie, "CONNECTION_ID", Operator::ConnectionId, Category::DOWNSTREAM_CONTEXT); trie_mapstr!(trie, "REQUEST_HEADERS_BYTES", Operator::RequestHeadersBytes, Category::DOWNSTREAM_REQUEST); trie_mapstr!(trie, "RESPONSE_HEADERS_BYTES", Operator::ResponseHeadersBytes, Category::DOWNSTREAM_RESPONSE); - trie_mapstr!(trie, "REQUESTED_SERVER_NAME", Operator::RequestedServerName, Category::UNSUPPORTED); - trie_mapstr!(trie, "ROUTE_NAME", Operator::RouteName, Category::UNSUPPORTED); + trie_mapstr!(trie, "REQUESTED_SERVER_NAME", Operator::RequestedServerName, Category::DOWNSTREAM_REQUEST); + trie_mapstr!(trie, "ROUTE_NAME", Operator::RouteName, Category::UPSTREAM_CONTEXT); trie_mapstr!(trie, "UPSTREAM_PEER_URI_SAN", Operator::UpstreamPeerUriSan, Category::UNSUPPORTED); trie_mapstr!(trie, "UPSTREAM_PEER_DNS_SAN", Operator::UpstreamPeerDnsSan, Category::UNSUPPORTED); trie_mapstr!(trie, "UPSTREAM_PEER_IP_SAN", Operator::UpstreamPeerIpSan, Category::UNSUPPORTED); @@ -220,7 +225,7 @@ static ENVOY_PATTERNS: LazyLock> = L trie, "UPSTREAM_TRANSPORT_FAILURE_REASON", Operator::UpstreamTransportFailureReason, - Category::UNSUPPORTED + Category::FINISH_CONTEXT ); trie_mapstr!(trie, "HOSTNAME", Operator::Hostname, Category::UNSUPPORTED); trie_mapstr!(trie, "FILTER_CHAIN_NAME", Operator::FilterChainName, Category::UNSUPPORTED); diff --git a/orion-format/src/lib.rs b/orion-format/src/lib.rs index c7ea4647..6dfcbb73 100644 --- a/orion-format/src/lib.rs +++ b/orion-format/src/lib.rs @@ -285,7 +285,12 @@ mod tests { let mut formatter = source.local_clone(); let expected = "/"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -299,7 +304,12 @@ mod tests { let mut formatter = source.local_clone(); let expected = "/original"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -311,7 +321,12 @@ mod tests { let mut formatter = source.local_clone(); println!("FORMATTER: {formatter:?}"); let expected = "GET"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -323,7 +338,12 @@ mod tests { let mut formatter = source.local_clone(); println!("FORMATTER: {formatter:?}"); let expected = "HTTP/1.1"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -346,7 +366,12 @@ mod tests { let source = LogFormatter::try_new("%REQ(:SCHEME)%", false).unwrap(); let mut formatter = source.local_clone(); let expected = "https"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -357,7 +382,12 @@ mod tests { let source = LogFormatter::try_new("%REQ(:AUTHORITY)%", false).unwrap(); let mut formatter = source.local_clone(); let expected = "www.rust-lang.org"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -368,7 +398,12 @@ mod tests { let source = LogFormatter::try_new("%REQ(USER-AGENT)%", false).unwrap(); let mut formatter = source.local_clone(); let expected = "awesome/1.0"; - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); let actual = format!("{}", &formatter.into_message()); assert_eq!(actual, expected); } @@ -388,15 +423,26 @@ mod tests { let source = LogFormatter::try_new(DEFAULT_ACCESS_LOG_FORMAT, false).unwrap(); let mut formatter = source.local_clone(); formatter.with_context(&InitContext { start_time: std::time::SystemTime::now() }); - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); - formatter - .with_context(&UpstreamContext { authority: req.uri().authority().unwrap(), cluster_name: "test_cluster" }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); + formatter.with_context(&UpstreamContext { + authority: Some(req.uri().authority().unwrap()), + cluster_name: Some("test_cluster"), + route_name: "test_route", + }); formatter.with_context(&DownstreamResponse { response: &resp, response_head_size: 0 }); formatter.with_context(&FinishContext { duration: Duration::from_millis(100), bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::NO_HEALTHY_UPSTREAM, + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }); println!("{}", &formatter.into_message()); } @@ -408,15 +454,26 @@ mod tests { let source = LogFormatter::try_new(DEFAULT_ISTIO_ACCESS_LOG_FORMAT, false).unwrap(); let mut formatter = source.local_clone(); formatter.with_context(&InitContext { start_time: std::time::SystemTime::now() }); - formatter.with_context(&DownstreamContext { request: &req, request_head_size: 0, trace_id: None }); - formatter - .with_context(&UpstreamContext { authority: req.uri().authority().unwrap(), cluster_name: "test_cluster" }); + formatter.with_context(&DownstreamContext { + request: &req, + request_head_size: 0, + trace_id: None, + server_name: None, + }); + formatter.with_context(&UpstreamContext { + authority: Some(req.uri().authority().unwrap()), + cluster_name: Some("test_cluster"), + route_name: "test_route", + }); formatter.with_context(&DownstreamResponse { response: &resp, response_head_size: 0 }); formatter.with_context(&FinishContext { duration: Duration::from_millis(100), bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::NO_HEALTHY_UPSTREAM, + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }); println!("{}", &formatter.into_message()); } diff --git a/orion-lib/src/access_log.rs b/orion-lib/src/access_log.rs index b48a25cb..d29d30bb 100644 --- a/orion-lib/src/access_log.rs +++ b/orion-lib/src/access_log.rs @@ -239,14 +239,21 @@ mod tests { let mut fmt = formatter.local_clone(); fmt.with_context(&InitContext { start_time: std::time::SystemTime::now() }); - fmt.with_context(&DownstreamContext { request: &req, trace_id: None, request_head_size: 0 }); - fmt.with_context(&UpstreamContext { authority: req.uri().authority().unwrap(), cluster_name: "test_cluster" }); + fmt.with_context(&DownstreamContext { request: &req, trace_id: None, request_head_size: 0, server_name: None }); + fmt.with_context(&UpstreamContext { + authority: Some(req.uri().authority().unwrap()), + cluster_name: Some("test_cluster"), + route_name: "test_route", + }); fmt.with_context(&DownstreamResponse { response: &resp, response_head_size: 0 }); fmt.with_context(&FinishContext { duration: Duration::from_millis(100), bytes_received: 128, bytes_sent: 256, response_flags: ResponseFlags::NO_HEALTHY_UPSTREAM, + upstream_failure: None, + response_code_details: None, + connection_termination_details: None, }); let message = fmt.into_message(); diff --git a/orion-lib/src/body/body_with_metrics.rs b/orion-lib/src/body/body_with_metrics.rs index bc9a7ca3..5762d6f8 100644 --- a/orion-lib/src/body/body_with_metrics.rs +++ b/orion-lib/src/body/body_with_metrics.rs @@ -28,12 +28,16 @@ use std::{ task::{Context, Poll}, }; -use crate::body::response_flags::{BodyKind, ResponseFlags}; +use crate::event_error::TryInferFrom; +use crate::{ + body::response_flags::{BodyKind, ResponseFlags}, + event_error::EventError, +}; -type MetricsClosure = Box; +type MetricsClosure = Box, ResponseFlags) + Send + 'static>; pub struct MetricsState { - kind: BodyKind, + body_kind: BodyKind, bytes_counter: AtomicU64, on_complete: Mutex>, } @@ -47,15 +51,15 @@ pub struct DropGuard { impl Drop for DropGuard { fn drop(&mut self) { - trigger_on_complete(&self.state, ResponseFlags::default()); + trigger_on_complete(&self.state, None, ResponseFlags::default()); } } -fn trigger_on_complete(state: &Arc, flags: ResponseFlags) { +fn trigger_on_complete(state: &Arc, event_error: Option, flags: ResponseFlags) { let mut guard = state.on_complete.lock(); if let Some(closure) = guard.take() { let bytes = state.bytes_counter.load(Ordering::Relaxed); - closure(bytes, flags); + closure(bytes, event_error, flags); } } @@ -70,10 +74,10 @@ pub struct BodyWithMetrics { impl BodyWithMetrics { pub fn new(kind: BodyKind, inner: B, on_complete: F) -> Self where - F: FnOnce(u64, ResponseFlags) + Send + 'static, + F: FnOnce(u64, Option, ResponseFlags) + Send + 'static, { let state = Arc::new(MetricsState { - kind, + body_kind: kind, bytes_counter: AtomicU64::new(0), on_complete: Mutex::new(Some(Box::new(on_complete))), }); @@ -92,6 +96,7 @@ impl BodyWithMetrics { impl Body for BodyWithMetrics where B: Body, + ::Error: std::error::Error + Send + Sync + 'static, ResponseFlags: for<'a> From<(&'a ::Error, BodyKind)>, { type Data = B::Data; @@ -108,11 +113,12 @@ where } }, Poll::Ready(None) => { - trigger_on_complete(this.state, ResponseFlags::default()); + trigger_on_complete(this.state, None, ResponseFlags::default()); }, Poll::Ready(Some(Err(err))) => { - let flags = ResponseFlags::from((err, this.state.kind)); - trigger_on_complete(this.state, flags); + let event_error = EventError::try_infer_from(err); + let flags = ResponseFlags::from((err, this.state.body_kind)); + trigger_on_complete(this.state, event_error, flags); }, Poll::Pending => {}, } diff --git a/orion-lib/src/clusters/health/checkers/http/mod.rs b/orion-lib/src/clusters/health/checkers/http/mod.rs index 99b86231..120d53e5 100644 --- a/orion-lib/src/clusters/health/checkers/http/mod.rs +++ b/orion-lib/src/clusters/health/checkers/http/mod.rs @@ -183,5 +183,5 @@ fn create_request( let req = req.header("User-Agent", "orion/health-checks"); let empty = Empty::::default().into(); - Ok(RequestExt::new(req.body(BodyWithMetrics::new(BodyKind::Request, empty, |_, _| {}))?)) + Ok(RequestExt::new(req.body(BodyWithMetrics::new(BodyKind::Request, empty, |_, _, _| {}))?)) } diff --git a/orion-lib/src/clusters/retry_policy.rs b/orion-lib/src/clusters/retry_policy.rs index a76669ce..60739330 100644 --- a/orion-lib/src/clusters/retry_policy.rs +++ b/orion-lib/src/clusters/retry_policy.rs @@ -18,164 +18,17 @@ use http::Response; use http_body::Body; +use crate::event_error::EventError; use orion_configuration::config::network_filters::http_connection_manager::{RetryOn, RetryPolicy}; -use orion_format::types::ResponseFlags as FmtResponseFlags; - -use tokio::time::error::Elapsed; - -use crate::{body::response_flags::ResponseFlags, Error as BoxError}; -use std::{error::Error, io}; use orion_http_header::{X_ENVOY_RATELIMITED, X_ORION_RATELIMITED}; -#[derive(Debug, thiserror::Error)] -pub enum EventError { - #[error("ConnectFailure: {0:?}")] - ConnectFailure(#[from] std::io::Error), - #[error("ConnectTimeout")] - ConnectTimeout(#[from] Elapsed), - #[error("PerTryTimeout)")] - PerTryTimeout, - #[error("RouteTimeout")] - RouteTimeout, - #[error("Reset")] - Reset, - #[error("RefusedStream")] - RefusedStream, - #[allow(unused)] - #[error("Http3PostConnectFailure")] - Http3PostConnectFailure, -} - -// DISCLAIMER: This is a workaround for the fact that `EventError` can't implement `Clone`. -// Cloning is not possible because `Elapsed` and `io::Error` do not implement `Clone`. -// Their presence in `EventError` is required by the `hyper_util` crate, as it needs -// to traverse the `EventError` to extract either the underlying `io::Error` or `Elapsed` -// in order to produce a more specific error message. -// In this case, we create a new `EventError` by reconstructing the `io::Error` -// with the same kind and message as the original. It's a kind of "shallow clone" of the error, -// which is not perfect, but sufficient for our use case. - -impl Clone for EventError { - fn clone(&self) -> Self { - match self { - EventError::ConnectFailure(io_err) => { - let new_io_err = io::Error::new(io_err.kind(), io_err.to_string()); - EventError::ConnectFailure(new_io_err) - }, - EventError::ConnectTimeout(_) => EventError::ConnectTimeout(elapsed()), - EventError::PerTryTimeout => EventError::PerTryTimeout, - EventError::RouteTimeout => EventError::RouteTimeout, - EventError::Reset => EventError::Reset, - EventError::RefusedStream => EventError::RefusedStream, - EventError::Http3PostConnectFailure => EventError::Http3PostConnectFailure, - } - } -} -impl From for ResponseFlags { - fn from(err: EventError) -> Self { - match err { - EventError::ConnectFailure(_) | EventError::ConnectTimeout(_) => { - ResponseFlags(FmtResponseFlags::UPSTREAM_CONNECTION_FAILURE) - }, - EventError::PerTryTimeout => ResponseFlags(FmtResponseFlags::UPSTREAM_REQUEST_TIMEOUT), - EventError::RouteTimeout => ResponseFlags(FmtResponseFlags::empty()), - EventError::Reset | EventError::RefusedStream | EventError::Http3PostConnectFailure => { - ResponseFlags(FmtResponseFlags::UPSTREAM_REMOTE_RESET) - }, - } - } -} - -pub trait TryInferFrom: Sized { - fn try_infer_from(source: F) -> Option; -} - #[derive(Debug)] pub enum RetryCondition<'a, B> { Error(EventError), Response(&'a Response), } -impl<'a, B> TryInferFrom<&'a Result, BoxError>> for RetryCondition<'a, B> { - fn try_infer_from(source: &'a Result, BoxError>) -> Option { - match source { - Ok(ref resp) => { - // NOTE: exclude a priory the evaluation of the retry policy for 1xx, and 2xx. - if resp.status().is_informational() || resp.status().is_success() { - return None; - } - Some(RetryCondition::Response(resp)) - }, - Err(err) => { - let ev = EventError::try_infer_from(err.as_ref())?; - Some(RetryCondition::Error(ev)) - }, - } - } -} - -impl<'a> TryInferFrom<&'a (dyn std::error::Error + 'static)> for EventError { - fn try_infer_from(err: &'a (dyn std::error::Error + 'static)) -> Option { - if let Some(h_err) = err.downcast_ref::() { - if let Some(source) = h_err.source() { - return Self::try_infer_from(source); - } - } - - if let Some(h_err) = err.downcast_ref::() { - if let Some(source) = h_err.source() { - return Self::try_infer_from(source); - } - } - - if err.downcast_ref::().is_some() { - // Note: This should never happen, as the user should remap the Tokio timeout - // to a suitable EventError (e.g., timeout(dur, fut).await.map_err(|_| EventError::ConnectTimeout)). - // Just in case, the PerTryTimeout error is the closest one we can choose. - return Some(EventError::PerTryTimeout); - } - - if let Some(failure) = err.downcast_ref::() { - return Some(failure.clone()); - } - - if let Some(h2_reason) = err.downcast_ref::().and_then(h2::Error::reason) { - match h2_reason { - h2::Reason::REFUSED_STREAM => return Some(EventError::RefusedStream), - h2::Reason::CONNECT_ERROR => { - return Some(EventError::ConnectFailure(io::Error::new( - std::io::ErrorKind::ConnectionRefused, - "H2 connection refused", - ))); - }, - _ => return Some(EventError::Reset), - } - } - - if let Some(io_err) = err.downcast_ref::() { - match io_err.kind() { - std::io::ErrorKind::ConnectionRefused => { - return Some(EventError::ConnectFailure(io::Error::new( - std::io::ErrorKind::ConnectionRefused, - "Connection refused", - ))); - }, - std::io::ErrorKind::NotConnected => { - return Some(EventError::ConnectFailure(io::Error::new( - std::io::ErrorKind::NotConnected, - "Not connected", - ))); - }, - _ => return Some(EventError::Reset), - } - } - - // the rest of the errors are remapped to Reset - Some(EventError::Reset) - } -} - impl RetryCondition<'_, B> { pub fn inner_response(&self) -> Option<&Response> { if let RetryCondition::Response(resp) = self { @@ -257,10 +110,7 @@ impl RetryCondition<'_, B> { } }, RetryOn::ConnectFailure => { - if matches!( - self, - RetryCondition::Error(EventError::ConnectFailure(_) | EventError::ConnectTimeout(_)) - ) { + if matches!(self, RetryCondition::Error(EventError::IoError(_) | EventError::ConnectTimeout(_))) { return true; } }, @@ -280,11 +130,6 @@ impl RetryCondition<'_, B> { } } -#[inline] -pub fn elapsed() -> Elapsed { - unsafe { std::mem::transmute(()) } -} - #[cfg(test)] mod tests { diff --git a/orion-lib/src/event_error.rs b/orion-lib/src/event_error.rs new file mode 100644 index 00000000..3fe8e3e0 --- /dev/null +++ b/orion-lib/src/event_error.rs @@ -0,0 +1,277 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use http::Response; +use orion_format::types::ResponseFlags as FmtResponseFlags; +use std::error::Error as ErrorTrait; +use std::io; +use tokio::time::error::Elapsed; + +use crate::Error as BoxError; +use crate::{body::response_flags::ResponseFlags, clusters::retry_policy::RetryCondition}; + +#[derive(Debug, thiserror::Error)] +pub enum EventError { + #[error("I/O Error: {0:?}")] + IoError(#[from] io::Error), + #[error("ConnectTimeout")] + ConnectTimeout(#[from] Elapsed), + #[error("PerTryTimeout)")] + PerTryTimeout, + #[error("RouteTimeout")] + RouteTimeout, + #[error("Reset")] + Reset, + #[error("RefusedStream")] + RefusedStream, + #[allow(unused)] + #[error("Http3PostConnectFailure")] + Http3PostConnectFailure, +} + +#[derive(Debug, Clone)] +pub enum EventKind { + Error(EventError), + AdminFilterResponse, + ClusterNotFound, + DirectResponse, + FilterChainNotFound, + InternalRedirect, + NoHealthyUpstream, + RouteNotFound, + UpgradeFailed, + RbacAccessDenied, + RateLimited, + ViaUpstream, +} + +impl EventKind { + pub fn code_details(&self) -> Option { + match self { + EventKind::Error(err) => match err { + EventError::IoError(err) => Some(ResponseCodeDetails::from(err)), + EventError::ConnectTimeout(_) => Some(ResponseCodeDetails("connect_timeout")), + EventError::PerTryTimeout => Some(ResponseCodeDetails("upstream_per_try_timeout")), + EventError::RouteTimeout => Some(ResponseCodeDetails("upstream_response_timeout")), + EventError::Reset => Some(ResponseCodeDetails("upstream_reset_after_response_started{TCP_RESET}")), + EventError::RefusedStream => Some(ResponseCodeDetails("http2.remote_refuse")), + EventError::Http3PostConnectFailure => Some(ResponseCodeDetails("http3.remote_reset")), + }, + EventKind::AdminFilterResponse => Some(ResponseCodeDetails("admin_filter_response")), + EventKind::ClusterNotFound => Some(ResponseCodeDetails("cluster_not_found")), + EventKind::DirectResponse => Some(ResponseCodeDetails("direct_response")), + EventKind::FilterChainNotFound => Some(ResponseCodeDetails("filter_chain_not_found")), + EventKind::InternalRedirect => Some(ResponseCodeDetails("internal_redirect")), + EventKind::NoHealthyUpstream => Some(ResponseCodeDetails("no_healthy_upstream")), + EventKind::RouteNotFound => Some(ResponseCodeDetails("route_not_found")), + EventKind::UpgradeFailed => Some(ResponseCodeDetails("upgrade_failed")), + EventKind::RbacAccessDenied => Some(ResponseCodeDetails("rbac_access_denied")), + EventKind::RateLimited => Some(ResponseCodeDetails("rate_limited")), + EventKind::ViaUpstream => Some(ResponseCodeDetails("via_upstream")), + } + } + + pub fn termination_details(&self) -> Option { + match self { + #[allow(clippy::collapsible_match)] + EventKind::Error(err) => match err { + EventError::IoError(err) => Some(ConnectionTerminationDetails::from(err)), + EventError::RouteTimeout => Some(ConnectionTerminationDetails("route timeout was reached")), + _ => None, + }, + EventKind::RbacAccessDenied => Some(ConnectionTerminationDetails("rbac_access_denied_matched_policy")), + _ => None, + } + } +} + +pub struct UpstreamTransportEventError(pub &'static str); +pub struct ResponseCodeDetails(pub &'static str); +pub struct ConnectionTerminationDetails(pub &'static str); + +pub fn find_error_in_chain<'a, E: ErrorTrait + 'static>(mut err: &'a (dyn ErrorTrait + 'static)) -> Option<&'a E> { + loop { + if let Some(found) = err.downcast_ref::() { + return Some(found); + } + match err.source() { + Some(next) => err = next, + None => return None, + } + } +} + +impl From<&io::Error> for UpstreamTransportEventError { + fn from(err: &io::Error) -> Self { + UpstreamTransportEventError(match err.kind() { + io::ErrorKind::ConnectionRefused => "connection_refused", + io::ErrorKind::NotConnected => "not_connected", + io::ErrorKind::AddrInUse => "addr_in_use", + io::ErrorKind::AddrNotAvailable => "addr_not_available", + io::ErrorKind::NetworkUnreachable => "network_unreachable", + io::ErrorKind::PermissionDenied => "permission_denied", + io::ErrorKind::ConnectionAborted => "connection_aborted", + io::ErrorKind::ConnectionReset => "connection_reset", + io::ErrorKind::TimedOut => "connection_timed_out", + _ => "connect_failure", + }) + } +} + +impl From<&io::Error> for ResponseCodeDetails { + fn from(err: &io::Error) -> Self { + ResponseCodeDetails(match err.kind() { + io::ErrorKind::ConnectionRefused => "upstream_reset_before_response_started{CONNECTION_REFUSED}", + io::ErrorKind::NotConnected => "upstream_reset_after_response_started{NOT_CONNECTED}", + io::ErrorKind::AddrInUse => "upstream_reset_before_response_started{ADDR_IN_USE}", + io::ErrorKind::AddrNotAvailable => "upstream_reset_before_response_started{ADDR_NOT_AVAILABLE}", + io::ErrorKind::NetworkUnreachable => "upstream_reset_before_response_started{NETWORK_UNREACHABLE}", + io::ErrorKind::PermissionDenied => "upstream_reset_before_response_started{PERMISSION_DENIED}", + io::ErrorKind::ConnectionAborted => "upstream_reset_after_response_started{CONNECTION_ABORTED}", + io::ErrorKind::ConnectionReset => "upstream_reset_after_response_started{TCP_RESET}", + io::ErrorKind::TimedOut => "streaming_timeout", + _ => "connection_reset", + }) + } +} + +impl From<&io::Error> for ConnectionTerminationDetails { + fn from(err: &io::Error) -> Self { + ConnectionTerminationDetails(match err.kind() { + io::ErrorKind::TimedOut => "transport socket timeout was reached", + _ => "I/O error", + }) + } +} + +impl TryFrom<&EventError> for UpstreamTransportEventError { + type Error = (); + fn try_from(value: &EventError) -> Result { + match value { + EventError::IoError(io_err) => Ok(UpstreamTransportEventError::from(io_err)), + EventError::ConnectTimeout(_) => Ok(UpstreamTransportEventError("connect_timeout")), + EventError::Reset => Ok(UpstreamTransportEventError("upstream_reset")), + _ => Err(()), // other errors are not transport errors + } + } +} + +// DISCLAIMER: This is a workaround for the fact that `EventError` can't implement `Clone`. +// Cloning is not possible because `Elapsed` and `io::Error` do not implement `Clone`. +// Their presence in `EventError` is required by the `hyper_util` crate, as it needs +// to traverse the `EventError` to extract either the underlying `io::Error` or `Elapsed` +// in order to produce a more specific error message. +// In this case, we create a new `EventError` by reconstructing the `io::Error` +// with the same kind and message as the original. It's a kind of "shallow clone" of the error, +// which is not perfect, but sufficient for our use case. + +impl Clone for EventError { + fn clone(&self) -> Self { + match self { + EventError::IoError(io_err) => { + let new_io_err = io::Error::new(io_err.kind(), io_err.to_string()); + EventError::IoError(new_io_err) + }, + EventError::ConnectTimeout(_) => EventError::ConnectTimeout(elapsed()), + EventError::PerTryTimeout => EventError::PerTryTimeout, + EventError::RouteTimeout => EventError::RouteTimeout, + EventError::Reset => EventError::Reset, + EventError::RefusedStream => EventError::RefusedStream, + EventError::Http3PostConnectFailure => EventError::Http3PostConnectFailure, + } + } +} + +impl From for ResponseFlags { + fn from(err: EventError) -> Self { + match err { + EventError::IoError(_) | EventError::ConnectTimeout(_) => { + ResponseFlags(FmtResponseFlags::UPSTREAM_CONNECTION_FAILURE) + }, + EventError::PerTryTimeout => ResponseFlags(FmtResponseFlags::UPSTREAM_REQUEST_TIMEOUT), + EventError::RouteTimeout => ResponseFlags(FmtResponseFlags::empty()), + EventError::Reset | EventError::RefusedStream | EventError::Http3PostConnectFailure => { + ResponseFlags(FmtResponseFlags::UPSTREAM_REMOTE_RESET) + }, + } + } +} + +pub fn elapsed() -> Elapsed { + unsafe { std::mem::transmute(()) } +} + +pub trait TryInferFrom: Sized { + fn try_infer_from(source: F) -> Option; +} + +impl<'a, B> TryInferFrom<&'a Result, BoxError>> for RetryCondition<'a, B> { + fn try_infer_from(source: &'a Result, BoxError>) -> Option { + match source { + Ok(ref resp) => { + // NOTE: exclude a priory the evaluation of the retry policy for 1xx, and 2xx. + if resp.status().is_informational() || resp.status().is_success() { + return None; + } + Some(RetryCondition::Response(resp)) + }, + Err(err) => { + let ev = EventError::try_infer_from(err.as_ref())?; + Some(RetryCondition::Error(ev)) + }, + } + } +} + +impl<'a> TryInferFrom<&'a (dyn std::error::Error + 'static)> for EventError { + fn try_infer_from(err: &'a (dyn std::error::Error + 'static)) -> Option { + if err.downcast_ref::().is_some() { + // Note: This should never happen, as the user should remap the Tokio timeout + // to a suitable EventError (e.g., timeout(dur, fut).await.map_err(|_| EventError::ConnectTimeout)). + // Just in case, the PerTryTimeout error is the closest one we can choose. + return Some(EventError::PerTryTimeout); + } + + if let Some(failure) = err.downcast_ref::() { + return Some(failure.clone()); + } + + if let Some(h2_reason) = err.downcast_ref::().and_then(h2::Error::reason) { + match h2_reason { + h2::Reason::REFUSED_STREAM => return Some(EventError::RefusedStream), + h2::Reason::CONNECT_ERROR => { + return Some(EventError::IoError(io::Error::new( + io::ErrorKind::ConnectionRefused, + "H2 connection refused", + ))); + }, + _ => return Some(EventError::Reset), + } + } + + if let Some(io_err) = err.downcast_ref::() { + return Some(EventError::IoError(io::Error::new(io_err.kind(), io_err.to_string()))); + } + + if let Some(source_err) = err.source() { + return Self::try_infer_from(source_err); + } + + // the rest of the errors are remapped to Reset + Some(EventError::Reset) + } +} diff --git a/orion-lib/src/lib.rs b/orion-lib/src/lib.rs index 48fa4458..21707b4a 100644 --- a/orion-lib/src/lib.rs +++ b/orion-lib/src/lib.rs @@ -18,6 +18,7 @@ #![recursion_limit = "128"] pub mod configuration; +pub mod event_error; pub mod access_log; mod body; diff --git a/orion-lib/src/listeners/filter_state.rs b/orion-lib/src/listeners/filter_state.rs index b8fa8be5..7fe2d442 100644 --- a/orion-lib/src/listeners/filter_state.rs +++ b/orion-lib/src/listeners/filter_state.rs @@ -15,6 +15,7 @@ // // +use compact_str::CompactString; use orion_configuration::config::common::TlvType; use std::{collections::HashMap, net::SocketAddr}; @@ -56,3 +57,18 @@ impl DownstreamConnectionMetadata { } } } + +#[derive(Debug, Clone)] +pub struct DownstreamMetadata { + pub connection: DownstreamConnectionMetadata, + pub server_name: Option, +} + +impl DownstreamMetadata { + pub fn new(connection: DownstreamConnectionMetadata, server_name: Option) -> Self + where + S: Into, + { + Self { connection, server_name: server_name.map(Into::into) } + } +} diff --git a/orion-lib/src/listeners/filterchain.rs b/orion-lib/src/listeners/filterchain.rs index d33c78bb..ce13aa63 100644 --- a/orion-lib/src/listeners/filterchain.rs +++ b/orion-lib/src/listeners/filterchain.rs @@ -20,7 +20,10 @@ use super::{ tcp_proxy::{TcpProxy, TcpProxyBuilder}, }; use crate::{ - listeners::{filter_state::DownstreamConnectionMetadata, http_connection_manager::ExtendedRequest}, + listeners::{ + filter_state::{DownstreamConnectionMetadata, DownstreamMetadata}, + http_connection_manager::ExtendedRequest, + }, secrets::{TlsConfigurator, WantsToBuildServer}, transport::AsyncReadWrite, AsyncStream, ConversionContext, Error, Result, @@ -168,7 +171,7 @@ impl FilterchainType { pub async fn start_filterchain( &self, stream: AsyncStream, - downstream_metadata: Arc, + downstream_metadata: Arc, shard_id: ThreadId, listener_name: &'static str, start_instant: std::time::Instant, diff --git a/orion-lib/src/listeners/http_connection_manager.rs b/orion-lib/src/listeners/http_connection_manager.rs index bd6933ed..9cf25d2b 100644 --- a/orion-lib/src/listeners/http_connection_manager.rs +++ b/orion-lib/src/listeners/http_connection_manager.rs @@ -39,8 +39,10 @@ use opentelemetry::global::BoxedSpan; use opentelemetry::trace::{Span, Status}; use opentelemetry::KeyValue; use orion_configuration::config::GenericError; +use orion_format::types::ResponseFlags as FmtResponseFlags; use orion_tracing::span_state::SpanState; use orion_tracing::{attributes::HTTP_RESPONSE_STATUS_CODE, with_client_span, with_server_span}; +use std::sync::atomic::AtomicUsize; use orion_configuration::config::network_filters::http_connection_manager::http_filters::{ FilterConfigOverride, FilterOverride, @@ -67,7 +69,6 @@ use parking_lot::Mutex; use route::MatchedRequest; use scopeguard::defer; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; use std::thread::ThreadId; use std::time::Instant; use std::{fmt, future::Future, result::Result as StdResult, sync::Arc}; @@ -76,6 +77,7 @@ use tokio::sync::watch; use tracing::debug; use upgrades as upgrade_utils; +use crate::event_error::{EventKind, UpstreamTransportEventError}; use crate::{ access_log::{is_access_log_enabled, log_access, log_access_reserve_balanced, AccessLogMessage, Target}, body::{ @@ -87,7 +89,7 @@ use crate::{ use crate::{ body::body_with_timeout::BodyWithTimeout, listeners::{ - access_log::AccessLogContext, filter_state::DownstreamConnectionMetadata, rate_limiter::LocalRateLimit, + access_log::AccessLogContext, filter_state::DownstreamMetadata, rate_limiter::LocalRateLimit, synthetic_http_response::SyntheticHttpResponse, }, utils::http::{request_head_size, response_head_size}, @@ -406,20 +408,24 @@ pub(crate) struct HttpRequestHandler { pub struct ExtendedRequest { pub request: Request, - pub downstream_metadata: Arc, + pub downstream_metadata: Arc, } #[derive(Debug)] pub struct AccessLoggersContext { - access_loggers: Mutex>, - bytes: AtomicU64, // either the request or response body size, depending which one has completed first + loggers: Vec, + bytes: u64, // either the request or response body size, depending which one has completed first + flags: ResponseFlags, + event: Option, } impl AccessLoggersContext { pub fn new(access_log: &[AccessLog]) -> Self { AccessLoggersContext { - access_loggers: Mutex::new(access_log.iter().map(|al| al.logger.local_clone()).collect::>()), - bytes: AtomicU64::new(0), + loggers: access_log.iter().map(|al| al.logger.local_clone()).collect::>(), + bytes: 0, + flags: ResponseFlags::default(), + event: None, } } } @@ -427,14 +433,30 @@ impl AccessLoggersContext { #[derive(Debug)] pub struct TransactionHandler { start_instant: std::time::Instant, - access_log_ctx: Option, + access_log_ctx: Option>, trace_ctx: Option, request_id: RequestId, - completed_phases: AtomicU8, span_state: Option>, thread_id: ThreadId, + trans_state: TransactionPhases, +} + +#[derive(Debug)] +struct TransactionPhases { + phase: AtomicUsize, +} + +impl TransactionPhases { + fn new() -> Self { + TransactionPhases { phase: AtomicUsize::new(0) } + } + fn message_complete(&self) -> TransactionComplete { + TransactionComplete(self.phase.fetch_add(1, std::sync::atomic::Ordering::SeqCst) > 0) + } } +struct TransactionComplete(bool); + impl Default for TransactionHandler { fn default() -> Self { TransactionHandler { @@ -442,13 +464,21 @@ impl Default for TransactionHandler { access_log_ctx: None, trace_ctx: None, request_id: RequestId::Internal(HeaderValue::from_static("")), - completed_phases: AtomicU8::new(0), span_state: None, thread_id: std::thread::current().id(), + trans_state: TransactionPhases::new(), } } } +#[allow(dead_code)] +#[derive(Debug)] +struct EventInfo { + body_kind: BodyKind, + event_kind: Option, + response_flags: ResponseFlags, +} + impl TransactionHandler { pub fn new( access_log: &[AccessLog], @@ -459,12 +489,12 @@ impl TransactionHandler { ) -> Self { TransactionHandler { start_instant: std::time::Instant::now(), - access_log_ctx: is_access_log_enabled().then(|| AccessLoggersContext::new(access_log)), + access_log_ctx: is_access_log_enabled().then(|| Mutex::new(AccessLoggersContext::new(access_log))), trace_ctx, request_id, - completed_phases: AtomicU8::new(0), span_state: server_span.map(|span| Arc::new(SpanState::new(Some(span)))), thread_id, + trans_state: TransactionPhases::new(), } } @@ -479,13 +509,13 @@ impl TransactionHandler { manager: Arc, permit: Arc>>>, mut request: Request>>, - downstream_metadata: Arc, + downstream_metadata: Arc, ) -> Result>> where RC: RequestHandler<( Request>>, Arc, - Arc, + Arc, )> + Clone, { let listener_name = manager.listener_name; @@ -493,7 +523,7 @@ impl TransactionHandler { // apply the request header modifiers http_modifiers::apply_prerouting_functions( &mut request, - downstream_metadata.peer_address(), + downstream_metadata.connection.peer_address(), manager.xff_settings, ); @@ -506,18 +536,17 @@ impl TransactionHandler { manager.request_id_handler.apply_to(&mut response, self.request_id.propagate_ref()); let initial_flags = response.extensions().get::().cloned().unwrap_or_default(); + let initial_event = response.extensions().get::>().cloned().unwrap_or_default(); if let Some(ctx) = self.access_log_ctx.as_ref() { let response_head_size = response_head_size(&response); - ctx.access_loggers.lock().with_context(&DownstreamResponse { response: &response, response_head_size }) + ctx.lock().loggers.with_context(&DownstreamResponse { response: &response, response_head_size }) } let resp_head_size = response_head_size(&response); response.map(move |body| { - BodyWithMetrics::new(BodyKind::Response, body, move |nbytes, flags| { - let is_transaction_complete = self.completed_phases.fetch_add(1, Ordering::Relaxed) > 0; - + BodyWithMetrics::new(BodyKind::Response, body, move |nbytes, body_error, body_flags| { with_metric!( http::DOWNSTREAM_CX_TX_BYTES_TOTAL, add, @@ -526,28 +555,41 @@ impl TransactionHandler { &[KeyValue::new("listener", listener_name)] ); - if let Some(ctx) = self.access_log_ctx.as_ref() { - let mut access_loggers = ctx.access_loggers.lock(); + let is_transaction_complete = if let Some(ctx) = self.access_log_ctx.as_ref() { + let mut log_ctx = ctx.lock(); let duration = first_byte_instant.saturating_duration_since(self.start_instant); let tx_duration = Instant::now().saturating_duration_since(first_byte_instant); - access_loggers.with_context(&HttpResponseDuration { duration, tx_duration }); + log_ctx.loggers.with_context(&HttpResponseDuration { duration, tx_duration }); - if is_transaction_complete { + let is_transaction_complete = self.trans_state.message_complete(); + if is_transaction_complete.0 { + let ctx_bytes = log_ctx.bytes; + let ctx_flags = log_ctx.flags.clone(); + let ctx_event = log_ctx.event.clone(); eval_http_finish_context( - access_loggers.as_mut(), + log_ctx.loggers.as_mut(), self.start_instant, - ctx.bytes.load(Ordering::Relaxed), // bytes received - nbytes, // bytes sent + ctx_bytes, // bytes received + nbytes, // bytes sent listener_name, - initial_flags | flags, + EventInfo { + body_kind: BodyKind::Response, + event_kind: ctx_event.or(initial_event).or(body_error.map(EventKind::Error)), + response_flags: ctx_flags | initial_flags | body_flags, + }, permit, ); } else { - ctx.bytes.store(nbytes, Ordering::Relaxed); + log_ctx.bytes = nbytes; + log_ctx.flags = initial_flags | body_flags; + log_ctx.event = initial_event.or(body_error.map(EventKind::Error)); } - } + is_transaction_complete + } else { + self.trans_state.message_complete() + }; - if is_transaction_complete { + if is_transaction_complete.0 { if let Some(span) = self.span_state.as_ref() { span.end(); } @@ -680,7 +722,7 @@ impl RequestHandler<( Request>>, Arc, - Arc, + Arc, )> for Arc { async fn to_response( @@ -689,7 +731,7 @@ impl (request, connection_manager, downstream_metadata): ( Request>>, Arc, - Arc, + Arc, ), ) -> Result> { let mut processed_routes: HashSet = HashSet::new(); @@ -732,7 +774,11 @@ impl break; } } else { - return Ok(SyntheticHttpResponse::not_found().into_response(request.version())); + return Ok(SyntheticHttpResponse::not_found( + EventKind::RouteNotFound, + ResponseFlags(FmtResponseFlags::NO_ROUTE_FOUND), + ) + .into_response(request.version())); } } @@ -741,8 +787,10 @@ impl upgrade_utils::is_websocket_enabled_by_hcm(&connection_manager.enabled_upgrades); let mut response = match &chosen_route.route.action { - Action::DirectResponse(dr) => dr.to_response(trans_handler, request).await, - Action::Redirect(rd) => rd.to_response(trans_handler, (request, chosen_route.route_match)).await, + Action::DirectResponse(dr) => dr.to_response(trans_handler, (request, &chosen_route.route.name)).await, + Action::Redirect(rd) => { + rd.to_response(trans_handler, (request, chosen_route.route_match, &chosen_route.route.name)).await + }, Action::Route(route) => { route .to_response( @@ -750,9 +798,10 @@ impl ( MatchedRequest { request, + route_name: &chosen_route.route.name, retry_policy: chosen_route.vh.retry_policy.as_ref(), route_match: chosen_route.route_match, - remote_address: downstream_metadata.peer_address(), + remote_address: downstream_metadata.connection.peer_address(), websocket_enabled_by_default, }, &connection_manager, @@ -791,7 +840,11 @@ impl Ok(response) } else { // We should not be here - Ok(SyntheticHttpResponse::not_found().into_response(request.version())) + Ok(SyntheticHttpResponse::not_found( + EventKind::RouteNotFound, + ResponseFlags(FmtResponseFlags::NO_ROUTE_FOUND), + ) + .into_response(request.version())) } } } @@ -801,6 +854,7 @@ impl Service> for HttpRequestHandler { type Error = crate::Error; type Future = BoxFuture<'static, StdResult>; + #[allow(clippy::too_many_lines)] fn call(&self, req: ExtendedRequest) -> Self::Future { // 0. destructure the ExtendedRequest to get the request and addresses let ExtendedRequest { request, downstream_metadata } = req; @@ -884,20 +938,20 @@ impl Service> for HttpRequestHandler { // // 1. evaluate InitHttpContext, if logging is enabled - eval_http_init_context(&request, &trans_handler); + eval_http_init_context(&request, &trans_handler, downstream_metadata.server_name.as_deref()); // // 2. create the MetricsBody, which will track the size of the request body let permit_clone = Arc::clone(&permit); - let init_flags = request.extensions().get::().cloned().unwrap_or_default(); + + let initial_flags = request.extensions().get::().cloned().unwrap_or_default(); + let initial_event = request.extensions().get::>().cloned().unwrap_or_default(); let req_head_size = request_head_size(&request); let request = request.map(|body| { let trans_handler = Arc::clone(&trans_handler); - BodyWithMetrics::new(BodyKind::Request, body, move |nbytes, flags| { - let is_transaction_complete = trans_handler.completed_phases.fetch_add(1, Ordering::Relaxed) > 0; - + BodyWithMetrics::new(BodyKind::Request, body, move |nbytes, body_error, body_flags| { with_metric!( http::DOWNSTREAM_CX_RX_BYTES_TOTAL, add, @@ -906,29 +960,44 @@ impl Service> for HttpRequestHandler { &[KeyValue::new("listener", listener_name)] ); - // emit the access log, if the request is completed.. - if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { - let mut access_loggers = ctx.access_loggers.lock(); + // emit the access log, if the transaction is completed.. + let is_transaction_complete = if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { + let mut log_ctx = ctx.lock(); let duration = trans_handler.start_instant.elapsed(); - access_loggers.with_context(&HttpRequestDuration { duration, tx_duration: duration }); + log_ctx.loggers.with_context(&HttpRequestDuration { duration, tx_duration: duration }); + + let is_transaction_complete = trans_handler.trans_state.message_complete(); + if is_transaction_complete.0 { + let ctx_bytes = log_ctx.bytes; + let ctx_flags = log_ctx.flags.clone(); + let ctx_event = log_ctx.event.clone(); - if is_transaction_complete { // if this happens is because the stream of body response finished before the request one! eval_http_finish_context( - access_loggers.as_mut(), + log_ctx.loggers.as_mut(), trans_handler.start_instant, - nbytes, // bytes received - ctx.bytes.load(Ordering::Relaxed), // bytes sent + nbytes, // bytes received + ctx_bytes, // bytes sent listener_name, - init_flags | flags, + EventInfo { + body_kind: BodyKind::Request, + event_kind: ctx_event.or(initial_event).or(body_error.map(EventKind::Error)), + response_flags: ctx_flags | initial_flags | body_flags, + }, permit_clone, ); } else { - ctx.bytes.store(nbytes, Ordering::Relaxed); + log_ctx.bytes = nbytes; + log_ctx.flags = initial_flags | body_flags; + log_ctx.event = initial_event.or(body_error.map(EventKind::Error)); } - } - if is_transaction_complete { + is_transaction_complete + } else { + trans_handler.trans_state.message_complete() + }; + + if is_transaction_complete.0 { if let Some(span) = trans_handler.span_state.as_ref() { span.end(); } @@ -938,7 +1007,11 @@ impl Service> for HttpRequestHandler { let Some(route_conf) = route_conf else { // immediately return a SyntheticHttpResponse, and calcuate the first byte instant - let resp = SyntheticHttpResponse::not_found().into_response(request.version()); + let resp = SyntheticHttpResponse::not_found( + EventKind::RouteNotFound, + ResponseFlags(FmtResponseFlags::NO_ROUTE_FOUND), + ) + .into_response(request.version()); let first_byte_instant = Instant::now(); with_metric!( @@ -955,20 +1028,17 @@ impl Service> for HttpRequestHandler { } } - if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { + if let Some(log_ctx) = trans_handler.access_log_ctx.as_ref() { let response_head_size = response_head_size(&resp); - ctx.access_loggers.lock().with_context(&DownstreamResponse { response: &resp, response_head_size }) + log_ctx.lock().loggers.with_context(&DownstreamResponse { response: &resp, response_head_size }) } - let init_flags = resp.extensions().get::().cloned().unwrap_or_default(); - + let initial_flags = resp.extensions().get::().cloned().unwrap_or_default(); + let initial_event = resp.extensions().get::>().cloned().unwrap_or_default(); let resp_head_size = response_head_size(&resp); let response = resp.map(|body| { - BodyWithMetrics::new(BodyKind::Response, body, move |nbytes, flags| { - let is_transaction_complete = - trans_handler.completed_phases.fetch_add(1, Ordering::Relaxed) > 0; - + BodyWithMetrics::new(BodyKind::Response, body, move |nbytes, body_error, body_flags| { with_metric!( http::DOWNSTREAM_CX_TX_BYTES_TOTAL, add, @@ -977,28 +1047,43 @@ impl Service> for HttpRequestHandler { &[KeyValue::new("listener", listener_name)] ); - if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { - let mut access_loggers = ctx.access_loggers.lock(); + let is_transaction_complete = if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { + let mut log_ctx = ctx.lock(); let duration = first_byte_instant.saturating_duration_since(trans_handler.start_instant); let tx_duration = Instant::now().saturating_duration_since(first_byte_instant); - access_loggers.with_context(&HttpResponseDuration { duration, tx_duration }); + log_ctx.loggers.with_context(&HttpResponseDuration { duration, tx_duration }); + + let is_transaction_complete = trans_handler.trans_state.message_complete(); + if is_transaction_complete.0 { + let ctx_bytes = log_ctx.bytes; + let ctx_flags = log_ctx.flags.clone(); + let ctx_event = log_ctx.event.clone(); - if is_transaction_complete { eval_http_finish_context( - access_loggers.as_mut(), + log_ctx.loggers.as_mut(), trans_handler.start_instant, - ctx.bytes.load(Ordering::Relaxed), // bytes received - nbytes, // bytes sent + ctx_bytes, // bytes received + nbytes, // bytes sent listener_name, - init_flags | flags, + EventInfo { + body_kind: BodyKind::Response, + event_kind: ctx_event.or(initial_event).or(body_error.map(EventKind::Error)), + response_flags: ctx_flags | initial_flags | body_flags, + }, permit, ); } else { - ctx.bytes.store(nbytes, Ordering::Relaxed); + log_ctx.bytes = nbytes; + log_ctx.flags = initial_flags | body_flags; + log_ctx.event = initial_event.or(body_error.map(EventKind::Error)); } - } - if is_transaction_complete { + is_transaction_complete + } else { + trans_handler.trans_state.message_complete() + }; + + if is_transaction_complete.0 { if let Some(span) = trans_handler.span_state.as_ref() { span.end(); } @@ -1018,34 +1103,48 @@ impl Service> for HttpRequestHandler { } } -fn eval_http_init_context(request: &Request, trans_handler: &TransactionHandler) { +fn eval_http_init_context(request: &Request, trans_handler: &TransactionHandler, server_name: Option<&str>) { if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { let trace_id = trans_handler.trace_ctx.as_ref().and_then(|t| t.map_child(orion_tracing::trace_info::TraceInfo::trace_id)); let request_head_size = request_head_size(request); - ctx.access_loggers.lock().with_context_fn(|| InitHttpContext { + ctx.lock().loggers.with_context_fn(|| InitHttpContext { start_time: std::time::SystemTime::now(), downstream_request: request, request_head_size, trace_id, + server_name, }) } } +#[allow(clippy::too_many_arguments)] fn eval_http_finish_context( access_loggers: &mut Vec, trans_start_time: Instant, bytes_received: u64, bytes_sent: u64, listener_name: &'static str, - flags: ResponseFlags, + event: EventInfo, permit: Arc>>>, ) { access_loggers.with_context(&FinishContext { duration: trans_start_time.elapsed(), bytes_received, bytes_sent, - response_flags: flags.0, + response_flags: event.response_flags.0, + upstream_failure: event.event_kind.as_ref().and_then(|ev| { + let EventKind::Error(err) = ev else { + return None; + }; + UpstreamTransportEventError::try_from(err).ok().map(|e| e.0) + }), + response_code_details: event + .event_kind + .as_ref() + .map_or(EventKind::ViaUpstream.code_details(), EventKind::code_details) + .map(|d| d.0), + connection_termination_details: event.event_kind.as_ref().and_then(EventKind::termination_details).map(|d| d.0), }); let loggers: Vec = std::mem::take(access_loggers); @@ -1059,7 +1158,8 @@ fn apply_authorization_rules(rbac: &HttpRbac, req: &Request) -> FilterDeci FilterDecision::Continue } else { FilterDecision::DirectResponse( - SyntheticHttpResponse::forbidden("RBAC: access denied").into_response(req.version()), + SyntheticHttpResponse::forbidden(EventKind::RbacAccessDenied, "RBAC: access denied") + .into_response(req.version()), ) } } diff --git a/orion-lib/src/listeners/http_connection_manager/direct_response.rs b/orion-lib/src/listeners/http_connection_manager/direct_response.rs index 6998ca64..2ab15f07 100644 --- a/orion-lib/src/listeners/http_connection_manager/direct_response.rs +++ b/orion-lib/src/listeners/http_connection_manager/direct_response.rs @@ -23,12 +23,19 @@ use http_body_util::Full; use hyper::{body::Incoming, Request, Response}; use orion_configuration::config::network_filters::http_connection_manager::route::DirectResponseAction; -impl RequestHandler>>> for &DirectResponseAction { +use orion_format::context::UpstreamContext; + +use crate::listeners::access_log::AccessLogContext; + +impl<'a> RequestHandler<(Request>>, &'a str)> for &DirectResponseAction { async fn to_response( self, - _ctx: &TransactionHandler, - request: Request>>, + trans_handler: &TransactionHandler, + (request, route_name): (Request>>, &'a str), ) -> Result> { + if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { + ctx.lock().loggers.with_context(&UpstreamContext { authority: None, cluster_name: None, route_name }) + } let body = Full::new(self.body.as_ref().map(|b| bytes::Bytes::copy_from_slice(b.data())).unwrap_or_default()); let mut resp = Response::new(body.into()); *resp.status_mut() = self.status; diff --git a/orion-lib/src/listeners/http_connection_manager/http_modifiers.rs b/orion-lib/src/listeners/http_connection_manager/http_modifiers.rs index cc9b40f6..06d4282f 100644 --- a/orion-lib/src/listeners/http_connection_manager/http_modifiers.rs +++ b/orion-lib/src/listeners/http_connection_manager/http_modifiers.rs @@ -16,7 +16,7 @@ // use super::upgrade_utils; -use crate::{listeners::synthetic_http_response::SyntheticHttpResponse, PolyBody}; +use crate::{event_error::EventKind, listeners::synthetic_http_response::SyntheticHttpResponse, PolyBody}; use http::{header, HeaderMap, HeaderName, HeaderValue, Method, Request, Response}; use orion_configuration::config::network_filters::http_connection_manager::XffSettings; use orion_http_header::{X_ENVOY_EXTERNAL_ADDRESS, X_ENVOY_INTERNAL, X_FORWARDED_FOR}; @@ -45,11 +45,17 @@ pub fn apply_preflight_functions(request: &mut Request) -> Option(request: &Request) -> Option> { if request.method() == Method::CONNECT { - return Some(SyntheticHttpResponse::forbidden("CONNECT not permitted").into_response(request.version())); + return Some( + SyntheticHttpResponse::forbidden(EventKind::UpgradeFailed, "CONNECT not permitted") + .into_response(request.version()), + ); } if let Some(connection_header) = request.headers().get(header::CONNECTION) { if upgrade_utils::is_upgrade_connection(connection_header.to_str().ok()?) { - return Some(SyntheticHttpResponse::forbidden("upgrade not permitted").into_response(request.version())); + return Some( + SyntheticHttpResponse::forbidden(EventKind::UpgradeFailed, "upgrade not permitted") + .into_response(request.version()), + ); } } None diff --git a/orion-lib/src/listeners/http_connection_manager/redirect.rs b/orion-lib/src/listeners/http_connection_manager/redirect.rs index cbb07961..ed9db3c7 100644 --- a/orion-lib/src/listeners/http_connection_manager/redirect.rs +++ b/orion-lib/src/listeners/http_connection_manager/redirect.rs @@ -19,6 +19,7 @@ use super::{RequestHandler, TransactionHandler}; use crate::{ body::{body_with_metrics::BodyWithMetrics, body_with_timeout::BodyWithTimeout}, + listeners::access_log::AccessLogContext, Error, PolyBody, Result, }; use http::{ @@ -31,14 +32,24 @@ use orion_configuration::config::network_filters::http_connection_manager::route AuthorityRedirect, RedirectAction, RouteMatchResult, }; use orion_error::Context; +use orion_format::context::UpstreamContext; use std::str::FromStr; -impl RequestHandler<(Request>>, RouteMatchResult)> for &RedirectAction { +impl<'a> RequestHandler<(Request>>, RouteMatchResult, &'a str)> + for &RedirectAction +{ async fn to_response( self, - _trans_handler: &TransactionHandler, - (request, route_match_result): (Request>>, RouteMatchResult), + trans_handler: &TransactionHandler, + (request, route_match_result, route_name): ( + Request>>, + RouteMatchResult, + &'a str, + ), ) -> Result> { + if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { + ctx.lock().loggers.with_context(&UpstreamContext { authority: None, cluster_name: None, route_name }) + } let (parts, _) = request.into_parts(); let mut rsp = Response::builder().status(StatusCode::from(self.response_code)).version(parts.version); diff --git a/orion-lib/src/listeners/http_connection_manager/route.rs b/orion-lib/src/listeners/http_connection_manager/route.rs index 7e82c620..01a22ca5 100644 --- a/orion-lib/src/listeners/http_connection_manager/route.rs +++ b/orion-lib/src/listeners/http_connection_manager/route.rs @@ -15,12 +15,12 @@ // // use super::{http_modifiers, upgrades as upgrade_utils, RequestHandler, TransactionHandler}; +use crate::event_error::{EventError, EventKind, TryInferFrom}; use crate::{ body::{body_with_metrics::BodyWithMetrics, body_with_timeout::BodyWithTimeout, response_flags::ResponseFlags}, clusters::{ balancers::hash_policy::HashState, clusters_manager::{self, RoutingContext}, - retry_policy::{EventError, TryInferFrom}, }, listeners::{ access_log::AccessLogContext, http_connection_manager::HttpConnectionManager, @@ -29,6 +29,7 @@ use crate::{ transport::policy::{RequestContext, RequestExt}, PolyBody, Result, }; + use http::{uri::Parts as UriParts, Uri}; use hyper::{body::Incoming, Request, Response}; use opentelemetry::trace::Span; @@ -51,6 +52,7 @@ use tracing::debug; pub struct MatchedRequest<'a> { pub request: Request>>, pub retry_policy: Option<&'a RetryPolicy>, + pub route_name: &'a str, pub remote_address: SocketAddr, pub route_match: RouteMatchResult, pub websocket_enabled_by_default: bool, @@ -65,6 +67,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route ) -> Result> { let MatchedRequest { request: downstream_request, + route_name, retry_policy, remote_address, route_match, @@ -80,9 +83,10 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route match maybe_channel { Ok(svc_channel) => { if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { - ctx.access_loggers.lock().with_context(&UpstreamContext { - authority: &svc_channel.upstream_authority, - cluster_name: svc_channel.cluster_name, + ctx.lock().loggers.with_context(&UpstreamContext { + authority: Some(&svc_channel.upstream_authority), + cluster_name: Some(svc_channel.cluster_name), + route_name, }) } @@ -134,7 +138,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route } if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { - ctx.access_loggers.lock().with_context(&UpstreamRequest(&upstream_request)); + ctx.lock().loggers.with_context(&UpstreamRequest(&upstream_request)); } let websocket_enabled = if let Some(upgrade_config) = self.upgrade_config { @@ -147,7 +151,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route Ok(maybe_upgrade) => maybe_upgrade, Err(upgrade_error) => { debug!("Failed to upgrade to websockets {upgrade_error}"); - return Ok(SyntheticHttpResponse::bad_request().into_response(ver)); + return Ok(SyntheticHttpResponse::bad_request(EventKind::UpgradeFailed).into_response(ver)); }, } } else { @@ -174,28 +178,33 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route match resp { Err(err) => { let err = err.into_inner(); - let flags = EventError::try_infer_from(&err).map(ResponseFlags::from).unwrap_or_default(); + let event_error = EventError::try_infer_from(&err); + let flags = event_error.clone().map(ResponseFlags::from).unwrap_or_default(); + let event_kind = event_error.map_or(EventKind::ViaUpstream, |e| EventKind::Error(e)); debug!( "HttpConnectionManager Error processing response {:?}: {}({})", err, ResponseFlagsLong(&flags.0).to_smolstr(), ResponseFlagsShort(&flags.0).to_smolstr() ); - Ok(SyntheticHttpResponse::bad_gateway(flags).into_response(ver)) + Ok(SyntheticHttpResponse::bad_gateway(event_kind, flags).into_response(ver)) }, Ok(resp) => Ok(resp), } }, + // http connection not avaiable from cluster... Err(err) => { let err = err.into_inner(); - let flags = EventError::try_infer_from(&err).map(ResponseFlags::from).unwrap_or_default(); + let event_error = EventError::try_infer_from(&err); + let flags = event_error.clone().map(ResponseFlags::from).unwrap_or_default(); + let event_kind = event_error.map_or(EventKind::ViaUpstream, |e| EventKind::Error(e)); debug!( "Failed to get an HTTP connection: {:?}: {}({})", err, ResponseFlagsLong(&flags.0).to_smolstr(), ResponseFlagsShort(&flags.0).to_smolstr() ); - Ok(SyntheticHttpResponse::internal_error(flags).into_response(downstream_request.version())) + Ok(SyntheticHttpResponse::internal_error(event_kind, flags).into_response(downstream_request.version())) }, } } diff --git a/orion-lib/src/listeners/http_connection_manager/upgrades.rs b/orion-lib/src/listeners/http_connection_manager/upgrades.rs index 831467cf..a1d62185 100644 --- a/orion-lib/src/listeners/http_connection_manager/upgrades.rs +++ b/orion-lib/src/listeners/http_connection_manager/upgrades.rs @@ -18,6 +18,7 @@ use super::{RequestHandler, TransactionHandler}; use crate::{ body::{body_with_metrics::BodyWithMetrics, response_flags::ResponseFlags}, + event_error::EventKind, listeners::synthetic_http_response::SyntheticHttpResponse, transport::{policy::RequestExt, HttpChannel}, PolyBody, Result, @@ -109,17 +110,23 @@ pub async fn handle_websocket_upgrade( "Upgrade attempt failure, upstream did not accept websocket upgrade, returned status code {:?}", response.status() ); - Ok(SyntheticHttpResponse::not_allowed(ResponseFlags(FmtResponseFlags::UPSTREAM_CONNECTION_FAILURE)) - .into_response(version)) + Ok(SyntheticHttpResponse::not_allowed( + EventKind::UpgradeFailed, + ResponseFlags(FmtResponseFlags::UPSTREAM_CONNECTION_FAILURE), + ) + .into_response(version)) }, Err(err) => { error!("Upgrade failed in attempting to establish upstream websocket {:?}", err); - Ok(SyntheticHttpResponse::bad_gateway(ResponseFlags(FmtResponseFlags::UPSTREAM_CONNECTION_FAILURE)) - .into_response(version)) + Ok(SyntheticHttpResponse::bad_gateway( + EventKind::UpgradeFailed, + ResponseFlags(FmtResponseFlags::UPSTREAM_CONNECTION_FAILURE), + ) + .into_response(version)) }, } }, - _ => Ok(SyntheticHttpResponse::bad_request().into_response(version)), + _ => Ok(SyntheticHttpResponse::bad_request(EventKind::UpgradeFailed).into_response(version)), } } diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index eae3c553..be296b87 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -20,7 +20,7 @@ use super::{ listeners_manager::TlsContextChange, }; use crate::{ - listeners::filter_state::DownstreamConnectionMetadata, + listeners::filter_state::{DownstreamConnectionMetadata, DownstreamMetadata}, secrets::{TlsConfigurator, WantsToBuildServer}, transport::{bind_device::BindDevice, tls_inspector, AsyncStream, ProxyProtocolReader, TlvListenerFilter}, ConversionContext, Error, Result, RouteConfigurationChange, @@ -317,7 +317,7 @@ impl Listener { match_subitem( FilterChainMatch::matches_server_name, - server_name.unwrap_or_default(), + server_name.unwrap_or(""), filter_chains.keys(), &mut scratchpad, &mut possible_filters, @@ -352,6 +352,8 @@ impl Listener { } } + #[allow(clippy::too_many_lines)] + #[allow(clippy::too_many_arguments)] async fn process_listener_update( listener_name: &'static str, filter_chains: Arc>, @@ -377,26 +379,6 @@ impl Listener { with_histogram!(listeners::DOWNSTREAM_CX_LENGTH_MS, record, ms, &[KeyValue::new("listener", listener_name)]); } - let downstream_metadata = if let Some(config) = proxy_protocol_config.as_ref() { - let reader = ProxyProtocolReader::new(Arc::clone(config)); - let (metadata, new_stream) = reader.try_read_proxy_header(stream, local_address, peer_addr).await?; - stream = new_stream; - metadata - } else { - DownstreamConnectionMetadata::FromSocket { peer_address: peer_addr, local_address } - }; - - let downstream_metadata = if with_tlv_listener_filter { - let mut tlv_filter = TlvListenerFilter::default(); - let (new_stream, tlv_metadata) = tlv_filter.process_stream(stream, local_address, peer_addr).await?; - stream = new_stream; - tlv_metadata - } else { - downstream_metadata - }; - - let downstream_metadata = Arc::new(downstream_metadata); - let server_name = if with_tls_inspector { let (tls_result, rewound_stream) = tls_inspector::inspect_client_hello(stream).await; stream = rewound_stream; @@ -448,8 +430,27 @@ impl Listener { None }; + let downstream_metadata = if let Some(config) = proxy_protocol_config.as_ref() { + let reader = ProxyProtocolReader::new(Arc::clone(config)); + let (metadata, new_stream) = reader.try_read_proxy_header(stream, local_address, peer_addr).await?; + stream = new_stream; + metadata + } else { + DownstreamConnectionMetadata::FromSocket { peer_address: peer_addr, local_address } + }; + + let downstream_metadata = if with_tlv_listener_filter { + let mut tlv_filter = TlvListenerFilter::default(); + let (new_stream, tlv_metadata) = tlv_filter.process_stream(stream, local_address, peer_addr).await?; + stream = new_stream; + tlv_metadata + } else { + downstream_metadata + }; + let selected_filterchain = Self::select_filterchain(&filter_chains, &downstream_metadata, server_name.as_deref())?; + if let Some(filterchain) = selected_filterchain { debug!( "{listener_name} : mapping connection from {peer_addr} to filter chain {}", @@ -457,7 +458,13 @@ impl Listener { ); if let Some(stream) = filterchain.apply_rbac(stream, &downstream_metadata, server_name.as_deref()) { return filterchain - .start_filterchain(stream, downstream_metadata, shard_id, listener_name, start_instant) + .start_filterchain( + stream, + Arc::new(DownstreamMetadata { connection: downstream_metadata, server_name }), + shard_id, + listener_name, + start_instant, + ) .await; } debug!("{listener_name} : dropped connection from {peer_addr} due to rbac"); @@ -701,8 +708,8 @@ filter_chains: peer_address: (Ipv4Addr::LOCALHOST, 3300).into(), local_address: (Ipv4Addr::LOCALHOST, 443).into(), }; - let good_host = Some("host.test"); - assert!(matches!(Listener::select_filterchain(&m, &metadata, good_host), Ok(Some(())))); + + assert!(matches!(Listener::select_filterchain(&m, &metadata, Some("host.test")), Ok(Some(())))); assert!(matches!(Listener::select_filterchain(&m, &metadata, Some("a.wildcard")), Ok(Some(())))); assert!(matches!(Listener::select_filterchain(&m, &metadata, None), Ok(None))); } @@ -757,7 +764,6 @@ filter_chains: Listener::select_filterchain(&m, &metadata, Some("this.is.less.specific")).unwrap().copied(), Some(2) ); - assert_eq!(Listener::select_filterchain(&m, &metadata, Some("hello.world")).unwrap().copied(), Some(3)); } } diff --git a/orion-lib/src/listeners/rate_limiter/mod.rs b/orion-lib/src/listeners/rate_limiter/mod.rs index eda80090..edab1b1d 100644 --- a/orion-lib/src/listeners/rate_limiter/mod.rs +++ b/orion-lib/src/listeners/rate_limiter/mod.rs @@ -23,7 +23,7 @@ use token_bucket::TokenBucket; use orion_configuration::config::network_filters::http_connection_manager::http_filters::local_rate_limit::LocalRateLimit as LocalRateLimitConfig; -use crate::body::response_flags::ResponseFlags; +use crate::{body::response_flags::ResponseFlags, event_error::EventKind}; use orion_format::types::ResponseFlags as FmtResponseFlags; use crate::{ @@ -43,8 +43,12 @@ impl LocalRateLimit { if !token_bucket.consume(1) { let status = self.status; return FilterDecision::DirectResponse( - SyntheticHttpResponse::custom_error(status, ResponseFlags(FmtResponseFlags::RATE_LIMITED)) - .into_response(req.version()), + SyntheticHttpResponse::custom_error( + status, + EventKind::RateLimited, + ResponseFlags(FmtResponseFlags::RATE_LIMITED), + ) + .into_response(req.version()), ); } } diff --git a/orion-lib/src/listeners/synthetic_http_response.rs b/orion-lib/src/listeners/synthetic_http_response.rs index 78e9d8ad..e08a01ea 100644 --- a/orion-lib/src/listeners/synthetic_http_response.rs +++ b/orion-lib/src/listeners/synthetic_http_response.rs @@ -19,11 +19,12 @@ use bytes::Bytes; use http::{HeaderValue, Response, StatusCode, Version as HttpVersion}; use http_body_util::Full; -use crate::{body::response_flags::ResponseFlags, PolyBody}; +use crate::{body::response_flags::ResponseFlags, event_error::EventKind, PolyBody}; #[derive(Clone, Debug)] pub struct SyntheticHttpResponse { http_status: StatusCode, + event_kind: EventKind, response_flags: ResponseFlags, body: Bytes, close_connection: bool, @@ -32,23 +33,30 @@ pub struct SyntheticHttpResponse { // === impl SyntheticHttpResponse === impl SyntheticHttpResponse { - pub fn internal_error(response_flags: ResponseFlags) -> Self { + pub fn internal_error(event_kind: EventKind, response_flags: ResponseFlags) -> Self { Self { http_status: StatusCode::INTERNAL_SERVER_ERROR, + event_kind, response_flags, body: Bytes::default(), close_connection: true, } } - pub fn bad_gateway(response_flags: ResponseFlags) -> Self { - Self { http_status: StatusCode::BAD_GATEWAY, response_flags, body: Bytes::default(), close_connection: true } + pub fn bad_gateway(event_kind: EventKind, response_flags: ResponseFlags) -> Self { + Self { + http_status: StatusCode::BAD_GATEWAY, + event_kind, + response_flags, + body: Bytes::default(), + close_connection: true, + } } - #[allow(dead_code)] - pub fn forbidden(msg: &str) -> Self { + pub fn forbidden(event_kind: EventKind, msg: &str) -> Self { Self { http_status: StatusCode::FORBIDDEN, + event_kind, response_flags: ResponseFlags::default(), body: Bytes::copy_from_slice(msg.as_bytes()), //should this close actually? the connection seems to stay open since it's only triggered for a single http @@ -57,56 +65,61 @@ impl SyntheticHttpResponse { } #[allow(dead_code)] - pub fn service_unavailable(response_flags: ResponseFlags) -> Self { + pub fn service_unavailable(event_kind: EventKind, response_flags: ResponseFlags) -> Self { Self { http_status: StatusCode::SERVICE_UNAVAILABLE, + event_kind, response_flags, body: Bytes::default(), close_connection: true, } } - pub fn gateway_timeout(response_flags: ResponseFlags) -> Self { + pub fn gateway_timeout(event_kind: EventKind, response_flags: ResponseFlags) -> Self { Self { http_status: StatusCode::GATEWAY_TIMEOUT, + event_kind, response_flags, body: Bytes::default(), close_connection: true, } } - pub fn not_found() -> Self { + pub fn not_found(event_kind: EventKind, response_flags: ResponseFlags) -> Self { Self { http_status: StatusCode::NOT_FOUND, - response_flags: ResponseFlags::default(), + event_kind, + response_flags, body: Bytes::default(), close_connection: false, } } #[allow(dead_code)] - pub fn upgrade_required() -> Self { + pub fn upgrade_required(event_kind: EventKind) -> Self { Self { http_status: StatusCode::UPGRADE_REQUIRED, + event_kind, response_flags: ResponseFlags::default(), body: Bytes::default(), close_connection: true, } } - #[allow(dead_code)] - pub fn bad_request() -> Self { + pub fn bad_request(event_kind: EventKind) -> Self { Self { http_status: StatusCode::BAD_REQUEST, + event_kind, response_flags: ResponseFlags::default(), body: Bytes::default(), close_connection: true, } } - pub fn not_allowed(response_flags: ResponseFlags) -> Self { + pub fn not_allowed(event_kind: EventKind, response_flags: ResponseFlags) -> Self { Self { http_status: StatusCode::METHOD_NOT_ALLOWED, + event_kind, response_flags, body: Bytes::default(), close_connection: true, @@ -114,27 +127,17 @@ impl SyntheticHttpResponse { } #[allow(dead_code)] - pub fn custom_error(http_status: StatusCode, response_flags: ResponseFlags) -> Self { - Self { http_status, response_flags, body: Bytes::default(), close_connection: false } + pub fn custom_error(http_status: StatusCode, event_kind: EventKind, response_flags: ResponseFlags) -> Self { + Self { http_status, event_kind, response_flags, body: Bytes::default(), close_connection: false } } - // #[inline] - // fn header_error_message(&self) -> Option { - // match self.header_error_message { - // Some(Cow::Borrowed(msg)) => Some(HeaderValue::from_static(msg)), - // Some(Cow::Owned(ref msg)) => { - // Some(HeaderValue::from_str(msg).unwrap_or_else(|_| HeaderValue::from_static("unexpected error"))) - // }, - // None => None, - // } - // } - #[inline] pub fn into_response(self, version: http::Version) -> Response { let mut rsp = Response::new(Full::from(self.body).into()); *rsp.status_mut() = self.http_status; *rsp.version_mut() = version; rsp.extensions_mut().insert(self.response_flags); + rsp.extensions_mut().insert(Some(self.event_kind)); if self.close_connection && (version == HttpVersion::HTTP_10 || version == HttpVersion::HTTP_11) { // Notify the (proxy or non-proxy) client that the connection will be closed. rsp.headers_mut().insert(http::header::CONNECTION, HeaderValue::from_static("close")); diff --git a/orion-lib/src/listeners/tcp_proxy.rs b/orion-lib/src/listeners/tcp_proxy.rs index 28b60c4c..aa1b8e36 100644 --- a/orion-lib/src/listeners/tcp_proxy.rs +++ b/orion-lib/src/listeners/tcp_proxy.rs @@ -18,7 +18,10 @@ use crate::{ access_log::{log_access, log_access_reserve_balanced, Target}, clusters::clusters_manager::{self, RoutingContext}, - listeners::{access_log::AccessLogContext, filter_state::DownstreamConnectionMetadata}, + event_error::{ + find_error_in_chain, ConnectionTerminationDetails, ResponseCodeDetails, UpstreamTransportEventError, + }, + listeners::{access_log::AccessLogContext, filter_state::DownstreamMetadata}, transport::connector::TcpErrorContext, AsyncStream, Result, }; @@ -72,10 +75,11 @@ impl fmt::Display for TcpProxy { } impl TcpProxy { + #[allow(clippy::too_many_lines)] pub async fn serve_connection( &self, mut stream: AsyncStream, - downstream_metadata: Arc, + downstream_metadata: Arc, ) -> Result<()> { let start_instant = Instant::now(); let mut access_loggers = self.access_log.iter().map(|al| al.logger.local_clone()).collect::>(); @@ -90,6 +94,9 @@ impl TcpProxy { let mut bytes_received = 0; let mut bytes_sent = 0; let mut response_flags = ResponseFlags::empty(); + let mut maybe_upstream_transport_error: Option = None; + let mut maybe_response_code_details: Option = None; + let mut maybe_connection_termination_details: Option = None; let cluster_name: &str; let maybe_upstream_local_addr: Option; @@ -97,28 +104,30 @@ impl TcpProxy { let res = match maybe_connector { Ok(connector) => { - let channel_result = connector.connect(Some(&downstream_metadata)).await; + let channel_result = connector.connect(Some(&downstream_metadata.connection)).await; match channel_result { Ok(mut channel) => { maybe_upstream_local_addr = channel.upstream_local_addr; maybe_upstream_peer_addr = channel.upstream_peer_addr; let res = tokio::io::copy_bidirectional(&mut stream, &mut channel.stream).await; - - match &res { + match res { Ok((received, sent)) => { - bytes_received = *received; - bytes_sent = *sent; + bytes_received = received; + bytes_sent = sent; }, - Err(e) => { - response_flags.insert(ResponseFlags::UPSTREAM_CONNECTION_FAILURE); + Err(ref e) => { debug!("Error with TCP stream: {}", e); + maybe_upstream_transport_error = Some(e.into()); + maybe_response_code_details = Some(ResponseCodeDetails::from(e)); + maybe_connection_termination_details = Some(ConnectionTerminationDetails::from(e)); + response_flags.insert(ResponseFlags::UPSTREAM_CONNECTION_FAILURE); }, } access_loggers.with_context(&TcpContext { - downstream_local_addr: Some(downstream_metadata.local_address()), - downstream_peer_addr: Some(downstream_metadata.peer_address()), + downstream_local_addr: Some(downstream_metadata.connection.local_address()), + downstream_peer_addr: Some(downstream_metadata.connection.peer_address()), upstream_local_addr: maybe_upstream_local_addr, upstream_peer_addr: maybe_upstream_peer_addr, cluster_name: channel.cluster_name, @@ -139,9 +148,14 @@ impl TcpProxy { cluster_name = "impossible"; } + let io_err = find_error_in_chain::(e.inner()); + maybe_upstream_transport_error = io_err.map(UpstreamTransportEventError::from); + maybe_response_code_details = io_err.map(ResponseCodeDetails::from); + maybe_connection_termination_details = io_err.map(ConnectionTerminationDetails::from); + access_loggers.with_context(&TcpContext { - downstream_local_addr: Some(downstream_metadata.local_address()), - downstream_peer_addr: Some(downstream_metadata.peer_address()), + downstream_local_addr: Some(downstream_metadata.connection.local_address()), + downstream_peer_addr: Some(downstream_metadata.connection.peer_address()), upstream_local_addr: None, upstream_peer_addr: maybe_upstream_peer_addr, cluster_name, @@ -155,9 +169,14 @@ impl TcpProxy { error!("Failed to get TCP connection for cluster {:?}: {}", cluster_selector, e); response_flags.insert(ResponseFlags::NO_ROUTE_FOUND); + let io_err = find_error_in_chain::(e.inner()); + maybe_upstream_transport_error = io_err.map(UpstreamTransportEventError::from); + maybe_response_code_details = io_err.map(ResponseCodeDetails::from); + maybe_connection_termination_details = io_err.map(ConnectionTerminationDetails::from); + access_loggers.with_context(&TcpContext { - downstream_local_addr: Some(downstream_metadata.local_address()), - downstream_peer_addr: Some(downstream_metadata.peer_address()), + downstream_local_addr: Some(downstream_metadata.connection.local_address()), + downstream_peer_addr: Some(downstream_metadata.connection.peer_address()), upstream_local_addr: None, upstream_peer_addr: None, cluster_name: &cluster_selector.name(), @@ -172,7 +191,11 @@ impl TcpProxy { bytes_received, bytes_sent, response_flags, + upstream_failure: maybe_upstream_transport_error.map(|x| x.0), + response_code_details: maybe_response_code_details.map(|x| x.0), + connection_termination_details: maybe_connection_termination_details.map(|x| x.0), }); + let permit = log_access_reserve_balanced().await; let messages = access_loggers.into_iter().map(LogFormatterLocal::into_message).collect::>(); log_access(permit, Target::Listener(self.listener_name.to_compact_string()), messages); diff --git a/orion-lib/src/trace/tracer.rs b/orion-lib/src/trace/tracer.rs index b848c201..35dbfec4 100644 --- a/orion-lib/src/trace/tracer.rs +++ b/orion-lib/src/trace/tracer.rs @@ -146,7 +146,6 @@ impl Tracer { // Trigger: x_request_id... if let Some(trace_id) = request_id.as_ref().and_then(|val| ::from(val.as_ref()).ok()) { - println!("here!"); if forced || (Self::should_sample(self.tracing.random_sampling) && Self::should_sample(self.tracing.overall_sampling)) diff --git a/orion-lib/src/transport/connector.rs b/orion-lib/src/transport/connector.rs index ad0ea7b2..94bb09ca 100644 --- a/orion-lib/src/transport/connector.rs +++ b/orion-lib/src/transport/connector.rs @@ -34,7 +34,7 @@ use tokio::net::{TcpSocket, TcpStream}; use tower::Service; use tracing::debug; -use crate::clusters::retry_policy::{elapsed, EventError}; +use crate::event_error::{elapsed, EventError}; use super::{bind_device::BindDevice, resolve}; @@ -61,6 +61,7 @@ pub struct LocalConnectorWithDNSResolver { } impl LocalConnectorWithDNSResolver { + #[allow(clippy::too_many_lines)] pub fn connect( &self, ) -> impl Future>> + 'static { @@ -149,7 +150,7 @@ impl LocalConnectorWithDNSResolver { }) .map_into() })? // Result - .map_err(|orig| EventError::ConnectFailure(io::Error::new(orig.kind(), orig.to_string()))) + .map_err(|orig| EventError::IoError(io::Error::new(orig.kind(), orig.to_string()))) .map_err(|e| { WithContext::new(e) .with_context_data(TcpErrorContext { @@ -162,7 +163,7 @@ impl LocalConnectorWithDNSResolver { } else { sock.connect(addr) .await - .map_err(|orig| EventError::ConnectFailure(io::Error::new(orig.kind(), orig.to_string()))) + .map_err(|orig| EventError::IoError(io::Error::new(orig.kind(), orig.to_string()))) .map_err(|e| { WithContext::new(e) .with_context_data(TcpErrorContext { diff --git a/orion-lib/src/transport/grpc_channel.rs b/orion-lib/src/transport/grpc_channel.rs index b024700f..a959130c 100644 --- a/orion-lib/src/transport/grpc_channel.rs +++ b/orion-lib/src/transport/grpc_channel.rs @@ -63,7 +63,7 @@ impl GrpcService { let http_req = Request::from_parts( parts, - BodyWithMetrics::new(BodyKind::Request, grpc_body.into(), |_bytes, _flags| { + BodyWithMetrics::new(BodyKind::Request, grpc_body.into(), |_bytes, _event_error, _flags| { println!("gRPC request body finalized") }), ); diff --git a/orion-lib/src/transport/http_channel.rs b/orion-lib/src/transport/http_channel.rs index 935964eb..f44f18a7 100644 --- a/orion-lib/src/transport/http_channel.rs +++ b/orion-lib/src/transport/http_channel.rs @@ -22,7 +22,8 @@ use super::{ }; use crate::{ body::{body_with_metrics::BodyWithMetrics, body_with_timeout::BodyWithTimeout, response_flags::ResponseFlags}, - clusters::retry_policy::{EventError, RetryCondition, TryInferFrom}, + clusters::retry_policy::RetryCondition, + event_error::{EventError, EventKind, TryInferFrom}, listeners::{ http_connection_manager::{RequestHandler, TransactionHandler}, synthetic_http_response::SyntheticHttpResponse, @@ -369,34 +370,6 @@ impl<'a> RequestHandler>>> for } } -// fn update_upstream_stats( -// client_stats: &ClientEndpointStats, -// cluster_name: &'static str, -// shard_id: (ThreadId, Authority), -// is_tls: bool, -// ) { -// let total = client_stats.total_cx.load(std::sync::atomic::Ordering::Relaxed) as u64; -// let destroy = client_stats.destroy_cx.load(std::sync::atomic::Ordering::Relaxed) as u64; -// let active = total.saturating_sub(destroy); -// -// with_metric!( -// clusters::UPSTREAM_CX_TOTAL, -// store, -// total, -// shard_id.clone(), -// &[KeyValue::new("clusters", cluster_name)] -// ); -// with_metric!( -// clusters::UPSTREAM_CX_DESTROY, -// store, -// destroy, -// shard_id.clone(), -// &[KeyValue::new("clusters", cluster_name)] -// ); -// -// with_metric!(clusters::UPSTREAM_CX_ACTIVE, store, active, shard_id, &[KeyValue::new("clusters", cluster_name)]); -// } - impl HttpChannel { /// Send the request and return the Result, either the Response or an Error, /// along with the time spent for possible retransmissions. Note: the returned @@ -553,15 +526,19 @@ impl HttpChannel { ResponseFlagsLong(&response_flags.0).to_smolstr(), ResponseFlagsShort(&response_flags.0).to_smolstr() ); + match event_error { - EventError::RefusedStream | EventError::ConnectFailure(_) | EventError::ConnectTimeout(_) => { - Ok(SyntheticHttpResponse::service_unavailable(response_flags).into_response(version)) - }, + EventError::RefusedStream | EventError::IoError(_) | EventError::ConnectTimeout(_) => Ok( + SyntheticHttpResponse::service_unavailable(EventKind::Error(event_error), response_flags) + .into_response(version), + ), EventError::PerTryTimeout | EventError::RouteTimeout => { - Ok(SyntheticHttpResponse::gateway_timeout(response_flags).into_response(version)) + Ok(SyntheticHttpResponse::gateway_timeout(EventKind::Error(event_error), response_flags) + .into_response(version)) }, EventError::Reset | EventError::Http3PostConnectFailure => { - Ok(SyntheticHttpResponse::bad_gateway(response_flags).into_response(version)) + Ok(SyntheticHttpResponse::bad_gateway(EventKind::Error(event_error), response_flags) + .into_response(version)) }, } } else { diff --git a/orion-lib/src/transport/proxy_protocol.rs b/orion-lib/src/transport/proxy_protocol.rs index fe1f1eaa..9406438a 100644 --- a/orion-lib/src/transport/proxy_protocol.rs +++ b/orion-lib/src/transport/proxy_protocol.rs @@ -578,11 +578,9 @@ mod tests { assert_eq!(tlv_data.get(&TlvType::Custom(0x10)), Some(&b"added_config_tlv".to_vec())); assert_eq!(tlv_data.get(&TlvType::Custom(0x11)), Some(&b"another_added_tlv".to_vec())); - assert_eq!(tlv_data.get(&TlvType::NoOp), Some(&b"noop_data".to_vec())); assert_eq!(tlv_data.get(&TlvType::Custom(0x01)), Some(&b"custom_type_1".to_vec())); assert_eq!(tlv_data.get(&TlvType::Custom(0x02)), Some(&b"custom_type_2".to_vec())); - assert_eq!(tlv_data.get(&TlvType::Custom(0x03)), None); }, _ => unreachable!("Expected FromProxyProtocol metadata"), diff --git a/orion-lib/src/transport/tls_inspector.rs b/orion-lib/src/transport/tls_inspector.rs index f51354fa..2b2dbeb1 100644 --- a/orion-lib/src/transport/tls_inspector.rs +++ b/orion-lib/src/transport/tls_inspector.rs @@ -18,13 +18,14 @@ use super::AsyncReadWrite; use crate::utils::rewindable_stream::RewindableHeadAsyncStream; +use compact_str::{CompactString, ToCompactString}; use rustls::server::Acceptor; use std::io; #[derive(Debug)] pub enum InspectorResult { /// Handshake with valid TLS and SNI. - Success(String), + Success(CompactString), /// Handshake with valid TLS, without server name indication. SuccessNoSni, /// Failed TLS Handshake (e.g. not TLS, or I/O, etc.). @@ -36,7 +37,7 @@ pub async fn inspect_client_hello(stream: Box) -> (Inspector let acceptor = tokio_rustls::LazyConfigAcceptor::new(Acceptor::default(), &mut inspector); let result = match acceptor.await { Ok(handshake) => match handshake.client_hello().server_name() { - Some(server_name) => InspectorResult::Success(server_name.to_owned()), + Some(server_name) => InspectorResult::Success(server_name.to_compact_string()), None => InspectorResult::SuccessNoSni, }, Err(e) => InspectorResult::TlsError(e),