Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add label to message.broadcast and metrics (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Sep 8, 2020
1 parent 0f6fb9a commit 9218acc
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/src/api/message/broadcast.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Name | Type | Default | Description
----------------- | ---------- | ---------- | ------------------
room_id | Uuid | _required_ | A destination room identifier. The room must be opened.
data | JsonObject | _required_ | JSON object.
label | String | _optional_ | A label to group messages by in metrics.



Expand Down
17 changes: 12 additions & 5 deletions src/app/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use svc_agent::{queue_counter::QueueCounterHandle, AgentId};
use svc_authz::cache::ConnectionPool as RedisConnectionPool;
use svc_authz::ClientMap as Authz;

use crate::app::metrics::StatsCollector;
use crate::app::metrics::{DbPoolStatsCollector, DynamicStatsCollector};
use crate::config::Config;
use crate::db::ConnectionPool as Db;

Expand All @@ -19,7 +19,8 @@ pub(crate) struct AppContext {
janus_topics: JanusTopics,
queue_counter: Option<QueueCounterHandle>,
redis_pool: Option<RedisConnectionPool>,
db_pool_stats: Option<StatsCollector>,
db_pool_stats: Option<DbPoolStatsCollector>,
dynamic_stats: Option<Arc<DynamicStatsCollector>>,
}

impl AppContext {
Expand All @@ -35,6 +36,7 @@ impl AppContext {
queue_counter: None,
redis_pool: None,
db_pool_stats: None,
dynamic_stats: Some(Arc::new(DynamicStatsCollector::start())),
}
}

Expand All @@ -52,7 +54,7 @@ impl AppContext {
}
}

