Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
running request timings metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Jan 11, 2021
1 parent a5aea1c commit 395c4fb
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 6 deletions.
75 changes: 71 additions & 4 deletions src/app/message_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::future::Future;
use std::pin::Pin;
use std::time::Instant;

use async_std::prelude::*;
use async_std::stream::{self, Stream};
Expand All @@ -22,16 +23,20 @@ use crate::app::{endpoint, API_VERSION};
pub(crate) type MessageStream =
Box<dyn Stream<Item = Box<dyn IntoPublishableMessage + Send>> + Send + Unpin>;

type TimingChannel = crossbeam_channel::Sender<(std::time::Duration, String)>;

pub(crate) struct MessageHandler<C: GlobalContext> {
agent: Agent,
global_context: C,
tx: TimingChannel,
}

impl<C: GlobalContext + Sync> MessageHandler<C> {
pub(crate) fn new(agent: Agent, global_context: C) -> Self {
pub(crate) fn new(agent: Agent, global_context: C, tx: TimingChannel) -> Self {
Self {
agent,
global_context,
tx,
}
}

Expand Down Expand Up @@ -86,10 +91,38 @@ impl<C: GlobalContext + Sync> MessageHandler<C> {
message: &IncomingMessage<String>,
topic: &str,
) -> Result<(), AppError> {
let mut timer = if msg_context.dynamic_stats().is_some() {
Some(MessageHandlerTiming::new(self.tx.clone()))
} else {
None
};

match message {
IncomingMessage::Request(req) => self.handle_request(msg_context, req, topic).await,
IncomingMessage::Response(resp) => self.handle_response(msg_context, resp, topic).await,
IncomingMessage::Event(event) => self.handle_event(msg_context, event, topic).await,
IncomingMessage::Request(req) => {
if let Some(ref mut timer) = timer {
timer.set_method(req.properties().method().into());
}
self.handle_request(msg_context, req, topic).await
}
IncomingMessage::Response(resp) => {
if let Some(ref mut timer) = timer {
timer.set_method("response".into());
}

self.handle_response(msg_context, resp, topic).await
}
IncomingMessage::Event(event) => {
if let Some(ref mut timer) = timer {
let label = match event.properties().label() {
Some(label) => format!("event-{}", label),
None => "event-none".into(),
};

timer.set_method(label);
}

self.handle_event(msg_context, event, topic).await
}
}
}

Expand Down Expand Up @@ -422,3 +455,37 @@ impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandle
Box::pin(handle_envelope::<H, C>(context, event))
}
}

struct MessageHandlerTiming {
start: Instant,
sender: TimingChannel,
method: String,
}

impl MessageHandlerTiming {
fn new(sender: TimingChannel) -> Self {
Self {
start: Instant::now(),
method: "none".into(),
sender,
}
}

fn set_method(&mut self, method: String) {
self.method = method;
}
}

impl Drop for MessageHandlerTiming {
fn drop(&mut self) {
if let Err(e) = self
.sender
.try_send((self.start.elapsed(), self.method.clone()))
{
warn!(
crate::LOG,
"Failed to send msg handler future timing, reason = {:?}", e
);
}
}
}
28 changes: 27 additions & 1 deletion src/app/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use svc_agent::AgentId;

use crate::app::context::GlobalContext;
use crate::app::metrics::{Metric, MetricKey, Tags};
use crate::app::metrics::{Metric, MetricKey, PercentileReport, Tags};

pub(crate) struct Aggregator<'a, C: GlobalContext> {
context: &'a C,
Expand Down Expand Up @@ -188,6 +188,32 @@ fn append_dynamic_stats(
tags.clone(),
));
}

for (method, PercentileReport { p95, p99, max }) in dynamic_stats.get_handler_timings()? {
let tags =
Tags::build_running_futures_tags(crate::APP_VERSION, context.agent_id(), method);

metrics.push(Metric::new(
MetricKey::RunningRequestDurationP95,
p95,
now,
tags.clone(),
));

metrics.push(Metric::new(
MetricKey::RunningRequestDurationP99,
p99,
now,
tags.clone(),
));

metrics.push(Metric::new(
MetricKey::RunningRequestDurationMax,
max,
now,
tags.clone(),
));
}
}

