Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Metric2 with different serialization tag
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Sep 17, 2020
1 parent a699daf commit b24105a
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 159 deletions.
213 changes: 54 additions & 159 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use async_std::stream;
use async_trait::async_trait;
use chrono::{serde::ts_seconds, DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
use serde_derive::Deserialize;
use svc_agent::mqtt::{
IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ResponseStatus,
ShortTermTimingProperties,
};

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::metrics::{Metric, Metric2, MetricValue};
use crate::config::TelemetryConfig;
use crate::db;

Expand All @@ -22,70 +23,6 @@ fn default_duration() -> u64 {
5
}

#[derive(Serialize, Debug, Copy, Clone)]
pub(crate) struct MetricValue<T: serde::Serialize> {
value: T,
#[serde(with = "ts_seconds")]
timestamp: DateTime<Utc>,
}

impl<T: serde::Serialize> MetricValue<T> {
fn new(value: T, timestamp: DateTime<Utc>) -> Self {
Self { value, timestamp }
}
}
#[derive(Serialize, Debug, Clone)]
#[serde(tag = "metric")]
pub(crate) enum Metric {
#[serde(rename(serialize = "apps.conference.incoming_requests_total"))]
IncomingQueueRequests(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.incoming_responses_total"))]
IncomingQueueResponses(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.incoming_events_total"))]
IncomingQueueEvents(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.outgoing_requests_total"))]
OutgoingQueueRequests(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.outgoing_responses_total"))]
OutgoingQueueResponses(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.outgoing_events_total"))]
OutgoingQueueEvents(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.db_connections_total"))]
DbConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.idle_db_connections_total"))]
IdleDbConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.redis_connections_total"))]
RedisConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.idle_redis_connections_total"))]
IdleRedisConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.db_pool_checkin_average_total"))]
DbPoolCheckinAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.conference.max_db_pool_checkin_total"))]
MaxDbPoolCheckin(MetricValue<u128>),
#[serde(rename(serialize = "apps.conference.db_pool_checkout_average_total"))]
DbPoolCheckoutAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.conference.max_db_pool_checkout_total"))]
MaxDbPoolCheckout(MetricValue<u128>),
#[serde(rename(serialize = "apps.conference.db_pool_release_average_total"))]
DbPoolReleaseAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.conference.max_db_pool_release_total"))]
MaxDbPoolRelease(MetricValue<u128>),
#[serde(rename(serialize = "apps.conference.db_pool_timeout_average_total"))]
DbPoolTimeoutAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.conference.max_db_pool_timeout_total"))]
MaxDbPoolTimeout(MetricValue<u128>),
#[serde(rename(serialize = "apps.conference.online_janus_backends_total"))]
OnlineJanusBackendsCount(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.janus_backends_capacity_total"))]
JanusBackendTotalCapacity(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.connected_agents_total"))]
ConnectedAgentsCount(MetricValue<u64>),
#[serde(serialize_with = "serialize_dynamic_metric")]
Dynamic {
key: String,
value: MetricValue<u64>,
},
}

pub(crate) struct PullHandler;

#[async_trait]
Expand All @@ -110,57 +47,51 @@ impl EventHandler for PullHandler {
.status(ResponseStatus::BAD_REQUEST)?;

vec![
Metric::IncomingQueueRequests(MetricValue {
value: stats.incoming_requests,
timestamp: now,
}),
Metric::IncomingQueueResponses(MetricValue {
value: stats.incoming_responses,
timestamp: now,
}),
Metric::IncomingQueueEvents(MetricValue {
value: stats.incoming_events,
timestamp: now,
}),
Metric::OutgoingQueueRequests(MetricValue {
value: stats.outgoing_requests,
timestamp: now,
}),
Metric::OutgoingQueueResponses(MetricValue {
value: stats.outgoing_responses,
timestamp: now,
}),
Metric::OutgoingQueueEvents(MetricValue {
value: stats.outgoing_events,
timestamp: now,
}),
Metric::IncomingQueueRequests(MetricValue::new(
stats.incoming_requests,
now,
)),
Metric::IncomingQueueResponses(MetricValue::new(
stats.incoming_responses,
now,
)),
Metric::IncomingQueueEvents(MetricValue::new(stats.incoming_events, now)),
Metric::OutgoingQueueRequests(MetricValue::new(
stats.outgoing_requests,
now,
)),
Metric::OutgoingQueueResponses(MetricValue::new(
stats.outgoing_responses,
now,
)),
Metric::OutgoingQueueEvents(MetricValue::new(stats.outgoing_events, now)),
]
} else {
vec![]
};

let db_state = context.db().state();
metrics.push(Metric::DbConnections(MetricValue {
value: db_state.connections as u64,
timestamp: now,
}));
metrics.push(Metric::DbConnections(MetricValue::new(
db_state.connections as u64,
now,
)));

metrics.push(Metric::IdleDbConnections(MetricValue {
value: db_state.idle_connections as u64,
timestamp: now,
}));
metrics.push(Metric::IdleDbConnections(MetricValue::new(
db_state.idle_connections as u64,
now,
)));

if let Some(pool) = context.redis_pool() {
let pool_state = pool.state();
metrics.push(Metric::RedisConnections(MetricValue {
value: pool_state.connections as u64,
timestamp: now,
}));

metrics.push(Metric::IdleRedisConnections(MetricValue {
value: pool_state.idle_connections as u64,
timestamp: now,
}));
metrics.push(Metric::RedisConnections(MetricValue::new(
pool_state.connections as u64,
now,
)));

metrics.push(Metric::IdleRedisConnections(MetricValue::new(
pool_state.idle_connections as u64,
now,
)));
}

append_db_pool_stats(&mut metrics, context, now);
Expand All @@ -173,12 +104,24 @@ impl EventHandler for PullHandler {
.map_err(|err| err.to_string())
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let metrics2 = metrics
.clone()
.into_iter()
.map(|m| m.into())
.collect::<Vec<Metric2>>();

let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evp.to_event("metric.create", short_term_timing);
let props = evp.to_event("metric.create", short_term_timing.clone());
let props2 = evp.to_event("metric.create", short_term_timing);

let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id);
let boxed_event =
Box::new(outgoing_event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_event)))
let outgoing_event2 = OutgoingEvent::multicast(metrics2, props2, account_id);

let boxed_events = vec![
Box::new(outgoing_event) as Box<dyn IntoPublishableMessage + Send>,
Box::new(outgoing_event2) as Box<dyn IntoPublishableMessage + Send>,
];
Ok(Box::new(stream::from_iter(boxed_events)))
}

_ => Ok(Box::new(stream::empty())),
Expand Down Expand Up @@ -222,25 +165,6 @@ fn append_dynamic_stats(
Ok(())
}

fn serialize_dynamic_metric<K, V, S>(
key: K,
metric_value: &MetricValue<V>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
K: std::fmt::Display,
V: serde::Serialize,
S: serde::ser::Serializer,
{
use serde::ser::SerializeMap;

let mut map = serializer.serialize_map(Some(3))?;
map.serialize_entry("metric", &format!("apps.conference.{}_total", key))?;
map.serialize_entry("value", &metric_value.value)?;
map.serialize_entry("timestamp", &metric_value.timestamp)?;
map.end()
}

fn append_janus_stats(
metrics: &mut Vec<Metric>,
context: &dyn Context,
Expand Down Expand Up @@ -275,32 +199,3 @@ fn append_janus_stats(

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Deserialize)]
struct DynamicMetric {
metric: String,
value: u64,
timestamp: DateTime<Utc>,
}

#[test]
fn serialize_dynamic_metric() {
let now = Utc::now();

let json = serde_json::json!(Metric::Dynamic {
key: String::from("example"),
value: MetricValue::new(123, now),
});

let parsed: DynamicMetric = serde_json::from_str(&json.to_string())
.expect("Failed to parse json");

assert_eq!(&parsed.metric, "apps.conference.example_total");
assert_eq!(parsed.value, 123);
assert_eq!(parsed.timestamp, now);
}
}
Loading

0 comments on commit b24105a

Please sign in to comment.