Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agent): correct agent's behavior on Instance deletion #654

Merged
merged 2 commits into from Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.12.7"
version = "0.12.8"
license = "Apache-2.0"
authors = ["Kate Goldenring <kate.goldenring@microsoft.com>", "<bfjelds@microsoft.com>"]
edition = "2018"
Expand Down
12 changes: 9 additions & 3 deletions agent/src/main.rs
Expand Up @@ -12,6 +12,7 @@ use log::{info, trace};
use prometheus::{HistogramVec, IntGaugeVec};
use std::{
collections::HashMap,
env,
sync::{Arc, Mutex},
time::Duration,
};
Expand Down Expand Up @@ -52,6 +53,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
);

let mut tasks = Vec::new();
let node_name = env::var("AGENT_NODE_NAME")?;

// Start server for Prometheus metrics
tasks.push(tokio::spawn(async move {
Expand Down Expand Up @@ -83,9 +85,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
}));

tasks.push(tokio::spawn(async move {
config_action::do_config_watch(discovery_handler_map, new_discovery_handler_sender_clone)
.await
.unwrap()
config_action::do_config_watch(
discovery_handler_map,
new_discovery_handler_sender_clone,
node_name,
)
.await
.unwrap()
}));

futures::future::try_join_all(tasks).await?;
Expand Down
15 changes: 15 additions & 0 deletions agent/src/util/config_action.rs
Expand Up @@ -50,6 +50,7 @@ pub struct ConfigInfo {
pub async fn do_config_watch(
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
info!("do_config_watch - enter");
let config_map: ConfigMap = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -63,13 +64,15 @@ pub async fn do_config_watch(
let discovery_handler_map = discovery_handler_map.clone();
let new_discovery_handler_sender = new_discovery_handler_sender.clone();
let new_kube_interface = kube_interface.clone();
let new_node_name = node_name.clone();
tasks.push(tokio::spawn(async move {
handle_config_add(
new_kube_interface,
&config,
config_map,
discovery_handler_map,
new_discovery_handler_sender,
new_node_name,
)
.await
.unwrap();
Expand All @@ -83,6 +86,7 @@ pub async fn do_config_watch(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await
.unwrap();
Expand All @@ -99,6 +103,7 @@ async fn watch_for_config_changes(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
trace!("watch_for_config_changes - start");
let resource = Api::<Configuration>::all(kube_interface.get_kube_client());
Expand All @@ -121,6 +126,7 @@ async fn watch_for_config_changes(
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender,
node_name.clone(),
)
.await?
}
Expand All @@ -135,6 +141,7 @@ async fn handle_config(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> anyhow::Result<()> {
trace!("handle_config - something happened to a configuration");
match event {
Expand All @@ -149,6 +156,7 @@ async fn handle_config(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await?;
}
Expand Down Expand Up @@ -186,6 +194,7 @@ async fn handle_config(
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender.clone(),
node_name.clone(),
)
.await?;
}
Expand All @@ -200,6 +209,7 @@ async fn handle_config_apply(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> anyhow::Result<()> {
// Applied events can either be newly added Configurations or modified Configurations.
// If modified delete all associated instances and device plugins and then recreate them to reflect updated config
Expand Down Expand Up @@ -231,6 +241,7 @@ async fn handle_config_apply(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await
.unwrap();
Expand All @@ -246,6 +257,7 @@ async fn handle_config_add(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let config_id: ConfigId = (
config.metadata.namespace.clone().unwrap(),
Expand Down Expand Up @@ -276,6 +288,7 @@ async fn handle_config_add(
stop_discovery_sender,
&mut finished_discovery_sender,
kube_interface,
node_name,
)
.await
.unwrap();
Expand Down Expand Up @@ -446,6 +459,7 @@ mod config_action_tests {
config_map.clone(),
dh_map.clone(),
tx.clone(),
"node-a".to_string(),
)
.await
.is_ok());
Expand All @@ -461,6 +475,7 @@ mod config_action_tests {
config_map.clone(),
dh_map,
tx,
"node-a".to_string(),
)
.await
.is_ok());
Expand Down
19 changes: 13 additions & 6 deletions agent/src/util/device_plugin_builder.rs
Expand Up @@ -4,8 +4,8 @@ use super::{
KUBELET_SOCKET, LIST_AND_WATCH_MESSAGE_CHANNEL_CAPACITY,
},
device_plugin_service::{
ConfigurationDevicePlugin, DevicePluginBehavior, DevicePluginContext, DevicePluginService,
InstanceDevicePlugin, ListAndWatchMessageKind,
get_device_instance_name, ConfigurationDevicePlugin, DevicePluginBehavior,
DevicePluginContext, DevicePluginService, InstanceDevicePlugin, ListAndWatchMessageKind,
},
v1beta1,
v1beta1::{
Expand All @@ -24,7 +24,7 @@ use log::{info, trace};
#[cfg(test)]
use mockall::{automock, predicate::*};
use std::sync::Arc;
use std::{convert::TryFrom, env, path::Path, time::SystemTime};
use std::{convert::TryFrom, path::Path, time::SystemTime};
use tokio::{
net::UnixListener,
net::UnixStream,
Expand All @@ -39,19 +39,20 @@ use tower::service_fn;
pub trait DevicePluginBuilderInterface: Send + Sync {
async fn build_device_plugin(
&self,
instance_name: String,
instance_id: String,
config: &Configuration,
shared: bool,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device: Device,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;

async fn build_configuration_device_plugin(
&self,
device_plugin_name: String,
config: &Configuration,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
node_name: String,
) -> Result<
broadcast::Sender<ListAndWatchMessageKind>,
Box<dyn std::error::Error + Send + Sync + 'static>,
Expand All @@ -66,13 +67,15 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
/// This creates a new DevicePluginService for an instance and registers it with the kubelet
async fn build_device_plugin(
&self,
instance_name: String,
instance_id: String,
config: &Configuration,
shared: bool,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device: Device,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let instance_name =
get_device_instance_name(&instance_id, config.metadata.name.as_ref().unwrap());
info!("build_device_plugin - entered for device {}", instance_name);
let device_plugin_behavior = DevicePluginBehavior::Instance(InstanceDevicePlugin {
instance_id: instance_id.clone(),
Expand All @@ -87,6 +90,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_context,
device_plugin_behavior,
list_and_watch_message_sender,
node_name,
)
.await
}
Expand All @@ -97,6 +101,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_name: String,
config: &Configuration,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
node_name: String,
) -> Result<
broadcast::Sender<ListAndWatchMessageKind>,
Box<dyn std::error::Error + Send + Sync + 'static>,
Expand All @@ -115,6 +120,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_context,
device_plugin_behavior,
list_and_watch_message_sender.clone(),
node_name,
)
.await?;
Ok(list_and_watch_message_sender)
Expand All @@ -129,6 +135,7 @@ impl DevicePluginBuilder {
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device_plugin_behavior: DevicePluginBehavior,
list_and_watch_message_sender: broadcast::Sender<ListAndWatchMessageKind>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let capability_id: String = format!("{}/{}", AKRI_PREFIX, device_plugin_name);
let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
Expand All @@ -147,7 +154,7 @@ impl DevicePluginBuilder {
config_name: config.metadata.name.clone().unwrap(),
config_uid: config.metadata.uid.as_ref().unwrap().clone(),
config_namespace: config.metadata.namespace.as_ref().unwrap().clone(),
node_name: env::var("AGENT_NODE_NAME")?,
node_name,
device_plugin_context,
list_and_watch_message_sender,
server_ender_sender: server_ender_sender.clone(),
Expand Down