forked from project-akri/akri
-
Notifications
You must be signed in to change notification settings - Fork 0
/
discovery_handler.rs
203 lines (192 loc) · 8.5 KB
/
discovery_handler.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
use akri_discovery_utils::discovery::{
discovery_handler::{deserialize_discovery_details, DISCOVERED_DEVICES_CHANNEL_CAPACITY},
v0::{discovery_handler_server::DiscoveryHandler, Device, DiscoverRequest, DiscoverResponse},
DiscoverStream,
};
use async_trait::async_trait;
use log::{error, info, trace};
use std::time::Duration;
use std::{collections::HashMap, fs};
use tokio::sync::mpsc;
use tokio::time::delay_for;
use tonic::{Response, Status};
// TODO: make this configurable
pub const DISCOVERY_INTERVAL_SECS: u64 = 10;
/// File acting as an environment variable for testing discovery.
/// To mimic an instance going offline, kubectl exec into the pod running this discovery handler
/// and echo "OFFLINE" > /tmp/debug-echo-availability.txt.
/// To mimic a device coming back online, remove the word "OFFLINE" from the file
/// ie: echo "" > /tmp/debug-echo-availability.txt.
pub const DEBUG_ECHO_AVAILABILITY_CHECK_PATH: &str = "/tmp/debug-echo-availability.txt";
/// String to write into DEBUG_ECHO_AVAILABILITY_CHECK_PATH to make Other devices undiscoverable
pub const OFFLINE: &str = "OFFLINE";
/// DebugEchoDiscoveryDetails describes the necessary information needed to discover and filter debug echo devices.
/// Specifically, it contains a list (`descriptions`) of fake devices to be discovered.
/// This information is expected to be serialized in the discovery details map sent during Discover requests.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DebugEchoDiscoveryDetails {
pub descriptions: Vec<String>,
}
/// The DiscoveryHandlerImpl discovers a list of devices, named in its `descriptions`.
/// It mocks discovering the devices by inspecting the contents of the file at `DEBUG_ECHO_AVAILABILITY_CHECK_PATH`.
/// If the file contains "OFFLINE", it won't discover any of the devices, else it discovers them all.
pub struct DiscoveryHandlerImpl {
register_sender: Option<mpsc::Sender<()>>,
}
impl DiscoveryHandlerImpl {
pub fn new(register_sender: Option<mpsc::Sender<()>>) -> Self {
DiscoveryHandlerImpl { register_sender }
}
}
#[async_trait]
impl DiscoveryHandler for DiscoveryHandlerImpl {
type DiscoverStream = DiscoverStream;
async fn discover(
&self,
request: tonic::Request<DiscoverRequest>,
) -> Result<Response<Self::DiscoverStream>, Status> {
info!("discover - called for debug echo protocol");
let register_sender = self.register_sender.clone();
let discover_request = request.get_ref();
let (mut discovered_devices_sender, discovered_devices_receiver) =
mpsc::channel(DISCOVERED_DEVICES_CHANNEL_CAPACITY);
let discovery_handler_config: DebugEchoDiscoveryDetails =
deserialize_discovery_details(&discover_request.discovery_details)
.map_err(|e| tonic::Status::new(tonic::Code::InvalidArgument, format!("{}", e)))?;
let descriptions = discovery_handler_config.descriptions;
let mut offline = fs::read_to_string(DEBUG_ECHO_AVAILABILITY_CHECK_PATH)
.unwrap_or_default()
.contains(OFFLINE);
let mut first_loop = true;
tokio::spawn(async move {
loop {
let availability =
fs::read_to_string(DEBUG_ECHO_AVAILABILITY_CHECK_PATH).unwrap_or_default();
trace!(
"discover -- debugEcho devices are online? {}",
!availability.contains(OFFLINE)
);
if (availability.contains(OFFLINE) && !offline) || offline && first_loop {
if first_loop {
first_loop = false;
}
// If the device is now offline, return an empty list of instance info
offline = true;
if let Err(e) = discovered_devices_sender
.send(Ok(DiscoverResponse {
devices: Vec::new(),
}))
.await
{
error!("discover - for debugEcho failed to send discovery response with error {}", e);
if let Some(mut sender) = register_sender {
sender.send(()).await.unwrap();
}
break;
}
} else if (!availability.contains(OFFLINE) && offline) || !offline && first_loop {
if first_loop {
first_loop = false;
}
offline = false;
let devices = descriptions
.iter()
.map(|description| {
let mut properties = HashMap::new();
properties.insert(
super::DEBUG_ECHO_DESCRIPTION_LABEL.to_string(),
description.clone(),
);
Device {
id: description.clone(),
properties,
mounts: Vec::default(),
device_specs: Vec::default(),
}
})
.collect::<Vec<Device>>();
if let Err(e) = discovered_devices_sender
.send(Ok(DiscoverResponse { devices }))
.await
{
// TODO: consider re-registering here
error!("discover - for debugEcho failed to send discovery response with error {}", e);
if let Some(mut sender) = register_sender {
sender.send(()).await.unwrap();
}
break;
}
}
delay_for(Duration::from_secs(DISCOVERY_INTERVAL_SECS)).await;
}
});
trace!("outside of thread");
Ok(Response::new(discovered_devices_receiver))
}
}
#[cfg(test)]
mod tests {
use super::*;
use akri_discovery_utils::discovery::v0::DiscoverRequest;
use akri_shared::akri::configuration::DiscoveryHandlerInfo;
#[test]
fn test_deserialize_discovery_details_empty() {
let dh_config: Result<DebugEchoDiscoveryDetails, anyhow::Error> =
deserialize_discovery_details("");
assert!(dh_config.is_err());
let dh_config: DebugEchoDiscoveryDetails =
deserialize_discovery_details("descriptions: []").unwrap();
assert!(dh_config.descriptions.is_empty());
let serialized = serde_json::to_string(&dh_config).unwrap();
let expected_deserialized = r#"{"descriptions":[]}"#;
assert_eq!(expected_deserialized, serialized);
}
#[test]
fn test_deserialize_discovery_details_detailed() {
let yaml = r#"
descriptions:
- "foo1"
"#;
let dh_config: DebugEchoDiscoveryDetails = deserialize_discovery_details(yaml).unwrap();
assert_eq!(dh_config.descriptions.len(), 1);
assert_eq!(&dh_config.descriptions[0], "foo1");
}
#[tokio::test]
async fn test_discover_online_devices() {
// Make devices "online"
fs::write(DEBUG_ECHO_AVAILABILITY_CHECK_PATH, "").unwrap();
let debug_echo_yaml = r#"
name: debugEcho
discoveryDetails: |+
descriptions:
- "foo1"
"#;
let deserialized: DiscoveryHandlerInfo = serde_yaml::from_str(&debug_echo_yaml).unwrap();
let discovery_handler = DiscoveryHandlerImpl::new(None);
let properties: HashMap<String, String> = [(
super::super::DEBUG_ECHO_DESCRIPTION_LABEL.to_string(),
"foo1".to_string(),
)]
.iter()
.cloned()
.collect();
let device = akri_discovery_utils::discovery::v0::Device {
id: "foo1".to_string(),
properties,
mounts: Vec::default(),
device_specs: Vec::default(),
};
let discover_request = tonic::Request::new(DiscoverRequest {
discovery_details: deserialized.discovery_details.clone(),
});
let mut stream = discovery_handler
.discover(discover_request)
.await
.unwrap()
.into_inner();
let devices = stream.recv().await.unwrap().unwrap().devices;
assert_eq!(1, devices.len());
assert_eq!(devices[0], device);
}
}