Skip to content

Commit

Permalink
Load balancers fall back to ORIG_DST when no endpoints exist (#248)
Browse files Browse the repository at this point in the history
Currently, when no endpoints exist in the load balancer for a
destination, we fail the request. This is because we expect endpoints to
be discovered by both destination service queries _and_ DNS lookups, so
if there are no endpoints for a destination, it is assumed to not exist.

In linkerd/linkerd2#2661, we intend to remove the DNS lookup from the
proxy and instead fall back to routing requests for which no endpoints
exist in the destination service to their SO_ORIGINAL_DST IP address.
This means that the current approach of failing requests when the load
balancer has no endpoints will no longer work.

This branch introduces a generic `fallback` layer, which composes a
primary and secondary service builder into a new layer. The primary 
service can fail requests with an error type that propages the original
request, allowing the fallback middleware to call the fallback service
with the same request. Other errors returned by the primary service are
still propagated upstream.

In contrast to the approach used in #240, this fallback middleware is 
generic and not tied directly to a load balancer or a router, and can
be used for other purposes in the future. It relies on the router cache
eviction added in #247 to drain the router when it is not being used, 
rather than proactively destroying the router when endpoints are
available for the lb, and re-creating it when they exist again.

A new trait, `HasEndpointStatus`, is added in order to allow the
discovery lookup to communicate the "no endpoints" state to the
balancer. In addition, we add a new `Update::NoEndpoints` variant to
`proxy::resolve::Update`, so that when the control plane sends a no
endpoints update, we switch from the balancer to the no endpoints state
_immediately_, rather than waiting for all the endpoints to be
individually removed. When the balancer has no endpoints, it fails all
requests with a fallback error, so that the fallback middleware

A subsequent PR (#248) will remove the DNS lookups from the discovery 
module.

Closes #240.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed May 15, 2019
1 parent 16441c2 commit b70c68d
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- export RUSTFLAGS="-C debuginfo=0" RUST_TEST_THREADS=1 RUST_TEST_PATIENCE_MS=200
script:
- make check-fmt
- travis_wait make test
- travis_wait 40 make test
# If you're debugging disk utilization/caching... This finds the largest files in `target`:
#- du -sh target && find target -type f |xargs -n 1000 du -s |sort -rn |head |awk '{print $2}' |xargs du -sh

Expand Down
27 changes: 22 additions & 5 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use logging;
use metrics::FmtMetrics;
use never::Never;
use proxy::{
self, accept,
self, accept, buffer,
http::{
client, insert, metrics as http_metrics, normalize_uri, profiles, router, settings,
strip_header,
Expand Down Expand Up @@ -419,12 +419,13 @@ where

let outbound = {
use super::outbound::{
//add_remote_ip_on_rsp, add_server_id_on_rsp,
self,
discovery::Resolve,
orig_proto_upgrade,
//add_remote_ip_on_rsp, add_server_id_on_rsp,
};
use proxy::{
http::{balance, canonicalize, header_from_target, metrics, retry},
http::{balance, canonicalize, fallback, header_from_target, metrics, retry},
resolve,
};

Expand Down Expand Up @@ -499,9 +500,25 @@ where
.layer(metrics::layer::<_, classify::Response>(retry_http_metrics))
.layer(insert::target::layer());

let balancer_stack = svc::builder()
let balancer = svc::builder()
.layer(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.layer(resolve::layer(Resolve::new(resolver)))
.layer(resolve::layer(Resolve::new(resolver)));

// Routes requests to their original destination endpoints. Used as
// a fallback when service discovery has no endpoints for a destination.
let orig_dst_router = svc::builder()
.layer(router::layer(
router::Config::new("out ep", capacity, max_idle_age),
|req: &http::Request<_>| {
let ep = outbound::Endpoint::from_orig_dst(req);
debug!("outbound ep={:?}", ep);
ep
},
))
.layer(buffer::layer(max_in_flight, DispatchDeadline::extract));

let balancer_stack = svc::builder()
.layer(fallback::layer(balancer, orig_dst_router))
.layer(pending::layer())
.layer(balance::weight::layer())
.service(endpoint_stack);
Expand Down
27 changes: 25 additions & 2 deletions src/app/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ use std::{fmt, hash};

use super::identity;
use control::destination::{Metadata, ProtocolHint};
use proxy::http::balance::{HasWeight, Weight};
use proxy::http::settings;
use proxy::{
self,
http::{
balance::{HasWeight, Weight},
settings,
},
};
use tap;
use transport::{connect, tls};
use {Conditional, NameAddr};
Expand Down Expand Up @@ -44,6 +49,23 @@ impl Endpoint {
}
}
}

pub fn from_orig_dst<B>(req: &http::Request<B>) -> Option<Self> {
let addr = req
.extensions()
.get::<proxy::Source>()?
.orig_dst_if_not_local()?;
let http_settings = settings::Settings::from_request(req);
Some(Self {
addr,
dst_name: None,
identity: Conditional::None(
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(),
),
metadata: Metadata::empty(),
http_settings,
})
}
}

impl From<SocketAddr> for Endpoint {
Expand Down Expand Up @@ -207,6 +229,7 @@ pub mod discovery {
fn poll(&mut self) -> Poll<resolve::Update<Self::Endpoint>, Self::Error> {
match self.resolving {
Resolving::Name(ref name, ref mut res) => match try_ready!(res.poll()) {
resolve::Update::NoEndpoints => Ok(Async::Ready(resolve::Update::NoEndpoints)),
resolve::Update::Remove(addr) => {
debug!("removing {}", addr);
Ok(Async::Ready(resolve::Update::Remove(addr)))
Expand Down
4 changes: 4 additions & 0 deletions src/control/destination/background/destination_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ where
);
match self.addrs.take() {
Exists::Yes(mut cache) => {
self.responders.retain(|r| {
let sent = r.update_tx.unbounded_send(Update::NoEndpoints);
sent.is_ok()
});
cache.clear(&mut |change| {
Self::on_change(&mut self.responders, authority_for_logging, change)
});
Expand Down
68 changes: 60 additions & 8 deletions src/proxy/http/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ extern crate hyper_balance;
extern crate tower_balance;
extern crate tower_discover;

use std::marker::PhantomData;
use std::time::Duration;
use std::{error::Error, fmt, marker::PhantomData, time::Duration};

use futures::{Future, Poll};
use futures::{future, Async, Future, Poll};
use hyper::body::Payload;

use self::tower_discover::Discover;
Expand All @@ -16,6 +15,11 @@ pub use self::tower_balance::{
};

use http;
use proxy::{
self,
http::fallback,
resolve::{EndpointStatus, HasEndpointStatus},
};
use svc;

/// Configures a stack to resolve `T` typed targets to balance requests over
Expand All @@ -36,6 +40,15 @@ pub struct MakeSvc<M, A, B> {
_marker: PhantomData<fn(A) -> B>,
}

#[derive(Debug)]
pub struct Service<S> {
balance: S,
status: EndpointStatus,
}

#[derive(Debug)]
pub struct NoEndpoints;

// === impl Layer ===

pub fn layer<A, B>(default_rtt: Duration, decay: Duration) -> Layer<A, B> {
Expand Down Expand Up @@ -89,13 +102,14 @@ impl<M: Clone, A, B> Clone for MakeSvc<M, A, B> {
impl<T, M, A, B> svc::Service<T> for MakeSvc<M, A, B>
where
M: svc::Service<T>,
M::Response: Discover,
M::Response: Discover + HasEndpointStatus,
<M::Response as Discover>::Service:
svc::Service<http::Request<A>, Response = http::Response<B>>,
A: Payload,
B: Payload,
{
type Response = Balance<WithPeakEwma<M::Response, PendingUntilFirstData>, PowerOfTwoChoices>;
type Response =
Service<Balance<WithPeakEwma<M::Response, PendingUntilFirstData>, PowerOfTwoChoices>>;
type Error = M::Error;
type Future = MakeSvc<M::Future, A, B>;

Expand All @@ -118,19 +132,47 @@ where
impl<F, A, B> Future for MakeSvc<F, A, B>
where
F: Future,
F::Item: Discover,
F::Item: Discover + HasEndpointStatus,
<F::Item as Discover>::Service: svc::Service<http::Request<A>, Response = http::Response<B>>,
A: Payload,
B: Payload,
{
type Item = Balance<WithPeakEwma<F::Item, PendingUntilFirstData>, PowerOfTwoChoices>;
type Item = Service<Balance<WithPeakEwma<F::Item, PendingUntilFirstData>, PowerOfTwoChoices>>;
type Error = F::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let discover = try_ready!(self.inner.poll());
let status = discover.endpoint_status();
let instrument = PendingUntilFirstData::default();
let loaded = WithPeakEwma::new(discover, self.default_rtt, self.decay, instrument);
Ok(Balance::p2c(loaded).into())
let balance = Balance::p2c(loaded);
Ok(Async::Ready(Service { balance, status }))
}
}

impl<S, A, B> svc::Service<http::Request<A>> for Service<S>
where
S: svc::Service<http::Request<A>, Response = http::Response<B>, Error = proxy::Error>,
{
type Response = http::Response<B>;
type Error = fallback::Error<A>;
type Future = future::Either<
future::MapErr<S::Future, fn(proxy::Error) -> Self::Error>,
future::FutureResult<http::Response<B>, fallback::Error<A>>,
>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.balance.poll_ready().map_err(From::from)
}

fn call(&mut self, req: http::Request<A>) -> Self::Future {
// The endpoint status is updated by the Discover instance, which is
// driven by calling `poll_ready` on the balancer.
if self.status.is_empty() {
future::Either::B(future::err(fallback::Error::fallback(req, NoEndpoints)))
} else {
future::Either::A(self.balance.call(req).map_err(From::from))
}
}
}

Expand Down Expand Up @@ -188,3 +230,13 @@ pub mod weight {
}
}
}

// === impl NoEndpoints ===

impl fmt::Display for NoEndpoints {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt("load balancer has no endpoints", f)
}
}

impl Error for NoEndpoints {}
Loading

0 comments on commit b70c68d

Please sign in to comment.