Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
added StatsRoute for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Oct 15, 2020
1 parent fd42476 commit b2f1393
Show file tree
Hide file tree
Showing 16 changed files with 915 additions and 385 deletions.
3 changes: 3 additions & 0 deletions App.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ id = "janus-gateway.svc.example.org"
default_timeout = 5
stream_upload_timeout = 600
transaction_watchdog_check_period = 1

[metrics.http]
bind_address = "0.0.0.0:8087"
246 changes: 193 additions & 53 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ svc-authz = "0.10"
svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] }
uuid = { version = "0.6", features = ["v4", "serde"] }
webrtc-sdp = "0.1"
tide = "0.13.0"
# this is to prohibit tide and surf inside svc-authz from updating this dependency
pin-project-lite = "=0.1.7"
29 changes: 10 additions & 19 deletions src/app/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use svc_authz::cache::ConnectionPool as RedisConnectionPool;
use svc_authz::ClientMap as Authz;

use crate::app::error::{Error as AppError, ErrorExt, ErrorKind as AppErrorKind};
use crate::app::metrics::{DbPoolStatsCollector, DynamicStatsCollector};
use crate::app::metrics::{DynamicStatsCollector, Metric};
use crate::backend::janus::Client as JanusClient;
use crate::config::Config;
use crate::db::ConnectionPool as Db;
Expand All @@ -27,8 +27,8 @@ pub(crate) trait GlobalContext: Sync {
fn janus_topics(&self) -> &JanusTopics;
fn queue_counter(&self) -> &Option<QueueCounterHandle>;
fn redis_pool(&self) -> &Option<RedisConnectionPool>;
fn db_pool_stats(&self) -> &Option<DbPoolStatsCollector>;
fn dynamic_stats(&self) -> Option<&DynamicStatsCollector>;
fn get_metrics(&self, duration: u64) -> anyhow::Result<Vec<Metric>>;

fn get_conn(&self) -> Result<PooledConnection<ConnectionManager<PgConnection>>, AppError> {
self.db()
Expand Down Expand Up @@ -59,7 +59,6 @@ pub(crate) struct AppContext {
janus_topics: JanusTopics,
queue_counter: Option<QueueCounterHandle>,
redis_pool: Option<RedisConnectionPool>,
db_pool_stats: Option<DbPoolStatsCollector>,
dynamic_stats: Option<Arc<DynamicStatsCollector>>,
}

Expand All @@ -82,7 +81,6 @@ impl AppContext {
janus_topics,
queue_counter: None,
redis_pool: None,
db_pool_stats: None,
dynamic_stats: Some(Arc::new(DynamicStatsCollector::start())),
}
}
Expand All @@ -100,13 +98,6 @@ impl AppContext {
..self
}
}

pub(crate) fn db_pool_stats(self, stats: DbPoolStatsCollector) -> Self {
Self {
db_pool_stats: Some(stats),
..self
}
}
}

impl GlobalContext for AppContext {
Expand Down Expand Up @@ -142,13 +133,13 @@ impl GlobalContext for AppContext {
&self.redis_pool
}

fn db_pool_stats(&self) -> &Option<DbPoolStatsCollector> {
&self.db_pool_stats
}

fn dynamic_stats(&self) -> Option<&DynamicStatsCollector> {
self.dynamic_stats.as_deref()
}

fn get_metrics(&self, duration: u64) -> anyhow::Result<Vec<Metric>> {
crate::app::metrics::Collector::new(self, duration).get()
}
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -202,13 +193,13 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> {
self.global_context.redis_pool()
}

fn db_pool_stats(&self) -> &Option<DbPoolStatsCollector> {
self.global_context.db_pool_stats()
}

fn dynamic_stats(&self) -> Option<&DynamicStatsCollector> {
self.global_context.dynamic_stats()
}

fn get_metrics(&self, duration: u64) -> anyhow::Result<Vec<Metric>> {
self.global_context.get_metrics(duration)
}
}