pub(crate) fn db_pool_stats(self, stats: StatsCollector) -> Self {
pub(crate) fn db_pool_stats(self, stats: DbPoolStatsCollector) -> Self {
Self {
db_pool_stats: Some(stats),
..self
Expand All @@ -68,7 +70,8 @@ pub(crate) trait Context: Sync {
fn janus_topics(&self) -> &JanusTopics;
fn queue_counter(&self) -> &Option<QueueCounterHandle>;
fn redis_pool(&self) -> &Option<RedisConnectionPool>;
fn db_pool_stats(&self) -> &Option<StatsCollector>;
fn db_pool_stats(&self) -> &Option<DbPoolStatsCollector>;
fn dynamic_stats(&self) -> Option<&DynamicStatsCollector>;
}

impl Context for AppContext {
Expand Down Expand Up @@ -100,9 +103,13 @@ impl Context for AppContext {
&self.redis_pool
}

fn db_pool_stats(&self) -> &Option<StatsCollector> {
fn db_pool_stats(&self) -> &Option<DbPoolStatsCollector> {
&self.db_pool_stats
}

fn dynamic_stats(&self) -> Option<&DynamicStatsCollector> {
self.dynamic_stats.as_deref()
}
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
10 changes: 10 additions & 0 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl RequestHandler for UnicastHandler {
pub(crate) struct BroadcastRequest {
room_id: Uuid,
data: JsonValue,
label: Option<String>,
}

pub(crate) struct BroadcastHandler;
Expand All @@ -107,6 +108,12 @@ impl RequestHandler for BroadcastHandler {
room
};

if let Some(stats) = context.dynamic_stats() {
if let Some(label) = payload.label {
stats.collect(&format!("message_broadcast_{}", label), 1);
}
}

// Respond and broadcast to the room topic.
let response =
shared::build_response(ResponseStatus::OK, json!({}), reqp, start_timestamp, None);
Expand Down Expand Up @@ -400,6 +407,7 @@ mod test {
let payload = BroadcastRequest {
room_id: room.id(),
data: json!({ "key": "value" }),
label: None,
};

let messages = handle_request::<BroadcastHandler>(&context, &sender, payload)
Expand Down Expand Up @@ -435,6 +443,7 @@ mod test {
let payload = BroadcastRequest {
room_id: Uuid::new_v4(),
data: json!({ "key": "value" }),
label: None,
};

let err = handle_request::<BroadcastHandler>(&context, &sender, payload)
Expand Down Expand Up @@ -464,6 +473,7 @@ mod test {
let payload = BroadcastRequest {
room_id: room.id(),
data: json!({ "key": "value" }),
label: None,
};

let err = handle_request::<BroadcastHandler>(&context, &sender, payload)
Expand Down
46 changes: 45 additions & 1 deletion src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<T: serde::Serialize> MetricValue<T> {
Self { value, timestamp }
}
}
#[derive(Serialize, Debug, Copy, Clone)]
#[derive(Serialize, Debug, Clone)]
#[serde(tag = "metric")]
pub(crate) enum Metric {
#[serde(rename(serialize = "apps.conference.incoming_requests_total"))]
Expand Down Expand Up @@ -72,6 +72,11 @@ pub(crate) enum Metric {
DbPoolTimeoutAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.conference.max_db_pool_timeout_total"))]
MaxDbPoolTimeout(MetricValue<u128>),
#[serde(serialize_with = "serialize_dynamic_metric")]
Dynamic {
key: String,
value: MetricValue<u64>,
},
}

pub(crate) struct PullHandler;
Expand Down Expand Up @@ -153,6 +158,10 @@ impl EventHandler for PullHandler {

append_db_pool_stats(&mut metrics, context, now);

append_dynamic_stats(&mut metrics, context, now)
.map_err(|err| err.to_string())
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evp.to_event("metric.create", short_term_timing);
let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id);
Expand Down Expand Up @@ -184,3 +193,38 @@ fn append_db_pool_stats(metrics: &mut Vec<Metric>, context: &dyn Context, now: D
metrics.extend_from_slice(&m);
}
}

fn append_dynamic_stats(
metrics: &mut Vec<Metric>,
context: &dyn Context,
now: DateTime<Utc>,
) -> anyhow::Result<()> {
if let Some(dynamic_stats) = context.dynamic_stats() {
for (key, value) in dynamic_stats.flush()? {
metrics.push(Metric::Dynamic {
key,
value: MetricValue::new(value as u64, now),
});
}
}

Ok(())
}

pub(crate) fn serialize_dynamic_metric<K, V, S>(
key: K,
value: V,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
K: std::fmt::Display,
V: serde::Serialize,
S: serde::ser::Serializer,
{
use serde::ser::SerializeMap;

let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("metric", &format!("app.conference.{}_total", key))?;
map.serialize_entry("value", &value)?;
map.end()
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ pub struct Stats {
}

#[derive(Debug, Clone)]
pub struct StatsCollector {
pub struct DbPoolStatsCollector {
inner: Arc<Mutex<Inner>>,
}

impl StatsCollector {
impl DbPoolStatsCollector {
pub fn new() -> (Self, StatsTransmitter) {
let inner: Arc<Mutex<Inner>> = Default::default();
let collector = Self {
Expand Down
88 changes: 88 additions & 0 deletions src/app/metrics/dynamic_stats_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::collections::BTreeMap;
use std::thread;

use anyhow::{Context, Result};
use log::warn;

enum Message {
Register {
key: String,
value: usize,
},
Flush {
tx: crossbeam_channel::Sender<Vec<(String, usize)>>,
},
Stop,
}

pub(crate) struct DynamicStatsCollector {
tx: crossbeam_channel::Sender<Message>,
}

impl DynamicStatsCollector {
pub(crate) fn start() -> Self {
let (tx, rx) = crossbeam_channel::unbounded();

thread::spawn(move || {
let mut data: BTreeMap<String, usize> = BTreeMap::new();

for message in rx {
match message {
Message::Register { key, value } => {
let current_value = data.get_mut(&key).map(|v| *v);

match current_value {
Some(current_value) => data.insert(key, current_value + value),
None => data.insert(key, value),
};
}
Message::Flush { tx } => {
let report = data.into_iter().collect::<Vec<(String, usize)>>();

if let Err(err) = tx.send(report) {
warn!("Failed to send dynamic stats collector report: {:?}", err);
}

data = BTreeMap::new();
}
Message::Stop => break,
}
}
});

Self { tx }
}

pub(crate) fn collect(&self, key: impl Into<String>, value: usize) {
let message = Message::Register {
key: key.into(),
value,
};

if let Err(err) = self.tx.send(message) {
warn!(
"Failed to register dynamic stats collector value: {:?}",
err
);
}
}

pub(crate) fn flush(&self) -> Result<Vec<(String, usize)>> {
let (tx, rx) = crossbeam_channel::bounded(1);

self.tx
.send(Message::Flush { tx })
.context("Failed to send flush message to the dynamic stats collector")?;

rx.recv()
.context("Failed to receive dynamic stats collector report")
}
}

impl Drop for DynamicStatsCollector {
fn drop(&mut self) {
if let Err(err) = self.tx.send(Message::Stop) {
warn!("Failed to stop dynamic stats collector: {:?}", err);
}
}
}
6 changes: 4 additions & 2 deletions src/app/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) use stats_collector::StatsCollector;
pub(crate) use db_pool_stats_collector::DbPoolStatsCollector;
pub(crate) use dynamic_stats_collector::DynamicStatsCollector;

mod stats_collector;
mod db_pool_stats_collector;
mod dynamic_stats_collector;
4 changes: 2 additions & 2 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use svc_authz::cache::{Cache as AuthzCache, ConnectionPool as RedisConnectionPoo
use svc_error::{extension::sentry, Error as SvcError};

use crate::app::context::Context;
use crate::app::metrics::StatsCollector;
use crate::app::metrics::DbPoolStatsCollector;
use crate::config::{self, Config, KruonisConfig};
use crate::db::ConnectionPool;
use context::{AppContext, JanusTopics};
Expand All @@ -35,7 +35,7 @@ pub(crate) async fn run(
db: &ConnectionPool,
redis_pool: Option<RedisConnectionPool>,
authz_cache: Option<AuthzCache>,
db_pool_stats: StatsCollector,
db_pool_stats: DbPoolStatsCollector,
) -> Result<()> {
// Config
let config = config::load().expect("Failed to load config");
Expand Down
6 changes: 3 additions & 3 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use diesel::r2d2::{ConnectionManager, Pool};
use std::sync::Arc;
use std::time::Duration;

use crate::app::metrics::StatsCollector;
use crate::app::metrics::DbPoolStatsCollector;

pub(crate) type ConnectionPool = Arc<Pool<ConnectionManager<PgConnection>>>;

Expand All @@ -14,9 +14,9 @@ pub(crate) fn create_pool(
idle_size: Option<u32>,
timeout: u64,
enable_stats: bool,
) -> (ConnectionPool, StatsCollector) {
) -> (ConnectionPool, DbPoolStatsCollector) {
let manager = ConnectionManager::<PgConnection>::new(url);
let (collector, transmitter) = StatsCollector::new();
let (collector, transmitter) = DbPoolStatsCollector::new();

let builder = Pool::builder()
.max_size(size)
Expand Down
8 changes: 6 additions & 2 deletions src/test_helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use svc_authz::cache::ConnectionPool as RedisConnectionPool;
use svc_authz::ClientMap as Authz;

use crate::app::context::{Context, JanusTopics};
use crate::app::metrics::StatsCollector;
use crate::app::metrics::{DbPoolStatsCollector, DynamicStatsCollector};
use crate::config::Config;
use crate::db::ConnectionPool as Db;

Expand Down Expand Up @@ -93,7 +93,11 @@ impl Context for TestContext {
&None
}

fn db_pool_stats(&self) -> &Option<StatsCollector> {
fn db_pool_stats(&self) -> &Option<DbPoolStatsCollector> {
&None
}

fn dynamic_stats(&self) -> Option<&DynamicStatsCollector> {
None
}
}

0 comments on commit 9218acc

Please sign in to comment.