Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Resubscribe on reconnect (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed May 19, 2020
1 parent 5749b70 commit e6ab003
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 46 deletions.
8 changes: 8 additions & 0 deletions src/app/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ impl<C: Context + Sync> MessageHandler<C> {
Self { agent, context }
}

pub(crate) fn agent(&self) -> &Agent {
&self.agent
}

pub(crate) fn context(&self) -> &C {
&self.context
}

pub(crate) async fn handle(&self, message_bytes: &[u8], topic: &str) {
info!(
"Incoming message = '{}'",
Expand Down
136 changes: 90 additions & 46 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::thread;
use async_std::task;
use chrono::Utc;
use futures::StreamExt;
use log::{error, info};
use log::{error, info, warn};
use serde_json::json;
use svc_agent::{
mqtt::{
Expand All @@ -15,8 +15,10 @@ use svc_agent::{
};
use svc_authn::token::jws_compact;
use svc_authz::cache::Cache as AuthzCache;
use svc_error::{extension::sentry, Error as SvcError};

use crate::config::{self, KruonisConfig};
use crate::app::context::Context;
use crate::config::{self, Config, KruonisConfig};
use crate::db::ConnectionPool;
use context::{AppContext, JanusTopics};
use message_handler::MessageHandler;
Expand Down Expand Up @@ -72,9 +74,54 @@ pub(crate) async fn run(
svc_error::extension::sentry::init(sentry_config);
}

// Subscribe to multicast requests
// Subscribe to topics
let janus_topics = subscribe(&mut agent, &agent_id, &config)?;

// Context
let context = AppContext::new(config.clone(), authz, db.clone(), janus_topics);

// Message handler
let message_handler = Arc::new(MessageHandler::new(agent.clone(), context));

// Message loop
while let Some(message) = mq_rx.next().await {
let message_handler = message_handler.clone();

task::spawn(async move {
match message {
svc_agent::mqtt::Notification::Publish(message) => {
message_handler
.handle(&message.payload, &message.topic_name)
.await
}
svc_agent::mqtt::Notification::Disconnection => {
error!("Disconnected from broker");
}
svc_agent::mqtt::Notification::Reconnection => {
error!("Reconnected to broker");

resubscribe(
&mut message_handler.agent().to_owned(),
message_handler.context().agent_id(),
message_handler.context().config(),
);
}
_ => error!("Unsupported notification type = '{:?}'", message),
}
});
}

Ok(())
}

fn subscribe(
agent: &mut Agent,
agent_id: &AgentId,
config: &Config,
) -> Result<JanusTopics, String> {
let group = SharedGroup::new("loadbalancer", agent_id.as_account_id().clone());

// Multicast requests
agent
.subscribe(
&Subscription::multicast_requests(Some(API_VERSION)),
Expand All @@ -83,90 +130,71 @@ pub(crate) async fn run(
)
.map_err(|err| format!("Error subscribing to multicast requests: {}", err))?;

// Subscribe to Janus status
// Unicast requests
agent
.subscribe(
&Subscription::unicast_requests(),
QoS::AtMostOnce,
Some(&group),
)
.map_err(|err| format!("Error subscribing to unicast requests: {}", err))?;

// Janus status events
let subscription = Subscription::broadcast_events(&config.backend_id, API_VERSION, "status");

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend events topic: {}", err))?;

let janus_status_events_topic = subscription
.subscription_topic(&agent_id, API_VERSION)
.subscription_topic(agent_id, API_VERSION)
.map_err(|err| format!("Error building janus events subscription topic: {}", err))?;

// Subscribe to Janus events
// Janus events
let subscription = Subscription::broadcast_events(&config.backend_id, API_VERSION, "events");

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend events topic: {}", err))?;

let janus_events_topic = subscription
.subscription_topic(&agent_id, API_VERSION)
.subscription_topic(agent_id, API_VERSION)
.map_err(|err| format!("Error building janus events subscription topic: {}", err))?;

// Subscribe to Janus responses
// Janus responses
let subscription = Subscription::unicast_responses_from(&config.backend_id);

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend responses topic: {}", err))?;

agent
.subscribe(
&Subscription::unicast_requests(),
QoS::AtMostOnce,
Some(&group),
)
.map_err(|err| format!("Error subscribing to unicast requests: {}", err))?;
let janus_responses_topic = subscription
.subscription_topic(agent_id, API_VERSION)
.map_err(|err| format!("Error building janus responses subscription topic: {}", err))?;

// Kruonis
if let KruonisConfig {
id: Some(ref kruonis_id),
} = config.kruonis
{
subscribe_to_kruonis(kruonis_id, &mut agent)?;
subscribe_to_kruonis(kruonis_id, agent)?;
}

let janus_responses_topic = subscription
.subscription_topic(&agent_id, API_VERSION)
.map_err(|err| format!("Error building janus responses subscription topic: {}", err))?;

// Context
let janus_topics = JanusTopics::new(
// Return Janus subscription topics
Ok(JanusTopics::new(
&janus_status_events_topic,
&janus_events_topic,
&janus_responses_topic,
);

let context = AppContext::new(config, authz, db.clone(), janus_topics);

// Message handler
let message_handler = Arc::new(MessageHandler::new(agent, context));

// Message loop
while let Some(message) = mq_rx.next().await {
let message_handler = message_handler.clone();

task::spawn(async move {
match message {
svc_agent::mqtt::Notification::Publish(message) => {
message_handler
.handle(&message.payload, &message.topic_name)
.await
}
_ => error!("Unsupported notification type = '{:?}'", message),
}
});
}

Ok(())
))
}

fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<(), String> {
let timing = ShortTermTimingProperties::new(Utc::now());

let topic = Subscription::unicast_requests_from(kruonis_id)
.subscription_topic(agent.id(), API_VERSION)
.map_err(|err| format!("Failed to build subscription topic: {:?}", err))?;

let props = OutgoingRequestProperties::new("kruonis.subscribe", &topic, "", timing);
let event = OutgoingRequest::multicast(json!({}), props, kruonis_id);
let message = Box::new(event) as Box<dyn IntoPublishableDump + Send>;
Expand All @@ -184,9 +212,25 @@ fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<(),
agent
.publish_dump(dump)
.map_err(|err| format!("Failed to publish message: {}", err))?;

Ok(())
}

fn resubscribe(agent: &mut Agent, agent_id: &AgentId, config: &Config) {
if let Err(err) = subscribe(agent, agent_id, config) {
let err = format!("Failed to resubscribe after reconnection: {}", err);
error!("{}", err);

let svc_error = SvcError::builder()
.kind("resubscription_error", "Resubscription error")
.detail(&err)
.build();

sentry::send(svc_error)
.unwrap_or_else(|err| warn!("Error sending error to Sentry: {}", err));
}
}

pub(crate) mod context;
pub(crate) mod endpoint;
pub(crate) mod handle_id;
Expand Down

0 comments on commit e6ab003

Please sign in to comment.