Skip to content

Commit

Permalink
Keep track of logs for terminated connections (#1187)
Browse files Browse the repository at this point in the history
Fixes #1155

This ensures when we drain a process we emit metrics
  • Loading branch information
howardjohn committed Jun 28, 2024
1 parent caa4e76 commit 3114f81
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ pub enum Error {

#[error("connection failed to drain within the timeout")]
DrainTimeOut,
#[error("connection closed due to connection drain")]
ClosedFromDrain,

#[error("dns: {0}")]
Dns(#[from] ProtoError),
Expand Down
6 changes: 2 additions & 4 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl Inbound {
};
let ds =
proxy::guess_inbound_service(&rbac_ctx.conn, &for_host, upstream_service, &upstream);
let result_tracker = Arc::new(metrics::ConnectionResult::new(
let result_tracker = Box::new(metrics::ConnectionResult::new(
rbac_ctx.conn.src,
rbac_ctx.conn.dst,
Some(hbone_addr),
Expand All @@ -264,8 +264,7 @@ impl Inbound {
{
Ok(cg) => cg,
Err(e) => {
Arc::into_inner(result_tracker)
.expect("arc is not shared yet")
result_tracker
.record_with_flag(Err(e), metrics::ResponseFlags::AuthorizationPolicyDenied);
return req.send_error(build_response(StatusCode::UNAUTHORIZED));
}
Expand All @@ -287,7 +286,6 @@ impl Inbound {
let h2_stream = req.send_response(build_response(StatusCode::OK)).await?;

let send = async {
let result_tracker = result_tracker.clone();
if inbound_protocol == AppProtocol::PROXY {
let Connection {
src, src_identity, ..
Expand Down
6 changes: 2 additions & 4 deletions src/proxy/inbound_passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl InboundPassthrough {
..Default::default()
};
let ds = proxy::guess_inbound_service(&rbac_ctx.conn, &None, upstream_service, &upstream);
let result_tracker = Arc::new(metrics::ConnectionResult::new(
let result_tracker = Box::new(metrics::ConnectionResult::new(
source_addr,
dest_addr,
None,
Expand All @@ -228,8 +228,7 @@ impl InboundPassthrough {
{
Ok(cg) => cg,
Err(e) => {
Arc::into_inner(result_tracker)
.expect("arc is not shared yet")
result_tracker
.record_with_flag(Err(e), metrics::ResponseFlags::AuthorizationPolicyDenied);
return;
}
Expand All @@ -242,7 +241,6 @@ impl InboundPassthrough {
};

let send = async {
let result_tracker = result_tracker.clone();
trace!(%source_addr, %dest_addr, component="inbound plaintext", "connecting...");

let outbound = super::freebind_connect(orig_src, dest_addr, pi.socket_factory.as_ref())
Expand Down
26 changes: 24 additions & 2 deletions src/proxy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tracing_core::field::Value;

use crate::identity::Identity;
use crate::metrics::DefaultedUnknown;
use crate::proxy;

use crate::state::service::ServiceDescription;
use crate::state::workload::Workload;
Expand Down Expand Up @@ -313,6 +314,8 @@ pub struct ConnectionResult {
recv: AtomicU64,
// recv_metric records the number of bytes received on this connection to the aggregated metric counter
recv_metric: Counter,
// Have we recorded yet?
recorded: bool,
}

// log_early_deny allows logging a connection is denied before we have enough information to emit proper
Expand Down Expand Up @@ -439,6 +442,7 @@ impl ConnectionResult {
sent_metric,
recv,
recv_metric,
recorded: false,
}
}

Expand All @@ -452,6 +456,7 @@ impl ConnectionResult {
self.recv_metric.inc_by(res);
}

// Record our final result, with more details as a response flag.
pub fn record_with_flag<E: std::error::Error>(
mut self,
res: Result<(), E>,
Expand All @@ -462,8 +467,17 @@ impl ConnectionResult {
}

// Record our final result.
// Ideally, we would save and report from the increment_ functions instead of requiring a report here.
pub fn record<E: std::error::Error>(&self, res: Result<(), E>) {
pub fn record<E: std::error::Error>(mut self, res: Result<(), E>) {
self.record_internal(res)
}

// Internal-only function that takes `&mut` to facilitate Drop. Public consumers must use consuming functions.
fn record_internal<E: std::error::Error>(&mut self, res: Result<(), E>) {
debug_assert!(!self.recorded, "record called multiple times");
if self.recorded {
return;
}
self.recorded = true;
let tl = &self.tl;

// Unconditionally record the connection was closed
Expand Down Expand Up @@ -508,6 +522,14 @@ impl ConnectionResult {
}
}

impl Drop for ConnectionResult {
fn drop(&mut self) {
if !self.recorded {
self.record_internal(Err(proxy::Error::ClosedFromDrain))
}
}
}

fn to_value_owned<T: ToString>(t: T) -> impl Value {
t.to_string()
}
Expand Down

0 comments on commit 3114f81

Please sign in to comment.