From 6a4dbc053a37d2995c698ab599e12ed41fff825c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 8 May 2019 13:45:33 -0700 Subject: [PATCH 01/16] Add a generic HTTP fallback layer Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 291 +++++++++++++++++++++++++++++++++++++ src/proxy/http/mod.rs | 1 + 2 files changed, 292 insertions(+) create mode 100644 src/proxy/http/fallback.rs diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs new file mode 100644 index 0000000000..729d37f9be --- /dev/null +++ b/src/proxy/http/fallback.rs @@ -0,0 +1,291 @@ +use bytes::Buf; +use futures::{Async, Future, Poll}; +use http; +use hyper::body::Payload; +use proxy; +use svc; + +use std::{marker::PhantomData, mem}; + +#[derive(Debug)] +pub enum Error { + Fallback(http::Request), + Error(proxy::Error), +} + +pub type Layer = MakeFallback, svc::ServiceBuilder, A>; + +#[derive(Clone, Debug)] +pub struct MakeFallback { + primary: P, + fallback: F, + _p: PhantomData, +} + +pub struct Service { + primary: P, + fallback: F, + _p: PhantomData, +} + +pub struct MakeFuture +where + P: Future, + F: Future, +{ + primary: Making

, + fallback: Making, + _p: PhantomData, +} + +pub struct ResponseFuture +where + P: Future>, + F: svc::Service>, +{ + fallback: F, + state: State, +} + +pub enum Body { + A(A), + B(B), +} + +enum State { + Primary(P), + Fallback(F), +} + +enum Making { + NotReady(T), + Ready(T::Item), + Done, +} + +pub fn layer( + primary: svc::ServiceBuilder

, + fallback: svc::ServiceBuilder, +) -> Layer { + Layer { + primary, + fallback, + _p: PhantomData, + } +} + +impl svc::Layer for Layer +where + P: svc::Layer + Clone, + F: svc::Layer + Clone, + M: Clone, +{ + type Service = MakeFallback; + + fn layer(&self, inner: M) -> Self::Service { + MakeFallback { + primary: self.primary.clone().service(inner.clone()), + fallback: self.fallback.clone().service(inner), + _p: PhantomData, + } + } +} + +impl svc::Service for MakeFallback +where + P: svc::Service, + P::Error: Into, + F: svc::Service, + F::Error: Into, + T: Clone, +{ + type Response = Service; + type Error = proxy::Error; + type Future = MakeFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + let primary_ready = self.primary.poll_ready().map_err(Into::into); + let fallback_ready = self.fallback.poll_ready().map_err(Into::into); + try_ready!(primary_ready); + try_ready!(fallback_ready); + Ok(Async::Ready(())) + } + + fn call(&mut self, target: T) -> Self::Future { + MakeFuture { + primary: Making::NotReady(self.primary.call(target.clone())), + fallback: Making::NotReady(self.fallback.call(target.clone())), + _p: PhantomData, + } + } +} + +impl Future for MakeFuture +where + P: Future, + P::Error: Into, + F: Future, + F::Error: Into, +{ + type Item = Service; + type Error = proxy::Error; + fn poll(&mut self) -> Poll { + let done = + self.primary.poll().map_err(Into::into)? && self.fallback.poll().map_err(Into::into)?; + if !done { + return Ok(Async::NotReady); + } + + Ok(Async::Ready(Service { + primary: self.primary.take(), + fallback: self.fallback.take(), + _p: PhantomData, + })) + } +} + +impl Making { + fn poll(&mut self) -> Result { + let res = match *self { + Making::NotReady(ref mut fut) => fut.poll()?, + Making::Ready(_) => return Ok(true), + Making::Done => panic!("polled after ready"), + }; + match res { + Async::Ready(res) => { + *self = Making::Ready(res); + Ok(true) + } + Async::NotReady => Ok(false), + } + } + + fn take(&mut self) -> T::Item { + match mem::replace(self, Making::Done) { + Making::Ready(a) => a, + _ => panic!(), + } + } +} + +impl svc::Service> for Service +where + P: svc::Service, Response = http::Response, Error = Error>, + F: svc::Service, Response = http::Response> + Clone, + F::Error: Into, +{ + type Response = http::Response>; + type Error = proxy::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + let primary_ready = self.primary.poll_ready().map_err(|e| match e { + Error::Fallback(_) => { + panic!("service must not return a fallback request in poll_ready") + } + Error::Error(e) => e, + }); + let fallback_ready = self.fallback.poll_ready().map_err(Into::into); + try_ready!(primary_ready); + try_ready!(fallback_ready); + Ok(Async::Ready(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + ResponseFuture { + fallback: self.fallback.clone(), + state: State::Primary(self.primary.call(req)), + } + } +} + +impl Future for ResponseFuture +where + P: Future, Error = Error>, + F: svc::Service, Response = http::Response>, + F::Error: Into, +{ + type Item = http::Response>; + type Error = proxy::Error; + fn poll(&mut self) -> Poll { + loop { + self.state = match self.state { + State::Primary(ref mut f) => match f.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(rsp)) => return Ok(Async::Ready(rsp.map(Body::A))), + Err(Error::Error(e)) => return Err(e), + Err(Error::Fallback(req)) => State::Fallback(self.fallback.call(req)), + }, + State::Fallback(ref mut f) => { + return f + .poll() + .map(|p| p.map(|rsp| rsp.map(Body::B))) + .map_err(Into::into) + } + } + } + } +} + +impl Payload for Body +where + A: Payload, + B: Payload, +{ + type Data = Body; + type Error = A::Error; + + fn poll_data(&mut self) -> Poll, Self::Error> { + match self { + Body::A(ref mut body) => body.poll_data().map(|r| r.map(|o| o.map(Body::A))), + Body::B(ref mut body) => body.poll_data().map(|r| r.map(|o| o.map(Body::B))), + } + } + + fn poll_trailers(&mut self) -> Poll, Self::Error> { + match self { + Body::A(ref mut body) => body.poll_trailers(), + Body::B(ref mut body) => body.poll_trailers(), + } + } + + fn is_end_stream(&self) -> bool { + match self { + Body::A(ref body) => body.is_end_stream(), + Body::B(ref body) => body.is_end_stream(), + } + } +} + +impl Default for Body { + fn default() -> Self { + Body::B(Default::default()) + } +} + +impl Buf for Body +where + A: Buf, + B: Buf, +{ + fn remaining(&self) -> usize { + match self { + Body::A(ref buf) => buf.remaining(), + Body::B(ref buf) => buf.remaining(), + } + } + + fn bytes(&self) -> &[u8] { + match self { + Body::A(ref buf) => buf.bytes(), + Body::B(ref buf) => buf.bytes(), + } + } + + fn advance(&mut self, cnt: usize) { + match self { + Body::A(ref mut buf) => buf.advance(cnt), + Body::B(ref mut buf) => buf.advance(cnt), + } + } +} diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 74c5cf62ca..41b8937c71 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -2,6 +2,7 @@ pub mod add_header; pub mod balance; pub mod canonicalize; pub mod client; +pub mod fallback; pub(super) mod glue; pub mod h1; pub mod h2; From bbb72aa2884fb0c2916bcb86e86ed15385df5636 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 24 Apr 2019 11:13:35 -0700 Subject: [PATCH 02/16] Add "no endpoints" state to resolve::Discover Signed-off-by: Eliza Weisman --- src/proxy/resolve.rs | 59 +++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/src/proxy/resolve.rs b/src/proxy/resolve.rs index 3c0d366478..0976a304b0 100644 --- a/src/proxy/resolve.rs +++ b/src/proxy/resolve.rs @@ -29,6 +29,7 @@ pub trait Resolution { pub enum Update { Add(SocketAddr, T), Remove(SocketAddr), + NoEndpoints, } #[derive(Clone, Debug)] @@ -48,6 +49,7 @@ pub struct MakeSvc { pub struct Discover { resolution: R, make: M, + is_empty: bool, } // === impl Layer === @@ -92,14 +94,33 @@ where fn call(&mut self, target: T) -> Self::Future { let resolution = self.resolve.resolve(&target); - futures::future::ok(Discover { - resolution, - make: self.inner.clone(), - }) + futures::future::ok(Discover::new(resolution, self.inner.clone())) } } // === impl Discover === +impl Discover +where + R: Resolution, + R::Endpoint: fmt::Debug, + R::Error: Into, + M: rt::Make, +{ + /// Returns `true` if there are currently endpoints for this resolution. + pub fn is_empty(&self) -> bool { + self.is_empty + } + + fn new(resolution: R, make: &M) -> Self { + Discover { + resolution, + make: &make.clone(), + is_empty: true, + } + } +} + + impl tower_discover::Discover for Discover where @@ -113,18 +134,26 @@ where type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { - let up = try_ready!(self.resolution.poll().map_err(Into::into)); - trace!("watch: {:?}", up); - match up { - Update::Add(addr, target) => { - // We expect the load balancer to handle duplicate inserts - // by replacing the old endpoint with the new one, so - // insertions of new endpoints and metadata changes for - // existing ones can be handled in the same way. - let svc = self.make.make(&target); - Ok(Async::Ready(Change::Insert(addr, svc))) + loop { + let up = try_ready!(self.resolution.poll().map_err(Into::into)); + trace!("watch: {:?}", up); + match up { + Update::Add(addr, target) => { + // We expect the load balancer to handle duplicate inserts + // by replacing the old endpoint with the new one, so + // insertions of new endpoints and metadata changes for + // existing ones can be handled in the same way. + let svc = self.make.make(&target); + return Ok(Async::Ready(Change::Insert(addr, svc))) + } + Update::Remove(addr) => return Ok(Async::Ready(Change::Remove(addr))), + Update::NoEndpoints => { + self.is_empty = true; + // Keep polling as we should now start to see removals. + continue; + } } - Update::Remove(addr) => Ok(Async::Ready(Change::Remove(addr))), } } + } From 65f4f6454950696336366862c54b74e7b7e1620e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 24 Apr 2019 11:18:49 -0700 Subject: [PATCH 03/16] Hack destination_set to send NoEndpoints Signed-off-by: Eliza Weisman --- src/control/destination/background/destination_set.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/control/destination/background/destination_set.rs b/src/control/destination/background/destination_set.rs index 07123a5669..292d29e790 100644 --- a/src/control/destination/background/destination_set.rs +++ b/src/control/destination/background/destination_set.rs @@ -224,6 +224,10 @@ where ); match self.addrs.take() { Exists::Yes(mut cache) => { + self.responders.retain(|r| { + let sent = r.update_tx.unbounded_send(Update::NoEndpoints); + sent.is_ok() + }); cache.clear(&mut |change| { Self::on_change(&mut self.responders, authority_for_logging, change) }); From 668df8d22061ba9668be178e87096e141709558d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 24 Apr 2019 11:56:37 -0700 Subject: [PATCH 04/16] wire up endpoint status *without* breaking stuff Signed-off-by: Eliza Weisman --- src/app/outbound.rs | 1 + src/proxy/http/balance.rs | 44 +++++++++++++++++++++++++----- src/proxy/resolve.rs | 56 ++++++++++++++++++++++----------------- 3 files changed, 71 insertions(+), 30 deletions(-) diff --git a/src/app/outbound.rs b/src/app/outbound.rs index e666e09941..6c1d97749c 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -207,6 +207,7 @@ pub mod discovery { fn poll(&mut self) -> Poll, Self::Error> { match self.resolving { Resolving::Name(ref name, ref mut res) => match try_ready!(res.poll()) { + resolve::Update::NoEndpoints => Ok(Async::Ready(resolve::Update::NoEndpoints)), resolve::Update::Remove(addr) => { debug!("removing {}", addr); Ok(Async::Ready(resolve::Update::Remove(addr))) diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index 730faf7c02..d345738131 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -5,7 +5,7 @@ extern crate tower_discover; use std::marker::PhantomData; use std::time::Duration; -use futures::{Future, Poll}; +use futures::{Async, Future, Poll}; use hyper::body::Payload; use self::tower_discover::Discover; @@ -15,6 +15,7 @@ pub use self::tower_balance::{ choose::PowerOfTwoChoices, load::WithPeakEwma, Balance, HasWeight, Weight, WithWeighted, }; +use proxy::resolve::{HasEndpointStatus, EndpointStatus}; use http; use svc; @@ -36,6 +37,12 @@ pub struct MakeSvc { _marker: PhantomData B>, } +#[derive(Debug)] +pub struct Service { + balance: S, + status: EndpointStatus, +} + // === impl Layer === pub fn layer(default_rtt: Duration, decay: Duration) -> Layer { @@ -89,13 +96,13 @@ impl Clone for MakeSvc { impl svc::Service for MakeSvc where M: svc::Service, - M::Response: Discover, + M::Response: Discover + HasEndpointStatus, ::Service: svc::Service, Response = http::Response>, A: Payload, B: Payload, { - type Response = Balance, PowerOfTwoChoices>; + type Response = Service, PowerOfTwoChoices>>; type Error = M::Error; type Future = MakeSvc; @@ -118,19 +125,44 @@ where impl Future for MakeSvc where F: Future, - F::Item: Discover, + F::Item: Discover + HasEndpointStatus, ::Service: svc::Service, Response = http::Response>, A: Payload, B: Payload, { - type Item = Balance, PowerOfTwoChoices>; + type Item = Service, PowerOfTwoChoices>>; type Error = F::Error; fn poll(&mut self) -> Poll { let discover = try_ready!(self.inner.poll()); + let status = discover.endpoint_status(); let instrument = PendingUntilFirstData::default(); let loaded = WithPeakEwma::new(discover, self.default_rtt, self.decay, instrument); - Ok(Balance::p2c(loaded).into()) + let balance = Balance::p2c(loaded); + Ok(Async::Ready(Service { + balance, + status + })) + } +} + +impl svc::Service> for Service +where + S: svc::Service, Response = http::Response>, +{ + type Response = http::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.balance.poll_ready() + } + + fn call(&mut self, req: http::Request) -> Self::Future { + if self.status.is_empty() { + unimplemented!("no endpoints"); + } + self.balance.call(req) } } diff --git a/src/proxy/resolve.rs b/src/proxy/resolve.rs index 0976a304b0..60f67a1218 100644 --- a/src/proxy/resolve.rs +++ b/src/proxy/resolve.rs @@ -2,8 +2,14 @@ extern crate linkerd2_router as rt; extern crate tower_discover; use futures::{Async, Poll}; -use std::fmt; -use std::net::SocketAddr; +use std::{ + fmt, + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; pub use self::tower_discover::Change; use proxy::Error; @@ -25,6 +31,13 @@ pub trait Resolution { fn poll(&mut self) -> Poll, Self::Error>; } +pub trait HasEndpointStatus { + fn endpoint_status(&self) -> EndpointStatus; +} + +#[derive(Clone, Debug)] +pub struct EndpointStatus(Arc); + #[derive(Clone, Debug)] pub enum Update { Add(SocketAddr, T), @@ -49,7 +62,7 @@ pub struct MakeSvc { pub struct Discover { resolution: R, make: M, - is_empty: bool, + is_empty: Arc, } // === impl Layer === @@ -94,34 +107,23 @@ where fn call(&mut self, target: T) -> Self::Future { let resolution = self.resolve.resolve(&target); - futures::future::ok(Discover::new(resolution, self.inner.clone())) + futures::future::ok(Discover { + resolution, + make: self.inner.clone(), + is_empty: Arc::new(AtomicBool::new(true)), + }) } } -// === impl Discover === -impl Discover +impl HasEndpointStatus for Discover where R: Resolution, - R::Endpoint: fmt::Debug, - R::Error: Into, - M: rt::Make, { - /// Returns `true` if there are currently endpoints for this resolution. - pub fn is_empty(&self) -> bool { - self.is_empty - } - - fn new(resolution: R, make: &M) -> Self { - Discover { - resolution, - make: &make.clone(), - is_empty: true, - } + fn endpoint_status(&self) -> EndpointStatus { + EndpointStatus(self.is_empty.clone()) } } - - impl tower_discover::Discover for Discover where R: Resolution, @@ -144,16 +146,22 @@ where // insertions of new endpoints and metadata changes for // existing ones can be handled in the same way. let svc = self.make.make(&target); - return Ok(Async::Ready(Change::Insert(addr, svc))) + self.is_empty.store(false, Ordering::Release); + return Ok(Async::Ready(Change::Insert(addr, svc))); } Update::Remove(addr) => return Ok(Async::Ready(Change::Remove(addr))), Update::NoEndpoints => { - self.is_empty = true; + self.is_empty.store(true, Ordering::Release); // Keep polling as we should now start to see removals. continue; } } } } +} +impl EndpointStatus { + pub fn is_empty(&self) -> bool { + self.0.load(Ordering::Acquire) + } } From 40ffba8b4937542218fb36aaaf139866f55b212a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 8 May 2019 14:42:05 -0700 Subject: [PATCH 05/16] make balancer fallback-aware Signed-off-by: Eliza Weisman --- src/proxy/http/balance.rs | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index d345738131..a6e97b6a47 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -5,7 +5,7 @@ extern crate tower_discover; use std::marker::PhantomData; use std::time::Duration; -use futures::{Async, Future, Poll}; +use futures::{future, Async, Future, Poll}; use hyper::body::Payload; use self::tower_discover::Discover; @@ -15,8 +15,12 @@ pub use self::tower_balance::{ choose::PowerOfTwoChoices, load::WithPeakEwma, Balance, HasWeight, Weight, WithWeighted, }; -use proxy::resolve::{HasEndpointStatus, EndpointStatus}; use http; +use proxy::{ + self, + http::fallback, + resolve::{EndpointStatus, HasEndpointStatus}, +}; use svc; /// Configures a stack to resolve `T` typed targets to balance requests over @@ -102,7 +106,8 @@ where A: Payload, B: Payload, { - type Response = Service, PowerOfTwoChoices>>; + type Response = + Service, PowerOfTwoChoices>>; type Error = M::Error; type Future = MakeSvc; @@ -139,30 +144,31 @@ where let instrument = PendingUntilFirstData::default(); let loaded = WithPeakEwma::new(discover, self.default_rtt, self.decay, instrument); let balance = Balance::p2c(loaded); - Ok(Async::Ready(Service { - balance, - status - })) + Ok(Async::Ready(Service { balance, status })) } } impl svc::Service> for Service where - S: svc::Service, Response = http::Response>, + S: svc::Service, Response = http::Response, Error = proxy::Error>, { type Response = http::Response; - type Error = S::Error; - type Future = S::Future; + type Error = fallback::Error; + type Future = future::Either< + future::MapErr Self::Error>, + future::FutureResult, fallback::Error>, + >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.balance.poll_ready() + self.balance.poll_ready().map_err(fallback::Error::Error) } fn call(&mut self, req: http::Request) -> Self::Future { if self.status.is_empty() { - unimplemented!("no endpoints"); + future::Either::B(future::err(fallback::Error::Fallback(req))) + } else { + future::Either::A(self.balance.call(req).map_err(fallback::Error::Error)) } - self.balance.call(req) } } From 70fde83ecd68e60138b0a6b62eb36ed8d0342f64 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 8 May 2019 14:52:33 -0700 Subject: [PATCH 06/16] fix Clone impls requiring req bodies to be Clone Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 18 ++++++++++++++++-- src/proxy/http/router.rs | 25 +++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 729d37f9be..1833f66b31 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -15,7 +15,7 @@ pub enum Error { pub type Layer = MakeFallback, svc::ServiceBuilder, A>; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct MakeFallback { primary: P, fallback: F, @@ -120,6 +120,20 @@ where } } +impl Clone for MakeFallback +where + P: Clone, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + primary: self.primary.clone(), + fallback: self.fallback.clone(), + _p: PhantomData, + } + } +} + impl Future for MakeFuture where P: Future, @@ -181,7 +195,7 @@ where fn poll_ready(&mut self) -> Poll<(), Self::Error> { let primary_ready = self.primary.poll_ready().map_err(|e| match e { Error::Fallback(_) => { - panic!("service must not return a fallback request in poll_ready") + panic!("service must not return a fallback request in poll_ready"); } Error::Error(e) => e, }); diff --git a/src/proxy/http/router.rs b/src/proxy/http/router.rs index 7a72e85093..1c3f77a9a5 100644 --- a/src/proxy/http/router.rs +++ b/src/proxy/http/router.rs @@ -25,14 +25,14 @@ pub struct Config { /// A `Rec`-typed `Recognize` instance is used to produce a target for each /// `Req`-typed request. If the router doesn't already have a `Service` for this /// target, it uses a `Mk`-typed `Service` stack. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Layer> { config: Config, recognize: Rec, _p: PhantomData Req>, } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Stack, Mk> { config: Config, recognize: Rec, @@ -101,6 +101,14 @@ where } } +impl Clone for Layer +where + Rec: Recognize + Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + layer(self.config.clone(), self.recognize.clone()) + } +} // === impl Stack === impl Stack @@ -143,6 +151,19 @@ where } } +impl Clone for Stack +where + Rec: Recognize + Clone, + Mk: Clone, +{ + fn clone(&self) -> Self { + Self { + recognize: self.recognize.clone(), + inner: self.inner.clone(), + _p: PhantomData, + } + } +} // === impl Service === impl svc::Service for Service From 6e812aad4b031ddb39febf6237020058b39b49f5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 8 May 2019 14:53:08 -0700 Subject: [PATCH 07/16] actually wire up new fallback stuff Signed-off-by: Eliza Weisman --- src/app/main.rs | 25 ++++++++++++++++++++----- src/app/outbound.rs | 26 ++++++++++++++++++++++++-- src/proxy/http/fallback.rs | 7 ++----- src/proxy/http/router.rs | 1 + 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/src/app/main.rs b/src/app/main.rs index 94c775396d..f9b88f88d8 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -20,7 +20,7 @@ use logging; use metrics::FmtMetrics; use never::Never; use proxy::{ - self, accept, + self, accept, buffer, http::{ client, insert, metrics as http_metrics, normalize_uri, profiles, router, settings, strip_header, @@ -419,12 +419,13 @@ where let outbound = { use super::outbound::{ - //add_remote_ip_on_rsp, add_server_id_on_rsp, + self, discovery::Resolve, orig_proto_upgrade, + //add_remote_ip_on_rsp, add_server_id_on_rsp, }; use proxy::{ - http::{balance, canonicalize, header_from_target, metrics, retry}, + http::{balance, canonicalize, fallback, header_from_target, metrics, retry}, resolve, }; @@ -499,9 +500,23 @@ where .layer(metrics::layer::<_, classify::Response>(retry_http_metrics)) .layer(insert::target::layer()); - let balancer_stack = svc::builder() + let balancer = svc::builder() .layer(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)) - .layer(resolve::layer(Resolve::new(resolver))) + .layer(resolve::layer(Resolve::new(resolver))); + + let fallback = svc::builder() + .layer(router::layer( + router::Config::new("out ep", capacity, max_idle_age), + |req: &http::Request<_>| { + let ep = outbound::Endpoint::from_orig_dst(req); + debug!("outbound ep={:?}", ep); + ep + }, + )) + .layer(buffer::layer(max_in_flight, DispatchDeadline::extract)); + + let balancer_stack = svc::builder() + .layer(fallback::layer(balancer, fallback)) .layer(pending::layer()) .layer(balance::weight::layer()) .service(endpoint_stack); diff --git a/src/app/outbound.rs b/src/app/outbound.rs index 6c1d97749c..7574cea5dd 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -5,8 +5,13 @@ use std::{fmt, hash}; use super::identity; use control::destination::{Metadata, ProtocolHint}; -use proxy::http::balance::{HasWeight, Weight}; -use proxy::http::settings; +use proxy::{ + self, + http::{ + balance::{HasWeight, Weight}, + settings, + }, +}; use tap; use transport::{connect, tls}; use {Conditional, NameAddr}; @@ -44,6 +49,23 @@ impl Endpoint { } } } + + pub fn from_orig_dst(req: &http::Request) -> Option { + let addr = req + .extensions() + .get::()? + .orig_dst_if_not_local()?; + let http_settings = settings::Settings::from_request(req); + Some(Self { + addr, + dst_name: None, + identity: Conditional::None( + tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(), + ), + metadata: Metadata::empty(), + http_settings, + }) + } } impl From for Endpoint { diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 1833f66b31..b35b5569a6 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -13,7 +13,7 @@ pub enum Error { Error(proxy::Error), } -pub type Layer = MakeFallback, svc::ServiceBuilder, A>; +pub type Layer = MakeFallback, svc::Builder, A>; #[derive(Debug)] pub struct MakeFallback { @@ -63,10 +63,7 @@ enum Making { Done, } -pub fn layer( - primary: svc::ServiceBuilder

, - fallback: svc::ServiceBuilder, -) -> Layer { +pub fn layer(primary: svc::Builder

, fallback: svc::Builder) -> Layer { Layer { primary, fallback, diff --git a/src/proxy/http/router.rs b/src/proxy/http/router.rs index 1c3f77a9a5..62294a4762 100644 --- a/src/proxy/http/router.rs +++ b/src/proxy/http/router.rs @@ -158,6 +158,7 @@ where { fn clone(&self) -> Self { Self { + config: self.config.clone(), recognize: self.recognize.clone(), inner: self.inner.clone(), _p: PhantomData, From 210143831e2adde32ebe27c77efe3532f2f39ce6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 8 May 2019 16:43:30 -0700 Subject: [PATCH 08/16] make errors compose a little nicer Signed-off-by: Eliza Weisman --- src/proxy/http/balance.rs | 22 +++++++++++---- src/proxy/http/fallback.rs | 57 ++++++++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index a6e97b6a47..2125a5f7ea 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -2,8 +2,7 @@ extern crate hyper_balance; extern crate tower_balance; extern crate tower_discover; -use std::marker::PhantomData; -use std::time::Duration; +use std::{error::Error, fmt, marker::PhantomData, time::Duration}; use futures::{future, Async, Future, Poll}; use hyper::body::Payload; @@ -47,6 +46,9 @@ pub struct Service { status: EndpointStatus, } +#[derive(Debug)] +pub struct NoEndpoints; + // === impl Layer === pub fn layer(default_rtt: Duration, decay: Duration) -> Layer { @@ -160,14 +162,14 @@ where >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.balance.poll_ready().map_err(fallback::Error::Error) + self.balance.poll_ready().map_err(From::from) } fn call(&mut self, req: http::Request) -> Self::Future { if self.status.is_empty() { - future::Either::B(future::err(fallback::Error::Fallback(req))) + future::Either::B(future::err(fallback::Error::fallback(req, NoEndpoints))) } else { - future::Either::A(self.balance.call(req).map_err(fallback::Error::Error)) + future::Either::A(self.balance.call(req).map_err(From::from)) } } } @@ -226,3 +228,13 @@ pub mod weight { } } } + +// === impl NoEndpoints === + +impl fmt::Display for NoEndpoints { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt("load balancer has no endpoints", f) + } +} + +impl Error for NoEndpoints {} diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index b35b5569a6..00ff9bdb3c 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -5,12 +5,12 @@ use hyper::body::Payload; use proxy; use svc; -use std::{marker::PhantomData, mem}; +use std::{fmt, marker::PhantomData, mem}; #[derive(Debug)] -pub enum Error { - Fallback(http::Request), - Error(proxy::Error), +pub struct Error { + error: proxy::Error, + fallback: Option>, } pub type Layer = MakeFallback, svc::Builder, A>; @@ -190,12 +190,7 @@ where type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let primary_ready = self.primary.poll_ready().map_err(|e| match e { - Error::Fallback(_) => { - panic!("service must not return a fallback request in poll_ready"); - } - Error::Error(e) => e, - }); + let primary_ready = self.primary.poll_ready().map_err(|e| e.error); let fallback_ready = self.fallback.poll_ready().map_err(Into::into); try_ready!(primary_ready); try_ready!(fallback_ready); @@ -224,8 +219,14 @@ where State::Primary(ref mut f) => match f.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(rsp)) => return Ok(Async::Ready(rsp.map(Body::A))), - Err(Error::Error(e)) => return Err(e), - Err(Error::Fallback(req)) => State::Fallback(self.fallback.call(req)), + Err(Error { + fallback: Some(req), + error, + }) => { + trace!("{}; trying to fall back", error); + State::Fallback(self.fallback.call(req)) + } + Err(e) => return Err(e.into()), }, State::Fallback(ref mut f) => { return f @@ -300,3 +301,35 @@ where } } } + +// === impl Error === + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.error, f) + } +} + +impl Into for Error { + fn into(self) -> proxy::Error { + self.error + } +} + +impl From for Error { + fn from(error: proxy::Error) -> Self { + Error { + error, + fallback: None, + } + } +} + +impl Error { + pub fn fallback(req: http::Request, error: impl Into) -> Self { + Error { + fallback: Some(req), + error: error.into(), + } + } +} From 6290cbe9e5ab2950a26e38784743b5208a49bf99 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 10 May 2019 12:45:23 -0700 Subject: [PATCH 09/16] give travis a longer timeout Signed-off-by: Eliza Weisman --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 689f372223..62ffcf1300 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ jobs: - export RUSTFLAGS="-C debuginfo=0" RUST_TEST_THREADS=1 RUST_TEST_PATIENCE_MS=200 script: - make check-fmt - - travis_wait make test + - travis_wait 40 make test # If you're debugging disk utilization/caching... This finds the largest files in `target`: #- du -sh target && find target -type f |xargs -n 1000 du -s |sort -rn |head |awk '{print $2}' |xargs du -sh From 83192f8663bd5fd677874a5a7ba601297086591e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 13 May 2019 16:18:55 -0700 Subject: [PATCH 10/16] simplify potentially confusing `poll_ready` impls Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 00ff9bdb3c..9319fa6f07 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -103,9 +103,8 @@ where fn poll_ready(&mut self) -> Poll<(), Self::Error> { let primary_ready = self.primary.poll_ready().map_err(Into::into); let fallback_ready = self.fallback.poll_ready().map_err(Into::into); - try_ready!(primary_ready); try_ready!(fallback_ready); - Ok(Async::Ready(())) + primary_ready } fn call(&mut self, target: T) -> Self::Future { @@ -190,11 +189,13 @@ where type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // We are ready iff both the primary *and* the fallback are ready. + // However, we always want to poll both, in case they have internal + // state that's driven in `poll_ready`, before returning the result. let primary_ready = self.primary.poll_ready().map_err(|e| e.error); let fallback_ready = self.fallback.poll_ready().map_err(Into::into); - try_ready!(primary_ready); try_ready!(fallback_ready); - Ok(Async::Ready(())) + primary_ready } fn call(&mut self, req: http::Request) -> Self::Future { From 44ceee9efe512d510124d2c3f974b28b2e0786b6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 13 May 2019 16:55:17 -0700 Subject: [PATCH 11/16] style Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 9319fa6f07..9d0ca557cf 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -44,7 +44,7 @@ where F: svc::Service>, { fallback: F, - state: State, + state: ResponseState, } pub enum Body { @@ -52,7 +52,7 @@ pub enum Body { B(B), } -enum State { +enum ResponseState { Primary(P), Fallback(F), } @@ -201,7 +201,7 @@ where fn call(&mut self, req: http::Request) -> Self::Future { ResponseFuture { fallback: self.fallback.clone(), - state: State::Primary(self.primary.call(req)), + state: ResponseState::Primary(self.primary.call(req)), } } } @@ -214,10 +214,11 @@ where { type Item = http::Response>; type Error = proxy::Error; + fn poll(&mut self) -> Poll { loop { self.state = match self.state { - State::Primary(ref mut f) => match f.poll() { + ResponseState::Primary(ref mut f) => match f.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(rsp)) => return Ok(Async::Ready(rsp.map(Body::A))), Err(Error { @@ -225,11 +226,11 @@ where error, }) => { trace!("{}; trying to fall back", error); - State::Fallback(self.fallback.call(req)) + ResponseState::Fallback(self.fallback.call(req)) } Err(e) => return Err(e.into()), }, - State::Fallback(ref mut f) => { + ResponseState::Fallback(ref mut f) => { return f .poll() .map(|p| p.map(|rsp| rsp.map(Body::B))) @@ -240,6 +241,8 @@ where } } +// === impl Body === + impl Payload for Body where A: Payload, From c7ff05b6f4091bb320efab72cccecc38b264ab6e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 13 May 2019 16:55:42 -0700 Subject: [PATCH 12/16] make Making less confusing Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 9d0ca557cf..8661c4d7f3 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -140,11 +140,9 @@ where type Item = Service; type Error = proxy::Error; fn poll(&mut self) -> Poll { - let done = - self.primary.poll().map_err(Into::into)? && self.fallback.poll().map_err(Into::into)?; - if !done { - return Ok(Async::NotReady); - } + // Are both the primary and fallback futures finished? + try_ready!(self.primary.poll().map_err(Into::into)); + try_ready!(self.fallback.poll().map_err(Into::into)); Ok(Async::Ready(Service { primary: self.primary.take(), @@ -155,19 +153,16 @@ where } impl Making { - fn poll(&mut self) -> Result { - let res = match *self { - Making::NotReady(ref mut fut) => fut.poll()?, - Making::Ready(_) => return Ok(true), + fn poll(&mut self) -> Poll<(), T::Error> { + *self = match self { + Making::NotReady(ref mut fut) => { + let res = try_ready!(fut.poll()); + Making::Ready(res) + } + Making::Ready(_) => return Ok(Async::Ready(())), Making::Done => panic!("polled after ready"), }; - match res { - Async::Ready(res) => { - *self = Making::Ready(res); - Ok(true) - } - Async::NotReady => Ok(false), - } + Ok(Async::Ready(())) } fn take(&mut self) -> T::Item { From 46f9beae86a773e12e0ce39ce935499dd06d6fa2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 14 May 2019 14:19:30 -0700 Subject: [PATCH 13/16] cleanup Signed-off-by: Eliza Weisman --- src/app/main.rs | 6 ++++-- src/proxy/http/fallback.rs | 37 ++++++++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/app/main.rs b/src/app/main.rs index f9b88f88d8..80d026cf1e 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -504,7 +504,9 @@ where .layer(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)) .layer(resolve::layer(Resolve::new(resolver))); - let fallback = svc::builder() + // Routes requests to their original destination endpoints. Used as + // a fallback when service discovery has no endpoints for a destination. + let orig_dst_router = svc::builder() .layer(router::layer( router::Config::new("out ep", capacity, max_idle_age), |req: &http::Request<_>| { @@ -516,7 +518,7 @@ where .layer(buffer::layer(max_in_flight, DispatchDeadline::extract)); let balancer_stack = svc::builder() - .layer(fallback::layer(balancer, fallback)) + .layer(fallback::layer(balancer, orig_dst_router)) .layer(pending::layer()) .layer(balance::weight::layer()) .service(endpoint_stack); diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 8661c4d7f3..0159834e5e 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -13,10 +13,15 @@ pub struct Error { fallback: Option>, } -pub type Layer = MakeFallback, svc::Builder, A>; +#[derive(Debug)] +pub struct Layer { + primary: svc::Builder

, + fallback: svc::Builder, + _p: PhantomData, +} #[derive(Debug)] -pub struct MakeFallback { +pub struct MakeSvc { primary: P, fallback: F, _p: PhantomData, @@ -71,16 +76,18 @@ pub fn layer(primary: svc::Builder

, fallback: svc::Builder) -> La } } +// === impl Layer === + impl svc::Layer for Layer where P: svc::Layer + Clone, F: svc::Layer + Clone, M: Clone, { - type Service = MakeFallback; + type Service = MakeSvc; fn layer(&self, inner: M) -> Self::Service { - MakeFallback { + MakeSvc { primary: self.primary.clone().service(inner.clone()), fallback: self.fallback.clone().service(inner), _p: PhantomData, @@ -88,7 +95,19 @@ where } } -impl svc::Service for MakeFallback +impl Clone for Layer +where + P: Clone, + F: Clone, +{ + fn clone(&self) -> Self { + layer(self.primary.clone(), self.fallback.clone()) + } +} + +// === impl MakeSvc === + +impl svc::Service for MakeSvc where P: svc::Service, P::Error: Into, @@ -116,7 +135,7 @@ where } } -impl Clone for MakeFallback +impl Clone for MakeSvc where P: Clone, F: Clone, @@ -130,6 +149,8 @@ where } } +// === impl MakeFuture === + impl Future for MakeFuture where P: Future, @@ -168,11 +189,13 @@ impl Making { fn take(&mut self) -> T::Item { match mem::replace(self, Making::Done) { Making::Ready(a) => a, - _ => panic!(), + _ => panic!("tried to take service twice"), } } } +// === impl Service === + impl svc::Service> for Service where P: svc::Service, Response = http::Response, Error = Error>, From a0258261c4d1d507a7bbc988adab3f786e9176b5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 15 May 2019 11:16:37 -0700 Subject: [PATCH 14/16] Handle fallback service backpressure in response future Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 0159834e5e..64f00293f9 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -49,7 +49,7 @@ where F: svc::Service>, { fallback: F, - state: ResponseState, + state: ResponseState, } pub enum Body { @@ -57,8 +57,12 @@ pub enum Body { B(B), } -enum ResponseState { +enum ResponseState { + /// Waiting for the primary service's future to complete. Primary(P), + /// Request buffered, waiting for the fallback service to become ready. + Waiting(Option>), + /// Waiting for the fallback service's future to complete. Fallback(F), } @@ -207,13 +211,10 @@ where type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - // We are ready iff both the primary *and* the fallback are ready. - // However, we always want to poll both, in case they have internal - // state that's driven in `poll_ready`, before returning the result. - let primary_ready = self.primary.poll_ready().map_err(|e| e.error); - let fallback_ready = self.fallback.poll_ready().map_err(Into::into); - try_ready!(fallback_ready); - primary_ready + // We're ready as long as the primary service is ready. If we have to + // call the fallback service, the response future will buffer the + // request until the fallback service is ready. + self.primary.poll_ready().map_err(|e| e.error) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -234,9 +235,10 @@ where type Error = proxy::Error; fn poll(&mut self) -> Poll { + use self::ResponseState::*; loop { self.state = match self.state { - ResponseState::Primary(ref mut f) => match f.poll() { + Primary(ref mut f) => match f.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(rsp)) => return Ok(Async::Ready(rsp.map(Body::A))), Err(Error { @@ -244,11 +246,16 @@ where error, }) => { trace!("{}; trying to fall back", error); - ResponseState::Fallback(self.fallback.call(req)) + Waiting(Some(req)) } Err(e) => return Err(e.into()), }, - ResponseState::Fallback(ref mut f) => { + Waiting(ref mut req) => { + try_ready!(self.fallback.poll_ready().map_err(Into::into)); + let req = req.take().expect("request should only be taken once"); + Fallback(self.fallback.call(req)) + } + Fallback(ref mut f) => { return f .poll() .map(|p| p.map(|rsp| rsp.map(Body::B))) From bb2bf7e51ed181bc3e7b2be5671f795d3cd49962 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 15 May 2019 11:22:09 -0700 Subject: [PATCH 15/16] clean up, add comments Signed-off-by: Eliza Weisman --- src/proxy/http/balance.rs | 2 ++ src/proxy/http/fallback.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index 2125a5f7ea..4933d10bcc 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -166,6 +166,8 @@ where } fn call(&mut self, req: http::Request) -> Self::Future { + // The endpoint status is updated by the Discover instance, which is + // driven by calling `poll_ready` on the balancer. if self.status.is_empty() { future::Either::B(future::err(fallback::Error::fallback(req, NoEndpoints))) } else { diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 64f00293f9..2fe127f086 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -238,9 +238,10 @@ where use self::ResponseState::*; loop { self.state = match self.state { + // We've called the primary service and are waiting for its + // future to complete. Primary(ref mut f) => match f.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(rsp)) => return Ok(Async::Ready(rsp.map(Body::A))), + Ok(r) => return Ok(r), Err(Error { fallback: Some(req), error, @@ -250,11 +251,15 @@ where } Err(e) => return Err(e.into()), }, + // The primary service has returned a fallback error, so we are + // waiting for the fallback service to be ready. Waiting(ref mut req) => { try_ready!(self.fallback.poll_ready().map_err(Into::into)); let req = req.take().expect("request should only be taken once"); Fallback(self.fallback.call(req)) } + // We've called the fallback service and are waiting for its + // future to complete. Fallback(ref mut f) => { return f .poll() From c5d9c11994f646ed094e2787568c6d806842bd02 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 15 May 2019 11:27:34 -0700 Subject: [PATCH 16/16] actually these do need to be separate cases... Signed-off-by: Eliza Weisman --- src/proxy/http/fallback.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/proxy/http/fallback.rs b/src/proxy/http/fallback.rs index 2fe127f086..3ba444427f 100644 --- a/src/proxy/http/fallback.rs +++ b/src/proxy/http/fallback.rs @@ -241,7 +241,8 @@ where // We've called the primary service and are waiting for its // future to complete. Primary(ref mut f) => match f.poll() { - Ok(r) => return Ok(r), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(rsp)) => return Ok(Async::Ready(rsp.map(Body::A))), Err(Error { fallback: Some(req), error,