impl<'a, C: GlobalContext> MessageContext for AppMessageContext<'a, C> {
Expand Down
171 changes: 3 additions & 168 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use async_std::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use diesel::pg::PgConnection;
use serde_derive::Deserialize;
use svc_agent::mqtt::{
IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ShortTermTimingProperties,
};

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::metrics::{Metric, Metric2, MetricValue};
use crate::app::metrics::Metric2;
use crate::config::TelemetryConfig;
use crate::db;

#[derive(Debug, Deserialize)]
pub(crate) struct PullPayload {
Expand All @@ -38,74 +35,10 @@ impl EventHandler for PullHandler {
TelemetryConfig {
id: Some(ref account_id),
} => {
let now = Utc::now();

let mut metrics = if let Some(qc) = context.queue_counter() {
let stats = qc
.get_stats(payload.duration)
.map_err(|err| anyhow!("Failed to get stats: {}", err))
.error(AppErrorKind::StatsCollectionFailed)?;

vec![
Metric::IncomingQueueRequests(MetricValue::new(
stats.incoming_requests,
now,
)),
Metric::IncomingQueueResponses(MetricValue::new(
stats.incoming_responses,
now,
)),
Metric::IncomingQueueEvents(MetricValue::new(stats.incoming_events, now)),
Metric::OutgoingQueueRequests(MetricValue::new(
stats.outgoing_requests,
now,
)),
Metric::OutgoingQueueResponses(MetricValue::new(
stats.outgoing_responses,
now,
)),
Metric::OutgoingQueueEvents(MetricValue::new(stats.outgoing_events, now)),
]
} else {
vec![]
};

let db_state = context.db().state();
metrics.push(Metric::DbConnections(MetricValue::new(
db_state.connections as u64,
now,
)));

metrics.push(Metric::IdleDbConnections(MetricValue::new(
db_state.idle_connections as u64,
now,
)));

if let Some(pool) = context.redis_pool() {
let pool_state = pool.state();
metrics.push(Metric::RedisConnections(MetricValue::new(
pool_state.connections as u64,
now,
)));

metrics.push(Metric::IdleRedisConnections(MetricValue::new(
pool_state.idle_connections as u64,
now,
)));
}

append_db_pool_stats(&mut metrics, context, now);

append_dynamic_stats(&mut metrics, context, now)
let metrics = context
.get_metrics(payload.duration)
.error(AppErrorKind::StatsCollectionFailed)?;

{
let conn = context.get_conn()?;

append_janus_stats(&mut metrics, &conn, now)
.error(AppErrorKind::StatsCollectionFailed)?;
}

let metrics2 = metrics
.clone()
.into_iter()
Expand All @@ -132,101 +65,3 @@ impl EventHandler for PullHandler {
}
}
}

fn append_db_pool_stats<C: Context>(metrics: &mut Vec<Metric>, context: &C, now: DateTime<Utc>) {
if let Some(db_pool_stats) = context.db_pool_stats() {
let stats = db_pool_stats.get_stats();

let m = [
Metric::DbPoolCheckinAverage(MetricValue::new(stats.avg_checkin, now)),
Metric::MaxDbPoolCheckin(MetricValue::new(stats.max_checkin, now)),
Metric::DbPoolCheckoutAverage(MetricValue::new(stats.avg_checkout, now)),
Metric::MaxDbPoolCheckout(MetricValue::new(stats.max_checkout, now)),
Metric::DbPoolTimeoutAverage(MetricValue::new(stats.avg_timeout, now)),
Metric::MaxDbPoolTimeout(MetricValue::new(stats.max_timeout, now)),
Metric::DbPoolReleaseAverage(MetricValue::new(stats.avg_release, now)),
Metric::MaxDbPoolRelease(MetricValue::new(stats.max_release, now)),
];

metrics.extend_from_slice(&m);
}
}

fn append_dynamic_stats<C: Context>(
metrics: &mut Vec<Metric>,
context: &C,
now: DateTime<Utc>,
) -> anyhow::Result<()> {
if let Some(dynamic_stats) = context.dynamic_stats() {
for (key, value) in dynamic_stats.flush()? {
metrics.push(Metric::Dynamic {
key,
value: MetricValue::new(value as u64, now),
});
}
}

Ok(())
}

fn append_janus_stats(
metrics: &mut Vec<Metric>,
conn: &PgConnection,
now: DateTime<Utc>,
) -> anyhow::Result<()> {
use anyhow::Context;

// The number of online janus backends.
let online_backends_count =
db::janus_backend::count(&conn).context("Failed to get janus backends count")?;

let value = MetricValue::new(online_backends_count as u64, now);
metrics.push(Metric::OnlineJanusBackendsCount(value));

// Total capacity of online janus backends.
let total_capacity = db::janus_backend::total_capacity(&conn)
.context("Failed to get janus backends total capacity")?;

let value = MetricValue::new(total_capacity as u64, now);
metrics.push(Metric::JanusBackendTotalCapacity(value));

// The number of agents connect to an RTC.
let connected_agents_count = db::agent::CountQuery::new()
.status(db::agent::Status::Connected)
.execute(&conn)
.context("Failed to get connected agents count")?;

let value = MetricValue::new(connected_agents_count as u64, now);
metrics.push(Metric::ConnectedAgentsCount(value));

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Deserialize)]
struct DynamicMetric {
metric: String,
value: u64,
timestamp: DateTime<Utc>,
}

#[test]
fn serialize_dynamic_metric() {
let now = Utc::now();

let json = serde_json::json!(Metric::Dynamic {
key: String::from("example"),
value: MetricValue::new(123, now),
});

let parsed: DynamicMetric =
serde_json::from_str(&json.to_string()).expect("Failed to parse json");

assert_eq!(&parsed.metric, "apps.conference.example_total");
assert_eq!(parsed.value, 123);
assert_eq!(parsed.timestamp, now);
}
}
Loading

0 comments on commit b2f1393

Please sign in to comment.