Skip to content

Commit

Permalink
Use kube-rs resource watcher instead of Api::watch (#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
kate-goldenring committed Sep 13, 2021
1 parent 6b477da commit c635430
Show file tree
Hide file tree
Showing 23 changed files with 354 additions and 246 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.6.14"
version = "0.6.15"
authors = ["Kate Goldenring <kate.goldenring@microsoft.com>", "<bfjelds@microsoft.com>"]
edition = "2018"

Expand Down
126 changes: 79 additions & 47 deletions agent/src/util/config_action.rs
Expand Up @@ -14,7 +14,8 @@ use akri_shared::{
k8s::{try_delete_instance, KubeInterface},
};
use futures::{StreamExt, TryStreamExt};
use kube::api::{Api, ListParams, WatchEvent};
use kube::api::{Api, ListParams};
use kube_runtime::watcher::{watcher, Event};
use log::{info, trace};
use std::{collections::HashMap, option::Option, sync::Arc};
use tokio::sync::{broadcast, mpsc, Mutex};
Expand Down Expand Up @@ -95,20 +96,20 @@ async fn watch_for_config_changes(
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
trace!("watch_for_config_changes - start");
let resource = Api::<Configuration>::all(kube_interface.get_kube_client());
let mut stream = resource
.watch(&ListParams::default(), akri_shared::akri::WATCH_VERSION)
.await?
.boxed();
let watcher = watcher(resource, ListParams::default());
let mut informer = watcher.boxed();
let mut first_event = true;
// Currently, this does not handle None except to break the
// while.
while let Some(event) = stream.try_next().await? {
while let Some(event) = informer.try_next().await? {
let new_discovery_handler_sender = new_discovery_handler_sender.clone();
handle_config(
kube_interface,
event,
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender,
&mut first_event,
)
.await?
}
Expand All @@ -119,18 +120,42 @@ async fn watch_for_config_changes(
/// correct function based on the event type.
async fn handle_config(
kube_interface: &impl KubeInterface,
event: WatchEvent<Configuration>,
event: Event<Configuration>,
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
first_event: &mut bool,
) -> anyhow::Result<()> {
trace!("handle_config - something happened to a configuration");
match event {
WatchEvent::Added(config) => {
Event::Applied(config) => {
info!(
"handle_config - added Configuration {:?}",
config.metadata.name
"handle_config - added or modified Configuration {:?}",
config.metadata.name,
);
// Applied events can either be newly added Configurations or modified Configurations.
// If modified delete all associated instances and device plugins and then recreate them to reflect updated config
// TODO: more gracefully handle modified Configurations by determining what changed rather than delete/re-add
if config_map
.lock()
.await
.contains_key(config.metadata.name.as_ref().unwrap())
{
let do_recreate = should_recreate_config(&config, config_map.clone()).await?;
if !do_recreate {
trace!(
"handle_config - config {:?} has not changed. ignoring config modified event.",
config.metadata.name,
);
return Ok(());
}
info!(
"handle_config - modified Configuration {:?}",
config.metadata.name,
);
handle_config_delete(kube_interface, &config, config_map.clone()).await?;
}

tokio::spawn(async move {
handle_config_add(
Arc::new(k8s::KubeImpl::new().await.unwrap()),
Expand All @@ -142,50 +167,26 @@ async fn handle_config(
.await
.unwrap();
});
Ok(())
}
WatchEvent::Deleted(config) => {
Event::Deleted(config) => {
info!(
"handle_config - deleted Configuration {:?}",
config.metadata.name,
);
handle_config_delete(kube_interface, &config, config_map).await?;
Ok(())
}
// If a config is updated, delete all associated instances and device plugins and then recreate them to reflect updated config
WatchEvent::Modified(config) => {
let do_recreate = should_recreate_config(&config, config_map.clone()).await?;
if !do_recreate {
trace!(
"handle_config - config {:?} has not changed. ignoring config modified event.",
config.metadata.name,
);
return Ok(());
Event::Restarted(_configs) => {
if *first_event {
info!("handle_config - watcher started");
} else {
return Err(anyhow::anyhow!(
"Configuration watcher restarted - throwing error to restart agent"
));
}
info!(
"handle_config - modified Configuration {:?}",
config.metadata.name,
);
handle_config_delete(kube_interface, &config, config_map.clone()).await?;
tokio::spawn(async move {
handle_config_add(
Arc::new(k8s::KubeImpl::new().await.unwrap()),
&config,
config_map,
discovery_handler_map,
new_discovery_handler_sender,
)
.await
.unwrap();
});
Ok(())
}
WatchEvent::Error(ref e) => {
error!("handle_config - error for Configuration: {}", e);
Ok(())
}
WatchEvent::Bookmark(_) => Ok(()),
}
*first_event = false;
Ok(())
}

/// This handles added Configuration by creating a new ConfigInfo for it and adding it to the ConfigMap.
Expand Down Expand Up @@ -241,7 +242,7 @@ async fn handle_config_delete(
kube_interface: &impl KubeInterface,
config: &Configuration,
config_map: ConfigMap,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> anyhow::Result<()> {
let name = config.metadata.name.clone().unwrap();
trace!(
"handle_config_delete - for config {} telling do_periodic_discovery to end",
Expand Down Expand Up @@ -316,7 +317,7 @@ pub async fn delete_all_instances_in_map(
kube_interface: &impl k8s::KubeInterface,
instance_map: InstanceMap,
config: &Configuration,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> anyhow::Result<()> {
let mut instance_map_locked = instance_map.lock().await;
let instances_to_delete_map = instance_map_locked.clone();
let namespace = config.metadata.namespace.as_ref().unwrap();
Expand Down Expand Up @@ -350,6 +351,37 @@ mod config_action_tests {
use std::{collections::HashMap, fs, sync::Arc};
use tokio::sync::{broadcast, Mutex};

// Test that watcher errors on restarts unless it is the first restart (aka initial startup)
#[tokio::test]
async fn test_handle_watcher_restart() {
let _ = env_logger::builder().is_test(true).try_init();
let config_map = Arc::new(Mutex::new(HashMap::new()));
let dh_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let (tx, mut _rx1) = broadcast::channel(1);
let mut first_event = true;
assert!(handle_config(
&MockKubeInterface::new(),
Event::Restarted(Vec::new()),
config_map.clone(),
dh_map.clone(),
tx.clone(),
&mut first_event
)
.await
.is_ok());
first_event = false;
assert!(handle_config(
&MockKubeInterface::new(),
Event::Restarted(Vec::new()),
config_map,
dh_map,
tx,
&mut first_event
)
.await
.is_err());
}

#[tokio::test]
async fn test_handle_config_delete() {
let _ = env_logger::builder().is_test(true).try_init();
Expand Down
2 changes: 1 addition & 1 deletion controller/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "controller"
version = "0.6.14"
version = "0.6.15"
authors = ["<bfjelds@microsoft.com>"]
edition = "2018"

Expand Down

0 comments on commit c635430

Please sign in to comment.