Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inbound: allow netmasks in LINKERD2_PROXY_INBOUND_IPS #1164

Closed
wants to merge 14 commits into from
5 changes: 5 additions & 0 deletions linkerd/app/core/src/addr_match.rs
Expand Up @@ -114,6 +114,11 @@ impl IpMatch {
_ => false,
})
}

#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl fmt::Display for IpMatch {
Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/core/src/metrics/tcp_accept_errors.rs
@@ -1,7 +1,7 @@
use crate::{
metrics::{self, Counter, FmtMetrics},
svc,
transport::{labels, OrigDstAddr},
transport::{allow_ips, labels, OrigDstAddr},
};
use linkerd_error::Error;
use linkerd_error_metrics::{FmtLabels, LabelError, RecordError};
Expand Down Expand Up @@ -37,6 +37,7 @@ pub struct LabelAcceptErrors(());
pub enum AcceptErrors {
TlsDetectTimeout,
Io,
InvalidIp,
Other,
}

Expand Down Expand Up @@ -94,6 +95,8 @@ impl LabelError<Error> for LabelAcceptErrors {
} else if err.is::<std::io::Error>() {
// We ignore the error code because we want all labels to be consistent.
return AcceptErrors::Io;
} else if err.is::<allow_ips::InvalidIp>() {
return AcceptErrors::InvalidIp;
}
curr = err.source();
}
Expand All @@ -110,6 +113,7 @@ impl FmtLabels for AcceptErrors {
Self::TlsDetectTimeout => fmt::Display::fmt("error=\"tls_detect_timeout\"", f),
Self::Io => fmt::Display::fmt("error=\"io\"", f),
Self::Other => fmt::Display::fmt("error=\"other\"", f),
Self::InvalidIp => fmt::Display::fmt("error=\"invalid_ip\"", f),
}
}
}
46 changes: 46 additions & 0 deletions linkerd/app/core/src/transport/allow_ips.rs
@@ -0,0 +1,46 @@
use crate::{addr_match::IpMatch, svc, transport::OrigDstAddr};
use ipnet::IpNet;
use std::net::IpAddr;
use thiserror::Error;

#[derive(Clone, Debug, Default)]
pub struct AllowIps {
ips: IpMatch,
}

#[derive(Clone, Debug, Error)]
#[error("inbound connections are not allowed on this IP address ({ip})")]
pub(crate) struct InvalidIp {
ip: IpAddr,
}

impl<T> svc::stack::Predicate<T> for AllowIps
where
T: svc::Param<OrigDstAddr>,
{
type Request = T;

fn check(&mut self, target: T) -> Result<Self::Request, crate::Error> {
// Allowlist not configured.
if self.ips.is_empty() {
return Ok(target);
}

let OrigDstAddr(addr) = target.param();
let ip = addr.ip();
if self.ips.matches(ip) {
return Ok(target);
}

tracing::warn!(%addr, allowed = ?self.ips, "Target IP address not permitted");
Err(InvalidIp { ip }.into())
}
}

impl AllowIps {
pub fn new(nets: impl IntoIterator<Item = IpNet>) -> Self {
Self {
ips: IpMatch::new(nets),
}
}
}
3 changes: 2 additions & 1 deletion linkerd/app/core/src/transport/mod.rs
@@ -1,5 +1,6 @@
pub use linkerd_proxy_transport::*;

pub mod allow_ips;
pub mod labels;

