Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
added kruonis subscription, db_connections_total metric report
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed May 15, 2020
1 parent a2b6a33 commit 32a9eb0
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 3 deletions.
64 changes: 64 additions & 0 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use async_std::stream;
use async_trait::async_trait;
use chrono::{serde::ts_seconds, DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ShortTermTimingProperties,
};

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::config::TelemetryConfig;

#[derive(Debug, Deserialize)]
pub(crate) struct PullPayload {}

#[derive(Serialize, Debug)]
pub(crate) struct MetricValue {
value: u64,
#[serde(with = "ts_seconds")]
timestamp: DateTime<Utc>,
}

#[derive(Serialize, Debug)]
#[serde(tag = "metric")]
pub(crate) enum Metric {
//IncomingQueue(MetricValue),
//OutgoingQueue(MetricValue),
#[serde(rename(serialize = "apps.conference.db_connections_total"))]
DbConnections(MetricValue),
}

pub(crate) struct PullHandler;

#[async_trait]
impl EventHandler for PullHandler {
type Payload = PullPayload;

async fn handle<C: Context>(
context: &C,
_payload: Self::Payload,
evp: &IncomingEventProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
match context.config().telemetry {
TelemetryConfig {
id: Some(ref account_id),
} => {
let outgoing_event_payload = vec![Metric::DbConnections(MetricValue {
value: context.db().state().connections as u64,
timestamp: Utc::now(),
})];

let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evp.to_event("metric.create", short_term_timing);
let outgoing_event =
OutgoingEvent::multicast(outgoing_event_payload, props, account_id);
let boxed_event = Box::new(outgoing_event) as Box<dyn IntoPublishableDump + Send>;
Ok(Box::new(stream::once(boxed_event)))
}

_ => Ok(Box::new(stream::empty())),
}
}
}
2 changes: 2 additions & 0 deletions src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ macro_rules! event_routes {

// Event routes configuration: label => EventHandler
event_routes!(
"metric.pull" => metric::PullHandler,
"subscription.delete" => subscription::DeleteHandler,
"subscription.create" => subscription::CreateHandler
);
Expand All @@ -162,6 +163,7 @@ event_routes!(

mod agent;
mod message;
mod metric;
mod room;
pub(crate) mod rtc;
pub(crate) mod rtc_signal;
Expand Down
51 changes: 48 additions & 3 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ use std::sync::Arc;
use std::thread;

use async_std::task;
use chrono::Utc;
use futures::StreamExt;
use log::{error, info};
use serde_json::json;
use svc_agent::{
mqtt::{AgentBuilder, ConnectionMode, Notification, QoS, SubscriptionTopic},
AgentId, Authenticable, SharedGroup, Subscription,
mqtt::{
Agent, AgentBuilder, ConnectionMode, IntoPublishableDump, Notification, OutgoingRequest,
OutgoingRequestProperties, QoS, ShortTermTimingProperties, SubscriptionTopic,
},
AccountId, AgentId, Authenticable, SharedGroup, Subscription,
};
use svc_authn::token::jws_compact;
use svc_authz::cache::Cache as AuthzCache;

use crate::config;
use crate::config::{self, KruonisConfig};
use crate::db::ConnectionPool;
use context::{AppContext, JanusTopics};
use message_handler::MessageHandler;
Expand Down Expand Up @@ -107,6 +112,21 @@ pub(crate) async fn run(
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend responses topic: {}", err))?;

agent
.subscribe(
&Subscription::unicast_requests(),
QoS::AtMostOnce,
Some(&group),
)
.map_err(|err| format!("Error subscribing to unicast requests: {}", err))?;

if let KruonisConfig {
id: Some(ref kruonis_id),
} = config.kruonis
{
subscribe_to_kruonis(kruonis_id, &mut agent)?;
}

let janus_responses_topic = subscription
.subscription_topic(&agent_id, API_VERSION)
.map_err(|err| format!("Error building janus responses subscription topic: {}", err))?;
Expand Down Expand Up @@ -142,6 +162,31 @@ pub(crate) async fn run(
Ok(())
}

fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<(), String> {
let timing = ShortTermTimingProperties::new(Utc::now());
let topic = Subscription::unicast_requests_from(kruonis_id)
.subscription_topic(agent.id(), API_VERSION)
.map_err(|err| format!("Failed to build subscription topic: {:?}", err))?;
let props = OutgoingRequestProperties::new("kruonis.subscribe", &topic, "", timing);
let event = OutgoingRequest::multicast(json!({}), props, kruonis_id);
let message = Box::new(event) as Box<dyn IntoPublishableDump + Send>;

let dump = message
.into_dump(agent.address())
.map_err(|err| format!("Failed to dump message: {}", err))?;

info!(
"Outgoing message = '{}' sending to the topic = '{}'",
dump.payload(),
dump.topic(),
);

agent
.publish_dump(dump)
.map_err(|err| format!("Failed to publish message: {}", err))?;
Ok(())
}

pub(crate) mod context;
pub(crate) mod endpoint;
pub(crate) mod handle_id;
Expand Down
14 changes: 14 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub(crate) struct Config {
pub(crate) authz: Authz,
pub(crate) mqtt: AgentConfig,
pub(crate) sentry: Option<SentryConfig>,
#[serde(default)]
pub(crate) telemetry: TelemetryConfig,
#[serde(default)]
pub(crate) kruonis: KruonisConfig,
}

#[derive(Clone, Debug, Deserialize)]
Expand All @@ -31,3 +35,13 @@ pub(crate) fn load() -> Result<Config, config::ConfigError> {
parser.merge(config::Environment::with_prefix("APP").separator("__"))?;
parser.try_into::<Config>()
}

#[derive(Clone, Debug, Deserialize, Default)]
pub(crate) struct TelemetryConfig {
pub(crate) id: Option<AccountId>,
}

#[derive(Clone, Debug, Deserialize, Default)]
pub(crate) struct KruonisConfig {
pub(crate) id: Option<AccountId>,
}

0 comments on commit 32a9eb0

Please sign in to comment.