Skip to content

Commit

Permalink
profiles: perform profile resolution for IP addresses (#626)
Browse files Browse the repository at this point in the history
linkerd/linkerd2#3916 added support for resolving service profiles for
IP addresses to the Destination service. This branch updates the proxy's
profiles client to look up profiles for IP addresses, rather than always
rejecting addresses that are IPs rather than DNS names.

Similarly to the Destination service-discovery client, a new
`LINKERD2_PROXY_DESTINATION_PROFILE_NETWORKS` enviroment variable is
used to configure a list of subnets to match which IPs the proxy will
look up profiles for. By default, this is empty. Since the logic for
filtering requests to a service based on IPs and DNS prefixes is now
identical between the profile and destination clients, I factored it out
into a new layer that's used for both.

Fixes linkerd/linkerd2#4877

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Aug 19, 2020
1 parent 170afe8 commit 6d58ee7
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 174 deletions.
1 change: 0 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,6 @@ dependencies = [
"http-body",
"indexmap",
"linkerd2-addr",
"linkerd2-dns",
"linkerd2-error",
"linkerd2-proxy-api",
"linkerd2-stack",
Expand Down
6 changes: 6 additions & 0 deletions linkerd/addr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ impl From<NameAddr> for Addr {
}
}

impl AsRef<Self> for Addr {
fn as_ref(&self) -> &Self {
self
}
}

// === impl NameAddr ===

impl NameAddr {
Expand Down
6 changes: 6 additions & 0 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,9 @@ impl std::fmt::Display for DiscoveryRejected {
}

impl std::error::Error for DiscoveryRejected {}

impl From<Addr> for DiscoveryRejected {
fn from(_: Addr) -> Self {
Self::new()
}
}
6 changes: 5 additions & 1 deletion linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

pub use crate::proxy::http;
use crate::transport::Connect;
use crate::{cache, Error};
use crate::{cache, request_filter, Error};
pub use linkerd2_buffer as buffer;
use linkerd2_concurrency_limit as concurrency_limit;
pub use linkerd2_stack::{self as stack, layer, NewService};
Expand Down Expand Up @@ -299,6 +299,10 @@ impl<S> Stack<S> {
self.push(stack::FallbackLayer::new(fallback).with_predicate(predicate))
}

pub fn push_request_filter<F: Clone>(self, filter: F) -> Stack<request_filter::Service<F, S>> {
self.push(request_filter::RequestFilterLayer::new(filter))
}

// pub fn box_http_request<B>(self) -> Stack<http::boxed::BoxRequest<S, B>>
// where
// B: hyper::body::HttpBody<Data = http::boxed::Data, Error = Error> + 'static,
Expand Down
6 changes: 6 additions & 0 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ impl<T: connect::ConnectAddr> connect::ConnectAddr for Target<T> {
}
}

impl<T> AsRef<Addr> for Target<T> {
fn as_ref(&self) -> &Addr {
&self.addr
}
}

// === impl HttpEndpoint ===

impl HttpEndpoint {
Expand Down
33 changes: 23 additions & 10 deletions linkerd/app/src/dst/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
mod permit;
mod resolve;

use http_body::Body as HttpBody;
use indexmap::IndexSet;
use linkerd2_app_core::{
config::{ControlAddr, ControlConfig},
dns, profiles, Error,
dns, profiles, request_filter, svc, Error,
};
use permit::PermitConfiguredDsts;
use std::time::Duration;
use tonic::{
body::{Body, BoxBody},
Expand All @@ -19,6 +21,7 @@ pub struct Config {
pub get_suffixes: IndexSet<dns::Suffix>,
pub get_networks: IndexSet<ipnet::IpNet>,
pub profile_suffixes: IndexSet<dns::Suffix>,
pub profile_networks: IndexSet<ipnet::IpNet>,
pub initial_profile_timeout: Duration,
}

Expand All @@ -27,8 +30,11 @@ pub struct Config {
/// The addr is preserved for logging.
pub struct Dst<S> {
pub addr: ControlAddr,
pub profiles: profiles::Client<S, resolve::BackoffUnlessInvalidArgument>,
pub resolve: resolve::Resolve<S>,
pub profiles: request_filter::Service<
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
profiles::Client<S, resolve::BackoffUnlessInvalidArgument>,
>,
pub resolve: request_filter::Service<PermitConfiguredDsts, resolve::Resolve<S>>,
}

impl Config {
Expand All @@ -42,21 +48,28 @@ impl Config {
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S::Future: Send,
{
let resolve = resolve::new(
let resolve = svc::stack(resolve::new(
svc.clone(),
self.get_suffixes,
self.get_networks,
&self.context,
self.control.connect.backoff,
);
))
.push_request_filter(PermitConfiguredDsts::new(
self.get_suffixes,
self.get_networks,
))
.into_inner();

let profiles = profiles::Client::new(
let profiles = svc::stack(profiles::Client::new(
svc,
resolve::BackoffUnlessInvalidArgument::from(self.control.connect.backoff),
self.initial_profile_timeout,
self.context,
self.profile_suffixes,
);
))
.push_request_filter(
PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks)
.with_error::<profiles::InvalidProfileAddr>(),
)
.into_inner();

Ok(Dst {
addr: self.control.addr,
Expand Down
78 changes: 78 additions & 0 deletions linkerd/app/src/dst/permit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use ipnet::{Contains, IpNet};
use linkerd2_app_core::{dns::Suffix, request_filter, Addr, DiscoveryRejected, Error};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::sync::Arc;

pub struct PermitConfiguredDsts<E = DiscoveryRejected> {
name_suffixes: Arc<Vec<Suffix>>,
networks: Arc<Vec<IpNet>>,
_error: PhantomData<fn(E)>,
}

// === impl PermitConfiguredDsts ===

impl PermitConfiguredDsts {
pub fn new(
name_suffixes: impl IntoIterator<Item = Suffix>,
nets: impl IntoIterator<Item = IpNet>,
) -> Self {
Self {
name_suffixes: Arc::new(name_suffixes.into_iter().collect()),
networks: Arc::new(nets.into_iter().collect()),
_error: PhantomData,
}
}

/// Configures the returned error type when the target is outside of the
/// configured set of destinations.
pub fn with_error<E>(self) -> PermitConfiguredDsts<E>
where
E: Into<Error> + From<Addr>,
{
PermitConfiguredDsts {
name_suffixes: self.name_suffixes,
networks: self.networks,
_error: PhantomData,
}
}
}

impl<E> Clone for PermitConfiguredDsts<E> {
fn clone(&self) -> Self {
Self {
name_suffixes: self.name_suffixes.clone(),
networks: self.networks.clone(),
_error: PhantomData,
}
}
}

impl<T, E> request_filter::RequestFilter<T> for PermitConfiguredDsts<E>
where
E: Into<Error> + From<Addr>,
T: AsRef<Addr>,
{
type Error = E;

fn filter(&self, t: T) -> Result<T, Self::Error> {
let addr = t.as_ref();
let permitted = match addr {
Addr::Name(ref name) => self
.name_suffixes
.iter()
.any(|suffix| suffix.contains(name.name())),
Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) {
(IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr),
(IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr),
_ => false,
}),
};

if permitted {
Ok(t)
} else {
Err(E::from(addr.clone()))
}
}
}
75 changes: 8 additions & 67 deletions linkerd/app/src/dst/resolve.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
pub use super::permit::PermitConfiguredDsts;
use http_body::Body as HttpBody;
use ipnet::{Contains, IpNet};
use linkerd2_app_core::{
dns::Suffix,
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
proxy::{
api_resolve as api,
resolve::{self, recover},
},
request_filter, Addr, DiscoveryRejected, Error, Recover,
DiscoveryRejected, Error, Recover,
};
use linkerd2_app_outbound::Target;
use std::net::IpAddr;
use std::sync::Arc;
use tonic::{
body::{Body, BoxBody},
client::GrpcService,
Code, Status,
};

pub type Resolve<S> = request_filter::Service<
PermitConfiguredDsts,
recover::Resolve<BackoffUnlessInvalidArgument, resolve::make_unpin::Resolve<api::Resolve<S>>>,
>;
pub type Resolve<S> =
recover::Resolve<BackoffUnlessInvalidArgument, resolve::make_unpin::Resolve<api::Resolve<S>>>;

pub fn new<S>(
service: S,
suffixes: impl IntoIterator<Item = Suffix>,
nets: impl IntoIterator<Item = IpNet>,
token: &str,
backoff: ExponentialBackoff,
) -> Resolve<S>
pub fn new<S>(service: S, token: &str, backoff: ExponentialBackoff) -> Resolve<S>
where
S: GrpcService<BoxBody> + Clone + Send + 'static,
S::Error: Into<Error> + Send,
Expand All @@ -38,62 +26,15 @@ where
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S::Future: Send,
{
request_filter::Service::new(
PermitConfiguredDsts::new(suffixes, nets),
recover::Resolve::new(
backoff.into(),
resolve::make_unpin(api::Resolve::new(service).with_context_token(token)),
),
recover::Resolve::new(
backoff.into(),
resolve::make_unpin(api::Resolve::new(service).with_context_token(token)),
)
}

#[derive(Clone, Debug)]
pub struct PermitConfiguredDsts {
name_suffixes: Arc<Vec<Suffix>>,
networks: Arc<Vec<IpNet>>,
}

#[derive(Clone, Debug, Default)]
pub struct BackoffUnlessInvalidArgument(ExponentialBackoff);

// === impl PermitConfiguredDsts ===

impl PermitConfiguredDsts {
fn new(
name_suffixes: impl IntoIterator<Item = Suffix>,
nets: impl IntoIterator<Item = IpNet>,
) -> Self {
Self {
name_suffixes: Arc::new(name_suffixes.into_iter().collect()),
networks: Arc::new(nets.into_iter().collect()),
}
}
}

impl<T> request_filter::RequestFilter<Target<T>> for PermitConfiguredDsts {
type Error = DiscoveryRejected;

fn filter(&self, t: Target<T>) -> Result<Target<T>, Self::Error> {
let permitted = match t.addr {
Addr::Name(ref name) => self
.name_suffixes
.iter()
.any(|suffix| suffix.contains(name.name())),
Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) {
(IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr),
(IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr),
_ => false,
}),
};

if permitted {
Ok(t)
} else {
Err(DiscoveryRejected::new())
}
}
}

// === impl BackoffUnlessInvalidArgument ===

impl From<ExponentialBackoff> for BackoffUnlessInvalidArgument {
Expand Down
12 changes: 12 additions & 0 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ pub const ENV_DESTINATION_GET_NETWORKS: &str = "LINKERD2_PROXY_DESTINATION_GET_N
/// If unspecified, a default value is used.
pub const ENV_DESTINATION_PROFILE_SUFFIXES: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_SUFFIXES";

/// Constrains which destination addresses may be used for profile/route discovery.
///
/// The value is a comma-separated list of networks that may be
/// resolved via the destination service.
///
/// If specified and empty, the destination service is not used for route discovery.
///
/// If unspecified, a default value is used.
pub const ENV_DESTINATION_PROFILE_NETWORKS: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_NETWORKS";

/// Constrains which destination names are permitted.
///
/// If unspecified or empty, no inbound gateway is configured.
Expand Down Expand Up @@ -331,6 +341,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
ENV_DESTINATION_PROFILE_SUFFIXES,
parse_dns_suffixes,
);
let dst_profile_networks = parse(strings, ENV_DESTINATION_PROFILE_NETWORKS, parse_networks);

let initial_stream_window_size = parse(strings, ENV_INITIAL_STREAM_WINDOW_SIZE, parse_number);
let initial_connection_window_size =
Expand Down Expand Up @@ -486,6 +497,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
get_networks: dst_get_networks?.unwrap_or_default(),
profile_suffixes: dst_profile_suffixes?
.unwrap_or(parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()),
profile_networks: dst_profile_networks?.unwrap_or_default(),
initial_profile_timeout: dst_profile_initial_timeout?
.unwrap_or(DEFAULT_DESTINATION_PROFILE_INITIAL_TIMEOUT),
control: ControlConfig {
Expand Down
21 changes: 21 additions & 0 deletions linkerd/request-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub trait RequestFilter<T> {
fn filter(&self, request: T) -> Result<T, Self::Error>;
}

#[derive(Clone, Debug)]
pub struct RequestFilterLayer<T> {
filter: T,
}

#[derive(Clone, Debug)]
pub struct Service<I, S> {
filter: I,
Expand All @@ -28,6 +33,22 @@ pub enum ResponseFuture<F> {
Rejected(Option<Error>),
}

// === impl Layer ===

impl<T: Clone> RequestFilterLayer<T> {
pub fn new(filter: T) -> Self {
Self { filter }
}
}

impl<T: Clone, S> tower::Layer<S> for RequestFilterLayer<T> {
type Service = Service<T, S>;

fn layer(&self, inner: S) -> Self::Service {
Service::new(self.filter.clone(), inner)
}
}

// === impl Service ===

impl<I, S> Service<I, S> {
Expand Down
1 change: 0 additions & 1 deletion linkerd/service-profiles/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ http = "0.2"
http-body = "0.3"
indexmap = "1.0"
linkerd2-addr = { path = "../addr" }
linkerd2-dns = { path = "../dns" }
linkerd2-error = { path = "../error" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13" }
linkerd2-stack = { path = "../stack" }
Expand Down
Loading

0 comments on commit 6d58ee7

Please sign in to comment.