Skip to content

Commit

Permalink
Proxy reports Protocol field in telemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Jan 11, 2018
1 parent 901cbfd commit c03c40a
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 8 deletions.
8 changes: 6 additions & 2 deletions proxy/src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,17 @@ where
{
pub fn bind_service(&self, addr: &SocketAddr) -> Service<B> {
trace!("bind_service {}", addr);
let client_ctx = ctx::transport::Client::new(&self.ctx, addr);
let client_ctx = ctx::transport::Client::new(
&self.ctx,
addr,
&control::pb::proxy::common::Protocol::Http,
);

// Map a socket address to an HTTP/2.0 connection.
let connect = {
let c = Timeout::new(
transport::Connect::new(*addr, &self.executor),
self.connect_timeout,
self.connect_timeout,
&self.executor,
);

Expand Down
9 changes: 8 additions & 1 deletion proxy/src/control/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![cfg_attr(feature = "cargo-clippy", allow(clippy))]

use std::error::Error;
use std::fmt;
use std::{fmt, hash};
use std::sync::Arc;

use http;
Expand Down Expand Up @@ -396,6 +396,13 @@ impl<'a> From<&'a ::std::net::SocketAddr> for common::TcpAddress {
}
}

impl hash::Hash for common::Protocol {
// it's necessary to implement Hash for Protocol as it's a field on
// ctx::Transport, which derives Hash.
fn hash<H: hash::Hasher>(&self, state: &mut H) {
(*self as i32).hash(state)
}
}

fn pb_duration(d: &::std::time::Duration) -> ::prost_types::Duration {
let seconds = if d.as_secs() > ::std::i64::MAX as u64 {
Expand Down
13 changes: 12 additions & 1 deletion proxy/src/ctx/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;
use std::sync::Arc;

use control::pb::common::Protocol;

use ctx;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand All @@ -16,13 +18,15 @@ pub struct Server {
pub remote: SocketAddr,
pub local: SocketAddr,
pub orig_dst: Option<SocketAddr>,
pub protocol: Protocol,
}

/// Identifies a connection from the proxy to another process.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Client {
pub proxy: Arc<ctx::Proxy>,
pub remote: SocketAddr,
pub protocol: Protocol,
}

impl Ctx {
Expand All @@ -40,23 +44,30 @@ impl Server {
local: &SocketAddr,
remote: &SocketAddr,
orig_dst: &Option<SocketAddr>,
protocol: &Protocol,
) -> Arc<Server> {
let s = Server {
proxy: Arc::clone(proxy),
local: *local,
remote: *remote,
orig_dst: *orig_dst,
protocol: *protocol,
};

Arc::new(s)
}
}

impl Client {
pub fn new(proxy: &Arc<ctx::Proxy>, remote: &SocketAddr) -> Arc<Client> {
pub fn new(
proxy: &Arc<ctx::Proxy>,
remote: &SocketAddr,
protocol: &Protocol,
) -> Arc<Client> {
let c = Client {
proxy: Arc::clone(proxy),
remote: *remote,
protocol: *protocol,
};

Arc::new(c)
Expand Down
10 changes: 9 additions & 1 deletion proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,16 @@ where
let opened_at = Instant::now();
let orig_dst = connection.original_dst_addr();
let local_addr = connection.local_addr().unwrap_or(listen_addr);
// TODO: detect protocol.
let protocol = control::pb::common::Protocol::Http;
let srv_ctx =
ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst);
ctx::transport::Server::new(
&proxy_ctx,
&local_addr,
&remote_addr,
&orig_dst,
&protocol,
);

let io = sensors.accept(connection, opened_at, &srv_ctx);

Expand Down
22 changes: 19 additions & 3 deletions proxy/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use http;
use ordermap::OrderMap;

use control::pb::common::{HttpMethod, TcpAddress};
use control::pb::common::{HttpMethod, TcpAddress, Protocol};
use control::pb::proxy::telemetry::{
eos_ctx,
ClientTransport,
Expand Down Expand Up @@ -81,6 +81,9 @@ enum End {

#[derive(Debug, Default)]
struct TransportStats {
// TODO: we may want to have more distinction between transports, if
// we want to collect different metrics based on transport protocol?
protocol: Protocol,
connects: u32,
disconnects: Vec<TransportSummary>,
}
Expand Down Expand Up @@ -199,11 +202,11 @@ impl Metrics {
let source = s.remote.ip();
self.sources
.entry(source)
.or_insert_with(TransportStats::default)
.or_insert_with(|| { TransportStats::from(s.protocol) })
}
ctx::transport::Ctx::Client(ref c) => self.destinations
.entry(c.remote)
.or_insert_with(TransportStats::default),
.or_insert_with(|| { TransportStats::from(c.protocol) }),
}
}

Expand All @@ -216,6 +219,7 @@ impl Metrics {
source_ip: Some(ip.into()),
connects: stats.connects,
disconnects: stats.disconnects,
protocol: stats.protocol as i32,
})
}

Expand All @@ -227,6 +231,7 @@ impl Metrics {
}),
connects: stats.connects,
disconnects: stats.disconnects,
protocol: stats.protocol as i32,
});
}

Expand Down Expand Up @@ -371,6 +376,17 @@ fn dur_to_ms(dur: Duration) -> u64 {
})
}

// ===== impl TransportStats =====

impl From<Protocol> for TransportStats {
fn from(protocol: Protocol) -> TransportStats {
TransportStats {
protocol,
..Default::default()
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit c03c40a

Please sign in to comment.