diff --git a/Cargo.lock b/Cargo.lock index 4cc840b729..475d587707 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,6 +389,17 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.25" @@ -410,6 +421,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -813,7 +825,6 @@ dependencies = [ "linkerd-opencensus", "linkerd-proxy-api-resolve", "linkerd-proxy-core", - "linkerd-proxy-discover", "linkerd-proxy-dns-resolve", "linkerd-proxy-http", "linkerd-proxy-identity-client", @@ -1322,24 +1333,17 @@ dependencies = [ ] [[package]] -name = "linkerd-proxy-core" -version = "0.1.0" -dependencies = [ - "futures", - "linkerd-error", - "tower", -] - -[[package]] -name = "linkerd-proxy-discover" +name = "linkerd-proxy-balance" version = "0.1.0" dependencies = [ "futures", + "futures-util", "indexmap", "linkerd-error", "linkerd-proxy-core", "linkerd-stack", "pin-project", + "rand", "tokio", "tokio-stream", "tokio-util", @@ -1347,6 +1351,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-proxy-core" +version = "0.1.0" +dependencies = [ + "futures", + "linkerd-error", + "tower", +] + [[package]] name = "linkerd-proxy-dns-resolve" version = "0.1.0" @@ -1382,6 +1395,7 @@ dependencies = [ "linkerd-error", "linkerd-http-box", "linkerd-io", + "linkerd-proxy-balance", "linkerd-stack", "linkerd-tracing", "pin-project", @@ -1461,6 +1475,7 @@ dependencies = [ "futures", "linkerd-duplex", "linkerd-error", + "linkerd-proxy-balance", "linkerd-stack", "pin-project", "rand", diff --git a/Cargo.toml b/Cargo.toml index 315407349c..c3a121e939 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,9 +39,9 @@ members = [ "linkerd/metrics", "linkerd/opencensus", "linkerd/proxy/api-resolve", - "linkerd/proxy/dns-resolve", + "linkerd/proxy/balance", "linkerd/proxy/core", - "linkerd/proxy/discover", + "linkerd/proxy/dns-resolve", "linkerd/proxy/http", "linkerd/proxy/identity-client", "linkerd/proxy/resolve", diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 0adc5a364c..b7323085d9 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -39,7 +39,6 @@ linkerd-metrics = { path = "../../metrics", features = ["linkerd-stack"] } linkerd-opencensus = { path = "../../opencensus" } linkerd-proxy-core = { path = "../../proxy/core" } linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" } -linkerd-proxy-discover = { path = "../../proxy/discover" } linkerd-proxy-identity-client = { path = "../../proxy/identity-client" } linkerd-proxy-http = { path = "../../proxy/http" } linkerd-proxy-resolve = { path = "../../proxy/resolve" } diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 138935df90..6ca4b187fb 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -1,12 +1,12 @@ use crate::{ - classify, config, control, dns, identity, metrics, proxy::http, svc, tls, - transport::ConnectTcp, Addr, Error, + classify, config, dns, identity, metrics, proxy::http, svc, tls, transport::ConnectTcp, Addr, + Error, }; use futures::future::Either; use std::fmt; use tokio::time; use tokio_stream::{wrappers::IntervalStream, StreamExt}; -use tracing::warn; +use tracing::{info_span, warn}; #[derive(Clone, Debug)] pub struct Config { @@ -27,16 +27,25 @@ impl svc::Param for ControlAddr { } } +impl svc::Param for ControlAddr { + fn param(&self) -> http::balance::EwmaConfig { + EWMA_CONFIG + } +} + impl fmt::Display for ControlAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.addr, f) } } -type BalanceBody = - http::balance::PendingUntilFirstDataBody; +pub type RspBody = + linkerd_http_metrics::requests::ResponseBody, classify::Eos>; -pub type RspBody = linkerd_http_metrics::requests::ResponseBody; +const EWMA_CONFIG: http::balance::EwmaConfig = http::balance::EwmaConfig { + default_rtt: time::Duration::from_millis(30), + decay: time::Duration::from_secs(10), +}; impl Config { pub fn build( @@ -82,21 +91,24 @@ impl Config { .push(self::client::layer()) .push_on_service(svc::MapErr::layer(Into::into)) .into_new_service() - // Ensure that connection is driven independently of the load balancer; but don't drive - // reconnection independently of the balancer. This ensures that new connections are - // only initiated when the balancer tries to move pending endpoints to ready (i.e. after - // checking for discovery updates); but we don't want to continually reconnect without - // checking for discovery updates. + // Ensure that connection is driven independently of the load + // balancer; but don't drive reconnection independently of the + // balancer. This ensures that new connections are only initiated + // when the balancer tries to move pending endpoints to ready (i.e. + // after checking for discovery updates); but we don't want to + // continually reconnect without checking for discovery updates. .push_on_service(svc::layer::mk(svc::SpawnReady::new)) .push_new_reconnect(self.connect.backoff) - .instrument(|t: &self::client::Target| tracing::info_span!("endpoint", addr = %t.addr)) - .push(self::resolve::layer(dns, resolve_backoff)) - .push_on_service(self::control::balance::layer()) - .into_new_service() + .instrument(|t: &self::client::Target| info_span!("endpoint", addr = %t.addr)) + .push_new_clone() + .push(self::balance::layer(dns, resolve_backoff)) .push(metrics.to_layer::()) .push(self::add_origin::layer()) + // This buffer allows a resolver client to be shared across stacks. + // No load shed is applied here, however, so backpressure may leak + // into the caller task. .push_buffer_on_service("Controller client", &self.buffer) - .instrument(|c: &ControlAddr| tracing::info_span!("controller", addr = %c.addr)) + .instrument(|c: &ControlAddr| info_span!("controller", addr = %c.addr)) .push_map_target(move |()| addr.clone()) .push(svc::ArcNewService::layer()) .into_inner() @@ -162,68 +174,61 @@ mod add_origin { } } -mod resolve { - use super::client::Target; +mod balance { + use super::{client::Target, ControlAddr}; use crate::{ dns, - proxy::{ - discover, - dns_resolve::DnsResolve, - resolve::{map_endpoint, recover}, - }, - svc, + proxy::{dns_resolve::DnsResolve, http, resolve::recover}, + svc, tls, }; - use linkerd_error::Recover; use std::net::SocketAddr; - pub fn layer( + pub fn layer( dns: dns::Resolver, recover: R, - ) -> impl svc::Layer> - where - R: Recover + Clone, - R::Backoff: Unpin, - { - svc::layer::mk(move |endpoint| { - discover::resolve( - endpoint, - map_endpoint::Resolve::new( - IntoTarget(()), - recover::Resolve::new(recover.clone(), DnsResolve::new(dns.clone())), - ), - ) + ) -> impl svc::Layer< + N, + Service = http::NewBalancePeakEwma, NewIntoTarget>, + > { + let resolve = recover::Resolve::new(recover, DnsResolve::new(dns)); + svc::layer::mk(move |inner| { + http::NewBalancePeakEwma::new(NewIntoTarget { inner }, resolve.clone()) }) } - type Discover = discover::MakeEndpoint< - discover::FromResolve< - map_endpoint::Resolve>, - Target, - >, - M, - >; + #[derive(Clone, Debug)] + pub struct NewIntoTarget { + inner: N, + } + + #[derive(Clone, Debug)] + pub struct IntoTarget { + inner: N, + server_id: tls::ConditionalClientTls, + } - #[derive(Copy, Clone, Debug)] - pub struct IntoTarget(()); + // === impl NewIntoTarget === - impl map_endpoint::MapEndpoint for IntoTarget { - type Out = Target; + impl> svc::NewService for NewIntoTarget { + type Service = IntoTarget; - fn map_endpoint(&self, control: &super::ControlAddr, addr: SocketAddr, _: ()) -> Self::Out { - Target::new(addr, control.identity.clone()) + fn new_service(&self, control: ControlAddr) -> Self::Service { + IntoTarget { + server_id: control.identity.clone(), + inner: self.inner.new_service(control), + } } } -} -mod balance { - use crate::proxy::http; - use std::time::Duration; + // === impl IntoTarget === - const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30); - const EWMA_DECAY: Duration = Duration::from_secs(10); + impl> svc::NewService<(SocketAddr, ())> for IntoTarget { + type Service = N::Service; - pub fn layer() -> http::balance::Layer { - http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY) + fn new_service(&self, (addr, ()): (SocketAddr, ())) -> Self::Service { + self.inner + .new_service(Target::new(addr, self.server_id.clone())) + } } } diff --git a/linkerd/app/core/src/proxy.rs b/linkerd/app/core/src/proxy.rs index 2fda2f6f4b..808b69fbeb 100644 --- a/linkerd/app/core/src/proxy.rs +++ b/linkerd/app/core/src/proxy.rs @@ -2,7 +2,6 @@ pub use linkerd_proxy_api_resolve as api_resolve; pub use linkerd_proxy_core as core; -pub use linkerd_proxy_discover as discover; pub use linkerd_proxy_dns_resolve as dns_resolve; pub use linkerd_proxy_http as http; pub use linkerd_proxy_resolve as resolve; diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index bfad621159..b828d47d1c 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -277,6 +277,15 @@ impl Stack { self } + pub fn check_new_new_service(self) -> Self + where + S: NewService, + S::Service: NewService, + >::Service: Service, + { + self + } + pub fn check_new_clone(self) -> Self where S: NewService, diff --git a/linkerd/app/gateway/src/lib.rs b/linkerd/app/gateway/src/lib.rs index 22ec9e9c53..506cb7a619 100644 --- a/linkerd/app/gateway/src/lib.rs +++ b/linkerd/app/gateway/src/lib.rs @@ -9,11 +9,7 @@ use self::gateway::NewGateway; use linkerd_app_core::{ identity, io, metrics, profiles::{self, DiscoveryRejected}, - proxy::{ - api_resolve::{ConcreteAddr, Metadata}, - core::Resolve, - http, - }, + proxy::{api_resolve::Metadata, core::Resolve, http}, svc::{self, Param}, tls, transport::{ClientAddr, Local, OrigDstAddr, Remote}, @@ -76,9 +72,12 @@ where P::Future: Send + 'static, P::Error: Send, R: Clone + Send + Sync + Unpin + 'static, - R: Resolve, - R::Resolution: Send, - R::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, { let inbound_config = inbound.config().clone(); let local_id = identity::LocalId(inbound.identity().name().clone()); diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index 62db54fdfe..5a460bd68f 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -2,7 +2,7 @@ use crate::{http, logical::Concrete, stack_labels, tcp, Outbound}; use linkerd_app_core::{ io, metrics, profiles::LogicalAddr, - proxy::{api_resolve::Metadata, resolve::map_endpoint::MapEndpoint}, + proxy::api_resolve::Metadata, svc, tls, transport::{self, addrs::*}, transport_header, Conditional, @@ -25,9 +25,17 @@ pub struct Endpoint

{ pub opaque_protocol: bool, } -#[derive(Clone)] -pub struct FromMetadata { - pub inbound_ips: Arc>, +#[derive(Clone, Debug)] +pub struct NewFromMetadata { + inbound_ips: Arc>, + inner: N, +} + +#[derive(Clone, Debug)] +pub struct FromMetadata { + inbound_ips: Arc>, + concrete: Concrete

, + inner: N, } // === impl Endpoint === @@ -61,7 +69,7 @@ impl Endpoint<()> { tracing::debug!(%addr, ?metadata, ?addr, ?inbound_ips, "Target is local"); tls::ConditionalClientTls::None(tls::NoClientTls::Loopback) } else { - FromMetadata::client_tls(&metadata, reason) + client_tls(&metadata, reason) }; Self { @@ -141,61 +149,85 @@ impl

svc::Param for Endpoint

{ } } -// === EndpointFromMetadata === -impl FromMetadata { - fn client_tls(metadata: &Metadata, reason: tls::NoClientTls) -> tls::ConditionalClientTls { - // If we're transporting an opaque protocol OR we're communicating with - // a gateway, then set an ALPN value indicating support for a transport - // header. - let use_transport_header = - metadata.opaque_transport_port().is_some() || metadata.authority_override().is_some(); - - metadata - .identity() - .cloned() - .map(move |server_id| { - Conditional::Some(tls::ClientTls { - server_id, - alpn: if use_transport_header { - Some(tls::client::AlpnProtocols(vec![ - transport_header::PROTOCOL.into() - ])) - } else { - None - }, - }) +// === NewFromMetadata === + +fn client_tls(metadata: &Metadata, reason: tls::NoClientTls) -> tls::ConditionalClientTls { + // If we're transporting an opaque protocol OR we're communicating with + // a gateway, then set an ALPN value indicating support for a transport + // header. + let use_transport_header = + metadata.opaque_transport_port().is_some() || metadata.authority_override().is_some(); + + metadata + .identity() + .cloned() + .map(move |server_id| { + Conditional::Some(tls::ClientTls { + server_id, + alpn: if use_transport_header { + Some(tls::client::AlpnProtocols(vec![ + transport_header::PROTOCOL.into() + ])) + } else { + None + }, }) - .unwrap_or(Conditional::None(reason)) + }) + .unwrap_or(Conditional::None(reason)) +} + +impl NewFromMetadata { + pub fn new(inbound_ips: Arc>, inner: N) -> Self { + Self { inbound_ips, inner } + } + + pub fn layer(inbound_ips: Arc>) -> impl svc::Layer + Clone { + svc::layer::mk(move |inner| Self::new(inbound_ips.clone(), inner)) } } -impl MapEndpoint, Metadata> for FromMetadata { - type Out = Endpoint

; +impl svc::NewService> for NewFromMetadata +where + P: Copy + std::fmt::Debug, + N: svc::NewService>, +{ + type Service = FromMetadata; - fn map_endpoint( - &self, - concrete: &Concrete

, - addr: SocketAddr, - mut metadata: Metadata, - ) -> Self::Out { - tracing::trace!(%addr, ?metadata, ?concrete, "Resolved endpoint"); + fn new_service(&self, concrete: Concrete

) -> Self::Service { + FromMetadata { + inner: self.inner.new_service(concrete.clone()), + concrete, + inbound_ips: self.inbound_ips.clone(), + } + } +} + +impl svc::NewService<(SocketAddr, Metadata)> for FromMetadata +where + P: Copy + std::fmt::Debug, + N: svc::NewService>, +{ + type Service = N::Service; + + fn new_service(&self, (addr, mut metadata): (SocketAddr, Metadata)) -> Self::Service { + tracing::trace!(%addr, ?metadata, concrete = ?self.concrete, "Resolved endpoint"); let tls = if self.inbound_ips.contains(&addr.ip()) { metadata.clear_upgrade(); tracing::debug!(%addr, ?metadata, ?addr, ?self.inbound_ips, "Target is local"); tls::ConditionalClientTls::None(tls::NoClientTls::Loopback) } else { - Self::client_tls(&metadata, tls::NoClientTls::NotProvidedByServiceDiscovery) + client_tls(&metadata, tls::NoClientTls::NotProvidedByServiceDiscovery) }; - Endpoint { + self.inner.new_service(Endpoint { addr: Remote(ServerAddr(addr)), tls, metadata, - logical_addr: Some(concrete.logical.logical_addr.clone()), - protocol: concrete.logical.protocol, + logical_addr: Some(self.concrete.logical.logical_addr.clone()), + protocol: self.concrete.logical.protocol, // XXX We never do protocol detection after resolving a concrete address to endpoints. // We should differentiate these target types statically. opaque_protocol: false, - } + }) } } diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index 2ad2ae1423..e541f7ae36 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -1,14 +1,10 @@ use super::{Concrete, Endpoint}; -use crate::{endpoint, resolve, stack_labels, Outbound}; +use crate::{endpoint, stack_labels, Outbound}; use linkerd_app_core::{ - proxy::{ - api_resolve::{ConcreteAddr, Metadata}, - core::Resolve, - http, - resolve::map_endpoint, - }, - svc, Error, Infallible, + proxy::{api_resolve::Metadata, core::Resolve, http}, + svc, Error, }; +use std::time; use tracing::info_span; impl Outbound { @@ -42,25 +38,12 @@ impl Outbound { + 'static, NSvc::Error: Into, NSvc::Future: Send, - R: Resolve, R: Clone + Send + Sync + 'static, + R: Resolve, R::Resolution: Send, R::Future: Send + Unpin, { self.map_stack(|config, rt, endpoint| { - let resolve = svc::stack(resolve.into_service()) - .push_request_filter(|c: Concrete| Ok::<_, Infallible>(c.resolve)) - .push(svc::layer::mk(move |inner| { - map_endpoint::Resolve::new( - endpoint::FromMetadata { - inbound_ips: config.inbound_ips.clone(), - }, - inner, - ) - })) - .check_service::() - .into_inner(); - endpoint .push_on_service( rt.metrics @@ -69,24 +52,10 @@ impl Outbound { .layer(stack_labels("http", "endpoint")), ) .instrument(|e: &Endpoint| info_span!("endpoint", addr = %e.addr)) - // Resolve the service to its endpoints and balance requests over them. - // - // We *don't* ensure that the endpoint is driven to readiness here, because this - // might cause us to continually attempt to reestablish connections without - // consulting discovery to see whether the endpoint has been removed. Instead, the - // endpoint stack spawns each _connection_ attempt on a background task, but the - // decision to attempt the connection must be driven by the balancer. - // - // TODO(ver) remove the watchdog timeout. - .push(resolve::layer(resolve, config.discovery_idle_timeout * 2)) - .push_on_service(http::balance::layer( - crate::EWMA_DEFAULT_RTT, - crate::EWMA_DECAY, - )) - .check_make_service::>() - .push(svc::MapErr::layer(Into::into)) + .push_new_clone() + .push(endpoint::NewFromMetadata::layer(config.inbound_ips.clone())) + .push(http::NewBalancePeakEwma::layer(resolve)) // Drives the initial resolution via the service's readiness. - .into_new_service() .push_on_service( svc::layers() .push(http::BoxResponse::layer()) @@ -103,3 +72,12 @@ impl Outbound { }) } } + +impl svc::Param for Concrete { + fn param(&self) -> http::balance::EwmaConfig { + http::balance::EwmaConfig { + default_rtt: time::Duration::from_millis(30), + decay: time::Duration::from_secs(10), + } + } +} diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 755b2fb06b..7a265edfa4 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -2,10 +2,7 @@ use crate::{http, stack_labels, tcp, trace_labels, Config, Outbound}; use linkerd_app_core::{ config::{ProxyConfig, ServerConfig}, detect, http_tracing, io, profiles, - proxy::{ - api_resolve::{ConcreteAddr, Metadata}, - core::Resolve, - }, + proxy::{api_resolve::Metadata, core::Resolve}, svc::{self, stack::Param}, tls, transport::{OrigDstAddr, Remote, ServerAddr}, @@ -68,7 +65,7 @@ impl Outbound> { P::Error: Send, P::Future: Send, R: Clone + Send + Sync + 'static, - R: Resolve, + R: Resolve, R::Resolution: Send, R::Future: Send + Unpin, F: svc::NewService + Clone + Send + Sync + 'static, diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index c47e6a946c..72e513ce69 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -11,7 +11,6 @@ pub mod http; mod ingress; pub mod logical; mod metrics; -mod resolve; mod switch_logical; pub mod tcp; #[cfg(test)] @@ -24,11 +23,7 @@ use linkerd_app_core::{ drain, http_tracing::OpenCensusSink, identity, io, profiles, - proxy::{ - api_resolve::{ConcreteAddr, Metadata}, - core::Resolve, - tap, - }, + proxy::{api_resolve::Metadata, core::Resolve, tap}, serve, svc::{self, stack::Param}, tls, @@ -44,9 +39,6 @@ use std::{ }; use tracing::{info, info_span}; -const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30); -const EWMA_DECAY: Duration = Duration::from_secs(10); - #[derive(Clone, Debug)] pub struct Config { pub allow_discovery: AddrMatch, @@ -177,9 +169,12 @@ impl Outbound<()> { I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Unpin + Send + Sync + 'static, R: Clone + Send + Sync + Unpin + 'static, - R: Resolve, - R::Resolution: Send, - R::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, P: profiles::GetProfile + Clone + Send + Sync + Unpin + 'static, P::Future: Send, P::Error: Send, @@ -201,10 +196,13 @@ impl Outbound<()> { T: Param + Clone + Send + Sync + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Unpin + Send + Sync + 'static, - R: Resolve, R: Clone + Send + Sync + Unpin + 'static, - R::Resolution: Send, - R::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, P: profiles::GetProfile + Clone + Send + Sync + Unpin + 'static, P::Future: Send, P::Error: Send, @@ -229,9 +227,12 @@ impl Outbound<()> { I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Unpin + Send + Sync + 'static, R: Clone + Send + Sync + Unpin + 'static, - R: Resolve, - R::Resolution: Send, - R::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, P: profiles::GetProfile + Clone + Send + Sync + Unpin + 'static, P::Future: Send, P::Error: Send, diff --git a/linkerd/app/outbound/src/logical.rs b/linkerd/app/outbound/src/logical.rs index c4fe08833d..511fac3066 100644 --- a/linkerd/app/outbound/src/logical.rs +++ b/linkerd/app/outbound/src/logical.rs @@ -128,14 +128,13 @@ impl Outbound { C: svc::MakeConnection, Error = io::Error>, C::Connection: Send + Unpin, C::Future: Send + Unpin, - R: Clone + Send + 'static, - R: Resolve - + Clone - + Send - + Sync - + 'static, - R::Resolution: Send, - R::Future: Send + Unpin, + R: Clone + Send + Sync + Unpin + 'static, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, + R: Resolve, + >::Resolution: Send, + >::Future: Send + Unpin, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr, I: fmt::Debug + Send + Sync + Unpin + 'static, { diff --git a/linkerd/app/outbound/src/resolve.rs b/linkerd/app/outbound/src/resolve.rs deleted file mode 100644 index 2f5688c51b..0000000000 --- a/linkerd/app/outbound/src/resolve.rs +++ /dev/null @@ -1,30 +0,0 @@ -use linkerd_app_core::{ - proxy::{ - core::Resolve, - discover::{self, Buffer}, - }, - svc::{layer, NewService}, -}; -use std::time::Duration; - -pub fn layer( - resolve: R, - watchdog: Duration, -) -> impl layer::Layer>> + Clone -where - T: Clone + Send + std::fmt::Debug, - R: Resolve + Clone, - R::Resolution: Send, - R::Future: Send, - N: NewService, -{ - const ENDPOINT_BUFFER_CAPACITY: usize = 1_000; - - layer::mk(move |new_endpoint| { - Buffer::new( - ENDPOINT_BUFFER_CAPACITY, - watchdog, - discover::resolve(new_endpoint, resolve.clone()), - ) - }) -} diff --git a/linkerd/app/outbound/src/tcp/concrete.rs b/linkerd/app/outbound/src/tcp/concrete.rs index fab20b6d01..193af1333a 100644 --- a/linkerd/app/outbound/src/tcp/concrete.rs +++ b/linkerd/app/outbound/src/tcp/concrete.rs @@ -1,15 +1,11 @@ use super::{Concrete, Endpoint}; -use crate::{endpoint, resolve, stack_labels, Outbound}; +use crate::{endpoint, stack_labels, Outbound}; use linkerd_app_core::{ drain, io, - proxy::{ - api_resolve::{ConcreteAddr, Metadata}, - core::Resolve, - resolve::map_endpoint, - tcp, - }, - svc, Error, Infallible, + proxy::{api_resolve::Metadata, core::Resolve, tcp}, + svc, Error, }; +use std::time; use tracing::info_span; // === impl Outbound === @@ -40,31 +36,17 @@ impl Outbound { C::Future: Send, C: Send + Sync + 'static, I: io::AsyncRead + io::AsyncWrite + std::fmt::Debug + Send + Unpin + 'static, - R: Clone + Send + 'static, - R: Resolve + Sync, + R: Clone + Send + Sync + 'static, + R: Resolve + Sync, R::Resolution: Send, R::Future: Send + Unpin, { self.map_stack(|config, rt, connect| { let crate::Config { - discovery_idle_timeout, tcp_connection_buffer, .. } = config; - let resolve = svc::stack(resolve.into_service()) - .push_request_filter(|c: Concrete| Ok::<_, Infallible>(c.resolve)) - .push(svc::layer::mk(move |inner| { - map_endpoint::Resolve::new( - endpoint::FromMetadata { - inbound_ips: config.inbound_ips.clone(), - }, - inner, - ) - })) - .check_service::() - .into_inner(); - connect .push(svc::stack::WithoutConnectionMetadata::layer()) .push_make_thunk() @@ -75,19 +57,13 @@ impl Outbound { .layer(stack_labels("tcp", "endpoint")), ) .instrument(|e: &Endpoint| info_span!("endpoint", addr = %e.addr)) - .push(resolve::layer(resolve, *discovery_idle_timeout * 2)) + .push_new_clone() + .push(endpoint::NewFromMetadata::layer(config.inbound_ips.clone())) + .push(tcp::NewBalancePeakEwma::layer(resolve)) .push_on_service( svc::layers() - .push(tcp::balance::layer( - crate::EWMA_DEFAULT_RTT, - crate::EWMA_DECAY, - )) .push(tcp::Forward::layer()) - .push(drain::Retain::layer(rt.drain.clone())), - ) - .into_new_service() - .push_on_service( - svc::layers() + .push(drain::Retain::layer(rt.drain.clone())) .push( rt.metrics .proxy @@ -96,8 +72,17 @@ impl Outbound { ) .push_buffer("Opaque Concrete", tcp_connection_buffer), ) - .instrument(|c: &Concrete| tracing::info_span!("concrete", addr = %c.resolve)) + .instrument(|c: &Concrete| info_span!("concrete", addr = %c.resolve)) .push(svc::ArcNewService::layer()) }) } } + +impl svc::Param for Concrete { + fn param(&self) -> tcp::balance::EwmaConfig { + tcp::balance::EwmaConfig { + default_rtt: time::Duration::from_millis(30), + decay: time::Duration::from_secs(10), + } + } +} diff --git a/linkerd/proxy/balance/Cargo.toml b/linkerd/proxy/balance/Cargo.toml new file mode 100644 index 0000000000..2b8ffa73a7 --- /dev/null +++ b/linkerd/proxy/balance/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "linkerd-proxy-balance" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +publish = false + +[dependencies] +futures = { version = "0.3", default-features = false } +futures-util = "0.3" +indexmap = "1" +linkerd-error = { path = "../../error" } +linkerd-proxy-core = { path = "../../proxy/core" } +linkerd-stack = { path = "../../stack" } +pin-project = "1" +rand = "0.8" +tokio = { version = "1", features = ["rt", "sync", "time"] } +tokio-stream = { version = "0.1", features = ["sync"] } +tokio-util = "0.7" +tracing = "0.1" + +[dependencies.tower] +version = "0.4.13" +default-features = false +features = ["balance", "discover", "load"] diff --git a/linkerd/proxy/balance/src/discover.rs b/linkerd/proxy/balance/src/discover.rs new file mode 100644 index 0000000000..ba91bdeb31 --- /dev/null +++ b/linkerd/proxy/balance/src/discover.rs @@ -0,0 +1,34 @@ +use futures::prelude::*; +use linkerd_proxy_core::Resolve; +use linkerd_stack::NewService; +use std::{fmt::Debug, net::SocketAddr}; + +pub mod buffer; +pub mod from_resolve; +pub mod new; + +pub use self::{from_resolve::FromResolve, new::DiscoverNew}; + +pub type Buffer = buffer::Buffer; + +pub(crate) fn spawn_new_from_resolve( + capacity: usize, + resolve: R, + new_service: M, + target: T, +) -> Buffer +where + T: Clone, + R: Resolve + Clone, + R::Endpoint: Clone + Debug + Eq + Send + 'static, + R::Resolution: Send, + R::Future: Send + 'static, + M: NewService, + N: NewService<(SocketAddr, R::Endpoint)> + Send + 'static, + N::Service: Send, +{ + let new_endpoint = new_service.new_service(target.clone()); + let resolution = resolve.resolve(target).try_flatten_stream(); + let disco = DiscoverNew::new(FromResolve::new(resolution), new_endpoint); + buffer::spawn(capacity, disco) +} diff --git a/linkerd/proxy/balance/src/discover/buffer.rs b/linkerd/proxy/balance/src/discover/buffer.rs new file mode 100644 index 0000000000..c17b477a4d --- /dev/null +++ b/linkerd/proxy/balance/src/discover/buffer.rs @@ -0,0 +1,64 @@ +use futures_util::future::poll_fn; +use linkerd_error::Error; +use tokio::sync::mpsc; +use tower::discover; +use tracing::{debug, instrument::Instrument, trace}; + +pub type Result = std::result::Result, Error>; +pub type Buffer = tokio_stream::wrappers::ReceiverStream>; + +pub fn spawn(capacity: usize, inner: D) -> Buffer +where + D: discover::Discover + Send + 'static, + D::Key: Send, + D::Service: Send, + D::Error: Into + Send, +{ + let (tx, rx) = mpsc::channel(capacity); + + debug!(%capacity, "Spawning discovery buffer"); + tokio::spawn( + async move { + tokio::pin!(inner); + + loop { + let res = tokio::select! { + _ = tx.closed() => break, + res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res, + }; + + let change = match res { + Some(Ok(change)) => { + trace!("Changed"); + change + } + Some(Err(e)) => { + let error = e.into(); + debug!(%error); + let _ = tx.send(Err(error)).await; + return; + } + None => { + debug!("Discovery stream closed"); + return; + } + }; + + tokio::select! { + _ = tx.closed() => break, + res = tx.send(Ok(change)) => { + if res.is_err() { + break; + } + trace!("Change sent"); + } + } + } + + debug!("Discovery receiver dropped"); + } + .in_current_span(), + ); + + Buffer::new(rx) +} diff --git a/linkerd/proxy/discover/src/from_resolve.rs b/linkerd/proxy/balance/src/discover/from_resolve.rs similarity index 79% rename from linkerd/proxy/discover/src/from_resolve.rs rename to linkerd/proxy/balance/src/discover/from_resolve.rs index 54f74d707f..ea5f5c4a57 100644 --- a/linkerd/proxy/discover/src/from_resolve.rs +++ b/linkerd/proxy/balance/src/discover/from_resolve.rs @@ -1,10 +1,9 @@ use futures::{prelude::*, ready}; use indexmap::{map::Entry, IndexMap}; -use linkerd_proxy_core::resolve::{Resolve, Update}; +use linkerd_proxy_core::resolve::Update; use pin_project::pin_project; use std::{ collections::VecDeque, - future::Future, net::SocketAddr, pin::Pin, task::{Context, Poll}, @@ -12,89 +11,26 @@ use std::{ use tower::discover::Change; use tracing::{debug, trace}; -#[derive(Clone, Debug)] -pub struct FromResolve { - resolve: R, - _marker: std::marker::PhantomData, -} - -#[pin_project] -#[derive(Debug)] -pub struct DiscoverFuture { - #[pin] - future: F, - _marker: std::marker::PhantomData, -} - /// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to /// build a service for each endpoint. #[pin_project] -pub struct Discover { +pub struct FromResolve { #[pin] resolution: R, /// Changes that have been received but not yet emitted. - pending: VecDeque>, + pending: VecDeque>, /// The current state of resolved endpoints that have been observed. This is /// an `IndexMap` so that the order of observed addresses is preserved /// (mostly for tests). - active: IndexMap, + active: IndexMap, } // === impl FromResolve === -impl FromResolve { - pub fn new(resolve: R) -> Self { - Self { - resolve, - _marker: std::marker::PhantomData, - } - } -} - -impl tower::Service for FromResolve -where - R: Resolve + Clone, -{ - type Response = Discover; - type Error = R::Error; - type Future = DiscoverFuture; - - #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.resolve.poll_ready(cx) - } - - #[inline] - fn call(&mut self, target: T) -> Self::Future { - Self::Future { - future: self.resolve.resolve(target), - _marker: std::marker::PhantomData, - } - } -} - -// === impl DiscoverFuture === - -impl Future for DiscoverFuture -where - F: TryFuture, - F::Ok: TryStream, -{ - type Output = Result, F::Error>; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let resolution = ready!(self.project().future.try_poll(cx))?; - Poll::Ready(Ok(Discover::new(resolution))) - } -} - -// === impl Discover === - -impl Discover { - pub fn new(resolution: R) -> Self { +impl FromResolve { + pub(super) fn new(resolution: R) -> Self { Self { resolution, active: IndexMap::default(), @@ -103,12 +39,12 @@ impl Discover { } } -impl Stream for Discover +impl Stream for FromResolve where - R: TryStream>, - E: Clone + Eq + std::fmt::Debug, + T: Clone + Eq + std::fmt::Debug, + R: TryStream>, { - type Item = Result, R::Error>; + type Item = Result, R::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -189,7 +125,7 @@ where #[cfg(test)] mod tests { - use super::Discover; + use super::FromResolve; use futures::prelude::*; use linkerd_error::Infallible; use linkerd_proxy_core::resolve::Update; @@ -205,7 +141,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn reset() { let (tx, rx) = tokio::sync::mpsc::channel(1); - let mut disco = Discover::new(ReceiverStream::new(rx)); + let mut disco = FromResolve::new(ReceiverStream::new(rx)); // Use reset to set a new state with 3 addresses. tx.try_send(Ok::<_, Infallible>(Update::Reset( @@ -251,7 +187,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn deduplicate_redundant() { let (tx, rx) = tokio::sync::mpsc::channel(1); - let mut disco = Discover::new(ReceiverStream::new(rx)); + let mut disco = FromResolve::new(ReceiverStream::new(rx)); // The initial update is observed. tx.try_send(Ok::<_, Infallible>(Update::Add(vec![(addr(1), "a")]))) diff --git a/linkerd/proxy/balance/src/discover/new.rs b/linkerd/proxy/balance/src/discover/new.rs new file mode 100644 index 0000000000..45b31afd5c --- /dev/null +++ b/linkerd/proxy/balance/src/discover/new.rs @@ -0,0 +1,54 @@ +use futures::{ready, Stream}; +use linkerd_error::Error; +use linkerd_stack::NewService; +use pin_project::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tower::discover::{self, Change}; + +/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to +/// build a service for each endpoint. +#[pin_project] +pub struct DiscoverNew { + #[pin] + discover: D, + new: N, +} + +// === impl DiscoverNew === + +impl DiscoverNew { + pub(super) fn new(discover: D, new: N) -> Self { + Self { discover, new } + } +} + +impl Stream for DiscoverNew +where + D: discover::Discover, + D::Key: Clone, + D::Error: Into, + N: NewService<(D::Key, D::Service)>, +{ + type Item = Result, Error>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>>> { + let this = self.as_mut().project(); + match ready!(this.discover.poll_discover(cx)) { + Some(change) => Poll::Ready(Some(Ok(match change.map_err(Into::into)? { + Change::Insert(key, target) => { + let svc = this.new.new_service((key.clone(), target)); + Change::Insert(key, svc) + } + Change::Remove(key) => Change::Remove(key), + }))), + + None => Poll::Ready(None), + } + } +} diff --git a/linkerd/proxy/balance/src/lib.rs b/linkerd/proxy/balance/src/lib.rs new file mode 100644 index 0000000000..24c9a77b4c --- /dev/null +++ b/linkerd/proxy/balance/src/lib.rs @@ -0,0 +1,186 @@ +mod discover; + +use linkerd_error::Error; +use linkerd_proxy_core::Resolve; +use linkerd_stack::{layer, NewService, Param, Service}; +use rand::thread_rng; +use std::{fmt::Debug, marker::PhantomData, net::SocketAddr, time::Duration}; +use tower::{ + balance::p2c, + load::{self, PeakEwma}, +}; + +pub use tower::load::peak_ewma::Handle; + +#[derive(Copy, Clone, Debug, Default)] +pub struct EwmaConfig { + pub default_rtt: Duration, + pub decay: Duration, +} + +/// Configures a stack to resolve targets to balance requests over `N`-typed +/// endpoint stacks. +#[derive(Debug)] +pub struct NewBalancePeakEwma { + update_queue_capacity: usize, + resolve: R, + inner: N, + _marker: PhantomData C>, +} + +type Buffer = discover::Buffer>; +pub type Balance = p2c::Balance, Req>; + +/// Wraps the inner stack in [`NewPeakEwma`] to produce [`PeakEwma`] services. +#[derive(Debug)] +pub struct NewNewPeakEwma { + inner: N, + _marker: PhantomData C>, +} + +/// Wraps the inner services in [`PeakEwma`] services so their load is tracked +/// for the p2c balancer. +#[derive(Debug)] +pub struct NewPeakEwma { + config: EwmaConfig, + inner: N, + _marker: PhantomData C>, +} + +// === impl NewBalancePeakEwma === + +impl NewBalancePeakEwma { + /// Limits the number of endpoint updates that can be buffered by a + /// discovery stream (i.e., for a specific service resolution). + /// + /// The buffering task ensures that discovery updates are processed (i.e., + /// from the controller client) even when the balancer is not processing new + /// requests. If the buffer fills up, we'll stop polling the discovery + /// stream. When the stream represents an gRPC streaming response, the + /// server may become unable to write further updates when the buffer is + /// full. + /// + /// 1K updates should be more than enough for most load balancers. + const UPDATE_QUEUE_CAPACITY: usize = 1_000; + + pub fn new(inner: N, resolve: R) -> Self { + Self { + update_queue_capacity: Self::UPDATE_QUEUE_CAPACITY, + resolve, + inner, + _marker: PhantomData, + } + } + + pub fn layer(resolve: R) -> impl layer::Layer + Clone + where + R: Clone, + { + layer::mk(move |inner| Self::new(inner, resolve.clone())) + } +} + +impl NewService for NewBalancePeakEwma +where + T: Param + Clone + Send, + R: Resolve, + R::Endpoint: Clone + Debug + Eq + Send + 'static, + R::Resolution: Send, + R::Future: Send + 'static, + M: NewService + Clone, + N: NewService<(SocketAddr, R::Endpoint), Service = S> + Send + 'static, + S: Service + Send, + S::Error: Into, + C: load::TrackCompletion + Default + Send + 'static, + Req: 'static, + Balance: Service, +{ + type Service = Balance; + + fn new_service(&self, target: T) -> Self::Service { + let new = NewNewPeakEwma { + inner: self.inner.clone(), + _marker: PhantomData, + }; + let disco = discover::spawn_new_from_resolve( + self.update_queue_capacity, + self.resolve.clone(), + new, + target, + ); + + Balance::from_rng(disco, &mut thread_rng()).expect("RNG must be valid") + } +} + +impl Clone for NewBalancePeakEwma { + fn clone(&self) -> Self { + Self { + update_queue_capacity: self.update_queue_capacity, + resolve: self.resolve.clone(), + inner: self.inner.clone(), + _marker: self._marker, + } + } +} + +// === impl NewNewPeakEwma === + +impl NewService for NewNewPeakEwma +where + T: Param, + N: NewService, +{ + type Service = NewPeakEwma; + + fn new_service(&self, target: T) -> Self::Service { + let config = target.param(); + let inner = self.inner.new_service(target); + NewPeakEwma { + config, + inner, + _marker: PhantomData, + } + } +} + +impl Clone for NewNewPeakEwma { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _marker: self._marker, + } + } +} + +// === impl NewPeakEwma === + +impl NewService for NewPeakEwma +where + C: load::TrackCompletion + Default, + N: NewService, + S: Service, +{ + type Service = PeakEwma; + + fn new_service(&self, target: T) -> Self::Service { + // Converts durations to nanos in f64. + // + // Due to a lossy transformation, the maximum value that can be + // represented is ~585 years, which, I hope, is more than enough to + // represent request latencies. + fn nanos(d: Duration) -> f64 { + const NANOS_PER_SEC: u64 = 1_000_000_000; + let n = f64::from(d.subsec_nanos()); + let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64; + n + s + } + + PeakEwma::new( + self.inner.new_service(target), + self.config.default_rtt, + nanos(self.config.decay), + C::default(), + ) + } +} diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index d5af4c5889..864b6c376a 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -1,19 +1,20 @@ use futures::prelude::*; use linkerd_error::Error; -use std::future::Future; -use std::net::SocketAddr; -use std::task::{Context, Poll}; +use std::{ + future::Future, + net::SocketAddr, + task::{Context, Poll}, +}; +use tower::util::Oneshot; /// Resolves `T`-typed names/addresses as an infinite stream of `Update`. -pub trait Resolve { +pub trait Resolve: Clone { type Endpoint; type Error: Into; type Resolution: Stream, Self::Error>>; type Future: Future>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; - - fn resolve(&mut self, target: T) -> Self::Future; + fn resolve(&self, target: T) -> Self::Future; fn into_service(self) -> ResolveService where @@ -38,23 +39,18 @@ pub enum Update { impl Resolve for S where - S: tower::Service, + S: tower::Service + Clone, S::Error: Into, R: Stream, S::Error>>, { type Endpoint = E; type Error = S::Error; type Resolution = S::Response; - type Future = S::Future; - - #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - tower::Service::poll_ready(self, cx) - } + type Future = Oneshot; #[inline] - fn resolve(&mut self, target: T) -> Self::Future { - tower::Service::call(self, target) + fn resolve(&self, target: T) -> Self::Future { + Oneshot::new(self.clone(), target) } } @@ -70,8 +66,8 @@ where type Future = R::Future; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx) + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } #[inline] diff --git a/linkerd/proxy/discover/Cargo.toml b/linkerd/proxy/discover/Cargo.toml deleted file mode 100644 index 821c732c43..0000000000 --- a/linkerd/proxy/discover/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "linkerd-proxy-discover" -version = "0.1.0" -authors = ["Linkerd Developers "] -license = "Apache-2.0" -edition = "2021" -publish = false -description = """ -Utilities to implement a Discover with the core Resolve type -""" - - -[dependencies] -futures = { version = "0.3", default-features = false } -indexmap = "1" -linkerd-error = { path = "../../error" } -linkerd-proxy-core = { path = "../core" } -linkerd-stack = { path = "../../stack" } -tokio = { version = "1", features = ["rt", "sync", "time"] } -tokio-util = "0.7" -tower = { version = "0.4", features = ["discover"] } -tracing = "0.1" -pin-project = "1" - -[dev-dependencies] -tokio = { version = "1", features = ["macros", "test-util"] } -tokio-stream = { version = "0.1", features = ["sync"] } -tower = { version = "0.4", default-features = false, features = ["util"] } diff --git a/linkerd/proxy/discover/src/buffer.rs b/linkerd/proxy/discover/src/buffer.rs deleted file mode 100644 index 5078315713..0000000000 --- a/linkerd/proxy/discover/src/buffer.rs +++ /dev/null @@ -1,199 +0,0 @@ -use futures::{ready, Stream, TryFuture}; -use linkerd_error::{Error, Infallible}; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; -use tokio::sync::{mpsc, oneshot}; -use tokio::time::{self, Sleep}; -use tokio_util::sync::PollSender; -use tower::discover; -use tracing::instrument::Instrument; -use tracing::warn; - -#[derive(Clone, Debug)] -pub struct Buffer { - capacity: usize, - watchdog_timeout: Duration, - inner: M, -} - -#[pin_project] -#[derive(Debug)] -pub struct Discover { - #[pin] - rx: mpsc::Receiver>, - _disconnect_tx: oneshot::Sender, -} - -#[pin_project] -pub struct DiscoverFuture { - #[pin] - future: F, - capacity: usize, - watchdog_timeout: Duration, - _marker: std::marker::PhantomData D>, -} - -#[pin_project] -pub struct Daemon { - #[pin] - discover: D, - #[pin] - disconnect_rx: oneshot::Receiver, - tx: PollSender>, - #[pin] - watchdog: Option, - watchdog_timeout: Duration, -} - -impl Buffer { - pub fn new(capacity: usize, watchdog_timeout: Duration, inner: M) -> Self { - Self { - capacity, - watchdog_timeout, - inner, - } - } -} - -impl tower::Service for Buffer -where - M: tower::Service, - D: discover::Discover + Send + 'static, - D::Error: Into, - D::Key: Send, - D::Service: Send, -{ - type Response = Discover; - type Error = M::Error; - type Future = DiscoverFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: T) -> Self::Future { - let future = self.inner.call(req); - Self::Future { - future, - capacity: self.capacity, - watchdog_timeout: self.watchdog_timeout, - _marker: std::marker::PhantomData, - } - } -} - -impl Future for DiscoverFuture -where - F: TryFuture, - D: discover::Discover + Send + 'static, - D::Error: Into, - D::Key: Send, - D::Service: Send, -{ - type Output = Result, F::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let discover = ready!(this.future.try_poll(cx))?; - - let (tx, rx) = mpsc::channel(*this.capacity); - let tx = PollSender::new(tx); - let (_disconnect_tx, disconnect_rx) = oneshot::channel(); - let fut = Daemon { - discover, - disconnect_rx, - tx, - watchdog_timeout: *this.watchdog_timeout, - watchdog: None, - }; - tokio::spawn(fut.in_current_span()); - - Poll::Ready(Ok(Discover { rx, _disconnect_tx })) - } -} - -impl Future for Daemon -where - D: discover::Discover, - D::Error: Into, - D::Service: Send + 'static, - D::Key: Send + 'static, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - let mut this = self.as_mut().project(); - match this.disconnect_rx.poll(cx) { - Poll::Pending => {} - Poll::Ready(Err(_lost)) => return Poll::Ready(()), - Poll::Ready(Ok(n)) => match n {}, - } - - // The watchdog bounds the amount of time that the send buffer stays - // full. This is designed to release the `discover` resources, i.e. - // if we expect that the receiver has leaked. - match this.tx.poll_reserve(cx) { - Poll::Ready(Ok(())) => { - this.watchdog.as_mut().set(None); - } - Poll::Ready(Err(_)) => { - tracing::trace!("lost sender"); - return Poll::Ready(()); - } - Poll::Pending => { - if this.watchdog.as_mut().as_pin_mut().is_none() { - this.watchdog - .as_mut() - .set(Some(time::sleep(*this.watchdog_timeout))); - } - - if this - .watchdog - .as_pin_mut() - .expect("should have been set if none") - .poll(cx) - .is_ready() - { - warn!( - timeout = ?this.watchdog_timeout, - "Dropping resolution due to watchdog", - ); - return Poll::Ready(()); - } - return Poll::Pending; - } - } - - let up = match ready!(this.discover.poll_discover(cx)) { - Some(Ok(up)) => up, - Some(Err(e)) => { - let error: Error = e.into(); - warn!(error, "Discovery task failed"); - return Poll::Ready(()); - } - None => { - warn!("Discovery stream ended!"); - return Poll::Ready(()); - } - }; - - this.tx.send_item(up).ok().expect("sender must be ready"); - } - } -} - -impl Stream for Discover { - type Item = Result, Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project().rx.poll_recv(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(change)) => Poll::Ready(Some(Ok(change))), - Poll::Ready(None) => Poll::Ready(None), - } - } -} diff --git a/linkerd/proxy/discover/src/lib.rs b/linkerd/proxy/discover/src/lib.rs deleted file mode 100644 index 085a16d691..0000000000 --- a/linkerd/proxy/discover/src/lib.rs +++ /dev/null @@ -1,21 +0,0 @@ -#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] -#![forbid(unsafe_code)] - -use linkerd_proxy_core::Resolve; - -pub mod buffer; -pub mod from_resolve; -pub mod make_endpoint; - -pub use self::buffer::Buffer; -pub use self::from_resolve::FromResolve; -pub use self::make_endpoint::MakeEndpoint; - -pub type Stack = MakeEndpoint, N>; - -pub fn resolve(endpoint: N, resolve: R) -> Stack -where - R: Resolve, -{ - MakeEndpoint::new(endpoint, FromResolve::new(resolve)) -} diff --git a/linkerd/proxy/discover/src/make_endpoint.rs b/linkerd/proxy/discover/src/make_endpoint.rs deleted file mode 100644 index 64c281df85..0000000000 --- a/linkerd/proxy/discover/src/make_endpoint.rs +++ /dev/null @@ -1,136 +0,0 @@ -use futures::{ready, Stream, TryFuture}; - -use linkerd_error::Error; -use linkerd_stack::NewService; -use pin_project::pin_project; -use std::future::Future; -use std::hash::Hash; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tower::discover::{self, Change}; - -#[derive(Clone, Debug)] -pub struct MakeEndpoint { - make_discover: D, - new_endpoint: E, -} - -#[pin_project] -#[derive(Debug)] -pub struct DiscoverFuture { - #[pin] - future: F, - new_endpoint: Option, -} - -/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to -/// build a service for each endpoint. -#[pin_project] -pub struct Discover> { - #[pin] - discover: D, - new_endpoint: E, -} - -// === impl MakeEndpoint === - -impl MakeEndpoint { - pub fn new(new_endpoint: E, make_discover: D) -> Self { - Self { - make_discover, - new_endpoint, - } - } -} - -impl tower::Service for MakeEndpoint -where - D: tower::Service, - InnerDiscover: discover::Discover, - InnerDiscover::Key: Hash + Clone, - InnerDiscover::Error: Into, - E: NewService + Clone, -{ - type Response = Discover; - type Error = D::Error; - type Future = DiscoverFuture; - - #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.make_discover.poll_ready(cx) - } - - #[inline] - fn call(&mut self, target: T) -> Self::Future { - let future = self.make_discover.call(target); - DiscoverFuture { - future, - new_endpoint: Some(self.new_endpoint.clone()), - } - } -} - -// === impl DiscoverFuture === - -impl Future for DiscoverFuture -where - F: TryFuture, - D: discover::Discover, - D::Key: Hash + Clone, - D::Error: Into, - E: NewService, -{ - type Output = Result, F::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let resolution = ready!(this.future.try_poll(cx))?; - let new_endpoint = this.new_endpoint.take().expect("polled after ready"); - Poll::Ready(Ok(Discover::new(resolution, new_endpoint))) - } -} - -// === impl Discover === - -impl Discover -where - D: discover::Discover, - D::Key: Hash + Clone, - D::Error: Into, - E: NewService, -{ - pub fn new(discover: D, new_endpoint: E) -> Self { - Self { - discover, - new_endpoint, - } - } -} - -impl Stream for Discover -where - D: discover::Discover, - D::Key: Hash + Clone, - D::Error: Into, - E: NewService, -{ - type Item = Result, Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Error>>> { - let this = self.as_mut().project(); - match ready!(this.discover.poll_discover(cx)) { - Some(change) => Poll::Ready(Some(Ok(match change.map_err(Into::into)? { - Change::Insert(key, target) => { - let endpoint = this.new_endpoint.new_service(target); - Change::Insert(key, endpoint) - } - Change::Remove(key) => Change::Remove(key), - }))), - - None => Poll::Ready(None), - } - } -} diff --git a/linkerd/proxy/http/Cargo.toml b/linkerd/proxy/http/Cargo.toml index 2a1e8094de..2838f3828e 100644 --- a/linkerd/proxy/http/Cargo.toml +++ b/linkerd/proxy/http/Cargo.toml @@ -20,18 +20,26 @@ h2 = "0.3" http = "0.2" http-body = "0.4" httparse = "1" -hyper = { version = "0.14", features = ["client", "http1", "http2", "server", "stream", "runtime"] } +hyper = { version = "0.14", features = [ + "client", + "http1", + "http2", + "server", + "stream", + "runtime", +] } hyper-balance = { path = "../../../hyper-balance" } linkerd-detect = { path = "../../detect" } linkerd-duplex = { path = "../../duplex" } linkerd-error = { path = "../../error" } linkerd-http-box = { path = "../../http-box" } linkerd-io = { path = "../../io" } +linkerd-proxy-balance = { path = "../../proxy/balance" } linkerd-stack = { path = "../../stack" } rand = "0.8" thiserror = "1" tokio = { version = "1", features = ["time", "rt"] } -tower = { version = "0.4.13", default-features = false, features = ["balance", "load", "discover"] } +tower = { version = "0.4.13", default-features = false } tracing = "0.1" try-lock = "0.2" pin-project = "1" diff --git a/linkerd/proxy/http/src/balance.rs b/linkerd/proxy/http/src/balance.rs index 59b286c5b4..8075b64670 100644 --- a/linkerd/proxy/http/src/balance.rs +++ b/linkerd/proxy/http/src/balance.rs @@ -1,59 +1,9 @@ -use crate::Error; -use hyper::body::HttpBody; -pub use hyper_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; -use rand::thread_rng; -use std::{hash::Hash, marker::PhantomData, time::Duration}; -use tower::discover::Discover; -pub use tower::{ - balance::p2c::Balance, - load::{Load, PeakEwmaDiscover}, -}; +use hyper_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; +use linkerd_proxy_balance as balance; -/// Configures a stack to resolve `T` typed targets to balance requests over -/// `M`-typed endpoint stacks. -#[derive(Debug)] -pub struct Layer { - decay: Duration, - default_rtt: Duration, - _marker: PhantomData B>, -} +pub type Body = PendingUntilFirstDataBody; -// === impl Layer === +pub type EwmaConfig = balance::EwmaConfig; -pub fn layer(default_rtt: Duration, decay: Duration) -> Layer { - Layer { - decay, - default_rtt, - _marker: PhantomData, - } -} - -impl Clone for Layer { - fn clone(&self) -> Self { - Self { - decay: self.decay, - default_rtt: self.default_rtt, - _marker: PhantomData, - } - } -} - -impl tower::layer::Layer for Layer -where - A: HttpBody, - B: HttpBody, - D: Discover, - D::Key: Hash, - S: tower::Service, Response = http::Response>, - S::Error: Into, - Balance, http::Request>: - tower::Service>, -{ - type Service = Balance, http::Request>; - - fn layer(&self, discover: D) -> Self::Service { - let instrument = PendingUntilFirstData::default(); - let loaded = PeakEwmaDiscover::new(discover, self.default_rtt, self.decay, instrument); - Balance::from_rng(loaded, &mut thread_rng()).expect("RNG must be valid") - } -} +pub type NewBalancePeakEwma = + balance::NewBalancePeakEwma, R, N>; diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 39b44942c2..a8cc229854 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -25,6 +25,7 @@ pub mod upgrade; mod version; pub use self::{ + balance::NewBalancePeakEwma, client_handle::{ClientHandle, SetClientHandle}, detect::DetectHttp, glue::{HyperServerSvc, UpgradeBody}, diff --git a/linkerd/proxy/resolve/src/lib.rs b/linkerd/proxy/resolve/src/lib.rs index 5b54eac823..393efc67a7 100644 --- a/linkerd/proxy/resolve/src/lib.rs +++ b/linkerd/proxy/resolve/src/lib.rs @@ -1,5 +1,4 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] -pub mod map_endpoint; pub mod recover; diff --git a/linkerd/proxy/resolve/src/map_endpoint.rs b/linkerd/proxy/resolve/src/map_endpoint.rs deleted file mode 100644 index b5abc52abe..0000000000 --- a/linkerd/proxy/resolve/src/map_endpoint.rs +++ /dev/null @@ -1,160 +0,0 @@ -//! A middleware that wraps `Resolutions`, modifying their endpoint type. - -use futures::stream::Stream; -use futures::stream::TryStream; -use futures::{ready, TryFuture}; -use linkerd_error::Error; -use linkerd_proxy_core::resolve; -use pin_project::pin_project; -use std::future::Future; -use std::net::SocketAddr; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pub trait MapEndpoint { - type Out; - fn map_endpoint(&self, target: &Target, addr: SocketAddr, in_ep: In) -> Self::Out; -} - -#[derive(Clone, Debug)] -pub struct Resolve { - resolve: R, - map: M, -} - -#[pin_project] -#[derive(Debug)] -pub struct ResolveFuture { - #[pin] - future: F, - target: Option, - map: Option, -} - -#[pin_project] -#[derive(Clone, Debug)] -pub struct Resolution { - #[pin] - resolution: R, - target: T, - map: M, - _marker: std::marker::PhantomData, -} - -// === impl Resolve === - -impl Resolve { - pub fn new(map: M, resolve: R) -> Self { - Self { resolve, map } - } -} - -impl tower::Service for Resolve -where - T: Clone, - R: resolve::Resolve, - M: MapEndpoint + Clone, -{ - type Response = Resolution; - type Error = R::Error; - type Future = ResolveFuture; - - #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.resolve.poll_ready(cx) - } - - #[inline] - fn call(&mut self, target: T) -> Self::Future { - let future = self.resolve.resolve(target.clone()); - Self::Future { - future, - target: Some(target), - map: Some(self.map.clone()), - } - } -} - -// === impl ResolveFuture === - -impl Future for ResolveFuture -where - F: TryFuture, - F::Ok: TryStream>, - ::Error: Into, - M: MapEndpoint, -{ - type Output = Result, F::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let resolution = ready!(this.future.try_poll(cx))?; - let target = this.target.take().expect("polled after ready"); - let map = this.map.take().expect("polled after ready"); - Poll::Ready(Ok(Resolution { - resolution, - target, - map, - _marker: std::marker::PhantomData, - })) - } -} - -// === impl Resolution === - -impl Stream for Resolution -where - R: TryStream>, - R::Error: Into, - M: MapEndpoint, - M::Out: std::fmt::Debug, -{ - type Item = Result, R::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let update = match ready!(this.resolution.try_poll_next(cx)) { - Some(result) => match result? { - resolve::Update::Add(eps) => { - let mut update = Vec::with_capacity(eps.len()); - for (a, ep) in eps.into_iter() { - let ep = this.map.map_endpoint(this.target, a, ep); - update.push((a, ep)); - } - resolve::Update::Add(update) - } - resolve::Update::Reset(eps) => { - let mut update = Vec::with_capacity(eps.len()); - for (a, ep) in eps.into_iter() { - let ep = this.map.map_endpoint(this.target, a, ep); - update.push((a, ep)); - } - resolve::Update::Reset(update) - } - resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs), - resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, - }, - None => return Poll::Ready(None), - }; - tracing::trace!(?update); - Poll::Ready(Some(Ok(update))) - } -} - -// === impl MapEndpoint === - -impl MapEndpoint for () { - type Out = N; - - fn map_endpoint(&self, _: &T, _: SocketAddr, ep: N) -> Self::Out { - ep - } -} - -impl Out> MapEndpoint for F { - type Out = Out; - - fn map_endpoint(&self, target: &T, addr: SocketAddr, ep: In) -> Self::Out { - (self)(target, addr, ep) - } -} diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index daa8aa41eb..a8d6b0c797 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -84,8 +84,8 @@ where type Future = ResolveFuture; #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.resolve.poll_ready(cx).map_err(Into::into) + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } #[inline] @@ -220,7 +220,6 @@ where // backoff in case this connection attempt fails. State::Disconnected { ref mut backoff } => { tracing::trace!("connecting"); - ready!(self.resolve.poll_ready(cx).map_err(Into::into))?; let future = self.resolve.resolve(self.target.clone()); let backoff = backoff.take(); State::Connecting { future, backoff } diff --git a/linkerd/proxy/tcp/Cargo.toml b/linkerd/proxy/tcp/Cargo.toml index 8ace638442..3d7107eee8 100644 --- a/linkerd/proxy/tcp/Cargo.toml +++ b/linkerd/proxy/tcp/Cargo.toml @@ -11,8 +11,9 @@ publish = false futures = { version = "0.3", default-features = false } linkerd-duplex = { path = "../../duplex" } linkerd-error = { path = "../../error" } +linkerd-proxy-balance = { path = "../../proxy/balance" } linkerd-stack = { path = "../../stack" } rand = "0.8" tokio = { version = "1" } -tower = { version = "0.4.13", default-features = false, features = ["balance", "load", "discover"] } +tower = { version = "0.4.13", default-features = false } pin-project = "1" diff --git a/linkerd/proxy/tcp/src/balance.rs b/linkerd/proxy/tcp/src/balance.rs index 5e3b1130fc..272d69b8d3 100644 --- a/linkerd/proxy/tcp/src/balance.rs +++ b/linkerd/proxy/tcp/src/balance.rs @@ -1,28 +1,6 @@ -use linkerd_error::Error; -use linkerd_stack::layer; -use rand::thread_rng; -use std::{hash::Hash, time::Duration}; -pub use tower::{ - balance::p2c::Balance, - load::{Load, PeakEwmaDiscover}, -}; -use tower::{discover::Discover, load::CompleteOnResponse}; +use linkerd_proxy_balance as balance; +use tower::load::CompleteOnResponse; -/// Produces a PeakEWMA balancer that uses connect latency (and pending -/// connections) as its load metric. -pub fn layer( - default_rtt: Duration, - decay: Duration, -) -> impl tower::layer::Layer, T>> + Clone -where - D: Discover, - D::Key: Hash, - D::Service: tower::Service, - >::Error: Into, -{ - layer::mk(move |discover| { - let loaded = - PeakEwmaDiscover::new(discover, default_rtt, decay, CompleteOnResponse::default()); - Balance::from_rng(loaded, &mut thread_rng()).expect("RNG must be valid") - }) -} +pub type EwmaConfig = balance::EwmaConfig; + +pub type NewBalancePeakEwma = balance::NewBalancePeakEwma; diff --git a/linkerd/proxy/tcp/src/lib.rs b/linkerd/proxy/tcp/src/lib.rs index 73fcbbf1c7..6bce36b14f 100644 --- a/linkerd/proxy/tcp/src/lib.rs +++ b/linkerd/proxy/tcp/src/lib.rs @@ -4,4 +4,4 @@ pub mod balance; pub mod forward; -pub use self::forward::Forward; +pub use self::{balance::NewBalancePeakEwma, forward::Forward};