Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
added janus timeouts metric
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Dec 15, 2020
1 parent da24b7d commit e610a03
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 17 deletions.
5 changes: 3 additions & 2 deletions src/app/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl AppContext {
db: Db,
janus_client: JanusClient,
janus_topics: JanusTopics,
stats_collector: Arc<DynamicStatsCollector>,
) -> Self {
let agent_id = AgentId::new(&config.agent_label, config.id.to_owned());

Expand All @@ -83,7 +84,7 @@ impl AppContext {
janus_topics,
queue_counter: None,
redis_pool: None,
dynamic_stats: Some(Arc::new(DynamicStatsCollector::start())),
dynamic_stats: Some(stats_collector),
running_requests: None,
}
}
Expand Down Expand Up @@ -148,7 +149,7 @@ impl GlobalContext for AppContext {
}

fn get_metrics(&self) -> anyhow::Result<Vec<Metric>> {
crate::app::metrics::Collector::new(self).get()
crate::app::metrics::Aggregator::new(self).get()
}

fn running_requests(&self) -> Option<Arc<AtomicI64>> {
Expand Down
25 changes: 22 additions & 3 deletions src/app/metrics/collector.rs → src/app/metrics/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::str::FromStr;
use std::sync::atomic::Ordering;

use chrono::{DateTime, Utc};
use svc_agent::AgentId;

use crate::app::context::GlobalContext;
use crate::app::metrics::{Metric, MetricKey, Tags};

pub(crate) struct Collector<'a, C: GlobalContext> {
pub(crate) struct Aggregator<'a, C: GlobalContext> {
context: &'a C,
}

impl<'a, C: GlobalContext> Collector<'a, C> {
impl<'a, C: GlobalContext> Aggregator<'a, C> {
pub(crate) fn new(context: &'a C) -> Self {
Self { context }
}
Expand Down Expand Up @@ -170,6 +172,22 @@ fn append_dynamic_stats(
Tags::Empty,
));
}

for (agent_id, value) in dynamic_stats.get_janus_timeouts()? {
let tags = Tags::build_janus_tags(
crate::APP_VERSION,
context.agent_id(),
&AgentId::from_str(&agent_id)
.expect("We only write agent ids into DynamicStatsCollector"),
);

metrics.push(Metric::new(
MetricKey::JanusTimeoutsTotal,
value,
now,
tags.clone(),
));
}
}

