Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Route requests to original dsts when no endpoints exist in the lb #240

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
aeb967f
Replace linkerd2-stack with tower-layer
seanmonstar Apr 22, 2019
5576d78
Add "no endpoints" state to resolve::Discover
hawkw Apr 24, 2019
d20b5c2
Hack destination_set to send NoEndpoints
hawkw Apr 24, 2019
933581d
wire up endpoint status *without* breaking stuff
hawkw Apr 24, 2019
3d358d0
don't get discover out of stack
hawkw Apr 24, 2019
f731f56
add no-endpoints error
hawkw Apr 25, 2019
2a24588
wip
hawkw Apr 26, 2019
facf313
start adding tests
hawkw Apr 29, 2019
ef838d6
made the tests compile but the proxy still doesn't
hawkw Apr 29, 2019
0c152a4
Merge branch 'master' into liza/balance-rt-good
hawkw Apr 29, 2019
dad8ad9
thanks @seanmonstar
hawkw Apr 30, 2019
11f8578
fix payload err types
hawkw Apr 30, 2019
6c14ac1
make everything compile-y
hawkw Apr 30, 2019
c87b088
fix fallback svc being ready when the LB isn't
hawkw Apr 30, 2019
7069c1d
fix warnings in tests
hawkw Apr 30, 2019
f7abd4e
clean up balancer construction a bit
hawkw Apr 30, 2019
741debd
Merge branch 'master' into liza/balance-rt-good
hawkw Apr 30, 2019
4f1abf2
turn warnings back on
hawkw Apr 30, 2019
9768683
rm vestigial where bounds
hawkw May 1, 2019
02628b3
simplify fallback router's `recognize` closure
hawkw May 1, 2019
6a34871
create balancer and/or router as needed
hawkw May 1, 2019
646728d
keep balancer alive always to drive discovery
hawkw May 1, 2019
d00858f
Merge branch 'master' into liza/balance-rt-good
hawkw May 1, 2019
767b8c6
rm buffers
hawkw May 2, 2019
6ac0715
use single-slot buffer for the router
hawkw May 2, 2019
ae22014
rm unused service impl
hawkw May 2, 2019
68e1814
replace MakeCurried , convey intent better
hawkw May 2, 2019
00d0c47
Revert "use single-slot buffer for the router"
hawkw May 2, 2019
19cda74
poll router when it exists
hawkw May 2, 2019
5736498
Merge branch 'master' into liza/balance-rt-good
hawkw May 2, 2019
b34cd1d
also fix syntax
hawkw May 2, 2019
5d44601
always send NoEndpoints
hawkw May 2, 2019
3d500f1
fix bad merge
hawkw May 2, 2019
378f9d7
fix bad merge
hawkw May 2, 2019
e24f5fe
add logging to fallback
hawkw May 3, 2019
4c75002
fix fallback router never being ready
hawkw May 3, 2019
3c2fb23
don't assume no endpoints
hawkw May 3, 2019
8130e8d
Merge branch 'master' into liza/balance-rt-good
hawkw May 3, 2019
fb99e9f
Remove test asserting no eps times out
hawkw May 3, 2019
ca64d98
Replace test with one expecting the right behavior
hawkw May 3, 2019
8cc4d78
Merge branch 'master' into liza/balance-rt-good
hawkw May 3, 2019
ea1b74a
remove target type parameter from `pending::layer`
hawkw May 7, 2019
433505b
put fallback module in its own file
hawkw May 7, 2019
599201a
only update fallback state in `poll_ready`
hawkw May 8, 2019
b565ae6
compose fallback using ServiceBuilder
hawkw May 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use never::Never;
use proxy::{
self, accept, buffer,
http::{
client, insert_target, metrics as http_metrics, normalize_uri, profiles, router, settings,
strip_header,
client, insert_target, metrics as http_metrics, normalize_uri, profiles,
router::{self, rt::Make},
settings, strip_header,
},
pending, reconnect,
};
Expand Down Expand Up @@ -404,6 +405,7 @@ where
//add_remote_ip_on_rsp, add_server_id_on_rsp,
discovery::Resolve,
orig_proto_upgrade,
Endpoint,
};
use proxy::{
http::{balance, canonicalize, header_from_target, metrics, retry},
Expand Down Expand Up @@ -481,6 +483,27 @@ where
.layer(metrics::layer::<_, classify::Response>(retry_http_metrics))
.layer(insert_target::layer());

let balancer = svc::builder()
.layer(
balance::layer(
EWMA_DEFAULT_RTT,
EWMA_DECAY,
resolve::layer(Resolve::new(resolver)),
)
.with_fallback(svc::builder()
.layer(router::layer(|req: &http::Request<_>| {
let ep = Endpoint::from_orig_dst(req);
debug!("outbound ep={:?}", ep);
ep
}))
.layer(buffer::layer(max_in_flight))
)
)
.layer(pending::layer())
.layer(balance::weight::layer())
.service(endpoint_stack)
.make(&router::Config::new("out ep", capacity, max_idle_age));

// A per-`DstAddr` stack that does the following:
//
// 1. Adds the `CANONICAL_DST_HEADER` from the `DstAddr`.
Expand All @@ -497,11 +520,7 @@ where
))
.layer(buffer::layer(max_in_flight))
.layer(pending::layer())
.layer(balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.layer(resolve::layer(Resolve::new(resolver)))
.layer(pending::layer())
.layer(balance::weight::layer())
.service(endpoint_stack);
.service(balancer);

// Routes request using the `DstAddr` extension.
//
Expand Down
27 changes: 25 additions & 2 deletions src/app/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ use std::{fmt, hash};

use super::identity;
use control::destination::{Metadata, ProtocolHint};
use proxy::http::balance::{HasWeight, Weight};
use proxy::http::settings;
use proxy::{
self,
http::{
balance::{HasWeight, Weight},
settings,
},
};
use tap;
use transport::{connect, tls};
use {Conditional, NameAddr};
Expand Down Expand Up @@ -44,6 +49,23 @@ impl Endpoint {
}
}
}