Ok(())
Expand Down
85 changes: 85 additions & 0 deletions src/app/metrics/dynamic_stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::thread;
use std::time::Duration;

use anyhow::{Context, Result};
use svc_agent::AgentId;
Expand All @@ -17,15 +19,30 @@ enum Message {
GetJanusTimeouts {
tx: crossbeam_channel::Sender<Vec<(String, u64)>>,
},
HandlerTiming {
duration: Duration,
method: String,
},
GetHandlerTimings {
tx: crossbeam_channel::Sender<Vec<(String, PercentileReport)>>,
},
}

#[derive(Clone)]
pub(crate) struct DynamicStatsCollector {
tx: crossbeam_channel::Sender<Message>,
}

pub(crate) struct PercentileReport {
pub p95: u64,
pub p99: u64,
pub max: u64,
}

struct State {
data: BTreeMap<String, usize>,
janus_timeouts: BTreeMap<String, u64>,
futures_timings: BTreeMap<String, Vec<u64>>,
}

impl DynamicStatsCollector {
Expand All @@ -36,6 +53,7 @@ impl DynamicStatsCollector {
let mut state = State {
data: BTreeMap::new(),
janus_timeouts: BTreeMap::new(),
futures_timings: BTreeMap::new(),
};

for message in rx {
Expand Down Expand Up @@ -82,6 +100,53 @@ impl DynamicStatsCollector {
);
}
}
Message::HandlerTiming { duration, method } => {
let vec = state.futures_timings.entry(method).or_default();
let micros = match u64::try_from(duration.as_micros()) {
Ok(micros) => micros,
Err(_) => u64::MAX,
};

vec.push(micros);
}
Message::GetHandlerTimings { tx } => {
let vec = state
.futures_timings
.into_iter()
.map(|(method, mut values)| {
values.sort_unstable();

let count = values.len();
let p95_idx = (count as f32 * 0.95) as usize;
let p99_idx = (count as f32 * 0.99) as usize;
let max_idx = count - 1;
let max = values[max_idx];

let p95 = if p95_idx < max_idx {
(values[p95_idx] + max) / 2
} else {
max
};

let p99 = if p99_idx < max_idx {
(values[p99_idx] + max) / 2
} else {
max
};

(method, PercentileReport { p95, p99, max })
})
.collect::<Vec<_>>();

if let Err(err) = tx.send(vec) {
warn!(
crate::LOG,
"Failed to send dynamic stats collector report: {}", err,
);
}

state.futures_timings = BTreeMap::new();
}
}
}
});
Expand Down Expand Up @@ -133,6 +198,26 @@ impl DynamicStatsCollector {
rx.recv()
.context("Failed to receive dynamic stats collector report")
}

pub(crate) fn record_future_time(&self, duration: Duration, method: String) {
if let Err(err) = self.tx.send(Message::HandlerTiming { duration, method }) {
warn!(
crate::LOG,
"Failed to register dynamic stats collector value: {}", err
);
}
}

pub(crate) fn get_handler_timings(&self) -> Result<Vec<(String, PercentileReport)>> {
let (tx, rx) = crossbeam_channel::bounded(1);

self.tx
.send(Message::GetHandlerTimings { tx })
.context("Failed to send GetHandlerTimings message to the dynamic stats collector")?;

rx.recv()
.context("Failed to receive dynamic stats collector report")
}
}

impl Drop for DynamicStatsCollector {
Expand Down
41 changes: 41 additions & 0 deletions src/app/metrics/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ pub enum Tags {
account_audience: String,
backend_label: String,
},
RunningFuture {
version: String,
agent_label: String,
account_label: String,
account_audience: String,
method: String,
},
Empty,
}

Expand Down Expand Up @@ -108,6 +115,16 @@ impl Tags {
backend_label: janus_id.label().to_owned(),
}
}

pub fn build_running_futures_tags(version: &str, agent_id: &AgentId, method: String) -> Self {
Tags::RunningFuture {
version: version.to_owned(),
agent_label: agent_id.label().to_owned(),
account_label: agent_id.as_account_id().label().to_owned(),
account_audience: agent_id.as_account_id().audience().to_owned(),
method,
}
}
}

