Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: Exclude private ip errors from recorded metrics #7389

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 13 additions & 14 deletions proxy/src/bin/pg_sni_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use futures::future::Either;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::context::RequestMonitoring;
use proxy::proxy::run_until_cancelled;
use proxy::{BranchId, EndpointId, ProjectId};
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled};
use rustls::pki_types::PrivateKeyDer;
use tokio::net::TcpListener;

use anyhow::{anyhow, bail, ensure, Context};
use clap::Arg;
use futures::TryFutureExt;
use proxy::console::messages::MetricsAuxInfo;
use proxy::stream::{PqStream, Stream};

use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -204,6 +202,7 @@ async fn task_main(
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";

async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
ctx: &mut RequestMonitoring,
raw_stream: S,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,
Expand Down Expand Up @@ -233,7 +232,10 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
}

Ok(Stream::Tls {
tls: Box::new(raw.upgrade(tls_config).await?),
tls: Box::new(
raw.upgrade(tls_config, !ctx.has_private_peer_addr())
.await?,
),
tls_server_end_point,
})
}
Expand All @@ -256,7 +258,7 @@ async fn handle_client(
tls_server_end_point: TlsServerEndPoint,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let tls_stream = ssl_handshake(stream, tls_config, tls_server_end_point).await?;
let mut tls_stream = ssl_handshake(&mut ctx, stream, tls_config, tls_server_end_point).await?;

// Cut off first part of the SNI domain
// We receive required destination details in the format of
Expand All @@ -273,18 +275,15 @@ async fn handle_client(

info!("destination: {}", destination);

let client = tokio::net::TcpStream::connect(destination).await?;

let metrics_aux: MetricsAuxInfo = MetricsAuxInfo {
endpoint_id: (&EndpointId::from("")).into(),
project_id: (&ProjectId::from("")).into(),
branch_id: (&BranchId::from("")).into(),
cold_start_info: proxy::console::messages::ColdStartInfo::Unknown,
};
let mut client = tokio::net::TcpStream::connect(destination).await?;

// doesn't yet matter as pg-sni-router doesn't report analytics logs
ctx.set_success();
ctx.log();

proxy::proxy::passthrough::proxy_pass(tls_stream, client, metrics_aux).await
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = copy_bidirectional_client_compute(&mut tls_stream, &mut client).await?;

Ok(())
}
12 changes: 11 additions & 1 deletion proxy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,18 @@ impl RequestMonitoring {
self.auth_method = Some(auth_method);
}

pub fn has_private_peer_addr(&self) -> bool {
match self.peer_addr {
IpAddr::V4(ip) => ip.is_private(),
_ => false,
}
}

pub fn set_error_kind(&mut self, kind: ErrorKind) {
Metrics::get().proxy.errors_total.inc(kind);
// Do not record errors from the private address to metrics.
if !self.has_private_peer_addr() {
Metrics::get().proxy.errors_total.inc(kind);
}
if let Some(ep) = &self.endpoint_id {
let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
let label = metric.with_labels(kind);
Expand Down
4 changes: 3 additions & 1 deletion proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod handshake;
pub mod passthrough;
pub mod retry;
pub mod wake_compute;
pub use copy_bidirectional::copy_bidirectional_client_compute;

use crate::{
auth,
Expand Down Expand Up @@ -256,8 +257,9 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(

let tls = config.tls_config.as_ref();

let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(stream, mode.handshake_tls(tls));
let do_handshake = handshake(stream, mode.handshake_tls(tls), record_handshake_error);
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/copy_bidirectional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ where
}

#[tracing::instrument(skip_all)]
pub(super) async fn copy_bidirectional_client_compute<Client, Compute>(
pub async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,
) -> Result<(u64, u64), std::io::Error>
Expand Down
5 changes: 4 additions & 1 deletion proxy/src/proxy/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub enum HandshakeData<S> {
pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<&TlsConfig>,
record_handshake_error: bool,
) -> Result<HandshakeData<S>, HandshakeError> {
// Client may try upgrading to each protocol only once
let (mut tried_ssl, mut tried_gss) = (false, false);
Expand Down Expand Up @@ -95,7 +96,9 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
if !read_buf.is_empty() {
return Err(HandshakeError::EarlyData);
}
let tls_stream = raw.upgrade(tls.to_server_config()).await?;
let tls_stream = raw
.upgrade(tls.to_server_config(), record_handshake_error)
.await?;

let (_, tls_server_end_point) = tls
.cert_resolver
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn dummy_proxy(
auth: impl TestAuth + Send,
) -> anyhow::Result<()> {
let client = WithClientIp::new(client);
let mut stream = match handshake(client, tls.as_ref()).await? {
let mut stream = match handshake(client, tls.as_ref(), false).await? {
HandshakeData::Startup(stream, _) => stream,
HandshakeData::Cancel(_) => bail!("cancellation not supported"),
};
Expand Down
5 changes: 4 additions & 1 deletion proxy/src/proxy/tests/mitm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ async fn proxy_mitm(
tokio::spawn(async move {
// begin handshake with end_server
let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await;
let (end_client, startup) = match handshake(client1, Some(&server_config1)).await.unwrap() {
let (end_client, startup) = match handshake(client1, Some(&server_config1), false)
.await
.unwrap()
{
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(_) => panic!("cancellation not supported"),
};
Expand Down
12 changes: 10 additions & 2 deletions proxy/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ async fn connection_handler(
};

let peer_addr = peer.unwrap_or(peer_addr).ip();
let has_private_peer_addr = match peer_addr {
IpAddr::V4(ip) => ip.is_private(),
_ => false,
};
info!(?session_id, %peer_addr, "accepted new TCP connection");

// try upgrade to TLS, but with a timeout.
Expand All @@ -182,13 +186,17 @@ async fn connection_handler(
}
// The handshake failed
Ok(Err(e)) => {
Metrics::get().proxy.tls_handshake_failures.inc();
if !has_private_peer_addr {
Metrics::get().proxy.tls_handshake_failures.inc();
}
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
return;
}
// The handshake timed out
Err(e) => {
Metrics::get().proxy.tls_handshake_failures.inc();
if !has_private_peer_addr {
Metrics::get().proxy.tls_handshake_failures.inc();
}
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
return;
}
Expand Down
12 changes: 10 additions & 2 deletions proxy/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,20 @@ pub enum StreamUpgradeError {

impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
/// If possible, upgrade raw stream into a secure TLS-based stream.
pub async fn upgrade(self, cfg: Arc<ServerConfig>) -> Result<TlsStream<S>, StreamUpgradeError> {
pub async fn upgrade(
self,
cfg: Arc<ServerConfig>,
record_handshake_error: bool,
) -> Result<TlsStream<S>, StreamUpgradeError> {
match self {
Stream::Raw { raw } => Ok(tokio_rustls::TlsAcceptor::from(cfg)
.accept(raw)
.await
.inspect_err(|_| Metrics::get().proxy.tls_handshake_failures.inc())?),
.inspect_err(|_| {
if record_handshake_error {
Metrics::get().proxy.tls_handshake_failures.inc()
}
})?),
Stream::Tls { .. } => Err(StreamUpgradeError::AlreadyTls),
}
}
Expand Down