pub fn from_orig_dst<B>(req: &http::Request<B>) -> Option<Self> {
let addr = req
.extensions()
.get::<proxy::Source>()?
.orig_dst_if_not_local()?;
let http_settings = settings::Settings::from_request(req);
Some(Self {
addr,
dst_name: None,
identity: Conditional::None(
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(),
),
metadata: Metadata::empty(),
http_settings,
})
}
}

impl From<SocketAddr> for Endpoint {
Expand Down Expand Up @@ -207,6 +229,7 @@ pub mod discovery {
fn poll(&mut self) -> Poll<resolve::Update<Self::Endpoint>, Self::Error> {
match self.resolving {
Resolving::Name(ref name, ref mut res) => match try_ready!(res.poll()) {
resolve::Update::NoEndpoints => Ok(Async::Ready(resolve::Update::NoEndpoints)),
resolve::Update::Remove(addr) => {
debug!("removing {}", addr);
Ok(Async::Ready(resolve::Update::Remove(addr)))
Expand Down
16 changes: 10 additions & 6 deletions src/control/destination/background/destination_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ where
.filter_map(|addr| pb_to_sock_addr(addr.clone())),
);
}
Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => {
exists = Exists::Yes(());
Some(PbUpdate2::NoEndpoints(ref no_endpoints)) => {
self.no_endpoints(auth, no_endpoints.exists);
}
Some(PbUpdate2::NoEndpoints(no_endpoints)) => {
debug_assert!(!no_endpoints.exists);
exists = Exists::No;
if no_endpoints.exists {
exists = Exists::Yes(());
} else {
exists = Exists::No;
}
}
None => (),
},
Expand Down Expand Up @@ -224,6 +224,10 @@ where
);
match self.addrs.take() {
Exists::Yes(mut cache) => {
self.responders.retain(|r| {
let sent = r.update_tx.unbounded_send(Update::NoEndpoints);
kleimkuhler marked this conversation as resolved.
Show resolved Hide resolved
sent.is_ok()
});
cache.clear(&mut |change| {
Self::on_change(&mut self.responders, authority_for_logging, change)
});
Expand Down
Loading