Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify control plane client construction #638

Merged
merged 4 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub use super::control::ControlAddr;
pub use crate::exp_backoff::ExponentialBackoff;
pub use crate::proxy::http::h2;
pub use crate::transport::{Bind, DefaultOrigDstAddr, NoOrigDstAddr, OrigDstAddr};
Expand Down Expand Up @@ -32,13 +31,6 @@ pub struct ProxyConfig {
pub detect_protocol_timeout: Duration,
}

#[derive(Clone, Debug)]
pub struct ControlConfig {
pub addr: ControlAddr,
pub connect: ConnectConfig,
pub buffer_capacity: usize,
}

// === impl ServerConfig ===

impl<A: OrigDstAddr> ServerConfig<A> {
Expand Down
66 changes: 60 additions & 6 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
use linkerd2_addr::Addr;
use crate::{
classify, config, control, dns,
proxy::http,
reconnect,
svc::{self, NewService},
transport::tls,
Addr, ControlHttpMetrics, Error,
};
use std::fmt;

#[derive(Clone, Debug)]
pub struct Config {
pub addr: ControlAddr,
pub connect: config::ConnectConfig,
pub buffer_capacity: usize,
}

#[derive(Clone, Debug)]
pub struct ControlAddr {
pub addr: Addr,
pub identity: crate::transport::tls::PeerIdentity,
pub identity: tls::PeerIdentity,
}

impl Into<Addr> for ControlAddr {
Expand All @@ -19,8 +33,48 @@ impl fmt::Display for ControlAddr {
}
}

type BalanceBody =
http::balance::PendingUntilFirstDataBody<tower::load::peak_ewma::Handle, http::glue::Body>;

type RspBody = linkerd2_http_metrics::requests::ResponseBody<BalanceBody, classify::Eos>;

pub type Client<B> = linkerd2_buffer::Buffer<http::Request<B>, http::Response<RspBody>>;

impl Config {
pub fn build<B, I>(
self,
dns: dns::Resolver,
metrics: ControlHttpMetrics,
identity: tls::Conditional<I>,
) -> Client<B>
where
B: http::HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
I: Clone + tls::client::HasConfig + Send + 'static,
{
svc::connect(self.connect.keepalive)
.push(tls::ConnectLayer::new(identity))
.push_timeout(self.connect.timeout)
.push(self::client::layer())
.push(reconnect::layer({
let backoff = self.connect.backoff;
move |_| Ok(backoff.stream())
}))
.push_spawn_ready()
.push(self::resolve::layer(dns))
.push_on_response(self::control::balance::layer())
.push(metrics.into_layer::<classify::Response>())
.push(self::add_origin::Layer::new())
.into_new_service()
.check_new_service()
.push_on_response(svc::layers().push_spawn_buffer(self.buffer_capacity))
.new_service(self.addr)
Comment on lines +56 to +72
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we maybe stick an instrument in this stack somewhere, or does that happen elsewhere?

}
}

/// Sets the request's URI from `Config`.
pub mod add_origin {
mod add_origin {
use super::ControlAddr;
use futures::{ready, TryFuture};
use linkerd2_error::Error;
Expand Down Expand Up @@ -151,7 +205,7 @@ pub mod add_origin {
}
}

pub mod resolve {
mod resolve {
use super::client::Target;
use crate::{
dns,
Expand Down Expand Up @@ -187,7 +241,7 @@ pub mod resolve {
}
}

pub mod balance {
mod balance {
use crate::proxy::http;
use std::time::Duration;

Expand All @@ -200,7 +254,7 @@ pub mod balance {
}

/// Creates a client suitable for gRPC.
pub mod client {
mod client {
use crate::transport::{connect, tls};
use crate::{proxy::http, svc};
use linkerd2_proxy_http::h2::Settings as H2Settings;
Expand Down
45 changes: 0 additions & 45 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,6 @@ impl<T> NewService<T> for IdentityProxy {
}
}

#[derive(Clone, Debug)]
pub struct WithTarget<S, T> {
inner: S,
target: T,
}

impl<S, T> Service<()> for WithTarget<S, T>
where
S: Service<T>,
T: Clone,
{
type Response = S::Response;
type Future = S::Future;
type Error = S::Error;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, _: ()) -> Self::Future {
self.inner.call(self.target.clone())
}
}

impl<S, T> NewService<()> for WithTarget<S, T>
where
S: NewService<T>,
T: Clone,
{
type Service = S::Service;

fn new_service(&self, _: ()) -> Self::Service {
self.inner.new_service(self.target.clone())
}
}

#[allow(dead_code)]
impl<L> Layers<L> {
pub fn push<O>(self, outer: O) -> Layers<Pair<L, O>> {
Expand Down Expand Up @@ -403,15 +367,6 @@ impl<S> Stack<S> {
pub fn into_inner(self) -> S {
self.0
}

/// Transforms a `Service<T>` or `NewService<T>` into a `Service<()>` or
/// `NewService<()>` by calling it with a fixed instance of `T`.
pub fn with_fixed_target<T: Clone>(self, target: T) -> Stack<WithTarget<S, T>> {
Stack(WithTarget {
target,
inner: self.0,
})
}
}

impl<T, N> NewService<T> for Stack<N>
Expand Down
60 changes: 26 additions & 34 deletions linkerd/app/src/dst/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
mod permit;
mod resolve;

use http_body::Body as HttpBody;
use indexmap::IndexSet;
use linkerd2_app_core::{
config::{ControlAddr, ControlConfig},
dns, profiles, request_filter, svc, Error,
control, dns, profiles, proxy::identity, request_filter, svc, transport::tls,
ControlHttpMetrics, Error,
};
use permit::PermitConfiguredDsts;
use std::time::Duration;
use tonic::{
body::{Body, BoxBody},
client::GrpcService,
};
use tonic::body::BoxBody;

#[derive(Clone, Debug)]
pub struct Config {
pub control: ControlConfig,
pub control: control::Config,
pub context: String,
pub get_suffixes: IndexSet<dns::Suffix>,
pub get_networks: IndexSet<ipnet::IpNet>,
Expand All @@ -28,40 +24,36 @@ pub struct Config {
/// Handles to destination service clients.
///
/// The addr is preserved for logging.
pub struct Dst<S> {
pub addr: ControlAddr,
pub struct Dst {
pub addr: control::ControlAddr,
pub profiles: request_filter::Service<
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
profiles::Client<S, resolve::BackoffUnlessInvalidArgument>,
profiles::Client<control::Client<BoxBody>, resolve::BackoffUnlessInvalidArgument>,
>,
pub resolve: request_filter::Service<PermitConfiguredDsts, resolve::Resolve<S>>,
pub resolve:
request_filter::Service<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
}

impl Config {
// XXX This is unfortunate -- the service should be built here, but it's annoying to name.
pub fn build<S>(self, svc: S) -> Result<Dst<S>, Error>
where
S: GrpcService<BoxBody> + Clone + Send + 'static,
S::Error: Into<Error> + Send,
S::ResponseBody: Send,
<S::ResponseBody as Body>::Data: Send,
<S::ResponseBody as HttpBody>::Error: Into<Error> + Send,
S::Future: Send,
{
let resolve = svc::stack(resolve::new(
svc.clone(),
&self.context,
self.control.connect.backoff,
))
.push_request_filter(PermitConfiguredDsts::new(
self.get_suffixes,
self.get_networks,
))
.into_inner();
pub fn build(
self,
dns: dns::Resolver,
metrics: ControlHttpMetrics,
identity: tls::Conditional<identity::Local>,
) -> Result<Dst, Error> {
let addr = self.control.addr.clone();
let backoff = self.control.connect.backoff.clone();
let svc = self.control.build(dns, metrics, identity);
let resolve = svc::stack(resolve::new(svc.clone(), &self.context, backoff))
.push_request_filter(PermitConfiguredDsts::new(
self.get_suffixes,
self.get_networks,
))
.into_inner();

let profiles = svc::stack(profiles::Client::new(
svc,
resolve::BackoffUnlessInvalidArgument::from(self.control.connect.backoff),
resolve::BackoffUnlessInvalidArgument::from(backoff),
self.initial_profile_timeout,
self.context,
))
Expand All @@ -72,7 +64,7 @@ impl Config {
.into_inner();

Ok(Dst {
addr: self.control.addr,
addr,
resolve,
profiles,
})
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::core::{
addr,
config::*,
control::{Config as ControlConfig, ControlAddr},
proxy::http::h2,
transport::{listen, tls},
Addr,
Expand Down
28 changes: 8 additions & 20 deletions linkerd/app/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ pub use linkerd2_app_core::proxy::identity::{
certify, Crt, CrtKey, Csr, InvalidName, Key, Local, Name, TokenSource, TrustAnchors,
};
use linkerd2_app_core::{
classify,
config::{ControlAddr, ControlConfig},
control, dns,
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
reconnect,
svc::{self, NewService},
transport::tls,
ControlHttpMetrics as Metrics, Error,
};
Expand All @@ -19,15 +15,15 @@ use tracing::debug;
pub enum Config {
Disabled,
Enabled {
control: ControlConfig,
control: control::Config,
certify: certify::Config,
},
}

pub enum Identity {
Disabled,
Enabled {
addr: ControlAddr,
addr: control::ControlAddr,
local: Local,
task: Task,
},
Expand All @@ -47,20 +43,12 @@ impl Config {
Config::Enabled { control, certify } => {
let (local, crt_store) = Local::new(&certify);

let addr = control.addr;
let svc = svc::connect(control.connect.keepalive)
.push(tls::ConnectLayer::new(tls::Conditional::Some(
certify.trust_anchors.clone(),
)))
.push_timeout(control.connect.timeout)
.push(control::client::layer())
.push(control::resolve::layer(dns))
.push_on_response(control::balance::layer())
.push(reconnect::layer(Recover(control.connect.backoff)))
.push(metrics.into_layer::<classify::Response>())
.push(control::add_origin::Layer::new())
.into_new_service()
.new_service(addr.clone());
let addr = control.addr.clone();
let svc = control.build(
dns,
metrics,
tls::Conditional::Some(certify.trust_anchors.clone()),
);

// Save to be spawned on an auxiliary runtime.
let task = {
Expand Down
Loading