From 7c81fa24ed4d27870ccc3115b9396f6ada21386e Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 20:02:00 +0000 Subject: [PATCH 01/11] Remove unneeded make layer for `Forward` --- linkerd/app/inbound/src/lib.rs | 2 +- linkerd/app/outbound/src/lib.rs | 9 +++---- linkerd/proxy/tcp/src/forward.rs | 45 ++++++++------------------------ 3 files changed, 16 insertions(+), 40 deletions(-) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 3bb64bbf48..066ca81002 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -370,7 +370,7 @@ impl Config { // Forwards TCP streams that cannot be decoded as HTTP. let tcp_forward = svc::stack(tcp_connect) .push_make_thunk() - .push(svc::layer::mk(tcp::Forward::new)) + .push_on_response(svc::layer::mk(tcp::Forward::new)) .push(admit::AdmitLayer::new(prevent_loop.into())); let http = DetectHttp::new( diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index db8674c82c..070d79eb5a 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -368,13 +368,13 @@ impl Config { metrics: &ProxyMetrics, ) -> impl tower::Service< SocketAddr, - Error = Error, + Error = impl Into, Future = impl Unpin + Send + 'static, Response = impl tower::Service< I, Response = (), Future = impl Unpin + Send + 'static, - Error = Error, + Error = impl Into, > + Unpin + Clone + Send @@ -382,7 +382,6 @@ impl Config { > + Unpin + Clone + Send - + Sync + 'static where C: tower::Service + Unpin + Clone + Send + Sync + 'static, @@ -431,7 +430,7 @@ impl Config { ) .spawn_buffer(buffer_capacity) .check_make_service::() - .push(svc::layer::mk(tcp::Forward::new)) + .push_on_response(svc::layer::mk(tcp::Forward::new)) .instrument(|a: &SocketAddr| info_span!("tcp", dst = %a)) } @@ -540,7 +539,7 @@ impl Config { let tcp_forward = svc::stack(tcp_connect) .push_make_thunk() - .push(svc::layer::mk(tcp::Forward::new)) + .push_on_response(svc::layer::mk(tcp::Forward::new)) .push(admit::AdmitLayer::new(prevent_loop)) .push_map_target(TcpEndpoint::from); diff --git a/linkerd/proxy/tcp/src/forward.rs b/linkerd/proxy/tcp/src/forward.rs index ddebf41b73..d2ecb2a089 100644 --- a/linkerd/proxy/tcp/src/forward.rs +++ b/linkerd/proxy/tcp/src/forward.rs @@ -1,4 +1,4 @@ -use futures::{future, prelude::*}; +use futures::prelude::*; use linkerd2_duplex::Duplex; use linkerd2_error::Error; use std::{ @@ -10,41 +10,17 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tower::Service; #[derive(Clone, Debug)] -pub struct Forward { - make_connect: M, -} - -#[derive(Clone, Debug)] -pub struct Accept { +pub struct Forward { connect: C, } impl Forward { - pub fn new(make_connect: M) -> Self { - Self { make_connect } - } -} - -impl Service for Forward -where - M: Service, -{ - type Response = Accept; - type Error = M::Error; - type Future = future::MapOk Accept>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.make_connect.poll_ready(cx) - } - - fn call(&mut self, target: T) -> Self::Future { - self.make_connect - .call(target) - .map_ok(|connect| Accept { connect }) + pub fn new(connect: M) -> Self { + Self { connect } } } -impl Service for Accept +impl Service for Forward where I: AsyncRead + AsyncWrite + Send + Unpin + 'static, C: tower::Service<()> + Send + 'static, @@ -62,10 +38,11 @@ where } fn call(&mut self, src_io: I) -> Self::Future { - let connect = self.connect.call(()).err_into::(); - Box::pin(async move { - let dst_io = connect.await?; - Duplex::new(src_io, dst_io).err_into::().await - }) + Box::pin( + self.connect + .call(()) + .err_into::() + .and_then(|dst_io| Duplex::new(src_io, dst_io).err_into::()), + ) } } From 27a17d6853377ec084dcfaba4c86f9a0f40c92de Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 21:34:37 +0000 Subject: [PATCH 02/11] outbound: Do all loop detection in the connect stack --- linkerd/app/outbound/src/lib.rs | 46 ++++++++++++++++----------------- linkerd/app/src/lib.rs | 8 +++--- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 070d79eb5a..19baf691ef 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -6,7 +6,6 @@ #![deny(warnings, rust_2018_idioms)] pub use self::endpoint::{HttpConcrete, HttpEndpoint, HttpLogical, LogicalPerRequest, TcpEndpoint}; -use ::http::header::HOST; use futures::{future, prelude::*}; use linkerd2_app_core::{ admit, classify, @@ -21,8 +20,8 @@ use linkerd2_app_core::{ spans::SpanConverter, svc::{self, NewService}, transport::{self, listen, tls}, - Addr, Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer, - CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID, + Addr, Conditional, DiscoveryRejected, Error, Never, ProxyMetrics, StackMetrics, + TraceContextLayer, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID, }; use std::{ collections::HashMap, @@ -53,6 +52,7 @@ pub struct Config { impl Config { pub fn build_tcp_connect( &self, + prevent_loop: impl Into, local_identity: tls::Conditional, metrics: &ProxyMetrics, ) -> impl tower::Service< @@ -76,6 +76,7 @@ impl Config { // Limits the time we wait for a connection to be established. .push_timeout(self.proxy.connect.timeout) .push(metrics.transport.layer_connect(TransportLabels)) + .push(admit::AdmitLayer::new(prevent_loop.into())) .into_inner() } @@ -119,14 +120,13 @@ impl Config { pub fn build_http_endpoint( &self, - prevent_loop: impl Into, tcp_connect: C, tap_layer: tap::Layer, metrics: ProxyMetrics, span_sink: Option>, ) -> impl tower::Service< HttpEndpoint, - Error = Error, + Error = Never, Future = impl Unpin + Send, Response = impl tower::Service< http::Request, @@ -169,18 +169,24 @@ impl Config { // Re-establishes a connection when the client fails. .push(reconnect::layer({ let backoff = self.proxy.connect.backoff.clone(); - move |_| Ok(backoff.stream()) + move |e: Error| { + if is_loop(&*e) { + Err(e) + } else { + Ok(backoff.stream()) + } + } })) - .push(admit::AdmitLayer::new(prevent_loop.into())) .push(observability.clone()) .push(identity_headers.clone()) .push(http::override_authority::Layer::new(vec![ - HOST.as_str(), + ::http::header::HOST.as_str(), CANONICAL_DST_HEADER, ])) .push_on_response(svc::layers().box_http_response()) .check_service::() .instrument(|e: &HttpEndpoint| info_span!("endpoint", peer.addr = %e.addr)) + .into_inner() } pub fn build_http_router( @@ -205,12 +211,8 @@ impl Config { where B: http::HttpBody + std::fmt::Debug + Default + Send + 'static, B::Data: Send + 'static, - E: tower::Service - + Unpin - + Clone - + Send - + Sync - + 'static, + E: tower::Service + Unpin + Clone + Send + Sync + 'static, + E::Error: Into, E::Future: Unpin + Send, S: tower::Service< http::Request, @@ -364,7 +366,6 @@ impl Config { &self, tcp_connect: &C, resolve: E, - prevent_loop: PreventLoop, metrics: &ProxyMetrics, ) -> impl tower::Service< SocketAddr, @@ -401,7 +402,6 @@ impl Config { svc::stack(tcp_connect.clone()) .push_make_thunk() .instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity)) - .push(admit::AdmitLayer::new(prevent_loop)) .check_make_service::() .push(discover::resolve(map_endpoint::Resolve::new( endpoint::FromMetadata, @@ -413,7 +413,6 @@ impl Config { .push_fallback_with_predicate( svc::stack(tcp_connect.clone()) .push_make_thunk() - .push(admit::AdmitLayer::new(prevent_loop)) .push_map_target(TcpEndpoint::from) .instrument(|_: &SocketAddr| info_span!("forward")), is_discovery_rejected, @@ -482,13 +481,11 @@ impl Config { .. } = self.proxy; let canonicalize_timeout = self.canonicalize_timeout; - let prevent_loop = PreventLoop::from(listen_addr.port()); // Load balances TCP streams that cannot be decoded as HTTP. - let tcp_balance = - svc::stack(self.build_tcp_balance(&tcp_connect, resolve, prevent_loop, &metrics)) - .push_map_target(|a: listen::Addrs| a.target_addr()) - .into_inner(); + let tcp_balance = svc::stack(self.build_tcp_balance(&tcp_connect, resolve, &metrics)) + .push_map_target(|a: listen::Addrs| a.target_addr()) + .into_inner(); let http_admit_request = svc::layers() // Limits the number of in-flight requests. @@ -540,7 +537,6 @@ impl Config { let tcp_forward = svc::stack(tcp_connect) .push_make_thunk() .push_on_response(svc::layer::mk(tcp::Forward::new)) - .push(admit::AdmitLayer::new(prevent_loop)) .push_map_target(TcpEndpoint::from); let accept = svc::stack(svc::stack::MakeSwitch::new( @@ -604,3 +600,7 @@ fn is_discovery_rejected(err: &Error) -> bool { tracing::debug!(rejected, %err); rejected } + +fn is_loop(err: &(dyn std::error::Error + 'static)) -> bool { + err.is::() || err.source().map(is_loop).unwrap_or(false) +} diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 4f571174ca..45a27cbe01 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -127,13 +127,15 @@ impl Config { let oc_span_sink = oc_collector.span_sink(); let start_proxy = Box::pin(async move { - let outbound_connect = - outbound.build_tcp_connect(local_identity.clone(), &outbound_metrics); + let outbound_connect = outbound.build_tcp_connect( + outbound_addr.port(), + local_identity.clone(), + &outbound_metrics, + ); let refine = outbound.build_dns_refine(resolver, &outbound_metrics.stack); let outbound_http_endpoint = outbound.build_http_endpoint( - outbound_addr.port(), outbound_connect.clone(), tap_layer.clone(), outbound_metrics.clone(), From e67f1819120750d7f86421c501c89c8b518dacab Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 21:42:48 +0000 Subject: [PATCH 03/11] Move tcp_balance near tcp_connect --- linkerd/app/outbound/src/lib.rs | 144 ++++++++++++++++---------------- 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 19baf691ef..482ca6b29d 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -80,6 +80,78 @@ impl Config { .into_inner() } + /// Constructs a TCP load balancer. + pub fn build_tcp_balance( + &self, + tcp_connect: &C, + resolve: E, + metrics: &ProxyMetrics, + ) -> impl tower::Service< + SocketAddr, + Error = impl Into, + Future = impl Unpin + Send + 'static, + Response = impl tower::Service< + I, + Response = (), + Future = impl Unpin + Send + 'static, + Error = impl Into, + > + Unpin + + Clone + + Send + + 'static, + > + Unpin + + Clone + + Send + + 'static + where + C: tower::Service + Unpin + Clone + Send + Sync + 'static, + C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, + C::Future: Unpin + Send, + E: Resolve + Unpin + Clone + Send + 'static, + E::Future: Unpin + Send, + E::Resolution: Unpin + Send, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static, + { + let ProxyConfig { + dispatch_timeout, + cache_max_idle_age, + buffer_capacity, + .. + } = self.proxy; + svc::stack(tcp_connect.clone()) + .push_make_thunk() + .instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity)) + .check_make_service::() + .push(discover::resolve(map_endpoint::Resolve::new( + endpoint::FromMetadata, + resolve, + ))) + .push(discover::buffer(1_000, cache_max_idle_age)) + .push_map_target(Addr::from) + .push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)) + .push_fallback_with_predicate( + svc::stack(tcp_connect.clone()) + .push_make_thunk() + .push_map_target(TcpEndpoint::from) + .instrument(|_: &SocketAddr| info_span!("forward")), + is_discovery_rejected, + ) + .into_new_service() + .check_new_service::() + .cache( + svc::layers().push_on_response( + svc::layers() + .push_failfast(dispatch_timeout) + .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age) + .push(metrics.stack.layer(stack_labels("tcp"))), + ), + ) + .spawn_buffer(buffer_capacity) + .check_make_service::() + .push_on_response(svc::layer::mk(tcp::Forward::new)) + .instrument(|a: &SocketAddr| info_span!("tcp", dst = %a)) + } + pub fn build_dns_refine( &self, dns_resolver: dns::Resolver, @@ -361,78 +433,6 @@ impl Config { .into_inner() } - /// Constructs a TCP load balancer. - pub fn build_tcp_balance( - &self, - tcp_connect: &C, - resolve: E, - metrics: &ProxyMetrics, - ) -> impl tower::Service< - SocketAddr, - Error = impl Into, - Future = impl Unpin + Send + 'static, - Response = impl tower::Service< - I, - Response = (), - Future = impl Unpin + Send + 'static, - Error = impl Into, - > + Unpin - + Clone - + Send - + 'static, - > + Unpin - + Clone - + Send - + 'static - where - C: tower::Service + Unpin + Clone + Send + Sync + 'static, - C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, - C::Future: Unpin + Send, - E: Resolve + Unpin + Clone + Send + 'static, - E::Future: Unpin + Send, - E::Resolution: Unpin + Send, - I: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::fmt::Debug + Unpin + Send + 'static, - { - let ProxyConfig { - dispatch_timeout, - cache_max_idle_age, - buffer_capacity, - .. - } = self.proxy; - svc::stack(tcp_connect.clone()) - .push_make_thunk() - .instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity)) - .check_make_service::() - .push(discover::resolve(map_endpoint::Resolve::new( - endpoint::FromMetadata, - resolve, - ))) - .push(discover::buffer(1_000, cache_max_idle_age)) - .push_map_target(Addr::from) - .push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)) - .push_fallback_with_predicate( - svc::stack(tcp_connect.clone()) - .push_make_thunk() - .push_map_target(TcpEndpoint::from) - .instrument(|_: &SocketAddr| info_span!("forward")), - is_discovery_rejected, - ) - .into_new_service() - .check_new_service::() - .cache( - svc::layers().push_on_response( - svc::layers() - .push_failfast(dispatch_timeout) - .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age) - .push(metrics.stack.layer(stack_labels("tcp"))), - ), - ) - .spawn_buffer(buffer_capacity) - .check_make_service::() - .push_on_response(svc::layer::mk(tcp::Forward::new)) - .instrument(|a: &SocketAddr| info_span!("tcp", dst = %a)) - } - pub async fn build_server( self, listen_addr: std::net::SocketAddr, From f53f25977861ef3938e1f99d056d5d7a6d7f12c9 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 21:46:29 +0000 Subject: [PATCH 04/11] Move things around for brain organizationals --- linkerd/app/outbound/src/lib.rs | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 482ca6b29d..0db5d94dda 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -83,7 +83,7 @@ impl Config { /// Constructs a TCP load balancer. pub fn build_tcp_balance( &self, - tcp_connect: &C, + connect: C, resolve: E, metrics: &ProxyMetrics, ) -> impl tower::Service< @@ -118,7 +118,14 @@ impl Config { buffer_capacity, .. } = self.proxy; - svc::stack(tcp_connect.clone()) + + let forward = svc::stack(connect.clone()) + .push_make_thunk() + .push_on_response(svc::layer::mk(tcp::Forward::new)) + .push_map_target(TcpEndpoint::from) + .instrument(|_: &SocketAddr| info_span!("forward")); + + svc::stack(connect) .push_make_thunk() .instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity)) .check_make_service::() @@ -129,15 +136,13 @@ impl Config { .push(discover::buffer(1_000, cache_max_idle_age)) .push_map_target(Addr::from) .push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)) + .push_on_response(svc::layer::mk(tcp::Forward::new)) .push_fallback_with_predicate( - svc::stack(tcp_connect.clone()) - .push_make_thunk() - .push_map_target(TcpEndpoint::from) - .instrument(|_: &SocketAddr| info_span!("forward")), + forward, is_discovery_rejected, ) .into_new_service() - .check_new_service::() + .check_new_service::() .cache( svc::layers().push_on_response( svc::layers() @@ -147,8 +152,7 @@ impl Config { ), ) .spawn_buffer(buffer_capacity) - .check_make_service::() - .push_on_response(svc::layer::mk(tcp::Forward::new)) + .check_make_service::() .instrument(|a: &SocketAddr| info_span!("tcp", dst = %a)) } @@ -482,11 +486,6 @@ impl Config { } = self.proxy; let canonicalize_timeout = self.canonicalize_timeout; - // Load balances TCP streams that cannot be decoded as HTTP. - let tcp_balance = svc::stack(self.build_tcp_balance(&tcp_connect, resolve, &metrics)) - .push_map_target(|a: listen::Addrs| a.target_addr()) - .into_inner(); - let http_admit_request = svc::layers() // Limits the number of in-flight requests. .push_concurrency_limit(max_in_flight_requests) @@ -526,6 +525,12 @@ impl Config { .into_inner() .into_make_service(); + // Load balances TCP streams that cannot be decoded as HTTP. + let tcp_balance = + svc::stack(self.build_tcp_balance(tcp_connect.clone(), resolve, &metrics)) + .push_map_target(|a: listen::Addrs| a.target_addr()) + .into_inner(); + let http = http::DetectHttp::new( h2_settings, detect_protocol_timeout, From c18b0aed9c92844a105ebe96e1059892b6bd1518 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 22:30:27 +0000 Subject: [PATCH 05/11] Take fallback forwarding out of the tcp balancer so forward logic is only defined once --- linkerd/app/outbound/src/lib.rs | 37 +++++++++++++------------------ linkerd/app/outbound/src/tests.rs | 20 ++++++++--------- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 0db5d94dda..055cda7d1b 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -85,7 +85,6 @@ impl Config { &self, connect: C, resolve: E, - metrics: &ProxyMetrics, ) -> impl tower::Service< SocketAddr, Error = impl Into, @@ -119,12 +118,6 @@ impl Config { .. } = self.proxy; - let forward = svc::stack(connect.clone()) - .push_make_thunk() - .push_on_response(svc::layer::mk(tcp::Forward::new)) - .push_map_target(TcpEndpoint::from) - .instrument(|_: &SocketAddr| info_span!("forward")); - svc::stack(connect) .push_make_thunk() .instrument(|t: &TcpEndpoint| info_span!("endpoint", peer.addr = %t.addr, peer.id = ?t.identity)) @@ -137,10 +130,6 @@ impl Config { .push_map_target(Addr::from) .push_on_response(tcp::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)) .push_on_response(svc::layer::mk(tcp::Forward::new)) - .push_fallback_with_predicate( - forward, - is_discovery_rejected, - ) .into_new_service() .check_new_service::() .cache( @@ -148,7 +137,6 @@ impl Config { svc::layers() .push_failfast(dispatch_timeout) .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age) - .push(metrics.stack.layer(stack_labels("tcp"))), ), ) .spawn_buffer(buffer_capacity) @@ -525,11 +513,21 @@ impl Config { .into_inner() .into_make_service(); + let tcp_forward = svc::stack(tcp_connect.clone()) + .push_make_thunk() + .push_on_response(svc::layer::mk(tcp::Forward::new)) + .instrument(|_: &TcpEndpoint| info_span!("forward")) + .check_service::(); + // Load balances TCP streams that cannot be decoded as HTTP. - let tcp_balance = - svc::stack(self.build_tcp_balance(tcp_connect.clone(), resolve, &metrics)) - .push_map_target(|a: listen::Addrs| a.target_addr()) - .into_inner(); + let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve)) + .push_make_ready() + .push_fallback_with_predicate( + svc::stack(tcp_forward.clone()).push_map_target(TcpEndpoint::from), + is_discovery_rejected, + ) + .push_map_target(|a: listen::Addrs| a.target_addr()) + .into_inner(); let http = http::DetectHttp::new( h2_settings, @@ -539,15 +537,10 @@ impl Config { drain.clone(), ); - let tcp_forward = svc::stack(tcp_connect) - .push_make_thunk() - .push_on_response(svc::layer::mk(tcp::Forward::new)) - .push_map_target(TcpEndpoint::from); - let accept = svc::stack(svc::stack::MakeSwitch::new( skip_detect.clone(), http, - tcp_forward, + tcp_forward.push_map_target(TcpEndpoint::from), )) .push(metrics.transport.layer_accept(TransportLabels)); diff --git a/linkerd/app/outbound/src/tests.rs b/linkerd/app/outbound/src/tests.rs index b6c5f268f9..e0f7c18df4 100644 --- a/linkerd/app/outbound/src/tests.rs +++ b/linkerd/app/outbound/src/tests.rs @@ -1,6 +1,7 @@ use crate::Config; +use futures::prelude::*; use indexmap::indexset; -use linkerd2_app_core::{self as app_core, metrics::Metrics, Addr}; +use linkerd2_app_core::{self as app_core, Addr, Error}; use linkerd2_app_test as test_support; use std::{net::SocketAddr, time::Duration}; use tower::ServiceExt; @@ -61,14 +62,10 @@ async fn plaintext_tcp() { // bind any of these addresses. Therefore, we don't need to use ephemeral // ports or anything. These will just be used so that the proxy has a socket // address to resolve, etc. - let target_addr = SocketAddr::new(LOCALHOST.into(), 666); - let local_addr = SocketAddr::new(LOCALHOST.into(), LISTEN_PORT); + let target_addr = SocketAddr::new([0, 0, 0, 0].into(), 0); let cfg = default_config(target_addr); - let (metrics, _) = Metrics::new(std::time::Duration::from_secs(10)); - let prevent_loop = super::PreventLoop::from(local_addr.port()); - // Configure mock IO for the upstream "server". It will read "hello" and // then write "world". let mut srv_io = test_support::io(); @@ -88,10 +85,13 @@ async fn plaintext_tcp() { ); // Build the outbound TCP balancer stack. - let outbound_tcp = cfg.build_tcp_balance(&connect, resolver, prevent_loop, &metrics.outbound); - let svc = outbound_tcp + cfg.build_tcp_balance(connect, resolver) .oneshot(target_addr) + .err_into::() + .await + .expect("make service should succeed") + .oneshot(client_io) + .err_into::() .await - .expect("make service should succeed"); - svc.oneshot(client_io).await.expect("conn should succeed"); + .expect("conn should succeed"); } From c841e9d9b3b7a4aeb1127b8b33ae31ec7376f61c Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 22:38:23 +0000 Subject: [PATCH 06/11] Pair into_new_service/cache with push_make_ready --- linkerd/app/outbound/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 055cda7d1b..c86e7915c0 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -140,8 +140,9 @@ impl Config { ), ) .spawn_buffer(buffer_capacity) - .check_make_service::() + .push_make_ready() .instrument(|a: &SocketAddr| info_span!("tcp", dst = %a)) + .check_make_service::() } pub fn build_dns_refine( @@ -385,6 +386,7 @@ impl Config { ), ) .spawn_buffer(buffer_capacity) + .push_make_ready() .check_make_service::>(); // Caches clients that bypass discovery/balancing. @@ -401,6 +403,7 @@ impl Config { ), ) .spawn_buffer(buffer_capacity) + .push_make_ready() .instrument(|t: &HttpEndpoint| info_span!("forward", peer.addr = %t.addr, peer.id = ?t.identity)) .check_make_service::>(); @@ -409,7 +412,6 @@ impl Config { // `forward` stack is used instead, bypassing load balancing, etc. logical .push_on_response(svc::layers().box_http_response()) - .push_make_ready() .push_fallback_with_predicate( forward .push_map_target(HttpEndpoint::from) @@ -493,7 +495,6 @@ impl Config { // its canonical FQDN to use for routing. .push(http::canonicalize::Layer::new(refine, canonicalize_timeout)) .check_make_service::>() - .push_make_ready() .push_timeout(dispatch_timeout) .push(router::Layer::new(LogicalPerRequest::from)) .check_new_service::>() @@ -521,7 +522,6 @@ impl Config { // Load balances TCP streams that cannot be decoded as HTTP. let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve)) - .push_make_ready() .push_fallback_with_predicate( svc::stack(tcp_forward.clone()).push_map_target(TcpEndpoint::from), is_discovery_rejected, From f2f72b0618843dc8ab5b6c231473183bc9975a14 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 17 Sep 2020 23:29:19 +0000 Subject: [PATCH 07/11] detect debugging --- linkerd/proxy/http/src/detect.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/linkerd/proxy/http/src/detect.rs b/linkerd/proxy/http/src/detect.rs index bea77cf05b..5ee805bde1 100644 --- a/linkerd/proxy/http/src/detect.rs +++ b/linkerd/proxy/http/src/detect.rs @@ -17,7 +17,7 @@ use std::{ time::Duration, }; use tower::{util::ServiceExt, Service}; -use tracing::{info_span, trace}; +use tracing::{debug, info_span, trace}; use tracing_futures::Instrument; type Server = hyper::server::conn::Http; @@ -155,6 +155,7 @@ where let timeout = tokio::time::delay_for(self.timeout); Box::pin(async move { + trace!("Detecting"); let (version, io) = tokio::select! { res = HttpVersion::detect(io) => { res? } () = timeout => { @@ -164,7 +165,7 @@ where match version { Some(HttpVersion::Http1) => { - trace!("Handling as HTTP"); + debug!("Handling as HTTP"); // Enable support for HTTP upgrades (CONNECT and websockets). let http = upgrade::Service::new(http, drain.clone()); let conn = server @@ -179,7 +180,7 @@ where } Some(HttpVersion::H2) => { - trace!("Handling as H2"); + debug!("Handling as H2"); let conn = server .http2_only(true) .serve_connection(io, HyperServerSvc::new(http)); @@ -191,7 +192,7 @@ where } None => { - trace!("Forwarding TCP"); + debug!("Forwarding TCP"); let release = drain.ignore_signal(); tcp.oneshot(io).err_into::().await?; drop(release); From df11e6c3b127a8649bf652bfe28819c0bd5ab51b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 18 Sep 2020 01:20:16 +0000 Subject: [PATCH 08/11] add target on accept context --- linkerd/app/core/src/serve.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/linkerd/app/core/src/serve.rs b/linkerd/app/core/src/serve.rs index 5196aaff4e..5b35eef2f2 100644 --- a/linkerd/app/core/src/serve.rs +++ b/linkerd/app/core/src/serve.rs @@ -36,6 +36,7 @@ where let span = info_span!( "accept", peer.addr = %addrs.peer(), + target.addr = %addrs.target_addr(), ); // Ready the service before dispatching the request to it. From ad4d7228212200c7fe37d1acd32bd58aef6e405e Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 18 Sep 2020 03:48:57 +0000 Subject: [PATCH 09/11] change moved into #661 --- linkerd/app/core/src/serve.rs | 1 - linkerd/proxy/http/src/detect.rs | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/linkerd/app/core/src/serve.rs b/linkerd/app/core/src/serve.rs index 5b35eef2f2..5196aaff4e 100644 --- a/linkerd/app/core/src/serve.rs +++ b/linkerd/app/core/src/serve.rs @@ -36,7 +36,6 @@ where let span = info_span!( "accept", peer.addr = %addrs.peer(), - target.addr = %addrs.target_addr(), ); // Ready the service before dispatching the request to it. diff --git a/linkerd/proxy/http/src/detect.rs b/linkerd/proxy/http/src/detect.rs index 5ee805bde1..bea77cf05b 100644 --- a/linkerd/proxy/http/src/detect.rs +++ b/linkerd/proxy/http/src/detect.rs @@ -17,7 +17,7 @@ use std::{ time::Duration, }; use tower::{util::ServiceExt, Service}; -use tracing::{debug, info_span, trace}; +use tracing::{info_span, trace}; use tracing_futures::Instrument; type Server = hyper::server::conn::Http; @@ -155,7 +155,6 @@ where let timeout = tokio::time::delay_for(self.timeout); Box::pin(async move { - trace!("Detecting"); let (version, io) = tokio::select! { res = HttpVersion::detect(io) => { res? } () = timeout => { @@ -165,7 +164,7 @@ where match version { Some(HttpVersion::Http1) => { - debug!("Handling as HTTP"); + trace!("Handling as HTTP"); // Enable support for HTTP upgrades (CONNECT and websockets). let http = upgrade::Service::new(http, drain.clone()); let conn = server @@ -180,7 +179,7 @@ where } Some(HttpVersion::H2) => { - debug!("Handling as H2"); + trace!("Handling as H2"); let conn = server .http2_only(true) .serve_connection(io, HyperServerSvc::new(http)); @@ -192,7 +191,7 @@ where } None => { - debug!("Forwarding TCP"); + trace!("Forwarding TCP"); let release = drain.ignore_signal(); tcp.oneshot(io).err_into::().await?; drop(release); From 2c0c1d52b7686364bc29949dbd01d44a746342fe Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 18 Sep 2020 03:53:57 +0000 Subject: [PATCH 10/11] revert kinda --- linkerd/app/outbound/src/tests.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/linkerd/app/outbound/src/tests.rs b/linkerd/app/outbound/src/tests.rs index e0f7c18df4..8a316f7314 100644 --- a/linkerd/app/outbound/src/tests.rs +++ b/linkerd/app/outbound/src/tests.rs @@ -85,12 +85,14 @@ async fn plaintext_tcp() { ); // Build the outbound TCP balancer stack. - cfg.build_tcp_balance(connect, resolver) + let make = cfg + .build_tcp_balance(connect, resolver) .oneshot(target_addr) .err_into::() .await - .expect("make service should succeed") - .oneshot(client_io) + .expect("make service should succeed"); + + make.oneshot(client_io) .err_into::() .await .expect("conn should succeed"); From ebd65cb368c1edcf25c1566040d9bbef2d8e6ce7 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 18 Sep 2020 14:55:21 +0000 Subject: [PATCH 11/11] simplify test config --- linkerd/app/core/src/lib.rs | 2 +- linkerd/app/outbound/src/tests.rs | 37 +++++++++---------------------- 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index eab7dac760..ce64d69c62 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -122,7 +122,7 @@ impl From for DiscoveryRejected { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct SkipByPort(std::sync::Arc>); impl From> for SkipByPort { diff --git a/linkerd/app/outbound/src/tests.rs b/linkerd/app/outbound/src/tests.rs index 8a316f7314..13191ed038 100644 --- a/linkerd/app/outbound/src/tests.rs +++ b/linkerd/app/outbound/src/tests.rs @@ -1,52 +1,35 @@ use crate::Config; use futures::prelude::*; -use indexmap::indexset; -use linkerd2_app_core::{self as app_core, Addr, Error}; +use linkerd2_app_core::{config, exp_backoff, proxy::http::h2, transport::listen, Addr, Error}; use linkerd2_app_test as test_support; use std::{net::SocketAddr, time::Duration}; use tower::ServiceExt; const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; -const LISTEN_PORT: u16 = 4140; fn default_config(orig_dst: SocketAddr) -> Config { - use app_core::{ - config::{ConnectConfig, ProxyConfig, ServerConfig}, - exp_backoff::ExponentialBackoff, - proxy::http::h2, - transport::listen, - }; - let h2_settings = h2::Settings { - initial_stream_window_size: Some(65_535), // Protocol default - initial_connection_window_size: Some(1_048_576), // 1MB ~ 16 streams at capacity - }; Config { canonicalize_timeout: Duration::from_millis(100), - proxy: ProxyConfig { - server: ServerConfig { - bind: listen::Bind::new(SocketAddr::new(LOCALHOST.into(), LISTEN_PORT), None) + proxy: config::ProxyConfig { + server: config::ServerConfig { + bind: listen::Bind::new(SocketAddr::new(LOCALHOST.into(), 0), None) .with_orig_dst_addr(orig_dst.into()), - h2_settings, + h2_settings: h2::Settings::default(), }, - connect: ConnectConfig { + connect: config::ConnectConfig { keepalive: None, timeout: Duration::from_secs(1), - backoff: ExponentialBackoff::new( + backoff: exp_backoff::ExponentialBackoff::new( Duration::from_millis(100), Duration::from_millis(500), 0.1, ) .unwrap(), - h2_settings, + h2_settings: h2::Settings::default(), }, buffer_capacity: 10_000, cache_max_idle_age: Duration::from_secs(60), - disable_protocol_detection_for_ports: indexset![ - 25, // SMTP - 587, // SMTP - 3306, // MySQL - ] - .into(), + disable_protocol_detection_for_ports: Default::default(), dispatch_timeout: Duration::from_secs(3), max_in_flight_requests: 10_000, detect_protocol_timeout: Duration::from_secs(3), @@ -62,7 +45,7 @@ async fn plaintext_tcp() { // bind any of these addresses. Therefore, we don't need to use ephemeral // ports or anything. These will just be used so that the proxy has a socket // address to resolve, etc. - let target_addr = SocketAddr::new([0, 0, 0, 0].into(), 0); + let target_addr = SocketAddr::new([0, 0, 0, 0].into(), 666); let cfg = default_config(target_addr);