Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply tapping logic only when taps are active #142

Merged
merged 14 commits into from Nov 30, 2018
2 changes: 0 additions & 2 deletions lib/stack/src/stack_per_request.rs
@@ -1,5 +1,3 @@
#![allow(dead_code)]

use futures::Poll;
use std::fmt;

Expand Down
45 changes: 29 additions & 16 deletions src/app/inbound.rs
@@ -1,4 +1,5 @@
use http;
use indexmap::IndexMap;
use std::fmt;
use std::net::SocketAddr;

Expand Down Expand Up @@ -33,10 +34,6 @@ impl classify::CanClassify for Endpoint {
}

impl Endpoint {
pub fn dst_name(&self) -> Option<&NameAddr> {
self.dst_name.as_ref()
}

fn target(&self) -> connect::Target {
let tls = Conditional::None(tls::ReasonForNoTls::InternalTraffic);
connect::Target::new(self.addr, tls)
Expand All @@ -49,13 +46,32 @@ impl settings::router::HasConnect for Endpoint {
}
}

impl From<Endpoint> for tap::Endpoint {
fn from(ep: Endpoint) -> Self {
tap::Endpoint {
direction: tap::Direction::In,
target: ep.target(),
labels: Default::default(),
}
impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
req.extensions().get::<Source>().map(|s| s.remote)
}

fn src_tls<B>(&self, req: &http::Request<B>) -> tls::Status {
req.extensions()
.get::<Source>()
.map(|s| s.tls_status)
.unwrap_or_else(|| Conditional::None(tls::ReasonForNoTls::Disabled))
}

fn dst_addr<B>(&self, _: &http::Request<B>) -> Option<SocketAddr> {
Some(self.addr)
}

fn dst_labels<B>(&self, _: &http::Request<B>) -> Option<&IndexMap<String, String>> {
None
}

fn dst_tls<B>(&self, _: &http::Request<B>) -> tls::Status {
Conditional::None(tls::ReasonForNoTls::InternalTraffic)
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
false
}
}

Expand Down Expand Up @@ -103,10 +119,10 @@ impl<A> router::Recognize<http::Request<A>> for RecognizeEndpoint {
}

pub mod orig_proto_downgrade {
use std::marker::PhantomData;
use http;
use proxy::http::orig_proto;
use proxy::server::Source;
use std::marker::PhantomData;
use svc;

#[derive(Debug)]
Expand Down Expand Up @@ -168,10 +184,7 @@ pub mod orig_proto_downgrade {

fn make(&self, target: &Source) -> Result<Self::Value, Self::Error> {
debug!("downgrading requests; source={:?}", target);
self
.inner
.make(&target)
.map(orig_proto::Downgrade::new)
self.inner.make(&target).map(orig_proto::Downgrade::new)
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/app/main.rs
Expand Up @@ -193,8 +193,7 @@ where
panic!("invalid DNS configuration: {:?}", e);
});

let tap_next_id = tap::NextId::default();
let (taps, observe) = control::Observe::new(100);
let (tap_layer, tap_grpc, tap_daemon) = tap::new();

let (ctl_http_metrics, ctl_http_report) = {
let (m, r) = http_metrics::new::<ControlLabels, Class>(config.metrics_retain_idle);
Expand Down Expand Up @@ -329,7 +328,7 @@ where
.push(buffer::layer())
.push(settings::router::layer::<Endpoint, _>())
.push(orig_proto_upgrade::layer())
.push(tap::layer(tap_next_id.clone(), taps.clone()))
.push(tap_layer.clone())
.push(metrics::layer::<_, classify::Response>(
endpoint_http_metrics,
))
Expand Down Expand Up @@ -478,7 +477,8 @@ where
let endpoint_router = client_stack
.push(buffer::layer())
.push(settings::router::layer::<Endpoint, _>())
.push(tap::layer(tap_next_id, taps))
.push(phantom_data::layer())
.push(tap_layer)
.push(http_metrics::layer::<_, classify::Response>(
endpoint_http_metrics,
))
Expand Down Expand Up @@ -593,19 +593,19 @@ where
let mut rt =
current_thread::Runtime::new().expect("initialize admin thread runtime");

let tap = serve_tap(control_listener, TapServer::new(observe));

let metrics = control::serve_http(
"metrics",
metrics_listener,
metrics::Serve::new(report),
);

// tap is already wrapped in a logging Future.
rt.spawn(tap);
// metrics_server is already wrapped in a logging Future.
rt.spawn(tap_daemon.map_err(|_| ()));
rt.spawn(serve_tap(control_listener, TapServer::new(tap_grpc)));

rt.spawn(metrics);

rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg));

rt.spawn(
::logging::admin()
.bg("resolver")
Expand Down
40 changes: 30 additions & 10 deletions src/app/outbound.rs
@@ -1,11 +1,12 @@
use std::fmt;
use indexmap::IndexMap;
use std::{fmt, net};

use control::destination::{Metadata, ProtocolHint};
use proxy::http::settings;
use svc;
use tap;
use transport::{connect, tls};
use NameAddr;
use {Conditional, NameAddr};

#[derive(Clone, Debug)]
pub struct Endpoint {
Expand Down Expand Up @@ -52,17 +53,36 @@ impl svc::watch::WithUpdate<tls::ConditionalClientConfig> for Endpoint {
}
}

impl From<Endpoint> for tap::Endpoint {
fn from(ep: Endpoint) -> Self {
// TODO add route labels...
tap::Endpoint {
direction: tap::Direction::Out,
labels: ep.metadata.labels().clone(),
target: ep.connect.clone(),
}
impl tap::Inspect for Endpoint {

fn src_addr<B>(&self, req: &http::Request<B>) -> Option<net::SocketAddr> {
use proxy::server::Source;

req.extensions().get::<Source>().map(|s| s.remote)
}

fn src_tls<B>(&self, _: &http::Request<B>) -> tls::Status {
Conditional::None(tls::ReasonForNoTls::InternalTraffic)
}

fn dst_addr<B>(&self, _: &http::Request<B>) -> Option<net::SocketAddr> {
Some(self.connect.addr)
}

fn dst_labels<B>(&self, _: &http::Request<B>) -> Option<&IndexMap<String, String>> {
Some(self.metadata.labels())
}

fn dst_tls<B>(&self, _: &http::Request<B>) -> tls::Status {
self.metadata.tls_status()
}

fn is_outbound<B>(&self, _: &http::Request<B>) -> bool {
true
}
}


pub mod discovery {
use futures::{Async, Poll};
use std::net::SocketAddr;
Expand Down
4 changes: 4 additions & 0 deletions src/control/destination/mod.rs
Expand Up @@ -222,4 +222,8 @@ impl Metadata {
pub fn tls_identity(&self) -> Conditional<&tls::Identity, tls::ReasonForNoIdentity> {
self.tls_identity.as_ref()
}

pub fn tls_status(&self) -> tls::Status {
self.tls_identity().map(|_| ())
}
}
3 changes: 0 additions & 3 deletions src/control/mod.rs
@@ -1,9 +1,6 @@
mod cache;
pub mod destination;
mod observe;
pub mod pb;
mod remote_stream;
mod serve_http;

pub use self::observe::Observe;
pub use self::serve_http::serve_http;
178 changes: 0 additions & 178 deletions src/control/observe.rs

This file was deleted.