Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add janus & connected agents metrics (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Sep 9, 2020
1 parent f890119 commit dee2d34
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 1 deletion.
48 changes: 47 additions & 1 deletion src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use svc_agent::mqtt::{
use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::config::TelemetryConfig;
use crate::db;

#[derive(Debug, Deserialize)]
pub(crate) struct PullPayload {
Expand Down Expand Up @@ -72,6 +73,12 @@ pub(crate) enum Metric {
DbPoolTimeoutAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.conference.max_db_pool_timeout_total"))]
MaxDbPoolTimeout(MetricValue<u128>),
#[serde(rename(serialize = "apps.conference.online_janus_backends_total"))]
OnlineJanusBackendsCount(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.janus_backends_capacity_total"))]
JanusBackendTotalCapacity(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.connected_agents_total"))]
ConnectedAgentsCount(MetricValue<u64>),
#[serde(serialize_with = "serialize_dynamic_metric")]
Dynamic {
key: String,
Expand Down Expand Up @@ -162,6 +169,10 @@ impl EventHandler for PullHandler {
.map_err(|err| err.to_string())
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

append_janus_stats(&mut metrics, context, now)
.map_err(|err| err.to_string())
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evp.to_event("metric.create", short_term_timing);
let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id);
Expand Down Expand Up @@ -211,7 +222,7 @@ fn append_dynamic_stats(
Ok(())
}

pub(crate) fn serialize_dynamic_metric<K, V, S>(
fn serialize_dynamic_metric<K, V, S>(
key: K,
value: V,
serializer: S,
Expand All @@ -228,3 +239,38 @@ where
map.serialize_entry("value", &value)?;
map.end()
}

fn append_janus_stats(
metrics: &mut Vec<Metric>,
context: &dyn Context,
now: DateTime<Utc>,
) -> anyhow::Result<()> {
use anyhow::Context;

let conn = context.db().get()?;

// The number of online janus backends.
let online_backends_count =
db::janus_backend::count(&conn).context("Failed to get janus backends count")?;

let value = MetricValue::new(online_backends_count as u64, now);
metrics.push(Metric::OnlineJanusBackendsCount(value));

// Total capacity of online janus backends.
let total_capacity = db::janus_backend::total_capacity(&conn)
.context("Failed to get janus backends total capacity")?;

let value = MetricValue::new(total_capacity as u64, now);
metrics.push(Metric::JanusBackendTotalCapacity(value));

// The number of agents connect to an RTC.
let connected_agents_count = db::agent::CountQuery::new()
.status(db::agent::Status::Connected)
.execute(&conn)
.context("Failed to get connected agents count")?;

let value = MetricValue::new(connected_agents_count as u64, now);
metrics.push(Metric::ConnectedAgentsCount(value));

Ok(())
}
31 changes: 31 additions & 0 deletions src/db/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,37 @@ impl<'a> ListQuery<'a> {
}
}

///////////////////////////////////////////////////////////////////////////////

pub(crate) struct CountQuery {
status: Option<Status>,
}

impl CountQuery {
pub(crate) fn new() -> Self {
Self { status: None }
}

pub(crate) fn status(self, status: Status) -> Self {
Self {
status: Some(status),
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<i64, Error> {
use diesel::dsl::count;
use diesel::prelude::*;

let mut query = agent::table.select(count(agent::id)).into_boxed();

if let Some(status) = self.status {
query = query.filter(agent::status.eq(status));
}

query.get_result(conn)
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Insertable)]
Expand Down
21 changes: 21 additions & 0 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,24 @@ pub(crate) fn free_capacity(rtc_id: Uuid, conn: &PgConnection) -> Result<i32, Er
.get_result::<FreeCapacityQueryRow>(conn)
.map(|row| row.free_capacity)
}

////////////////////////////////////////////////////////////////////////////////

pub(crate) fn count(conn: &PgConnection) -> Result<i64, Error> {
use diesel::dsl::sum;
use diesel::prelude::*;

janus_backend::table
.select(sum(janus_backend::capacity))
.get_result::<Option<i64>>(conn)
.map(|v| v.unwrap_or(0))
}

pub(crate) fn total_capacity(conn: &PgConnection) -> Result<i64, Error> {
use diesel::dsl::count;
use diesel::prelude::*;

janus_backend::table
.select(count(janus_backend::id))
.get_result(conn)
}

0 comments on commit dee2d34

Please sign in to comment.