Skip to content

Commit

Permalink
Proxy: Map unqualified/partially-qualified names to FQDN
Browse files Browse the repository at this point in the history
Previously we required the service to fully qualify all service names
for outbound traffic. Many services are written assuming that
Kubernetes will complete names using its DNS search path, and those
services weren't working with Conduit.

Now add an option, used by default, to fully-qualify the domain names.
Currently only Kubernetes-like name completion for services is
supported, but the configuration syntax is open-ended to allow for
alternatives in the future. Also, the auto-completion can be disabled
for applications that prefer to ensure they're always using unambiguous
names. Once routing is implemented then it is likely that (default)
routing rules will replace these hard-coded rules.

Unit tests for the name completion logic are included.

Part of the solution for #9. The changes to `conduit inject` to
actually use this facility will be in another PR.
  • Loading branch information
briansmith committed Dec 16, 2017
1 parent 4fa4891 commit ff92775
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 16 deletions.
41 changes: 41 additions & 0 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ pub struct Config {

pub pod_name: Option<String>,
pub pod_namespace: Option<String>,
pub pod_zone: Option<String>,
pub node_name: Option<String>,

/// Should we use `pod_namespace` and/or `pod_zone` to map unqualified/partially-qualified
/// to fully-qualified names using the given platform's conventions?
destinations_autocomplete_fqdn: Option<Environment>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Environment {
Kubernetes,
}

/// Configuration settings for binding a listener.
Expand All @@ -76,6 +86,7 @@ pub enum Error {

#[derive(Clone, Debug)]
pub enum ParseError {
EnviromentUnsupported,
NotANumber,
HostIsNotAnIpAddress,
NotUnicode,
Expand Down Expand Up @@ -132,6 +143,8 @@ const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT";
const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME";
const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME";
const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";

pub const ENV_CONTROL_URL: &str = "CONDUIT_PROXY_CONTROL_URL";
const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
Expand Down Expand Up @@ -170,7 +183,10 @@ impl<'a> TryFrom<&'a Strings> for Config {
let report_timeout = parse(strings, ENV_REPORT_TIMEOUT_SECS, parse_number);
let pod_name = strings.get(ENV_POD_NAME);
let pod_namespace = strings.get(ENV_POD_NAMESPACE);
let pod_zone = strings.get(ENV_POD_ZONE);
let node_name = strings.get(ENV_NODE_NAME);
let destinations_autocomplete_fqdn =
parse(strings, ENV_DESTINATIONS_AUTOCOMPLETE_FQDN, parse_environment);

Ok(Config {
private_listener: Listener {
Expand Down Expand Up @@ -204,11 +220,29 @@ impl<'a> TryFrom<&'a Strings> for Config {
Duration::from_secs(report_timeout?.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)),
pod_name: pod_name?,
pod_namespace: pod_namespace?,
pod_zone: pod_zone?,
node_name: node_name?,
destinations_autocomplete_fqdn: destinations_autocomplete_fqdn?,
})
}
}

impl Config {
pub fn default_destination_namespace(&self) -> Option<&String> {
match self.destinations_autocomplete_fqdn {
Some(Environment::Kubernetes) => self.pod_namespace.as_ref(),
None => None,
}
}

pub fn default_destination_zone(&self) -> Option<&String> {
match self.destinations_autocomplete_fqdn {
Some(Environment::Kubernetes) => self.pod_zone.as_ref(),
None => None,
}
}
}

// ===== impl Addr =====

impl FromStr for Addr {
Expand Down Expand Up @@ -275,6 +309,13 @@ impl Strings for TestEnv {

// ===== Parsing =====

fn parse_environment(s: &str) -> Result<Environment, ParseError> {
match s {
"Kubernetes" => Ok(Environment::Kubernetes),
_ => Err(ParseError::EnviromentUnsupported),
}
}

fn parse_number<T>(s: &str) -> Result<T, ParseError> where T: FromStr {
s.parse().map_err(|_| ParseError::NotANumber)
}
Expand Down
21 changes: 11 additions & 10 deletions proxy/src/control/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use std::net::SocketAddr;

use futures::{Async, Future, Poll, Stream};
use futures::sync::mpsc;
use http::uri::Authority;
use tower::Service;
use tower_discover::{Change, Discover};
use tower_grpc;

use fully_qualified_authority::FullyQualifiedAuthority;

use super::codec::Protobuf;
use super::pb::common::{Destination, TcpAddress};
use super::pb::proxy::destination::Update as PbUpdate;
Expand All @@ -24,7 +25,7 @@ pub type ClientBody = ::tower_grpc::client::codec::EncodingBody<
/// A handle to start watching a destination for address changes.
#[derive(Clone, Debug)]
pub struct Discovery {
tx: mpsc::UnboundedSender<(Authority, mpsc::UnboundedSender<Update>)>,
tx: mpsc::UnboundedSender<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
}

/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
Expand All @@ -37,7 +38,7 @@ pub struct Watch<B> {
/// A background handle to eventually bind on the controller thread.
#[derive(Debug)]
pub struct Background {
rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender<Update>)>,
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
}

type DiscoveryWatch<F> = DestinationSet<
Expand All @@ -51,14 +52,14 @@ type DiscoveryWatch<F> = DestinationSet<
/// the controller destination API.
#[derive(Debug)]
pub struct DiscoveryWork<F> {
destinations: HashMap<Authority, DiscoveryWatch<F>>,
destinations: HashMap<FullyQualifiedAuthority, DiscoveryWatch<F>>,
/// A queue of authorities that need to be reconnected.
reconnects: VecDeque<Authority>,
reconnects: VecDeque<FullyQualifiedAuthority>,
/// The Destination.Get RPC client service.
/// Each poll, records whether the rpc service was till ready.
rpc_ready: bool,
/// A receiver of new watch requests.
rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender<Update>)>,
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -115,7 +116,7 @@ pub fn new() -> (Discovery, Background) {

impl Discovery {
/// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &Authority, bind: B) -> Watch<B> {
pub fn resolve<B>(&self, authority: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
trace!("resolve; authority={:?}", authority);
let (tx, rx) = mpsc::unbounded();
self.tx
Expand Down Expand Up @@ -251,7 +252,7 @@ where
Entry::Vacant(vac) => {
let req = Destination {
scheme: "k8s".into(),
path: vac.key().as_str().into(),
path: vac.key().without_trailing_dot().into(),
};
let stream = DestinationSvc::new(&mut rpc).get(req);
vac.insert(DestinationSet {
Expand Down Expand Up @@ -292,7 +293,7 @@ where
trace!("Destination.Get reconnect {:?}", auth);
let req = Destination {
scheme: "k8s".into(),
path: auth.as_str().into(),
path: auth.without_trailing_dot().into(),
};
set.rx = DestinationSvc::new(&mut rpc).get(req);
set.needs_reconnect = false;
Expand Down Expand Up @@ -346,7 +347,7 @@ where
};
if needs_reconnect {
set.needs_reconnect = true;
self.reconnects.push_back(Authority::clone(auth));
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion proxy/src/control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tower_reconnect::Reconnect;
use url::HostAndPort;

use dns;
use fully_qualified_authority::FullyQualifiedAuthority;
use transport::LookupAddressAndConnect;
use timeout::Timeout;

Expand Down Expand Up @@ -57,7 +58,7 @@ pub fn new() -> (Control, Background) {
// ===== impl Control =====

impl Control {
pub fn resolve<B>(&self, auth: &http::uri::Authority, bind: B) -> Watch<B> {
pub fn resolve<B>(&self, auth: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
self.disco.resolve(auth, bind)
}
}
Expand Down
Loading

0 comments on commit ff92775

Please sign in to comment.