Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add unix signals handling (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Jul 28, 2020
1 parent b4958f4 commit 61a020b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 22 deletions.
39 changes: 28 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
signal-hook = "0.1"
svc-agent = { version = "0.14", features = ["diesel", "queue-counter"] }
svc-authn = { version = "0.6", features = ["jose", "diesel"] }
svc-authz = "0.10"
Expand Down
37 changes: 26 additions & 11 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use anyhow::{Context as AnyhowContext, Result};
use async_std::task;
Expand Down Expand Up @@ -86,17 +88,22 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option<AuthzCache>) ->
let message_handler = Arc::new(MessageHandler::new(agent.clone(), context));

// Message loop
while let Some(message) = mq_rx.next().await {
let message_handler = message_handler.clone();
let term_check_period = Duration::from_secs(1);
let term = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&term))?;
signal_hook::flag::register(signal_hook::SIGINT, Arc::clone(&term))?;

task::spawn(async move {
match message {
while !term.load(Ordering::Relaxed) {
let fut = async_std::future::timeout(term_check_period, mq_rx.next());

if let Ok(Some(message)) = fut.await {
let message_handler = message_handler.clone();

task::spawn_blocking(move || match message {
AgentNotification::Message(message, metadata) => {
message_handler.handle(&message, &metadata.topic).await
}
AgentNotification::Disconnection => {
error!("Disconnected from broker");
async_std::task::block_on(message_handler.handle(&message, &metadata.topic));
}
AgentNotification::Disconnection => error!("Disconnected from broker"),
AgentNotification::Reconnection => {
error!("Reconnected to broker");

Expand All @@ -106,9 +113,17 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option<AuthzCache>) ->
message_handler.context().config(),
);
}
_ => error!("Unsupported notification type = '{:?}'", message),
}
});
AgentNotification::Puback(_) => (),
AgentNotification::Pubrec(_) => (),
AgentNotification::Pubcomp(_) => (),
AgentNotification::Suback(_) => (),
AgentNotification::Unsuback(_) => (),
AgentNotification::Abort(err) => {
error!("MQTT client aborted: {}", err);
return;
}
});
}
}

Ok(())
Expand Down

0 comments on commit 61a020b

Please sign in to comment.