diff --git a/src/proxy.rs b/src/proxy.rs index c5f38edf5..27e73f225 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -386,6 +386,8 @@ pub enum Error { #[error("connection failed to drain within the timeout")] DrainTimeOut, + #[error("connection closed due to connection drain")] + ClosedFromDrain, #[error("dns: {0}")] Dns(#[from] ProtoError), diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index b87c8031f..32d9993e5 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -241,7 +241,7 @@ impl Inbound { }; let ds = proxy::guess_inbound_service(&rbac_ctx.conn, &for_host, upstream_service, &upstream); - let result_tracker = Arc::new(metrics::ConnectionResult::new( + let result_tracker = Box::new(metrics::ConnectionResult::new( rbac_ctx.conn.src, rbac_ctx.conn.dst, Some(hbone_addr), @@ -264,8 +264,7 @@ impl Inbound { { Ok(cg) => cg, Err(e) => { - Arc::into_inner(result_tracker) - .expect("arc is not shared yet") + result_tracker .record_with_flag(Err(e), metrics::ResponseFlags::AuthorizationPolicyDenied); return req.send_error(build_response(StatusCode::UNAUTHORIZED)); } @@ -287,7 +286,6 @@ impl Inbound { let h2_stream = req.send_response(build_response(StatusCode::OK)).await?; let send = async { - let result_tracker = result_tracker.clone(); if inbound_protocol == AppProtocol::PROXY { let Connection { src, src_identity, .. diff --git a/src/proxy/inbound_passthrough.rs b/src/proxy/inbound_passthrough.rs index 7b5228c45..2f9307569 100644 --- a/src/proxy/inbound_passthrough.rs +++ b/src/proxy/inbound_passthrough.rs @@ -205,7 +205,7 @@ impl InboundPassthrough { ..Default::default() }; let ds = proxy::guess_inbound_service(&rbac_ctx.conn, &None, upstream_service, &upstream); - let result_tracker = Arc::new(metrics::ConnectionResult::new( + let result_tracker = Box::new(metrics::ConnectionResult::new( source_addr, dest_addr, None, @@ -228,8 +228,7 @@ impl InboundPassthrough { { Ok(cg) => cg, Err(e) => { - Arc::into_inner(result_tracker) - .expect("arc is not shared yet") + result_tracker .record_with_flag(Err(e), metrics::ResponseFlags::AuthorizationPolicyDenied); return; } @@ -242,7 +241,6 @@ impl InboundPassthrough { }; let send = async { - let result_tracker = result_tracker.clone(); trace!(%source_addr, %dest_addr, component="inbound plaintext", "connecting..."); let outbound = super::freebind_connect(orig_src, dest_addr, pi.socket_factory.as_ref()) diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index 07a4f4cda..82f0cdb91 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -28,6 +28,7 @@ use tracing_core::field::Value; use crate::identity::Identity; use crate::metrics::DefaultedUnknown; +use crate::proxy; use crate::state::service::ServiceDescription; use crate::state::workload::Workload; @@ -313,6 +314,8 @@ pub struct ConnectionResult { recv: AtomicU64, // recv_metric records the number of bytes received on this connection to the aggregated metric counter recv_metric: Counter, + // Have we recorded yet? + recorded: bool, } // log_early_deny allows logging a connection is denied before we have enough information to emit proper @@ -439,6 +442,7 @@ impl ConnectionResult { sent_metric, recv, recv_metric, + recorded: false, } } @@ -452,6 +456,7 @@ impl ConnectionResult { self.recv_metric.inc_by(res); } + // Record our final result, with more details as a response flag. pub fn record_with_flag( mut self, res: Result<(), E>, @@ -462,8 +467,17 @@ impl ConnectionResult { } // Record our final result. - // Ideally, we would save and report from the increment_ functions instead of requiring a report here. - pub fn record(&self, res: Result<(), E>) { + pub fn record(mut self, res: Result<(), E>) { + self.record_internal(res) + } + + // Internal-only function that takes `&mut` to facilitate Drop. Public consumers must use consuming functions. + fn record_internal(&mut self, res: Result<(), E>) { + debug_assert!(!self.recorded, "record called multiple times"); + if self.recorded { + return; + } + self.recorded = true; let tl = &self.tl; // Unconditionally record the connection was closed @@ -508,6 +522,14 @@ impl ConnectionResult { } } +impl Drop for ConnectionResult { + fn drop(&mut self) { + if !self.recorded { + self.record_internal(Err(proxy::Error::ClosedFromDrain)) + } + } +} + fn to_value_owned(t: T) -> impl Value { t.to_string() }