diff --git a/agent/src/main.rs b/agent/src/main.rs index ea7568b87..c5dc99210 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -12,6 +12,7 @@ use log::{info, trace}; use prometheus::{HistogramVec, IntGaugeVec}; use std::{ collections::HashMap, + env, sync::{Arc, Mutex}, time::Duration, }; @@ -52,6 +53,7 @@ async fn main() -> Result<(), Box ); let mut tasks = Vec::new(); + let node_name = env::var("AGENT_NODE_NAME")?; // Start server for Prometheus metrics tasks.push(tokio::spawn(async move { @@ -83,9 +85,13 @@ async fn main() -> Result<(), Box })); tasks.push(tokio::spawn(async move { - config_action::do_config_watch(discovery_handler_map, new_discovery_handler_sender_clone) - .await - .unwrap() + config_action::do_config_watch( + discovery_handler_map, + new_discovery_handler_sender_clone, + node_name, + ) + .await + .unwrap() })); futures::future::try_join_all(tasks).await?; diff --git a/agent/src/util/config_action.rs b/agent/src/util/config_action.rs index 4ac2bc6d8..f97d542b0 100644 --- a/agent/src/util/config_action.rs +++ b/agent/src/util/config_action.rs @@ -50,6 +50,7 @@ pub struct ConfigInfo { pub async fn do_config_watch( discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { info!("do_config_watch - enter"); let config_map: ConfigMap = Arc::new(RwLock::new(HashMap::new())); @@ -63,6 +64,7 @@ pub async fn do_config_watch( let discovery_handler_map = discovery_handler_map.clone(); let new_discovery_handler_sender = new_discovery_handler_sender.clone(); let new_kube_interface = kube_interface.clone(); + let new_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { handle_config_add( new_kube_interface, @@ -70,6 +72,7 @@ pub async fn do_config_watch( config_map, discovery_handler_map, new_discovery_handler_sender, + new_node_name, ) .await .unwrap(); @@ -83,6 +86,7 @@ pub async fn do_config_watch( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await .unwrap(); @@ -99,6 +103,7 @@ async fn watch_for_config_changes( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { trace!("watch_for_config_changes - start"); let resource = Api::::all(kube_interface.get_kube_client()); @@ -121,6 +126,7 @@ async fn watch_for_config_changes( config_map.clone(), discovery_handler_map.clone(), new_discovery_handler_sender, + node_name.clone(), ) .await? } @@ -135,6 +141,7 @@ async fn handle_config( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> anyhow::Result<()> { trace!("handle_config - something happened to a configuration"); match event { @@ -149,6 +156,7 @@ async fn handle_config( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await?; } @@ -186,6 +194,7 @@ async fn handle_config( config_map.clone(), discovery_handler_map.clone(), new_discovery_handler_sender.clone(), + node_name.clone(), ) .await?; } @@ -200,6 +209,7 @@ async fn handle_config_apply( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> anyhow::Result<()> { // Applied events can either be newly added Configurations or modified Configurations. // If modified delete all associated instances and device plugins and then recreate them to reflect updated config @@ -231,6 +241,7 @@ async fn handle_config_apply( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await .unwrap(); @@ -246,6 +257,7 @@ async fn handle_config_add( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { let config_id: ConfigId = ( config.metadata.namespace.clone().unwrap(), @@ -276,6 +288,7 @@ async fn handle_config_add( stop_discovery_sender, &mut finished_discovery_sender, kube_interface, + node_name, ) .await .unwrap(); @@ -446,6 +459,7 @@ mod config_action_tests { config_map.clone(), dh_map.clone(), tx.clone(), + "node-a".to_string(), ) .await .is_ok()); @@ -461,6 +475,7 @@ mod config_action_tests { config_map.clone(), dh_map, tx, + "node-a".to_string(), ) .await .is_ok()); diff --git a/agent/src/util/device_plugin_builder.rs b/agent/src/util/device_plugin_builder.rs index 3c4eccb74..c9dd41b88 100644 --- a/agent/src/util/device_plugin_builder.rs +++ b/agent/src/util/device_plugin_builder.rs @@ -4,8 +4,8 @@ use super::{ KUBELET_SOCKET, LIST_AND_WATCH_MESSAGE_CHANNEL_CAPACITY, }, device_plugin_service::{ - ConfigurationDevicePlugin, DevicePluginBehavior, DevicePluginContext, DevicePluginService, - InstanceDevicePlugin, ListAndWatchMessageKind, + get_device_instance_name, ConfigurationDevicePlugin, DevicePluginBehavior, + DevicePluginContext, DevicePluginService, InstanceDevicePlugin, ListAndWatchMessageKind, }, v1beta1, v1beta1::{ @@ -24,7 +24,7 @@ use log::{info, trace}; #[cfg(test)] use mockall::{automock, predicate::*}; use std::sync::Arc; -use std::{convert::TryFrom, env, path::Path, time::SystemTime}; +use std::{convert::TryFrom, path::Path, time::SystemTime}; use tokio::{ net::UnixListener, net::UnixStream, @@ -39,12 +39,12 @@ use tower::service_fn; pub trait DevicePluginBuilderInterface: Send + Sync { async fn build_device_plugin( &self, - instance_name: String, instance_id: String, config: &Configuration, shared: bool, device_plugin_context: Arc>, device: Device, + node_name: String, ) -> Result<(), Box>; async fn build_configuration_device_plugin( @@ -52,6 +52,7 @@ pub trait DevicePluginBuilderInterface: Send + Sync { device_plugin_name: String, config: &Configuration, device_plugin_context: Arc>, + node_name: String, ) -> Result< broadcast::Sender, Box, @@ -66,13 +67,15 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { /// This creates a new DevicePluginService for an instance and registers it with the kubelet async fn build_device_plugin( &self, - instance_name: String, instance_id: String, config: &Configuration, shared: bool, device_plugin_context: Arc>, device: Device, + node_name: String, ) -> Result<(), Box> { + let instance_name = + get_device_instance_name(&instance_id, config.metadata.name.as_ref().unwrap()); info!("build_device_plugin - entered for device {}", instance_name); let device_plugin_behavior = DevicePluginBehavior::Instance(InstanceDevicePlugin { instance_id: instance_id.clone(), @@ -87,6 +90,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_context, device_plugin_behavior, list_and_watch_message_sender, + node_name, ) .await } @@ -97,6 +101,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_name: String, config: &Configuration, device_plugin_context: Arc>, + node_name: String, ) -> Result< broadcast::Sender, Box, @@ -115,6 +120,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_context, device_plugin_behavior, list_and_watch_message_sender.clone(), + node_name, ) .await?; Ok(list_and_watch_message_sender) @@ -129,6 +135,7 @@ impl DevicePluginBuilder { device_plugin_context: Arc>, device_plugin_behavior: DevicePluginBehavior, list_and_watch_message_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { let capability_id: String = format!("{}/{}", AKRI_PREFIX, device_plugin_name); let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; @@ -147,7 +154,7 @@ impl DevicePluginBuilder { config_name: config.metadata.name.clone().unwrap(), config_uid: config.metadata.uid.as_ref().unwrap().clone(), config_namespace: config.metadata.namespace.as_ref().unwrap().clone(), - node_name: env::var("AGENT_NODE_NAME")?, + node_name, device_plugin_context, list_and_watch_message_sender, server_ender_sender: server_ender_sender.clone(), diff --git a/agent/src/util/discovery_operator.rs b/agent/src/util/discovery_operator.rs index e06fc5112..b1326629d 100644 --- a/agent/src/util/discovery_operator.rs +++ b/agent/src/util/discovery_operator.rs @@ -17,11 +17,13 @@ use akri_discovery_utils::discovery::v0::{ DiscoverResponse, }; use akri_shared::{ - akri::configuration::{ - Configuration, DiscoveryProperty, DiscoveryPropertyKeySelector, DiscoveryPropertySource, + akri::{ + configuration::{ + Configuration, DiscoveryProperty, DiscoveryPropertyKeySelector, DiscoveryPropertySource, + }, + retry::MAX_INSTANCE_UPDATE_TRIES, }, k8s, - os::env_var::{ActualEnvVarQuery, EnvVarQuery}, }; use blake2::{ digest::{Update, VariableOutput}, @@ -240,6 +242,7 @@ impl DiscoveryOperator { kube_interface: Arc, dh_details: &'a DiscoveryDetails, stream: &'a mut dyn StreamingExt, + node_name: String, ) -> anyhow::Result<()> { // clone objects for thread let discovery_operator = Arc::new(self.clone()); @@ -273,6 +276,7 @@ impl DiscoveryOperator { response.devices, dh_details.shared, Box::new(DevicePluginBuilder{}), + node_name.clone(), ) .await?; } @@ -288,6 +292,7 @@ impl DiscoveryOperator { pub async fn delete_offline_instances( &self, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { trace!( "delete_offline_instances - entered for configuration {:?}", @@ -309,10 +314,11 @@ impl DiscoveryOperator { ) .await .unwrap(); - k8s::try_delete_instance( + try_delete_instance( kube_interface_clone.as_ref(), &instance, self.config.metadata.namespace.as_ref().unwrap(), + node_name.clone(), ) .await?; } @@ -331,6 +337,7 @@ impl DiscoveryOperator { discovery_results: Vec, shared: bool, device_plugin_builder: Box, + node_name: String, ) -> anyhow::Result<()> { let config_name = self.config.metadata.name.clone().unwrap(); trace!( @@ -341,7 +348,7 @@ impl DiscoveryOperator { let currently_visible_instances: HashMap = discovery_results .iter() .map(|discovery_result| { - let id = generate_instance_digest(&discovery_result.id, shared); + let id = generate_instance_digest(&discovery_result.id, shared, &node_name); let instance_name = get_device_instance_name(&id, &config_name); (instance_name, discovery_result.clone()) }) @@ -361,13 +368,14 @@ impl DiscoveryOperator { kube_interface, currently_visible_instances, shared, + node_name.clone(), ) .await?; // If there are newly visible instances associated with a Config, make a device plugin and Instance CR for them if !new_discovery_results.is_empty() { for discovery_result in new_discovery_results { - let id = generate_instance_digest(&discovery_result.id, shared); + let id = generate_instance_digest(&discovery_result.id, shared, &node_name); let instance_name = get_device_instance_name(&id, &config_name); trace!( "handle_discovery_results - new instance {} came online", @@ -376,12 +384,12 @@ impl DiscoveryOperator { let device_plugin_context = self.device_plugin_context.clone(); if let Err(e) = device_plugin_builder .build_device_plugin( - instance_name, id, &self.config, shared, device_plugin_context, discovery_result.clone(), + node_name.clone(), ) .await { @@ -403,6 +411,7 @@ impl DiscoveryOperator { kube_interface: Arc, currently_visible_instances: HashMap, shared: bool, + node_name: String, ) -> anyhow::Result<()> { let instance_map = self.device_plugin_context.read().await.clone().instances; for (instance, instance_info) in instance_map { @@ -495,10 +504,11 @@ impl DiscoveryOperator { ) .await .unwrap(); - k8s::try_delete_instance( + try_delete_instance( kube_interface.as_ref(), &instance, self.config.metadata.namespace.as_ref().unwrap(), + node_name.clone(), ) .await .unwrap(); @@ -582,6 +592,66 @@ impl DiscoveryOperator { } } +async fn try_delete_instance( + kube_interface: &dyn k8s::KubeInterface, + instance_name: &str, + instance_namespace: &str, + node_name: String, +) -> Result<(), anyhow::Error> { + for x in 0..MAX_INSTANCE_UPDATE_TRIES { + // First check if instance still exists + if let Ok(mut instance) = kube_interface + .find_instance(instance_name, instance_namespace) + .await + { + if instance.spec.nodes.contains(&node_name) { + instance.spec.nodes.retain(|node| node != &node_name); + } + if instance.spec.nodes.is_empty() { + match k8s::try_delete_instance(kube_interface, instance_name, instance_namespace) + .await + { + Ok(()) => { + trace!("try_delete_instance - deleted Instance {}", instance_name); + break; + } + Err(e) => { + trace!("try_delete_instance - call to delete_instance returned with error {} on try # {} of {}", e, x, MAX_INSTANCE_UPDATE_TRIES); + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + return Err(e); + } + } + } + } else { + match kube_interface + .update_instance( + &instance.spec, + &instance.metadata.name.unwrap(), + instance_namespace, + ) + .await + { + Ok(()) => { + trace!( + "try_delete_instance - updated Instance {} to remove {}", + instance_name, + node_name + ); + break; + } + Err(e) => { + trace!("try_delete_instance - call to update_instance returned with error {} on try # {} of {}", e, x, MAX_INSTANCE_UPDATE_TRIES); + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + return Err(e); + } + } + }; + } + } + } + Ok(()) +} + /// This provides a mockable way to query configMap and secret #[cfg_attr(test, automock)] #[tonic::async_trait] @@ -750,6 +820,7 @@ pub mod start_discovery { stop_all_discovery_sender: broadcast::Sender<()>, finished_all_discovery_sender: &mut mpsc::Sender<()>, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { internal_start_discovery( discovery_operator, @@ -758,6 +829,7 @@ pub mod start_discovery { finished_all_discovery_sender, kube_interface, Box::new(DevicePluginBuilder {}), + node_name, ) .await } @@ -769,6 +841,7 @@ pub mod start_discovery { finished_all_discovery_sender: &mut mpsc::Sender<()>, kube_interface: Arc, device_plugin_builder: Box, + node_name: String, ) -> Result<(), Box> { let config = discovery_operator.get_config(); info!( @@ -791,6 +864,7 @@ pub mod start_discovery { config_dp_name, &config, device_plugin_context.clone(), + node_name.clone(), ) .await { @@ -811,21 +885,28 @@ pub mod start_discovery { // Call discover on already registered Discovery Handlers requested by this Configuration's let known_dh_discovery_operator = discovery_operator.clone(); let known_dh_kube_interface = kube_interface.clone(); + let known_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { - do_discover(known_dh_discovery_operator, known_dh_kube_interface) - .await - .unwrap(); + do_discover( + known_dh_discovery_operator, + known_dh_kube_interface, + known_node_name, + ) + .await + .unwrap(); })); // Listen for new discovery handlers to call discover on let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe(); let mut new_discovery_handler_receiver = new_discovery_handler_sender.subscribe(); let new_dh_discovery_operator = discovery_operator.clone(); + let new_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { listen_for_new_discovery_handlers( new_dh_discovery_operator, &mut new_discovery_handler_receiver, &mut stop_all_discovery_receiver, + new_node_name, ) .await .unwrap(); @@ -836,10 +917,11 @@ pub mod start_discovery { let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe(); let offline_dh_discovery_operator = discovery_operator.clone(); let offline_dh_kube_interface = kube_interface.clone(); + let offline_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { loop { offline_dh_discovery_operator - .delete_offline_instances(offline_dh_kube_interface.clone()) + .delete_offline_instances(offline_dh_kube_interface.clone(), offline_node_name.clone()) .await .unwrap(); if tokio::time::timeout( @@ -864,6 +946,7 @@ pub mod start_discovery { discovery_operator: Arc, new_discovery_handler_receiver: &mut broadcast::Receiver, stop_all_discovery_receiver: &mut broadcast::Receiver<()>, + node_name: String, ) -> Result<(), Box> { let mut discovery_tasks = Vec::new(); loop { @@ -879,8 +962,9 @@ pub mod start_discovery { if discovery_handler_name == discovery_operator.get_config().spec.discovery_handler.name { trace!("listen_for_new_discovery_handlers - received new registered discovery handler for configuration {:?}", discovery_operator.get_config().metadata.name); let new_discovery_operator = discovery_operator.clone(); + let node_name = node_name.clone(); discovery_tasks.push(tokio::spawn(async move { - do_discover(new_discovery_operator, Arc::new(k8s::KubeImpl::new().await.unwrap())).await.unwrap(); + do_discover(new_discovery_operator, Arc::new(k8s::KubeImpl::new().await.unwrap()), node_name.clone()).await.unwrap(); })); } } @@ -899,6 +983,7 @@ pub mod start_discovery { pub async fn do_discover( discovery_operator: Arc, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { let mut discovery_tasks = Vec::new(); let config = discovery_operator.get_config(); @@ -927,12 +1012,14 @@ pub mod start_discovery { ); let discovery_operator = discovery_operator.clone(); let kube_interface = kube_interface.clone(); + let node_name = node_name.clone(); discovery_tasks.push(tokio::spawn(async move { do_discover_on_discovery_handler( discovery_operator.clone(), kube_interface.clone(), &endpoint, &dh_details, + node_name.clone(), ) .await .unwrap(); @@ -949,6 +1036,7 @@ pub mod start_discovery { kube_interface: Arc, endpoint: &'a DiscoveryHandlerEndpoint, dh_details: &'a DiscoveryDetails, + node_name: String, ) -> anyhow::Result<()> { loop { if let Some(stream_type) = discovery_operator @@ -958,7 +1046,12 @@ pub mod start_discovery { match stream_type { StreamType::External(mut stream) => { match discovery_operator - .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .internal_do_discover( + kube_interface.clone(), + dh_details, + &mut stream, + node_name.clone(), + ) .await { Ok(_) => { @@ -974,6 +1067,7 @@ pub mod start_discovery { kube_interface.clone(), std::collections::HashMap::new(), dh_details.shared, + node_name.clone(), ) .await?; } else { @@ -984,6 +1078,7 @@ pub mod start_discovery { kube_interface.clone(), std::collections::HashMap::new(), dh_details.shared, + node_name.clone(), ) .await?; } @@ -996,7 +1091,12 @@ pub mod start_discovery { #[cfg(any(test, feature = "agent-full"))] StreamType::Embedded(mut stream) => { discovery_operator - .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .internal_do_discover( + kube_interface.clone(), + dh_details, + &mut stream, + node_name.clone(), + ) .await?; // Embedded discovery should only return okay if signaled to stop. Otherwise, bubble up error. break; @@ -1041,24 +1141,11 @@ pub mod start_discovery { /// to the same instance name (which is suffixed with this digest). /// However, local devices' Instances should have unique hashes even if they have the same id. /// To ensure this, the node's name is added to the id before it is hashed. -pub fn generate_instance_digest(id_to_digest: &str, shared: bool) -> String { - let env_var_query = ActualEnvVarQuery {}; - inner_generate_instance_digest(id_to_digest, shared, &env_var_query) -} - -pub fn inner_generate_instance_digest( - id_to_digest: &str, - shared: bool, - query: &impl EnvVarQuery, -) -> String { +pub fn generate_instance_digest(id_to_digest: &str, shared: bool, node_name: &str) -> String { let mut id_to_digest = id_to_digest.to_string(); // For local devices, include node hostname in id_to_digest so instances have unique names if !shared { - id_to_digest = format!( - "{}{}", - &id_to_digest, - query.get_env_var("AGENT_NODE_NAME").unwrap() - ); + id_to_digest = format!("{}{}", &id_to_digest, node_name,); } let mut digest = String::new(); let mut hasher = VarBlake2b::new(3).unwrap(); @@ -1251,25 +1338,18 @@ pub mod tests { #[test] fn test_generate_instance_digest() { - let mut mock_env_var_a = MockEnvVarQuery::new(); - mock_env_var_a - .expect_get_env_var() - .returning(|_| Ok("node-a".to_string())); let id = "video1"; - let first_unshared_video_digest = - inner_generate_instance_digest(id, false, &mock_env_var_a); - let first_shared_video_digest = inner_generate_instance_digest(id, true, &mock_env_var_a); - let mut mock_env_var_b = MockEnvVarQuery::new(); - mock_env_var_b - .expect_get_env_var() - .returning(|_| Ok("node-b".to_string())); - let second_unshared_video_digest = - inner_generate_instance_digest(id, false, &mock_env_var_b); - let second_shared_video_digest = inner_generate_instance_digest(id, true, &mock_env_var_b); + let first_unshared_video_digest = generate_instance_digest(id, false, "node-a"); + let first_shared_video_digest = generate_instance_digest(id, true, "node-a"); + + let second_unshared_video_digest = generate_instance_digest(id, false, "node-b"); + let second_shared_video_digest = generate_instance_digest(id, true, "node-b"); // unshared instances visible to different nodes should NOT have the same digest assert_ne!(first_unshared_video_digest, second_unshared_video_digest); // shared instances visible to different nodes should have the same digest assert_eq!(first_shared_video_digest, second_shared_video_digest); + // shared and unshared instance for same node should NOT have the same digest + assert_ne!(first_unshared_video_digest, first_shared_video_digest); } #[tokio::test] @@ -1307,7 +1387,12 @@ pub mod tests { let local_do1 = discovery_operator1.clone(); let discover1 = tokio::spawn(async move { discovery_operator1 - .internal_do_discover(mock_kube_interface1, &dh_details1, &mut rx1) + .internal_do_discover( + mock_kube_interface1, + &dh_details1, + &mut rx1, + "node-a".to_string(), + ) .await .unwrap() }); @@ -1325,7 +1410,12 @@ pub mod tests { )); let discover2 = tokio::spawn(async move { discovery_operator2 - .internal_do_discover(mock_kube_interface2, &dh_details2, &mut rx2) + .internal_do_discover( + mock_kube_interface2, + &dh_details2, + &mut rx2, + "node-a".to_string(), + ) .await .unwrap() }); @@ -1461,7 +1551,7 @@ pub mod tests { mock_discovery_operator .expect_delete_offline_instances() .times(1) - .returning(move |_| Ok(())); + .returning(move |_, _| Ok(())); if terminate { let stop_dh_discovery_sender = discovery_handler_map .lock() @@ -1494,7 +1584,7 @@ pub mod tests { mock_device_plugin_builder .expect_build_configuration_device_plugin() .times(1) - .returning(move |_, _, _| { + .returning(move |_, _, _, _| { let (sender, _) = broadcast::channel(2); Ok(sender) }); @@ -1507,6 +1597,7 @@ pub mod tests { &mut finished_discovery_sender, mock_kube_interface, mock_device_plugin_builder, + "node-a".to_string(), ) .await .unwrap(); @@ -1544,18 +1635,20 @@ pub mod tests { mock_discovery_operator .expect_internal_do_discover() .times(1) - .returning(|_, _, _| Ok(())); + .returning(|_, _, _, _| Ok(())); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); - start_discovery::do_discover(Arc::new(mock_discovery_operator), mock_kube_interface) - .await - .unwrap(); + start_discovery::do_discover( + Arc::new(mock_discovery_operator), + mock_kube_interface, + "node-a".to_string(), + ) + .await + .unwrap(); } #[tokio::test] async fn test_handle_discovery_results() { let _ = env_logger::builder().is_test(true).try_init(); - // Set node name for generating instance id - std::env::set_var("AGENT_NODE_NAME", "node-a"); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); let discovery_handler_map: RegisteredDiscoveryHandlerMap = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -1595,10 +1688,10 @@ pub mod tests { discovery_results, true, Box::new(mock_device_plugin_builder), + "node-a".to_string(), ) .await .unwrap(); - assert_eq!( INSTANCE_COUNT_METRIC .with_label_values(&[&config_name, "true"]) @@ -1677,7 +1770,7 @@ pub mod tests { device_plugin_context, )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); @@ -1698,7 +1791,7 @@ pub mod tests { device_plugin_context, )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); @@ -1713,6 +1806,29 @@ pub mod tests { ) .await; let mut mock = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1722,9 +1838,10 @@ pub mod tests { device_plugin_context.clone(), )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); + // Make sure all instances are deleted from map. Note, first 3 arguments are ignored. check_status_or_empty_loop( InstanceConnectivityStatus::Online, @@ -1843,6 +1960,29 @@ pub mod tests { ) .await; let mut mock = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1869,6 +2009,29 @@ pub mod tests { // 4: Assert that local devices that go offline are removed from the instance map // let mut mock = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1918,6 +2081,7 @@ pub mod tests { Arc::new(mock), currently_visible_instances, shared, + "node-a".to_string(), ) .await .unwrap(); @@ -2725,4 +2889,123 @@ pub mod tests { get_discovery_property_value_from_config_map(&mock_kube_client, &selector).await; assert_eq!(result.unwrap().unwrap(), expected_result); } + + #[tokio::test] + async fn test_try_delete_instance() { + let _ = env_logger::builder().is_test(true).try_init(); + // should do nothing for non existing instance + let mut kube_interface = MockKubeInterface::new(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Err(anyhow::format_err!("Not Found"))); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should delete instance with already empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_delete_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should delete instance with then empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": ["node-a"], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_delete_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should update instance with non empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": ["node-b", "node-a"], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_update_instance() + .times(1) + .withf(move |instance, name, namespace| { + name == "foo" && namespace == "bar" && instance.nodes == vec!["node-b"] + }) + .returning(move |_, _, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + } }