From c2f2ff1c458e645ec416a719feec590c49047084 Mon Sep 17 00:00:00 2001 From: Nicolas Belouin Date: Fri, 1 Sep 2023 15:27:53 +0200 Subject: [PATCH] fix(agent): correct agent's behavior on Instance deletion Make the agent remove itself from the instance nodes list instead of deleting the instance if it's not the last node with access to the device. Refactored the way we get the AGENT_NODE_NAME to only fetch it once at startup. This fixes #650. Signed-off-by: Nicolas Belouin --- agent/src/main.rs | 12 +- agent/src/util/config_action.rs | 15 + agent/src/util/device_plugin_builder.rs | 19 +- agent/src/util/discovery_operator.rs | 399 ++++++++++++++++++++---- 4 files changed, 378 insertions(+), 67 deletions(-) diff --git a/agent/src/main.rs b/agent/src/main.rs index ea7568b87..c5dc99210 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -12,6 +12,7 @@ use log::{info, trace}; use prometheus::{HistogramVec, IntGaugeVec}; use std::{ collections::HashMap, + env, sync::{Arc, Mutex}, time::Duration, }; @@ -52,6 +53,7 @@ async fn main() -> Result<(), Box ); let mut tasks = Vec::new(); + let node_name = env::var("AGENT_NODE_NAME")?; // Start server for Prometheus metrics tasks.push(tokio::spawn(async move { @@ -83,9 +85,13 @@ async fn main() -> Result<(), Box })); tasks.push(tokio::spawn(async move { - config_action::do_config_watch(discovery_handler_map, new_discovery_handler_sender_clone) - .await - .unwrap() + config_action::do_config_watch( + discovery_handler_map, + new_discovery_handler_sender_clone, + node_name, + ) + .await + .unwrap() })); futures::future::try_join_all(tasks).await?; diff --git a/agent/src/util/config_action.rs b/agent/src/util/config_action.rs index 4ac2bc6d8..f97d542b0 100644 --- a/agent/src/util/config_action.rs +++ b/agent/src/util/config_action.rs @@ -50,6 +50,7 @@ pub struct ConfigInfo { pub async fn do_config_watch( discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { info!("do_config_watch - enter"); let config_map: ConfigMap = Arc::new(RwLock::new(HashMap::new())); @@ -63,6 +64,7 @@ pub async fn do_config_watch( let discovery_handler_map = discovery_handler_map.clone(); let new_discovery_handler_sender = new_discovery_handler_sender.clone(); let new_kube_interface = kube_interface.clone(); + let new_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { handle_config_add( new_kube_interface, @@ -70,6 +72,7 @@ pub async fn do_config_watch( config_map, discovery_handler_map, new_discovery_handler_sender, + new_node_name, ) .await .unwrap(); @@ -83,6 +86,7 @@ pub async fn do_config_watch( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await .unwrap(); @@ -99,6 +103,7 @@ async fn watch_for_config_changes( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { trace!("watch_for_config_changes - start"); let resource = Api::::all(kube_interface.get_kube_client()); @@ -121,6 +126,7 @@ async fn watch_for_config_changes( config_map.clone(), discovery_handler_map.clone(), new_discovery_handler_sender, + node_name.clone(), ) .await? } @@ -135,6 +141,7 @@ async fn handle_config( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> anyhow::Result<()> { trace!("handle_config - something happened to a configuration"); match event { @@ -149,6 +156,7 @@ async fn handle_config( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await?; } @@ -186,6 +194,7 @@ async fn handle_config( config_map.clone(), discovery_handler_map.clone(), new_discovery_handler_sender.clone(), + node_name.clone(), ) .await?; } @@ -200,6 +209,7 @@ async fn handle_config_apply( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> anyhow::Result<()> { // Applied events can either be newly added Configurations or modified Configurations. // If modified delete all associated instances and device plugins and then recreate them to reflect updated config @@ -231,6 +241,7 @@ async fn handle_config_apply( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await .unwrap(); @@ -246,6 +257,7 @@ async fn handle_config_add( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { let config_id: ConfigId = ( config.metadata.namespace.clone().unwrap(), @@ -276,6 +288,7 @@ async fn handle_config_add( stop_discovery_sender, &mut finished_discovery_sender, kube_interface, + node_name, ) .await .unwrap(); @@ -446,6 +459,7 @@ mod config_action_tests { config_map.clone(), dh_map.clone(), tx.clone(), + "node-a".to_string(), ) .await .is_ok()); @@ -461,6 +475,7 @@ mod config_action_tests { config_map.clone(), dh_map, tx, + "node-a".to_string(), ) .await .is_ok()); diff --git a/agent/src/util/device_plugin_builder.rs b/agent/src/util/device_plugin_builder.rs index 3c4eccb74..c9dd41b88 100644 --- a/agent/src/util/device_plugin_builder.rs +++ b/agent/src/util/device_plugin_builder.rs @@ -4,8 +4,8 @@ use super::{ KUBELET_SOCKET, LIST_AND_WATCH_MESSAGE_CHANNEL_CAPACITY, }, device_plugin_service::{ - ConfigurationDevicePlugin, DevicePluginBehavior, DevicePluginContext, DevicePluginService, - InstanceDevicePlugin, ListAndWatchMessageKind, + get_device_instance_name, ConfigurationDevicePlugin, DevicePluginBehavior, + DevicePluginContext, DevicePluginService, InstanceDevicePlugin, ListAndWatchMessageKind, }, v1beta1, v1beta1::{ @@ -24,7 +24,7 @@ use log::{info, trace}; #[cfg(test)] use mockall::{automock, predicate::*}; use std::sync::Arc; -use std::{convert::TryFrom, env, path::Path, time::SystemTime}; +use std::{convert::TryFrom, path::Path, time::SystemTime}; use tokio::{ net::UnixListener, net::UnixStream, @@ -39,12 +39,12 @@ use tower::service_fn; pub trait DevicePluginBuilderInterface: Send + Sync { async fn build_device_plugin( &self, - instance_name: String, instance_id: String, config: &Configuration, shared: bool, device_plugin_context: Arc>, device: Device, + node_name: String, ) -> Result<(), Box>; async fn build_configuration_device_plugin( @@ -52,6 +52,7 @@ pub trait DevicePluginBuilderInterface: Send + Sync { device_plugin_name: String, config: &Configuration, device_plugin_context: Arc>, + node_name: String, ) -> Result< broadcast::Sender, Box, @@ -66,13 +67,15 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { /// This creates a new DevicePluginService for an instance and registers it with the kubelet async fn build_device_plugin( &self, - instance_name: String, instance_id: String, config: &Configuration, shared: bool, device_plugin_context: Arc>, device: Device, + node_name: String, ) -> Result<(), Box> { + let instance_name = + get_device_instance_name(&instance_id, config.metadata.name.as_ref().unwrap()); info!("build_device_plugin - entered for device {}", instance_name); let device_plugin_behavior = DevicePluginBehavior::Instance(InstanceDevicePlugin { instance_id: instance_id.clone(), @@ -87,6 +90,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_context, device_plugin_behavior, list_and_watch_message_sender, + node_name, ) .await } @@ -97,6 +101,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_name: String, config: &Configuration, device_plugin_context: Arc>, + node_name: String, ) -> Result< broadcast::Sender, Box, @@ -115,6 +120,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_context, device_plugin_behavior, list_and_watch_message_sender.clone(), + node_name, ) .await?; Ok(list_and_watch_message_sender) @@ -129,6 +135,7 @@ impl DevicePluginBuilder { device_plugin_context: Arc>, device_plugin_behavior: DevicePluginBehavior, list_and_watch_message_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { let capability_id: String = format!("{}/{}", AKRI_PREFIX, device_plugin_name); let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; @@ -147,7 +154,7 @@ impl DevicePluginBuilder { config_name: config.metadata.name.clone().unwrap(), config_uid: config.metadata.uid.as_ref().unwrap().clone(), config_namespace: config.metadata.namespace.as_ref().unwrap().clone(), - node_name: env::var("AGENT_NODE_NAME")?, + node_name, device_plugin_context, list_and_watch_message_sender, server_ender_sender: server_ender_sender.clone(), diff --git a/agent/src/util/discovery_operator.rs b/agent/src/util/discovery_operator.rs index e06fc5112..b1326629d 100644 --- a/agent/src/util/discovery_operator.rs +++ b/agent/src/util/discovery_operator.rs @@ -17,11 +17,13 @@ use akri_discovery_utils::discovery::v0::{ DiscoverResponse, }; use akri_shared::{ - akri::configuration::{ - Configuration, DiscoveryProperty, DiscoveryPropertyKeySelector, DiscoveryPropertySource, + akri::{ + configuration::{ + Configuration, DiscoveryProperty, DiscoveryPropertyKeySelector, DiscoveryPropertySource, + }, + retry::MAX_INSTANCE_UPDATE_TRIES, }, k8s, - os::env_var::{ActualEnvVarQuery, EnvVarQuery}, }; use blake2::{ digest::{Update, VariableOutput}, @@ -240,6 +242,7 @@ impl DiscoveryOperator { kube_interface: Arc, dh_details: &'a DiscoveryDetails, stream: &'a mut dyn StreamingExt, + node_name: String, ) -> anyhow::Result<()> { // clone objects for thread let discovery_operator = Arc::new(self.clone()); @@ -273,6 +276,7 @@ impl DiscoveryOperator { response.devices, dh_details.shared, Box::new(DevicePluginBuilder{}), + node_name.clone(), ) .await?; } @@ -288,6 +292,7 @@ impl DiscoveryOperator { pub async fn delete_offline_instances( &self, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { trace!( "delete_offline_instances - entered for configuration {:?}", @@ -309,10 +314,11 @@ impl DiscoveryOperator { ) .await .unwrap(); - k8s::try_delete_instance( + try_delete_instance( kube_interface_clone.as_ref(), &instance, self.config.metadata.namespace.as_ref().unwrap(), + node_name.clone(), ) .await?; } @@ -331,6 +337,7 @@ impl DiscoveryOperator { discovery_results: Vec, shared: bool, device_plugin_builder: Box, + node_name: String, ) -> anyhow::Result<()> { let config_name = self.config.metadata.name.clone().unwrap(); trace!( @@ -341,7 +348,7 @@ impl DiscoveryOperator { let currently_visible_instances: HashMap = discovery_results .iter() .map(|discovery_result| { - let id = generate_instance_digest(&discovery_result.id, shared); + let id = generate_instance_digest(&discovery_result.id, shared, &node_name); let instance_name = get_device_instance_name(&id, &config_name); (instance_name, discovery_result.clone()) }) @@ -361,13 +368,14 @@ impl DiscoveryOperator { kube_interface, currently_visible_instances, shared, + node_name.clone(), ) .await?; // If there are newly visible instances associated with a Config, make a device plugin and Instance CR for them if !new_discovery_results.is_empty() { for discovery_result in new_discovery_results { - let id = generate_instance_digest(&discovery_result.id, shared); + let id = generate_instance_digest(&discovery_result.id, shared, &node_name); let instance_name = get_device_instance_name(&id, &config_name); trace!( "handle_discovery_results - new instance {} came online", @@ -376,12 +384,12 @@ impl DiscoveryOperator { let device_plugin_context = self.device_plugin_context.clone(); if let Err(e) = device_plugin_builder .build_device_plugin( - instance_name, id, &self.config, shared, device_plugin_context, discovery_result.clone(), + node_name.clone(), ) .await { @@ -403,6 +411,7 @@ impl DiscoveryOperator { kube_interface: Arc, currently_visible_instances: HashMap, shared: bool, + node_name: String, ) -> anyhow::Result<()> { let instance_map = self.device_plugin_context.read().await.clone().instances; for (instance, instance_info) in instance_map { @@ -495,10 +504,11 @@ impl DiscoveryOperator { ) .await .unwrap(); - k8s::try_delete_instance( + try_delete_instance( kube_interface.as_ref(), &instance, self.config.metadata.namespace.as_ref().unwrap(), + node_name.clone(), ) .await .unwrap(); @@ -582,6 +592,66 @@ impl DiscoveryOperator { } } +async fn try_delete_instance( + kube_interface: &dyn k8s::KubeInterface, + instance_name: &str, + instance_namespace: &str, + node_name: String, +) -> Result<(), anyhow::Error> { + for x in 0..MAX_INSTANCE_UPDATE_TRIES { + // First check if instance still exists + if let Ok(mut instance) = kube_interface + .find_instance(instance_name, instance_namespace) + .await + { + if instance.spec.nodes.contains(&node_name) { + instance.spec.nodes.retain(|node| node != &node_name); + } + if instance.spec.nodes.is_empty() { + match k8s::try_delete_instance(kube_interface, instance_name, instance_namespace) + .await + { + Ok(()) => { + trace!("try_delete_instance - deleted Instance {}", instance_name); + break; + } + Err(e) => { + trace!("try_delete_instance - call to delete_instance returned with error {} on try # {} of {}", e, x, MAX_INSTANCE_UPDATE_TRIES); + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + return Err(e); + } + } + } + } else { + match kube_interface + .update_instance( + &instance.spec, + &instance.metadata.name.unwrap(), + instance_namespace, + ) + .await + { + Ok(()) => { + trace!( + "try_delete_instance - updated Instance {} to remove {}", + instance_name, + node_name + ); + break; + } + Err(e) => { + trace!("try_delete_instance - call to update_instance returned with error {} on try # {} of {}", e, x, MAX_INSTANCE_UPDATE_TRIES); + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + return Err(e); + } + } + }; + } + } + } + Ok(()) +} + /// This provides a mockable way to query configMap and secret #[cfg_attr(test, automock)] #[tonic::async_trait] @@ -750,6 +820,7 @@ pub mod start_discovery { stop_all_discovery_sender: broadcast::Sender<()>, finished_all_discovery_sender: &mut mpsc::Sender<()>, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { internal_start_discovery( discovery_operator, @@ -758,6 +829,7 @@ pub mod start_discovery { finished_all_discovery_sender, kube_interface, Box::new(DevicePluginBuilder {}), + node_name, ) .await } @@ -769,6 +841,7 @@ pub mod start_discovery { finished_all_discovery_sender: &mut mpsc::Sender<()>, kube_interface: Arc, device_plugin_builder: Box, + node_name: String, ) -> Result<(), Box> { let config = discovery_operator.get_config(); info!( @@ -791,6 +864,7 @@ pub mod start_discovery { config_dp_name, &config, device_plugin_context.clone(), + node_name.clone(), ) .await { @@ -811,21 +885,28 @@ pub mod start_discovery { // Call discover on already registered Discovery Handlers requested by this Configuration's let known_dh_discovery_operator = discovery_operator.clone(); let known_dh_kube_interface = kube_interface.clone(); + let known_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { - do_discover(known_dh_discovery_operator, known_dh_kube_interface) - .await - .unwrap(); + do_discover( + known_dh_discovery_operator, + known_dh_kube_interface, + known_node_name, + ) + .await + .unwrap(); })); // Listen for new discovery handlers to call discover on let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe(); let mut new_discovery_handler_receiver = new_discovery_handler_sender.subscribe(); let new_dh_discovery_operator = discovery_operator.clone(); + let new_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { listen_for_new_discovery_handlers( new_dh_discovery_operator, &mut new_discovery_handler_receiver, &mut stop_all_discovery_receiver, + new_node_name, ) .await .unwrap(); @@ -836,10 +917,11 @@ pub mod start_discovery { let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe(); let offline_dh_discovery_operator = discovery_operator.clone(); let offline_dh_kube_interface = kube_interface.clone(); + let offline_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { loop { offline_dh_discovery_operator - .delete_offline_instances(offline_dh_kube_interface.clone()) + .delete_offline_instances(offline_dh_kube_interface.clone(), offline_node_name.clone()) .await .unwrap(); if tokio::time::timeout( @@ -864,6 +946,7 @@ pub mod start_discovery { discovery_operator: Arc, new_discovery_handler_receiver: &mut broadcast::Receiver, stop_all_discovery_receiver: &mut broadcast::Receiver<()>, + node_name: String, ) -> Result<(), Box> { let mut discovery_tasks = Vec::new(); loop { @@ -879,8 +962,9 @@ pub mod start_discovery { if discovery_handler_name == discovery_operator.get_config().spec.discovery_handler.name { trace!("listen_for_new_discovery_handlers - received new registered discovery handler for configuration {:?}", discovery_operator.get_config().metadata.name); let new_discovery_operator = discovery_operator.clone(); + let node_name = node_name.clone(); discovery_tasks.push(tokio::spawn(async move { - do_discover(new_discovery_operator, Arc::new(k8s::KubeImpl::new().await.unwrap())).await.unwrap(); + do_discover(new_discovery_operator, Arc::new(k8s::KubeImpl::new().await.unwrap()), node_name.clone()).await.unwrap(); })); } } @@ -899,6 +983,7 @@ pub mod start_discovery { pub async fn do_discover( discovery_operator: Arc, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { let mut discovery_tasks = Vec::new(); let config = discovery_operator.get_config(); @@ -927,12 +1012,14 @@ pub mod start_discovery { ); let discovery_operator = discovery_operator.clone(); let kube_interface = kube_interface.clone(); + let node_name = node_name.clone(); discovery_tasks.push(tokio::spawn(async move { do_discover_on_discovery_handler( discovery_operator.clone(), kube_interface.clone(), &endpoint, &dh_details, + node_name.clone(), ) .await .unwrap(); @@ -949,6 +1036,7 @@ pub mod start_discovery { kube_interface: Arc, endpoint: &'a DiscoveryHandlerEndpoint, dh_details: &'a DiscoveryDetails, + node_name: String, ) -> anyhow::Result<()> { loop { if let Some(stream_type) = discovery_operator @@ -958,7 +1046,12 @@ pub mod start_discovery { match stream_type { StreamType::External(mut stream) => { match discovery_operator - .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .internal_do_discover( + kube_interface.clone(), + dh_details, + &mut stream, + node_name.clone(), + ) .await { Ok(_) => { @@ -974,6 +1067,7 @@ pub mod start_discovery { kube_interface.clone(), std::collections::HashMap::new(), dh_details.shared, + node_name.clone(), ) .await?; } else { @@ -984,6 +1078,7 @@ pub mod start_discovery { kube_interface.clone(), std::collections::HashMap::new(), dh_details.shared, + node_name.clone(), ) .await?; } @@ -996,7 +1091,12 @@ pub mod start_discovery { #[cfg(any(test, feature = "agent-full"))] StreamType::Embedded(mut stream) => { discovery_operator - .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .internal_do_discover( + kube_interface.clone(), + dh_details, + &mut stream, + node_name.clone(), + ) .await?; // Embedded discovery should only return okay if signaled to stop. Otherwise, bubble up error. break; @@ -1041,24 +1141,11 @@ pub mod start_discovery { /// to the same instance name (which is suffixed with this digest). /// However, local devices' Instances should have unique hashes even if they have the same id. /// To ensure this, the node's name is added to the id before it is hashed. -pub fn generate_instance_digest(id_to_digest: &str, shared: bool) -> String { - let env_var_query = ActualEnvVarQuery {}; - inner_generate_instance_digest(id_to_digest, shared, &env_var_query) -} - -pub fn inner_generate_instance_digest( - id_to_digest: &str, - shared: bool, - query: &impl EnvVarQuery, -) -> String { +pub fn generate_instance_digest(id_to_digest: &str, shared: bool, node_name: &str) -> String { let mut id_to_digest = id_to_digest.to_string(); // For local devices, include node hostname in id_to_digest so instances have unique names if !shared { - id_to_digest = format!( - "{}{}", - &id_to_digest, - query.get_env_var("AGENT_NODE_NAME").unwrap() - ); + id_to_digest = format!("{}{}", &id_to_digest, node_name,); } let mut digest = String::new(); let mut hasher = VarBlake2b::new(3).unwrap(); @@ -1251,25 +1338,18 @@ pub mod tests { #[test] fn test_generate_instance_digest() { - let mut mock_env_var_a = MockEnvVarQuery::new(); - mock_env_var_a - .expect_get_env_var() - .returning(|_| Ok("node-a".to_string())); let id = "video1"; - let first_unshared_video_digest = - inner_generate_instance_digest(id, false, &mock_env_var_a); - let first_shared_video_digest = inner_generate_instance_digest(id, true, &mock_env_var_a); - let mut mock_env_var_b = MockEnvVarQuery::new(); - mock_env_var_b - .expect_get_env_var() - .returning(|_| Ok("node-b".to_string())); - let second_unshared_video_digest = - inner_generate_instance_digest(id, false, &mock_env_var_b); - let second_shared_video_digest = inner_generate_instance_digest(id, true, &mock_env_var_b); + let first_unshared_video_digest = generate_instance_digest(id, false, "node-a"); + let first_shared_video_digest = generate_instance_digest(id, true, "node-a"); + + let second_unshared_video_digest = generate_instance_digest(id, false, "node-b"); + let second_shared_video_digest = generate_instance_digest(id, true, "node-b"); // unshared instances visible to different nodes should NOT have the same digest assert_ne!(first_unshared_video_digest, second_unshared_video_digest); // shared instances visible to different nodes should have the same digest assert_eq!(first_shared_video_digest, second_shared_video_digest); + // shared and unshared instance for same node should NOT have the same digest + assert_ne!(first_unshared_video_digest, first_shared_video_digest); } #[tokio::test] @@ -1307,7 +1387,12 @@ pub mod tests { let local_do1 = discovery_operator1.clone(); let discover1 = tokio::spawn(async move { discovery_operator1 - .internal_do_discover(mock_kube_interface1, &dh_details1, &mut rx1) + .internal_do_discover( + mock_kube_interface1, + &dh_details1, + &mut rx1, + "node-a".to_string(), + ) .await .unwrap() }); @@ -1325,7 +1410,12 @@ pub mod tests { )); let discover2 = tokio::spawn(async move { discovery_operator2 - .internal_do_discover(mock_kube_interface2, &dh_details2, &mut rx2) + .internal_do_discover( + mock_kube_interface2, + &dh_details2, + &mut rx2, + "node-a".to_string(), + ) .await .unwrap() }); @@ -1461,7 +1551,7 @@ pub mod tests { mock_discovery_operator .expect_delete_offline_instances() .times(1) - .returning(move |_| Ok(())); + .returning(move |_, _| Ok(())); if terminate { let stop_dh_discovery_sender = discovery_handler_map .lock() @@ -1494,7 +1584,7 @@ pub mod tests { mock_device_plugin_builder .expect_build_configuration_device_plugin() .times(1) - .returning(move |_, _, _| { + .returning(move |_, _, _, _| { let (sender, _) = broadcast::channel(2); Ok(sender) }); @@ -1507,6 +1597,7 @@ pub mod tests { &mut finished_discovery_sender, mock_kube_interface, mock_device_plugin_builder, + "node-a".to_string(), ) .await .unwrap(); @@ -1544,18 +1635,20 @@ pub mod tests { mock_discovery_operator .expect_internal_do_discover() .times(1) - .returning(|_, _, _| Ok(())); + .returning(|_, _, _, _| Ok(())); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); - start_discovery::do_discover(Arc::new(mock_discovery_operator), mock_kube_interface) - .await - .unwrap(); + start_discovery::do_discover( + Arc::new(mock_discovery_operator), + mock_kube_interface, + "node-a".to_string(), + ) + .await + .unwrap(); } #[tokio::test] async fn test_handle_discovery_results() { let _ = env_logger::builder().is_test(true).try_init(); - // Set node name for generating instance id - std::env::set_var("AGENT_NODE_NAME", "node-a"); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); let discovery_handler_map: RegisteredDiscoveryHandlerMap = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -1595,10 +1688,10 @@ pub mod tests { discovery_results, true, Box::new(mock_device_plugin_builder), + "node-a".to_string(), ) .await .unwrap(); - assert_eq!( INSTANCE_COUNT_METRIC .with_label_values(&[&config_name, "true"]) @@ -1677,7 +1770,7 @@ pub mod tests { device_plugin_context, )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); @@ -1698,7 +1791,7 @@ pub mod tests { device_plugin_context, )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); @@ -1713,6 +1806,29 @@ pub mod tests { ) .await; let mut mock = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1722,9 +1838,10 @@ pub mod tests { device_plugin_context.clone(), )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); + // Make sure all instances are deleted from map. Note, first 3 arguments are ignored. check_status_or_empty_loop( InstanceConnectivityStatus::Online, @@ -1843,6 +1960,29 @@ pub mod tests { ) .await; let mut mock = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1869,6 +2009,29 @@ pub mod tests { // 4: Assert that local devices that go offline are removed from the instance map // let mut mock = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1918,6 +2081,7 @@ pub mod tests { Arc::new(mock), currently_visible_instances, shared, + "node-a".to_string(), ) .await .unwrap(); @@ -2725,4 +2889,123 @@ pub mod tests { get_discovery_property_value_from_config_map(&mock_kube_client, &selector).await; assert_eq!(result.unwrap().unwrap(), expected_result); } + + #[tokio::test] + async fn test_try_delete_instance() { + let _ = env_logger::builder().is_test(true).try_init(); + // should do nothing for non existing instance + let mut kube_interface = MockKubeInterface::new(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Err(anyhow::format_err!("Not Found"))); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should delete instance with already empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_delete_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should delete instance with then empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": ["node-a"], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_delete_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should update instance with non empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": ["node-b", "node-a"], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_update_instance() + .times(1) + .withf(move |instance, name, namespace| { + name == "foo" && namespace == "bar" && instance.nodes == vec!["node-b"] + }) + .returning(move |_, _, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + } }