Skip to content

Commit

Permalink
fix(agent): correct agent's behavior on Instance deletion (#654)
Browse files Browse the repository at this point in the history
* fix(agent): correct agent's behavior on Instance deletion

Make the agent remove itself from the instance nodes list instead of
deleting the instance if it's not the last node with access to the
device.

Refactored the way we get the AGENT_NODE_NAME to only fetch it once at
startup.

This fixes #650.

Signed-off-by: Nicolas Belouin <nicolas.belouin@suse.com>

* Update patch version

Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

---------

Signed-off-by: Nicolas Belouin <nicolas.belouin@suse.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
diconico07 and github-actions[bot] committed Sep 11, 2023
1 parent 209d7df commit 1c6d383
Show file tree
Hide file tree
Showing 21 changed files with 321 additions and 98 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.12.7"
version = "0.12.8"
license = "Apache-2.0"
authors = ["Kate Goldenring <kate.goldenring@microsoft.com>", "<bfjelds@microsoft.com>"]
edition = "2018"
Expand Down
12 changes: 9 additions & 3 deletions agent/src/main.rs
Expand Up @@ -12,6 +12,7 @@ use log::{info, trace};
use prometheus::{HistogramVec, IntGaugeVec};
use std::{
collections::HashMap,
env,
sync::{Arc, Mutex},
time::Duration,
};
Expand Down Expand Up @@ -52,6 +53,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
);

let mut tasks = Vec::new();
let node_name = env::var("AGENT_NODE_NAME")?;

// Start server for Prometheus metrics
tasks.push(tokio::spawn(async move {
Expand Down Expand Up @@ -83,9 +85,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
}));

tasks.push(tokio::spawn(async move {
config_action::do_config_watch(discovery_handler_map, new_discovery_handler_sender_clone)
.await
.unwrap()
config_action::do_config_watch(
discovery_handler_map,
new_discovery_handler_sender_clone,
node_name,
)
.await
.unwrap()
}));

futures::future::try_join_all(tasks).await?;
Expand Down
15 changes: 15 additions & 0 deletions agent/src/util/config_action.rs
Expand Up @@ -50,6 +50,7 @@ pub struct ConfigInfo {
pub async fn do_config_watch(
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
info!("do_config_watch - enter");
let config_map: ConfigMap = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -63,13 +64,15 @@ pub async fn do_config_watch(
let discovery_handler_map = discovery_handler_map.clone();
let new_discovery_handler_sender = new_discovery_handler_sender.clone();
let new_kube_interface = kube_interface.clone();
let new_node_name = node_name.clone();
tasks.push(tokio::spawn(async move {
handle_config_add(
new_kube_interface,
&config,
config_map,
discovery_handler_map,
new_discovery_handler_sender,
new_node_name,
)
.await
.unwrap();
Expand All @@ -83,6 +86,7 @@ pub async fn do_config_watch(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await
.unwrap();
Expand All @@ -99,6 +103,7 @@ async fn watch_for_config_changes(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
trace!("watch_for_config_changes - start");
let resource = Api::<Configuration>::all(kube_interface.get_kube_client());
Expand All @@ -121,6 +126,7 @@ async fn watch_for_config_changes(
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender,
node_name.clone(),
)
.await?
}
Expand All @@ -135,6 +141,7 @@ async fn handle_config(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> anyhow::Result<()> {
trace!("handle_config - something happened to a configuration");
match event {
Expand All @@ -149,6 +156,7 @@ async fn handle_config(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await?;
}
Expand Down Expand Up @@ -186,6 +194,7 @@ async fn handle_config(
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender.clone(),
node_name.clone(),
)
.await?;
}
Expand All @@ -200,6 +209,7 @@ async fn handle_config_apply(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> anyhow::Result<()> {
// Applied events can either be newly added Configurations or modified Configurations.
// If modified delete all associated instances and device plugins and then recreate them to reflect updated config
Expand Down Expand Up @@ -231,6 +241,7 @@ async fn handle_config_apply(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await
.unwrap();
Expand All @@ -246,6 +257,7 @@ async fn handle_config_add(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let config_id: ConfigId = (
config.metadata.namespace.clone().unwrap(),
Expand Down Expand Up @@ -276,6 +288,7 @@ async fn handle_config_add(
stop_discovery_sender,
&mut finished_discovery_sender,
kube_interface,
node_name,
)
.await
.unwrap();
Expand Down Expand Up @@ -446,6 +459,7 @@ mod config_action_tests {
config_map.clone(),
dh_map.clone(),
tx.clone(),
"node-a".to_string(),
)
.await
.is_ok());
Expand All @@ -461,6 +475,7 @@ mod config_action_tests {
config_map.clone(),
dh_map,
tx,
"node-a".to_string(),
)
.await
.is_ok());
Expand Down
19 changes: 13 additions & 6 deletions agent/src/util/device_plugin_builder.rs
Expand Up @@ -4,8 +4,8 @@ use super::{
KUBELET_SOCKET, LIST_AND_WATCH_MESSAGE_CHANNEL_CAPACITY,
},
device_plugin_service::{
ConfigurationDevicePlugin, DevicePluginBehavior, DevicePluginContext, DevicePluginService,
InstanceDevicePlugin, ListAndWatchMessageKind,
get_device_instance_name, ConfigurationDevicePlugin, DevicePluginBehavior,
DevicePluginContext, DevicePluginService, InstanceDevicePlugin, ListAndWatchMessageKind,
},
v1beta1,
v1beta1::{
Expand All @@ -24,7 +24,7 @@ use log::{info, trace};
#[cfg(test)]
use mockall::{automock, predicate::*};
use std::sync::Arc;
use std::{convert::TryFrom, env, path::Path, time::SystemTime};
use std::{convert::TryFrom, path::Path, time::SystemTime};
use tokio::{
net::UnixListener,
net::UnixStream,
Expand All @@ -39,19 +39,20 @@ use tower::service_fn;
pub trait DevicePluginBuilderInterface: Send + Sync {
async fn build_device_plugin(
&self,
instance_name: String,
instance_id: String,
config: &Configuration,
shared: bool,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device: Device,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;

async fn build_configuration_device_plugin(
&self,
device_plugin_name: String,
config: &Configuration,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
node_name: String,
) -> Result<
broadcast::Sender<ListAndWatchMessageKind>,
Box<dyn std::error::Error + Send + Sync + 'static>,
Expand All @@ -66,13 +67,15 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
/// This creates a new DevicePluginService for an instance and registers it with the kubelet
async fn build_device_plugin(
&self,
instance_name: String,
instance_id: String,
config: &Configuration,
shared: bool,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device: Device,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let instance_name =
get_device_instance_name(&instance_id, config.metadata.name.as_ref().unwrap());
info!("build_device_plugin - entered for device {}", instance_name);
let device_plugin_behavior = DevicePluginBehavior::Instance(InstanceDevicePlugin {
instance_id: instance_id.clone(),
Expand All @@ -87,6 +90,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_context,
device_plugin_behavior,
list_and_watch_message_sender,
node_name,
)
.await
}
Expand All @@ -97,6 +101,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_name: String,
config: &Configuration,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
node_name: String,
) -> Result<
broadcast::Sender<ListAndWatchMessageKind>,
Box<dyn std::error::Error + Send + Sync + 'static>,
Expand All @@ -115,6 +120,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_context,
device_plugin_behavior,
list_and_watch_message_sender.clone(),
node_name,
)
.await?;
Ok(list_and_watch_message_sender)
Expand All @@ -129,6 +135,7 @@ impl DevicePluginBuilder {
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device_plugin_behavior: DevicePluginBehavior,
list_and_watch_message_sender: broadcast::Sender<ListAndWatchMessageKind>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let capability_id: String = format!("{}/{}", AKRI_PREFIX, device_plugin_name);
let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
Expand All @@ -147,7 +154,7 @@ impl DevicePluginBuilder {
config_name: config.metadata.name.clone().unwrap(),
config_uid: config.metadata.uid.as_ref().unwrap().clone(),
config_namespace: config.metadata.namespace.as_ref().unwrap().clone(),
node_name: env::var("AGENT_NODE_NAME")?,
node_name,
device_plugin_context,
list_and_watch_message_sender,
server_ender_sender: server_ender_sender.clone(),
Expand Down

0 comments on commit 1c6d383

Please sign in to comment.