Skip to content

Commit

Permalink
Use cached data to validate policy admissions (#7991)
Browse files Browse the repository at this point in the history
Currently, the policy-controller's admission server issues a request to
Kubernetes every admission review request. This is unnecessary, since
we already maintain a cache of the relevant data structure in the
process.

This change updates the admission server to hold a reference to the
process's `Index` data structure. Each time an admission request is
received, the admission server accesses the index to lookup servers that
already exist in the cluster.

Note that it's possible that the index may not be 100% up to date, as
event delivery can be delayed. I think it's okay to make the tradeoff
that the admission controller is best-effort in favor of reducing
resource usage.

Signed-off-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
olix0r committed Mar 3, 2022
1 parent 00f52dd commit 8760c5f
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 88 deletions.
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/src/authz.rs
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, instrument, trace, warn};

/// Indexes `ServerAuthorization` resources within a namespace.
#[derive(Debug, Default)]
pub(crate) struct AuthzIndex {
pub struct AuthzIndex {
index: HashMap<String, Authz>,
}

Expand Down
36 changes: 21 additions & 15 deletions policy-controller/k8s/index/src/lib.rs
Expand Up @@ -45,7 +45,7 @@ use self::{
use futures::prelude::*;
use linkerd_policy_controller_core::{InboundServer, IpNet};
use linkerd_policy_controller_k8s_api as k8s;
use parking_lot::Mutex;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::{sync::watch, time};

Expand Down Expand Up @@ -76,6 +76,8 @@ pub struct ClusterInfo {
pub identity_domain: String,
}

pub type SharedIndex = Arc<RwLock<Index>>;

/// Holds all indexing state. Owned and updated by a single task that processes watch events,
/// publishing results to the shared lookup map for quick lookups in the API server.
pub struct Index {
Expand All @@ -100,7 +102,7 @@ impl Index {
cluster_info: ClusterInfo,
default_policy: DefaultPolicy,
detect_timeout: time::Duration,
) -> (lookup::Reader, Self) {
) -> (lookup::Reader, SharedIndex) {
// Create a common set of receivers for all supported default policies.
let default_policy_watches =
DefaultPolicyWatches::new(cluster_info.networks.clone(), detect_timeout);
Expand All @@ -116,45 +118,49 @@ impl Index {
cluster_info,
default_policy_watches,
};
(reader, idx)
(reader, Arc::new(RwLock::new(idx)))
}

pub fn get_ns(&self, ns: &str) -> Option<&Namespace> {
self.namespaces.index.get(ns)
}
}

pub async fn index_pods(idx: Arc<Mutex<Index>>, events: impl Stream<Item = k8s::Event<k8s::Pod>>) {
pub async fn index_pods(idx: SharedIndex, events: impl Stream<Item = k8s::Event<k8s::Pod>>) {
tokio::pin!(events);
while let Some(ev) = events.next().await {
match ev {
k8s::Event::Applied(pod) => idx.lock().apply_pod(pod),
k8s::Event::Deleted(pod) => idx.lock().delete_pod(pod),
k8s::Event::Restarted(pods) => idx.lock().reset_pods(pods),
k8s::Event::Applied(pod) => idx.write().apply_pod(pod),
k8s::Event::Deleted(pod) => idx.write().delete_pod(pod),
k8s::Event::Restarted(pods) => idx.write().reset_pods(pods),
}
}
}

pub async fn index_servers(
idx: Arc<Mutex<Index>>,
idx: SharedIndex,
events: impl Stream<Item = k8s::Event<k8s::policy::Server>>,
) {
tokio::pin!(events);
while let Some(ev) = events.next().await {
match ev {
k8s::Event::Applied(srv) => idx.lock().apply_server(srv),
k8s::Event::Deleted(srv) => idx.lock().delete_server(srv),
k8s::Event::Restarted(srvs) => idx.lock().reset_servers(srvs),
k8s::Event::Applied(srv) => idx.write().apply_server(srv),
k8s::Event::Deleted(srv) => idx.write().delete_server(srv),
k8s::Event::Restarted(srvs) => idx.write().reset_servers(srvs),
}
}
}

pub async fn index_serverauthorizations(
idx: Arc<Mutex<Index>>,
idx: SharedIndex,
events: impl Stream<Item = k8s::Event<k8s::policy::ServerAuthorization>>,
) {
tokio::pin!(events);
while let Some(ev) = events.next().await {
match ev {
k8s::Event::Applied(saz) => idx.lock().apply_serverauthorization(saz),
k8s::Event::Deleted(saz) => idx.lock().delete_serverauthorization(saz),
k8s::Event::Restarted(sazs) => idx.lock().reset_serverauthorizations(sazs),
k8s::Event::Applied(saz) => idx.write().apply_serverauthorization(saz),
k8s::Event::Deleted(saz) => idx.write().delete_serverauthorization(saz),
k8s::Event::Restarted(sazs) => idx.write().reset_serverauthorizations(sazs),
}
}
}
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/src/namespace.rs
Expand Up @@ -10,7 +10,7 @@ pub(crate) struct NamespaceIndex {
}

#[derive(Debug)]
pub(crate) struct Namespace {
pub struct Namespace {
/// Holds the global default-allow policy, which may be overridden per-workload.
pub default_policy: DefaultPolicy,

Expand Down
2 changes: 1 addition & 1 deletion policy-controller/k8s/index/src/pod.rs
Expand Up @@ -11,7 +11,7 @@ use tracing::{debug, instrument, trace, warn};

/// Indexes pod state (within a namespace).
#[derive(Debug, Default)]
pub(crate) struct PodIndex {
pub struct PodIndex {
index: HashMap<String, Pod>,
}

Expand Down
27 changes: 22 additions & 5 deletions policy-controller/k8s/index/src/server.rs
Expand Up @@ -8,13 +8,13 @@ use tracing::{debug, instrument, trace, warn};

/// Holds the state of all `Server`s in a namespace.
#[derive(Debug, Default)]
pub(crate) struct SrvIndex {
pub struct SrvIndex {
index: HashMap<String, Server>,
}

/// The state of a `Server` instance and its authorizations.
#[derive(Debug)]
struct Server {
pub struct Server {
/// Labels a `Server`.
labels: k8s::Labels,

Expand Down Expand Up @@ -136,8 +136,17 @@ impl Index {
// === impl SrvIndex ===

impl SrvIndex {
pub fn iter(&self) -> impl Iterator<Item = (&String, &Server)> {
self.index.iter()
}

/// Adds an authorization to servers matching `selector`.
pub fn add_authz(&mut self, name: &str, selector: &ServerSelector, authz: ClientAuthorization) {
pub(crate) fn add_authz(
&mut self,
name: &str,
selector: &ServerSelector,
authz: ClientAuthorization,
) {
for (srv_name, srv) in self.index.iter_mut() {
if selector.selects(srv_name, &srv.labels) {
debug!(server = %srv_name, authz = %name, "Adding authz to server");
Expand All @@ -150,14 +159,14 @@ impl SrvIndex {
}

/// Removes an authorization by `name`.
pub fn remove_authz(&mut self, name: &str) {
pub(crate) fn remove_authz(&mut self, name: &str) {
for srv in self.index.values_mut() {
srv.remove_authz(name);
}
}

/// Iterates over servers that select the given `pod_labels`.
pub fn iter_matching_pod(
pub(crate) fn iter_matching_pod(
&self,
pod_labels: k8s::Labels,
) -> impl Iterator<Item = (&str, &policy::server::Port, &ServerRx)> {
Expand Down Expand Up @@ -291,6 +300,14 @@ impl ServerSelector {
// === impl Server ===

impl Server {
pub fn port(&self) -> &policy::server::Port {
&self.port
}

pub fn pod_selector(&self) -> &k8s::labels::Selector {
&*self.pod_selector
}

fn insert_authz(&mut self, name: impl Into<String>, authz: ClientAuthorization) {
debug!("Adding authorization to server");
self.authorizations.insert(name.into(), authz);
Expand Down
42 changes: 21 additions & 21 deletions policy-controller/k8s/index/src/tests.rs
Expand Up @@ -21,7 +21,7 @@ async fn incrementally_configure_server() {
};
let pod_net = IpNet::from_str("192.0.2.2/28").unwrap();
let detect_timeout = time::Duration::from_secs(1);
let (lookup_rx, mut idx) = Index::new(
let (lookup_rx, idx) = Index::new(
cluster,
DefaultPolicy::Allow {
authenticated_only: false,
Expand All @@ -37,7 +37,7 @@ async fn incrementally_configure_server() {
pod_net.hosts().next().unwrap(),
Some(("container-0", vec![2222, 9999])),
);
idx.apply_pod(pod.clone());
idx.write().apply_pod(pod.clone());

let default = DefaultPolicy::Allow {
authenticated_only: false,
Expand Down Expand Up @@ -69,7 +69,7 @@ async fn incrementally_configure_server() {
srv.spec.proxy_protocol = Some(k8s::policy::server::ProxyProtocol::Http1);
srv
};
idx.apply_server(srv.clone());
idx.write().apply_server(srv.clone());

// Check that the watch has been updated to reflect the above change and that this change _only_
// applies to the correct port.
Expand All @@ -94,7 +94,7 @@ async fn incrementally_configure_server() {
..Default::default()
},
);
idx.apply_serverauthorization(authz.clone());
idx.write().apply_serverauthorization(authz.clone());

// Check that the watch now has authorized traffic as described above.
let mut rx = port2222.into_stream();
Expand All @@ -116,21 +116,21 @@ async fn incrementally_configure_server() {
);

// Delete the authorization and check that the watch has reverted to its prior state.
idx.delete_serverauthorization(authz);
idx.write().delete_serverauthorization(authz);
assert_eq!(
time::timeout(time::Duration::from_secs(1), rx.next()).await,
Ok(Some(basic_config)),
);

// Delete the server and check that the watch has reverted the default state.
idx.delete_server(srv);
idx.write().delete_server(srv);
assert_eq!(
time::timeout(time::Duration::from_secs(1), rx.next()).await,
Ok(Some(default_config))
);

// Delete the pod and check that the watch recognizes that the watch has been closed.
idx.delete_pod(pod);
idx.write().delete_pod(pod);
assert_eq!(
time::timeout(time::Duration::from_secs(1), rx.next()).await,
Ok(None)
Expand All @@ -151,7 +151,7 @@ fn server_update_deselects_pod() {
authenticated_only: false,
cluster_only: true,
};
let (lookup_rx, mut idx) = Index::new(cluster, default, detect_timeout);
let (lookup_rx, idx) = Index::new(cluster, default, detect_timeout);

let p = mk_pod(
"ns-0",
Expand All @@ -160,14 +160,14 @@ fn server_update_deselects_pod() {
pod_net.hosts().next().unwrap(),
Some(("container-0", vec![2222])),
);
idx.apply_pod(p);
idx.write().apply_pod(p);

let srv = {
let mut srv = mk_server("ns-0", "srv-0", Port::Number(2222), None, None);
srv.spec.proxy_protocol = Some(k8s::policy::server::ProxyProtocol::Http2);
srv
};
idx.apply_server(srv.clone());
idx.write().apply_server(srv.clone());

// The default policy applies for all exposed ports.
let port2222 = lookup_rx.lookup("ns-0", "pod-0", 2222).unwrap();
Expand All @@ -180,7 +180,7 @@ fn server_update_deselects_pod() {
}
);

idx.apply_server({
idx.write().apply_server({
let mut srv = srv;
srv.spec.pod_selector = Some(("label", "value")).into_iter().collect();
srv
Expand Down Expand Up @@ -212,7 +212,7 @@ fn default_policy_global() {
let detect_timeout = time::Duration::from_secs(1);

for default in &DEFAULTS {
let (lookup_rx, mut idx) = Index::new(cluster.clone(), *default, detect_timeout);
let (lookup_rx, idx) = Index::new(cluster.clone(), *default, detect_timeout);

let p = mk_pod(
"ns-0",
Expand All @@ -221,7 +221,7 @@ fn default_policy_global() {
pod_net.hosts().next().unwrap(),
Some(("container-0", vec![2222])),
);
idx.reset_pods(vec![p]);
idx.write().reset_pods(vec![p]);

let config = InboundServer {
name: format!("default:{}", default),
Expand Down Expand Up @@ -255,7 +255,7 @@ fn default_policy_annotated() {
let detect_timeout = time::Duration::from_secs(1);

for default in &DEFAULTS {
let (lookup_rx, mut idx) = Index::new(
let (lookup_rx, idx) = Index::new(
cluster.clone(),
// Invert default to ensure override applies.
match *default {
Expand All @@ -277,7 +277,7 @@ fn default_policy_annotated() {
);
p.annotations_mut()
.insert(DefaultPolicy::ANNOTATION.into(), default.to_string());
idx.reset_pods(vec![p]);
idx.write().reset_pods(vec![p]);

let config = InboundServer {
name: format!("default:{}", default),
Expand Down Expand Up @@ -309,7 +309,7 @@ fn default_policy_annotated_invalid() {
authenticated_only: false,
cluster_only: false,
};
let (lookup_rx, mut idx) = Index::new(cluster, default, detect_timeout);
let (lookup_rx, idx) = Index::new(cluster, default, detect_timeout);

let mut p = mk_pod(
"ns-0",
Expand All @@ -320,7 +320,7 @@ fn default_policy_annotated_invalid() {
);
p.annotations_mut()
.insert(DefaultPolicy::ANNOTATION.into(), "bogus".into());
idx.reset_pods(vec![p]);
idx.write().reset_pods(vec![p]);

// Lookup port 2222 -> default config.
let port2222 = lookup_rx
Expand Down Expand Up @@ -356,7 +356,7 @@ fn opaque_annotated() {
let detect_timeout = time::Duration::from_secs(1);

for default in &DEFAULTS {
let (lookup_rx, mut idx) = Index::new(cluster.clone(), *default, detect_timeout);
let (lookup_rx, idx) = Index::new(cluster.clone(), *default, detect_timeout);

let mut p = mk_pod(
"ns-0",
Expand All @@ -367,7 +367,7 @@ fn opaque_annotated() {
);
p.annotations_mut()
.insert("config.linkerd.io/opaque-ports".into(), "2222".into());
idx.reset_pods(vec![p]);
idx.write().reset_pods(vec![p]);

let config = InboundServer {
name: format!("default:{}", default),
Expand All @@ -394,7 +394,7 @@ fn authenticated_annotated() {
let detect_timeout = time::Duration::from_secs(1);

for default in &DEFAULTS {
let (lookup_rx, mut idx) = Index::new(cluster.clone(), *default, detect_timeout);
let (lookup_rx, idx) = Index::new(cluster.clone(), *default, detect_timeout);

let mut p = mk_pod(
"ns-0",
Expand All @@ -407,7 +407,7 @@ fn authenticated_annotated() {
"config.linkerd.io/proxy-require-identity-inbound-ports".into(),
"2222".into(),
);
idx.reset_pods(vec![p]);
idx.write().reset_pods(vec![p]);

let config = {
let policy = match *default {
Expand Down

0 comments on commit 8760c5f

Please sign in to comment.