Ok(())
Expand All @@ -182,11 +200,12 @@ fn append_janus_stats(
) -> anyhow::Result<()> {
use crate::db::agent;
use anyhow::Context;

match context.get_conn() {
Err(e) => {
error!(
crate::LOG,
"Collector failed to acquire connection, reason = {:?}", e
"Aggregator failed to acquire connection, reason = {:?}", e
);
Ok(())
}
Expand Down
66 changes: 60 additions & 6 deletions src/app/metrics/dynamic_stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use std::thread;

use anyhow::{Context, Result};
use svc_agent::AgentId;

enum Message {
Register {
Expand All @@ -11,32 +12,44 @@ enum Message {
Flush {
tx: crossbeam_channel::Sender<Vec<(String, usize)>>,
},
JanusTimeout(AgentId),
Stop,
GetJanusTimeouts {
tx: crossbeam_channel::Sender<Vec<(String, u64)>>,
},
}

pub(crate) struct DynamicStatsCollector {
tx: crossbeam_channel::Sender<Message>,
}

struct State {
data: BTreeMap<String, usize>,
janus_timeouts: BTreeMap<String, u64>,
}

impl DynamicStatsCollector {
pub(crate) fn start() -> Self {
let (tx, rx) = crossbeam_channel::unbounded();

thread::spawn(move || {
let mut data: BTreeMap<String, usize> = BTreeMap::new();
let mut state = State {
data: BTreeMap::new(),
janus_timeouts: BTreeMap::new(),
};

for message in rx {
match message {
Message::Register { key, value } => {
let current_value = data.get_mut(&key).map(|v| *v);
let current_value = state.data.get_mut(&key).map(|v| *v);

match current_value {
Some(current_value) => data.insert(key, current_value + value),
None => data.insert(key, value),
Some(current_value) => state.data.insert(key, current_value + value),
None => state.data.insert(key, value),
};
}
Message::Flush { tx } => {
let report = data.into_iter().collect::<Vec<(String, usize)>>();
let report = state.data.into_iter().collect();

if let Err(err) = tx.send(report) {
warn!(
Expand All @@ -45,9 +58,30 @@ impl DynamicStatsCollector {
);
}

data = BTreeMap::new();
state.data = BTreeMap::new();
}
Message::Stop => break,
Message::JanusTimeout(agent_id) => {
let entry = state
.janus_timeouts
.entry(agent_id.to_string())
.or_insert(0);
*entry += 1;
}
Message::GetJanusTimeouts { tx } => {
let report = state
.janus_timeouts
.iter()
.map(|(aid, c)| (aid.clone(), *c))
.collect();

if let Err(err) = tx.send(report) {
warn!(
crate::LOG,
"Failed to send dynamic stats collector report: {}", err,
);
}
}
}
}
});
Expand Down Expand Up @@ -79,6 +113,26 @@ impl DynamicStatsCollector {
rx.recv()
.context("Failed to receive dynamic stats collector report")
}

pub(crate) fn record_janus_timeout(&self, janus: AgentId) {
if let Err(err) = self.tx.send(Message::JanusTimeout(janus)) {
warn!(
crate::LOG,
"Failed to register dynamic stats collector value: {}", err
);
}
}

pub(crate) fn get_janus_timeouts(&self) -> Result<Vec<(String, u64)>> {
let (tx, rx) = crossbeam_channel::bounded(1);

self.tx
.send(Message::GetJanusTimeouts { tx })
.context("Failed to send GetJanusTimeouts message to the dynamic stats collector")?;

rx.recv()
.context("Failed to receive dynamic stats collector report")
}
}

impl Drop for DynamicStatsCollector {
Expand Down
6 changes: 6 additions & 0 deletions src/app/metrics/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ pub(crate) enum MetricKey {
Dynamic(String),
#[serde(rename(serialize = "apps.conference.running_requests_total"))]
RunningRequests,
#[serde(rename(serialize = "apps.conference.janus_timeouts_total"))]
JanusTimeoutsTotal,
}

#[derive(Serialize, Clone, Debug)]
Expand Down Expand Up @@ -186,6 +188,8 @@ pub(crate) enum MetricKey2 {
Dynamic(String),
#[serde(rename(serialize = "running_requests_total"))]
RunningRequests,
#[serde(rename(serialize = "janus_timeouts_total"))]
JanusTimeoutsTotal,
}

impl From<MetricKey> for MetricKey2 {
Expand All @@ -208,6 +212,7 @@ impl From<MetricKey> for MetricKey2 {
MetricKey::JanusBackendReserveLoad => MetricKey2::JanusBackendReserveLoad,
MetricKey::JanusBackendAgentLoad => MetricKey2::JanusBackendAgentLoad,
MetricKey::RunningRequests => MetricKey2::RunningRequests,
MetricKey::JanusTimeoutsTotal => MetricKey2::JanusTimeoutsTotal,
}
}
}
Expand All @@ -232,6 +237,7 @@ impl std::fmt::Display for MetricKey2 {
MetricKey2::JanusBackendAgentLoad => write!(f, "janus_backend_agent_load_total"),
MetricKey2::Dynamic(key) => write!(f, "{}_total", key),
MetricKey2::RunningRequests => write!(f, "running_requests_total"),
MetricKey2::JanusTimeoutsTotal => write!(f, "janus_backend_agent_load_total"),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/app/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub(crate) use collector::Collector;
pub(crate) use aggregator::Aggregator;
pub(crate) use dynamic_stats_collector::DynamicStatsCollector;
pub(crate) use metric::{Metric, Metric2, MetricKey, Tags};
pub(crate) use stats_route::StatsRoute;

mod collector;
mod aggregator;
mod dynamic_stats_collector;
mod metric;
mod stats_route;
6 changes: 4 additions & 2 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use svc_authz::cache::{Cache as AuthzCache, ConnectionPool as RedisConnectionPoo

use crate::app::context::GlobalContext;
use crate::app::error::{Error as AppError, ErrorKind as AppErrorKind};
use crate::app::metrics::StatsRoute;
use crate::app::metrics::{DynamicStatsCollector, StatsRoute};
use crate::backend::janus::Client as JanusClient;
use crate::config::{self, Config, KruonisConfig};
use crate::db::ConnectionPool;
Expand Down Expand Up @@ -86,14 +86,16 @@ pub(crate) async fn run(
let janus_topics = subscribe(&mut agent, &agent_id, &config)?;

let running_requests = Arc::new(AtomicI64::new(0));
let stats_collector = Arc::new(DynamicStatsCollector::start());

// Context
let context = AppContext::new(
config.clone(),
authz,
db.clone(),
JanusClient::start(&config.backend, agent_id)?,
JanusClient::start(&config.backend, agent_id, Some(stats_collector.clone()))?,
janus_topics,
stats_collector,
)
.add_queue_counter(agent.get_queue_counter())
.add_running_requests_counter(running_requests.clone());
Expand Down
11 changes: 10 additions & 1 deletion src/backend/janus/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use std::time::Duration as StdDuration;

Expand All @@ -11,6 +12,7 @@ use svc_agent::{
};

use crate::app::error::{Error as AppError, ErrorKind as AppErrorKind};
use crate::app::metrics::DynamicStatsCollector;
use crate::config::BackendConfig;

use super::{JANUS_API_VERSION, STREAM_UPLOAD_METHOD};
Expand Down Expand Up @@ -39,7 +41,11 @@ pub(crate) struct Client {
}

impl Client {
pub(crate) fn start(config: &BackendConfig, me: AgentId) -> Result<Self> {
pub(crate) fn start(
config: &BackendConfig,
me: AgentId,
timeouts_writer: Option<Arc<DynamicStatsCollector>>,
) -> Result<Self> {
let period = StdDuration::from_secs(config.transaction_watchdog_check_period);
let (tx, rx) = crossbeam_channel::unbounded();

Expand Down Expand Up @@ -67,6 +73,9 @@ impl Client {
anyhow!("Janus request timed out ({}): {:?}", corr_data, info);

error!(crate::LOG, "{}", err);
if let Some(ref writer) = timeouts_writer {
writer.record_janus_timeout(info.to.clone());
}

AppError::new(AppErrorKind::BackendRequestTimedOut, err)
.notify_sentry(&crate::LOG);
Expand Down
2 changes: 1 addition & 1 deletion src/test_helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl TestContext {
let config = build_config();
let agent_id = AgentId::new(&config.agent_label, config.id.clone());

let janus_client = JanusClient::start(&config.backend, agent_id.clone())
let janus_client = JanusClient::start(&config.backend, agent_id.clone(), None)
.expect("Failed to start janus client");

Self {
Expand Down

0 comments on commit e610a03

Please sign in to comment.