pub type Metrics = metrics::Registry<labels::Key>;
pub use allow_ips::AllowIps;
8 changes: 6 additions & 2 deletions linkerd/app/inbound/src/direct.rs
Expand Up @@ -4,7 +4,7 @@ use linkerd_app_core::{
proxy::identity::LocalCrtKey,
svc::{self, Param},
tls,
transport::{self, metrics::SensorIo, ClientAddr, OrigDstAddr, Remote},
transport::{self, metrics::SensorIo, ClientAddr, OrigDstAddr, Remote, ServerAddr},
transport_header::{self, NewTransportHeaderServer, SessionProtocol, TransportHeader},
Conditional, Error, Infallible, NameAddr,
};
Expand Down Expand Up @@ -101,7 +101,11 @@ impl<N> Inbound<N> {
port,
name: None,
protocol: None,
} => Ok(svc::Either::A(TcpEndpoint { port })),
} => {
let target_addr =
Remote(ServerAddr(SocketAddr::from(([127, 0, 0, 1], port))));
Ok(svc::Either::A(TcpEndpoint { target_addr }))
}
TransportHeader {
port,
name: Some(name),
Expand Down
6 changes: 2 additions & 4 deletions linkerd/app/inbound/src/http/mod.rs
Expand Up @@ -249,7 +249,7 @@ pub mod fuzz_logic {
use libfuzzer_sys::arbitrary::Arbitrary;
use linkerd_app_core::{
io, proxy,
svc::{self, NewService, Param},
svc::{self, NewService},
tls,
transport::{ClientAddr, Remote, ServerAddr},
Conditional, NameAddr, ProxyRuntime,
Expand Down Expand Up @@ -356,9 +356,7 @@ pub mod fuzz_logic {
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
{
let connect = svc::stack(connect)
.push_map_target(|t: TcpEndpoint| {
Remote(ServerAddr(([127, 0, 0, 1], t.param()).into()))
})
.push_map_target(|t: TcpEndpoint| t.target_addr)
.into_inner();
Inbound::new(cfg, rt)
.with_stack(connect)
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/http/tests.rs
Expand Up @@ -10,7 +10,7 @@ use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response};
use linkerd_app_core::{
errors::L5D_PROXY_ERROR,
io, proxy,
svc::{self, NewService, Param},
svc::{self, NewService},
tls,
transport::{ClientAddr, Remote, ServerAddr},
Conditional, Error, NameAddr, ProxyRuntime,
Expand All @@ -30,7 +30,7 @@ where
{
// Mocks to_tcp_connect.
let connect = svc::stack(connect)
.push_map_target(|t: TcpEndpoint| Remote(ServerAddr(([127, 0, 0, 1], t.param()).into())))
.push_map_target(|ep: TcpEndpoint| ep.target_addr)
.push_connect_timeout(cfg.proxy.connect.timeout)
.into_inner();

Expand Down
12 changes: 8 additions & 4 deletions linkerd/app/inbound/src/lib.rs
Expand Up @@ -27,7 +27,9 @@ use linkerd_app_core::{
detect, drain, io, metrics, profiles,
proxy::tcp,
serve, svc, tls,
transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
transport::{
self, allow_ips::AllowIps, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr,
},
Error, Infallible, NameMatch, ProxyRuntime,
};
use std::{convert::TryFrom, fmt::Debug, future::Future, time::Duration};
Expand All @@ -40,6 +42,7 @@ pub struct Config {
pub require_identity_for_inbound_ports: RequireIdentityForPorts,
pub disable_protocol_detection_for_ports: PortSet,
pub profile_idle_timeout: Duration,
pub allowed_ips: AllowIps,
}

#[derive(Clone)]
Expand Down Expand Up @@ -108,7 +111,7 @@ impl Inbound<()> {
> + Clone,
>
where
T: svc::Param<u16> + 'static,
T: svc::Param<Remote<ServerAddr>> + 'static,
{
self.map_stack(|config, _, _| {
// Establishes connections to remote peers (for both TCP
Expand All @@ -120,7 +123,7 @@ impl Inbound<()> {
} = config.proxy.connect;

svc::stack(transport::ConnectTcp::new(*keepalive))
.push_map_target(|t: T| Remote(ServerAddr(([127, 0, 0, 1], t.param()).into())))
.push_map_target(|t: T| t.param())
// Limits the time we wait for a connection to be established.
.push_connect_timeout(*timeout)
.push(svc::stack::BoxFuture::layer())
Expand Down Expand Up @@ -298,7 +301,7 @@ where
.push_on_response(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
})
.map_stack(|_, rt, accept| {
.map_stack(|cfg, rt, accept| {
accept
.push_switch(
PreventLoop::from(server_port).to_switch(),
Expand All @@ -308,6 +311,7 @@ where
let OrigDstAddr(target_addr) = a.param();
info_span!("server", port = target_addr.port())
})
.push_request_filter(cfg.allowed_ips.clone())
.push(rt.metrics.tcp_accept_errors.layer())
.push_on_response(svc::BoxService::layer())
.push(svc::BoxNewService::layer())
Expand Down
6 changes: 4 additions & 2 deletions linkerd/app/inbound/src/prevent_loop.rs
Expand Up @@ -4,6 +4,7 @@ use linkerd_app_core::{
transport::addrs::OrigDstAddr,
Error,
};
use std::net::SocketAddr;
use thiserror::Error;

/// A connection policy that drops
Expand Down Expand Up @@ -32,8 +33,9 @@ impl Predicate<TcpEndpoint> for PreventLoop {
type Request = TcpEndpoint;

fn check(&mut self, t: TcpEndpoint) -> Result<TcpEndpoint, Error> {
if t.port == self.port {
Err(LoopPrevented { port: t.port }.into())
let port = SocketAddr::from(t.target_addr).port();
if port == self.port {
Err(LoopPrevented { port }.into())
} else {
Ok(t)
}
Expand Down
26 changes: 14 additions & 12 deletions linkerd/app/inbound/src/target.rs
Expand Up @@ -41,14 +41,14 @@ pub struct Logical {

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HttpEndpoint {
pub port: u16,
pub target_addr: Remote<ServerAddr>,
pub settings: http::client::Settings,
pub tls: tls::ConditionalServerTls,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct TcpEndpoint {
pub port: u16,
pub target_addr: Remote<ServerAddr>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -157,7 +157,7 @@ impl Param<http::client::Settings> for HttpEndpoint {
impl From<Target> for HttpEndpoint {
fn from(target: Target) -> Self {
Self {
port: target.target_addr.port(),
target_addr: Remote(ServerAddr(target.target_addr)),
settings: target.http_version.into(),
tls: target.tls,
}
Expand All @@ -169,26 +169,28 @@ impl From<Target> for HttpEndpoint {
impl From<TcpAccept> for TcpEndpoint {
fn from(tcp: TcpAccept) -> Self {
Self {
port: tcp.target_addr.port(),
target_addr: Remote(ServerAddr(tcp.target_addr)),
}
}
}

impl From<(TransportHeader, TcpAccept)> for TcpEndpoint {
fn from((header, _): (TransportHeader, TcpAccept)) -> Self {
Self { port: header.port }
fn from((header, accept): (TransportHeader, TcpAccept)) -> Self {
let ip = accept.target_addr.ip();
let target_addr = Remote(ServerAddr(SocketAddr::from((ip, header.port))));
Self { target_addr }
}
}

impl From<HttpEndpoint> for TcpEndpoint {
fn from(h: HttpEndpoint) -> Self {
Self { port: h.port }
fn from(HttpEndpoint { target_addr, .. }: HttpEndpoint) -> Self {
Self { target_addr }
}
}

impl Param<u16> for TcpEndpoint {
fn param(&self) -> u16 {
self.port
impl Param<Remote<ServerAddr>> for TcpEndpoint {
fn param(&self) -> Remote<ServerAddr> {
self.target_addr
}
}

Expand All @@ -202,7 +204,7 @@ impl Param<transport::labels::Key> for TcpEndpoint {
#[cfg(test)]
impl From<TcpEndpoint> for SocketAddr {
fn from(ep: TcpEndpoint) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], ep.port))
ep.target_addr.into()
}
}

Expand Down
1 change: 1 addition & 0 deletions linkerd/app/inbound/src/test_util.rs
Expand Up @@ -50,6 +50,7 @@ pub fn default_config() -> Config {
require_identity_for_inbound_ports: RequireIdentityForPorts::from(None),
disable_protocol_detection_for_ports: Default::default(),
profile_idle_timeout: Duration::from_millis(500),
allowed_ips: Default::default(),
}
}

Expand Down
13 changes: 0 additions & 13 deletions linkerd/app/integration/src/proxy.rs
Expand Up @@ -151,19 +151,6 @@ impl Proxy {
}
}

/// Adjust the server's 'addr'. This won't actually re-bind the server,
/// it will just affect what the proxy think is the so_original_dst.
///
/// This address is bogus, but the proxy should properly ignored the IP
/// and only use the port combined with 127.0.0.1 to still connect to
/// the server.
pub fn inbound_fuzz_addr(self, mut s: server::Listening) -> Self {
let old_addr = s.addr;
let new_addr = ([10, 1, 2, 3], old_addr.port()).into();
s.addr = new_addr;
self.inbound(s)
}

pub fn outbound(mut self, s: server::Listening) -> Self {
self.outbound = MockOrigDst::Addr(s.addr);
self.outbound_server = Some(s);
Expand Down