#[derive(Serialize, Clone, Debug)]
Expand Down Expand Up @@ -149,6 +166,12 @@ pub(crate) enum MetricKey {
RunningRequests,
#[serde(rename(serialize = "apps.conference.janus_timeouts_total"))]
JanusTimeoutsTotal,
#[serde(rename(serialize = "apps.conference.running_request_p95_microseconds"))]
RunningRequestDurationP95,
#[serde(rename(serialize = "apps.conference.running_request_p99_microseconds"))]
RunningRequestDurationP99,
#[serde(rename(serialize = "apps.conference.running_request_max_microseconds"))]
RunningRequestDurationMax,
}

#[derive(Serialize, Clone, Debug)]
Expand Down Expand Up @@ -190,6 +213,12 @@ pub(crate) enum MetricKey2 {
RunningRequests,
#[serde(rename(serialize = "janus_timeouts_total"))]
JanusTimeoutsTotal,
#[serde(rename(serialize = "running_request_p95_microseconds"))]
RunningRequestDurationP95,
#[serde(rename(serialize = "running_request_p99_microseconds"))]
RunningRequestDurationP99,
#[serde(rename(serialize = "running_request_max_microseconds"))]
RunningRequestDurationMax,
}

impl From<MetricKey> for MetricKey2 {
Expand All @@ -213,6 +242,9 @@ impl From<MetricKey> for MetricKey2 {
MetricKey::JanusBackendAgentLoad => MetricKey2::JanusBackendAgentLoad,
MetricKey::RunningRequests => MetricKey2::RunningRequests,
MetricKey::JanusTimeoutsTotal => MetricKey2::JanusTimeoutsTotal,
MetricKey::RunningRequestDurationP95 => MetricKey2::RunningRequestDurationP95,
MetricKey::RunningRequestDurationP99 => MetricKey2::RunningRequestDurationP99,
MetricKey::RunningRequestDurationMax => MetricKey2::RunningRequestDurationMax,
}
}
}
Expand All @@ -238,6 +270,15 @@ impl std::fmt::Display for MetricKey2 {
MetricKey2::Dynamic(key) => write!(f, "{}_total", key),
MetricKey2::RunningRequests => write!(f, "running_requests_total"),
MetricKey2::JanusTimeoutsTotal => write!(f, "janus_backend_agent_load_total"),
MetricKey2::RunningRequestDurationP95 => {
write!(f, "running_request_duration_p95_microseconds")
}
MetricKey2::RunningRequestDurationP99 => {
write!(f, "running_request_duration_p99_microseconds")
}
MetricKey2::RunningRequestDurationMax => {
write!(f, "running_request_duration_max_microseconds")
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/app/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) use aggregator::Aggregator;
pub(crate) use dynamic_stats_collector::DynamicStatsCollector;
pub(crate) use dynamic_stats_collector::PercentileReport;
pub(crate) use metric::{Metric, Metric2, MetricKey, Tags};
pub(crate) use stats_route::StatsRoute;

Expand Down
17 changes: 16 additions & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ pub(crate) async fn run(

let running_requests = Arc::new(AtomicI64::new(0));
let stats_collector = Arc::new(DynamicStatsCollector::start());
let stats_collector_ = stats_collector.clone();

let (handler_timer_tx, handler_timer_rx) = crossbeam_channel::bounded(500);
std::thread::Builder::new()
.name("msg-handler-timings".into())
.spawn(move || {
for (dur, method) in handler_timer_rx {
stats_collector_.record_future_time(dur, method);
}
})
.expect("Failed to start msg-handler-timings thread");

// Context
let context = AppContext::new(
Expand All @@ -106,7 +117,11 @@ pub(crate) async fn run(
};

// Message handler
let message_handler = Arc::new(MessageHandler::new(agent.clone(), context));
let message_handler = Arc::new(MessageHandler::new(
agent.clone(),
context,
handler_timer_tx,
));
StatsRoute::start(config, message_handler.clone());

// Message loop
Expand Down

0 comments on commit 395c4fb

Please sign in to comment.