Skip to content
2 changes: 1 addition & 1 deletion linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl From<Addr> for DiscoveryRejected {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct SkipByPort(std::sync::Arc<indexmap::IndexSet<u16>>);

impl From<indexmap::IndexSet<u16>> for SkipByPort {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Config {
// Forwards TCP streams that cannot be decoded as HTTP.
let tcp_forward = svc::stack(tcp_connect)
.push_make_thunk()
.push(svc::layer::mk(tcp::Forward::new));
.push_on_response(svc::layer::mk(tcp::Forward::new));

let http = DetectHttp::new(
h2_settings,
Expand Down
208 changes: 102 additions & 106 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#![deny(warnings, rust_2018_idioms)]

pub use self::endpoint::{HttpConcrete, HttpEndpoint, HttpLogical, LogicalPerRequest, TcpEndpoint};
use ::http::header::HOST;
use futures::{future, prelude::*};
use linkerd2_app_core::{
admit, classify,
Expand All @@ -21,8 +20,8 @@ use linkerd2_app_core::{
spans::SpanConverter,
svc::{self, NewService},
transport::{self, listen, tls},
Addr, Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
Addr, Conditional, DiscoveryRejected, Error, Never, ProxyMetrics, StackMetrics,
TraceContextLayer, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
};
use std::{
collections::HashMap,
Expand Down Expand Up @@ -53,6 +52,7 @@ pub struct Config {
impl Config {
pub fn build_tcp_connect(
&self,
prevent_loop: impl Into<PreventLoop>,
local_identity: tls::Conditional<identity::Local>,
metrics: &ProxyMetrics,
) -> impl tower::Service<
Expand All @@ -76,9 +76,75 @@ impl Config {
// Limits the time we wait for a connection to be established.
.push_timeout(self.proxy.connect.timeout)
.push(metrics.transport.layer_connect(TransportLabels))
.push(admit::AdmitLayer::new(prevent_loop.into()))
.into_inner()
}

/// Constructs a TCP load balancer.
pub fn build_tcp_balance<C, E, I>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any actual change here, or did this just move around?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just moved to be next to tcp_connect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like some of the parameters were also removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, fallback was extracted from tcp_balance c18b0ae

&self,
connect: C,
resolve: E,
) -> impl tower::Service<
SocketAddr,
Error = impl Into<Error>,
Future = impl Unpin + Send + 'static,
Response = impl tower::Service<
I,
Response = (),
Future = impl Unpin + Send + 'static,
Error = impl Into<Error>,
> + Unpin
+ Clone
+ Send
+ 'static,
> + Unpin
+ Clone
+ Send
+ 'static
where
C: tower::Service<TcpEndpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
C::Future: Unpin + Send,
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
E::Future: Unpin + Send,
E::Resolution: Unpin + Send,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
{
let ProxyConfig {
dispatch_timeout,
cache_max_idle_age,
buffer_capacity,
..
} = self.proxy;

svc::stack(connect)
.push_make_thunk()
.instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
.check_make_service::<TcpEndpoint, ()>()
.push(discover::resolve(map_endpoint::Resolve::new(
endpoint::FromMetadata,
resolve,
)))
.push(discover::buffer(1_000, cache_max_idle_age))
.push_map_target(Addr::from)
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.push_on_response(svc::layer::mk(tcp::Forward::new))
.into_new_service()
.check_new_service::<SocketAddr, I>()
.cache(
svc::layers().push_on_response(
svc::layers()
.push_failfast(dispatch_timeout)
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
),
)
.spawn_buffer(buffer_capacity)
.push_make_ready()
.instrument(|_: &_| info_span!("tcp"))
.check_make_service::<SocketAddr, I>()
}

pub fn build_dns_refine(
&self,
dns_resolver: dns::Resolver,
Expand Down Expand Up @@ -119,14 +185,13 @@ impl Config {

pub fn build_http_endpoint<B, C>(
&self,
prevent_loop: impl Into<PreventLoop>,
tcp_connect: C,
tap_layer: tap::Layer,
metrics: ProxyMetrics,
span_sink: Option<mpsc::Sender<oc::Span>>,
) -> impl tower::Service<
HttpEndpoint,
Error = Error,
Error = Never,
Future = impl Unpin + Send,
Response = impl tower::Service<
http::Request<B>,
Expand Down Expand Up @@ -169,18 +234,24 @@ impl Config {
// Re-establishes a connection when the client fails.
.push(reconnect::layer({
let backoff = self.proxy.connect.backoff.clone();
move |_| Ok(backoff.stream())
move |e: Error| {
if is_loop(&*e) {
Err(e)
} else {
Ok(backoff.stream())
}
}
}))
.push(admit::AdmitLayer::new(prevent_loop.into()))
.push(observability.clone())
.push(identity_headers.clone())
.push(http::override_authority::Layer::new(vec![
HOST.as_str(),
::http::header::HOST.as_str(),
CANONICAL_DST_HEADER,
]))
.push_on_response(svc::layers().box_http_response())
.check_service::<HttpEndpoint>()
.instrument(|e: &HttpEndpoint| info_span!("endpoint", peer.addr = %e.addr))
.into_inner()
}

pub fn build_http_router<B, E, S, R, P>(
Expand All @@ -205,12 +276,8 @@ impl Config {
where
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
E: tower::Service<HttpEndpoint, Error = Error, Response = S>
+ Unpin
+ Clone
+ Send
+ Sync
+ 'static,
E: tower::Service<HttpEndpoint, Response = S> + Unpin + Clone + Send + Sync + 'static,
E::Error: Into<Error>,
E::Future: Unpin + Send,
S: tower::Service<
http::Request<http::boxed::Payload>,
Expand Down Expand Up @@ -319,6 +386,7 @@ impl Config {
),
)
.spawn_buffer(buffer_capacity)
.push_make_ready()
.check_make_service::<HttpLogical, http::Request<_>>();

// Caches clients that bypass discovery/balancing.
Expand All @@ -343,7 +411,6 @@ impl Config {
// `forward` stack is used instead, bypassing load balancing, etc.
logical
.push_on_response(svc::layers().box_http_response())
.push_make_ready()
.push_fallback_with_predicate(
forward
.push_map_target(HttpEndpoint::from)
Expand All @@ -359,82 +426,6 @@ impl Config {
.into_inner()
}

/// Constructs a TCP load balancer.
pub fn build_tcp_balance<C, E, I>(
&self,
tcp_connect: &C,
resolve: E,
prevent_loop: PreventLoop,
metrics: &ProxyMetrics,
) -> impl tower::Service<
SocketAddr,
Error = Error,
Future = impl Unpin + Send + 'static,
Response = impl tower::Service<
I,
Response = (),
Future = impl Unpin + Send + 'static,
Error = Error,
> + Unpin
+ Clone
+ Send
+ 'static,
> + Unpin
+ Clone
+ Send
+ Sync
+ 'static
where
C: tower::Service<TcpEndpoint, Error = Error> + Unpin + Clone + Send + Sync + 'static,
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
C::Future: Unpin + Send,
E: Resolve<Addr, Endpoint = proxy::api_resolve::Metadata> + Unpin + Clone + Send + 'static,
E::Future: Unpin + Send,
E::Resolution: Unpin + Send,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static,
{
let ProxyConfig {
dispatch_timeout,
cache_max_idle_age,
buffer_capacity,
..
} = self.proxy;
svc::stack(tcp_connect.clone())
.push_make_thunk()
.instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity))
.push(admit::AdmitLayer::new(prevent_loop))
.check_make_service::<TcpEndpoint, ()>()
.push(discover::resolve(map_endpoint::Resolve::new(
endpoint::FromMetadata,
resolve,
)))
.push(discover::buffer(1_000, cache_max_idle_age))
.push_map_target(Addr::from)
.push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.push_fallback_with_predicate(
svc::stack(tcp_connect.clone())
.push_make_thunk()
.push(admit::AdmitLayer::new(prevent_loop))
.push_map_target(TcpEndpoint::from)
.instrument(|_: &SocketAddr| debug_span!("forward")),
is_discovery_rejected,
)
.into_new_service()
.check_new_service::<SocketAddr, ()>()
.cache(
svc::layers().push_on_response(
svc::layers()
.push_failfast(dispatch_timeout)
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
.push(metrics.stack.layer(stack_labels("tcp"))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened to the TCP metrics layer? did that get misplaced, or is it added somewhere else that i'm missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned in the PR description

The metrics have been removed from the TCP balancer stack (for now).

These are just stack metrics (for our own debugging, really). Will probably reintroduce this after the caching stops moving around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yup, i can't read. carry on!

),
)
.spawn_buffer(buffer_capacity)
.check_make_service::<SocketAddr, ()>()
.push(svc::layer::mk(tcp::Forward::new))
.instrument(|_: &SocketAddr| info_span!("tcp"))
}

pub async fn build_server<E, R, C, H, S>(
self,
listen_addr: std::net::SocketAddr,
Expand Down Expand Up @@ -483,13 +474,6 @@ impl Config {
..
} = self.proxy;
let canonicalize_timeout = self.canonicalize_timeout;
let prevent_loop = PreventLoop::from(listen_addr.port());

// Load balances TCP streams that cannot be decoded as HTTP.
let tcp_balance =
svc::stack(self.build_tcp_balance(&tcp_connect, resolve, prevent_loop, &metrics))
.push_map_target(|a: listen::Addrs| a.target_addr())
.into_inner();

let http_admit_request = svc::layers()
// Limits the number of in-flight requests.
Expand All @@ -510,7 +494,6 @@ impl Config {
// its canonical FQDN to use for routing.
.push(http::canonicalize::Layer::new(refine, canonicalize_timeout))
.check_make_service::<HttpLogical, http::Request<_>>()
.push_make_ready()
.push_timeout(dispatch_timeout)
.push(router::Layer::new(LogicalPerRequest::from))
.check_new_service::<listen::Addrs, http::Request<_>>()
Expand All @@ -529,6 +512,21 @@ impl Config {
.into_inner()
.into_make_service();

let tcp_forward = svc::stack(tcp_connect.clone())
.push_make_thunk()
.push_on_response(svc::layer::mk(tcp::Forward::new))
.instrument(|_: &TcpEndpoint| info_span!("forward"))
.check_service::<TcpEndpoint>();

// Load balances TCP streams that cannot be decoded as HTTP.
let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve))
.push_fallback_with_predicate(
svc::stack(tcp_forward.clone()).push_map_target(TcpEndpoint::from),
is_discovery_rejected,
)
.push_map_target(|a: listen::Addrs| a.target_addr())
.into_inner();

let http = http::DetectHttp::new(
h2_settings,
detect_protocol_timeout,
Expand All @@ -537,16 +535,10 @@ impl Config {
drain.clone(),
);

let tcp_forward = svc::stack(tcp_connect)
.push_make_thunk()
.push(svc::layer::mk(tcp::Forward::new))
.push(admit::AdmitLayer::new(prevent_loop))
.push_map_target(TcpEndpoint::from);

let accept = svc::stack(svc::stack::MakeSwitch::new(
skip_detect.clone(),
http,
tcp_forward,
tcp_forward.push_map_target(TcpEndpoint::from),
))
.push(metrics.transport.layer_accept(TransportLabels));

Expand Down Expand Up @@ -604,3 +596,7 @@ fn is_discovery_rejected(err: &Error) -> bool {
tracing::debug!(rejected, %err);
rejected
}

fn is_loop(err: &(dyn std::error::Error + 'static)) -> bool {
err.is::<prevent_loop::LoopPrevented>() || err.source().map(is_loop).unwrap_or(false)
}
Comment on lines +600 to +602
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/tioli: is_discovery_rejected has a debug! event in it, should this as well?

Loading