-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Health status messages on startup and shutdown of tedge daemons #1650
Health status messages on startup and shutdown of tedge daemons #1650
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API of mqtt_channel::Config::with_last_will_message()
must be fixed.
The test checking last will message must be fixed too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be perfect with a tedge_api::health::get_last_will_message
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to fix the test. Or at least, to have some evidence that rumqttd is not supporting last will messages. The failing test is not clear enough to be sure of that.
crates/core/tedge_api/src/health.rs
Outdated
@@ -27,3 +27,16 @@ pub async fn send_health_status(responses: &mut impl PubChannel, daemon_name: &s | |||
let health_message = Message::new(&response_topic_health, health_status); | |||
let _ = responses.send(health_message).await; | |||
} | |||
|
|||
pub fn get_last_will_message(daemon_name: String) -> Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn get_last_will_message(daemon_name: String) -> Message { | |
pub fn health_status_down_message(daemon_name: String) -> Message { |
That's what this message is. We're just using it as a last will message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I prefer health_status_down_message
as proposed by @albinsuresh rather than get_health_status_down_message
as currently proposed.
@@ -140,6 +155,16 @@ impl Config { | |||
|
|||
mqtt_options.set_max_packet_size(self.max_packet_size, self.max_packet_size); | |||
|
|||
if let Some(lwp) = self.last_will_message.clone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if let Some(lwp) = self.last_will_message.clone() { | |
if let Some(lwp) = self.last_will_message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the last_will_message
is behind a shared reference, we have to clone it here. Another way is to take the ref &self.last_will_message
but the problem is when the struct members (topic, message) are initialized, they have to be cloned because they are behind the shared ref.
88415bc
to
85c40a3
Compare
let payload = match std::str::from_utf8(bytes.as_ref()) { | ||
let notification = match rx.recv().unwrap() { | ||
Some(v) => v, | ||
None => continue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good idea to continue on None
? A None
is typically returned when the broker closes the channel from the other end, right? Then what's the point in continuing? If it's still valid, better add a comment mentioning why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the rumqttd source code comments, I can understand that there can be a situation when there are no messages in the queue, then it returns None. But after some time there can be a newer message as well. So, I feel it's better to continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
); | ||
} | ||
|
||
v => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are these Notification
variants and is it worth printing those? If we're gonna keep it, just add a comment mentioning what these v
s are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add the comment there. These notifications are from the broker for the notifications like `Forward, acks, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very useful though: e.g. DeviceAck(SubAck(SubAck { pkid: 0, return_codes: [QoS0] }))
---- tests::tests::ensure_that_last_will_message_is_delivered stdout ----
MQTT-TEST INFO: start test MQTT broker (port = 55555)
DeviceAck(SubAck(SubAck { pkid: 0, return_codes: [QoS0] }))
MQTT-TEST MSG: topic = b"test/lwp", payload = "hello 1"
MQTT-TEST MSG: topic = b"test/lwp", payload = "hello 2"
MQTT-TEST MSG: topic = b"test/lwp", payload = "hello 3"
What would be great is to trace new connection as well as disconnection. I would ignore all other rumqttd
internal messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will not print anyother notifications.
throttle_delay_ms: 0, | ||
max_payload_size: 268435455, | ||
max_inflight_count: 200, | ||
max_inflight_size: 1024, | ||
login_credentials: None, | ||
max_inflight_size: 10204, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentionally changed that max size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, fixed
}; | ||
|
||
let server_config = ServerSettings { | ||
listen: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port)), | ||
cert: None, | ||
next_connection_delay_ms: 1, | ||
next_connection_delay_ms: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was it zero earlier and why change that to 1 now? What's the impact?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted back
Tedge daemons will send 'up' message when starts and a lastwill message when it stops Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
ef71650
to
a4d1fca
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved
thiserror = "1.0" | ||
tokio = { version = "1.12", features = ["rt", "time"] } | ||
|
||
[dev-dependencies] | ||
anyhow = "1.0" | ||
maplit = "1.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where this is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
crates/core/tedge_api/src/health.rs
Outdated
@@ -27,3 +27,16 @@ pub async fn send_health_status(responses: &mut impl PubChannel, daemon_name: &s | |||
let health_message = Message::new(&response_topic_health, health_status); | |||
let _ = responses.send(health_message).await; | |||
} | |||
|
|||
pub fn get_last_will_message(daemon_name: String) -> Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I prefer health_status_down_message
as proposed by @albinsuresh rather than get_health_status_down_message
as currently proposed.
@@ -200,8 +199,7 @@ impl Connection { | |||
} | |||
} | |||
|
|||
Ok(Event::Incoming(Incoming::Disconnect)) | |||
| Ok(Event::Outgoing(Outgoing::Disconnect)) => { | |||
Ok(Event::Incoming(Incoming::Disconnect)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain this change? Why is Outgoing::Disconnect
ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exiting on Event::Outgoing(Outgoing::Disconnect)
prevents a client.disconnect()
to be effective - since not forwarded by the rumqttc
state machine. The impacts were mostly on the tests related to last will messages: this is the only places with explicit client.disconnect()
calls.
crates/core/tedge_agent/src/agent.rs
Outdated
@@ -179,7 +180,8 @@ impl SmAgentConfig { | |||
let mqtt_config = mqtt_channel::Config::default() | |||
.with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) | |||
.with_port(tedge_config.query(MqttPortSetting)?.into()) | |||
.with_max_packet_size(10 * 1024 * 1024); | |||
.with_max_packet_size(10 * 1024 * 1024) | |||
.with_last_will_message(get_health_status_down_message("tedge-agent")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: The string literal "tedge-agent" is reused here and in the send_health_status
function. Better to move it to a const.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It's also strange that no session_name
is set. The same const should be used for both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -121,13 +123,17 @@ async fn monitor_tedge_service( | |||
) -> Result<(), WatchdogError> { | |||
let client_id: &str = &format!("{}_{}", name, nanoid!()); | |||
let mqtt_config = get_mqtt_config(tedge_config_location, client_id)? | |||
.with_subscriptions(res_topic.try_into()?); | |||
.with_subscriptions(res_topic.try_into()?) | |||
.with_last_will_message(get_health_status_down_message("tedge-watchdog")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Same comment as above: move to a const.
@@ -132,6 +133,10 @@ impl ConfigManager { | |||
|
|||
pub async fn run(&mut self) -> Result<(), anyhow::Error> { | |||
self.get_pending_operations_from_cloud().await?; | |||
|
|||
// Now the configuration plugin is done with the initialization and ready for processing the messages | |||
send_health_status(&mut self.mqtt_client.published, "c8y-configuration-plugin").await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Same comment as above: move to a const.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
plugins/c8y_log_plugin/src/main.rs
Outdated
@@ -87,7 +88,8 @@ async fn create_mqtt_client( | |||
let mqtt_config = mqtt_channel::Config::default() | |||
.with_session_name("c8y-log-plugin") | |||
.with_port(mqtt_port) | |||
.with_subscriptions(topics); | |||
.with_subscriptions(topics) | |||
.with_last_will_message(get_health_status_down_message("c8y-log-plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Same comment as above: move to a const.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Proposed changes
The code changes here include sending the health status of the
tedge
daemon by itself.When the daemon is ready, it will send
[tedge/health/<tedge-daemon-name>] {"pid":249234,"status":"up","time":1670535886}
When the daemon stops, it will send
[tedge/health/<tedge-daemon-name>] {"pid":249510,"status":"down"}
This has been implemented for all the
tedge
daemons includingtedge-mapper, tedge-agent, tedge-watchdog, c8y-log/configuration-plugin
Types of changes
Paste Link to the issue
#1503
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments