Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
db pool metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Aug 31, 2020
1 parent 084d03f commit 91ad494
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait = "0.1"
base64 = "0.10"
chrono = { version = "0.4", features = ["serde"] }
config = "0.9"
crossbeam-channel = "0.4"
diesel = { version = "1.4", features = ["postgres", "uuid", "chrono", "serde_json", "r2d2"] }
diesel-derive-enum = { version = "0.4", features = ["postgres"] }
env_logger = "0.6"
Expand Down
15 changes: 15 additions & 0 deletions src/app/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use svc_agent::{queue_counter::QueueCounterHandle, AgentId};
use svc_authz::cache::ConnectionPool as RedisConnectionPool;
use svc_authz::ClientMap as Authz;

use crate::app::metrics::StatsCollector;
use crate::config::Config;
use crate::db::ConnectionPool as Db;

Expand All @@ -18,6 +19,7 @@ pub(crate) struct AppContext {
janus_topics: JanusTopics,
queue_counter: Option<QueueCounterHandle>,
redis_pool: Option<RedisConnectionPool>,
db_pool_stats: Option<StatsCollector>,
}

impl AppContext {
Expand All @@ -32,6 +34,7 @@ impl AppContext {
janus_topics,
queue_counter: None,
redis_pool: None,
db_pool_stats: None,
}
}

Expand All @@ -48,6 +51,13 @@ impl AppContext {
..self
}
}

pub(crate) fn db_pool_stats(self, stats: StatsCollector) -> Self {
Self {
db_pool_stats: Some(stats),
..self
}
}
}

pub(crate) trait Context: Sync {
Expand All @@ -58,6 +68,7 @@ pub(crate) trait Context: Sync {
fn janus_topics(&self) -> &JanusTopics;
fn queue_counter(&self) -> &Option<QueueCounterHandle>;
fn redis_pool(&self) -> &Option<RedisConnectionPool>;
fn db_pool_stats(&self) -> &Option<StatsCollector>;
}

impl Context for AppContext {
Expand Down Expand Up @@ -88,6 +99,10 @@ impl Context for AppContext {
fn redis_pool(&self) -> &Option<RedisConnectionPool> {
&self.redis_pool
}

fn db_pool_stats(&self) -> &Option<StatsCollector> {
&self.db_pool_stats
}
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
70 changes: 56 additions & 14 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,57 @@ fn default_duration() -> u64 {
5
}

#[derive(Serialize, Debug)]
pub(crate) struct MetricValue {
value: u64,
#[derive(Serialize, Debug, Copy, Clone)]
pub(crate) struct MetricValue<T: serde::Serialize> {
value: T,
#[serde(with = "ts_seconds")]
timestamp: DateTime<Utc>,
}

#[derive(Serialize, Debug)]
impl<T: serde::Serialize> MetricValue<T> {
fn new(value: T, timestamp: DateTime<Utc>) -> Self {
Self { value, timestamp }
}
}
#[derive(Serialize, Debug, Copy, Clone)]
#[serde(tag = "metric")]
pub(crate) enum Metric {
#[serde(rename(serialize = "apps.conference.incoming_requests_total"))]
IncomingQueueRequests(MetricValue),
IncomingQueueRequests(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.incoming_responses_total"))]
IncomingQueueResponses(MetricValue),
IncomingQueueResponses(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.incoming_events_total"))]
IncomingQueueEvents(MetricValue),
IncomingQueueEvents(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.outgoing_requests_total"))]
OutgoingQueueRequests(MetricValue),
OutgoingQueueRequests(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.outgoing_responses_total"))]
OutgoingQueueResponses(MetricValue),
OutgoingQueueResponses(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.outgoing_events_total"))]
OutgoingQueueEvents(MetricValue),
OutgoingQueueEvents(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.db_connections_total"))]
DbConnections(MetricValue),
DbConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.idle_db_connections_total"))]
IdleDbConnections(MetricValue),
IdleDbConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.redis_connections_total"))]
RedisConnections(MetricValue),
RedisConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.conference.idle_redis_connections_total"))]
IdleRedisConnections(MetricValue),
IdleRedisConnections(MetricValue<u64>),
#[serde(rename(serialize = "apps.event.db_pool_checkin_average_total"))]
DbPoolCheckinAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.event.max_db_pool_checkin_total"))]
MaxDbPoolCheckin(MetricValue<u128>),
#[serde(rename(serialize = "apps.event.db_pool_checkout_average_total"))]
DbPoolCheckoutAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.event.max_db_pool_checkout_total"))]
MaxDbPoolCheckout(MetricValue<u128>),
#[serde(rename(serialize = "apps.event.db_pool_release_average_total"))]
DbPoolReleaseAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.event.max_db_pool_release_total"))]
MaxDbPoolRelease(MetricValue<u128>),
#[serde(rename(serialize = "apps.event.db_pool_timeout_average_total"))]
DbPoolTimeoutAverage(MetricValue<f64>),
#[serde(rename(serialize = "apps.event.max_db_pool_timeout_total"))]
MaxDbPoolTimeout(MetricValue<u128>),
}

pub(crate) struct PullHandler;
Expand Down Expand Up @@ -130,6 +151,8 @@ impl EventHandler for PullHandler {
}));
}

append_db_pool_stats(&mut metrics, context, now);

let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evp.to_event("metric.create", short_term_timing);
let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id);
Expand All @@ -142,3 +165,22 @@ impl EventHandler for PullHandler {
}
}
}

fn append_db_pool_stats(metrics: &mut Vec<Metric>, context: &dyn Context, now: DateTime<Utc>) {
if let Some(db_pool_stats) = context.db_pool_stats() {
let stats = db_pool_stats.get_stats();

let m = [
Metric::DbPoolCheckinAverage(MetricValue::new(stats.avg_checkin, now)),
Metric::MaxDbPoolCheckin(MetricValue::new(stats.max_checkin, now)),
Metric::DbPoolCheckoutAverage(MetricValue::new(stats.avg_checkout, now)),
Metric::MaxDbPoolCheckout(MetricValue::new(stats.max_checkout, now)),
Metric::DbPoolTimeoutAverage(MetricValue::new(stats.avg_timeout, now)),
Metric::MaxDbPoolTimeout(MetricValue::new(stats.max_timeout, now)),
Metric::DbPoolReleaseAverage(MetricValue::new(stats.avg_release, now)),
Metric::MaxDbPoolRelease(MetricValue::new(stats.max_release, now)),
];

metrics.extend_from_slice(&m);
}
}
3 changes: 3 additions & 0 deletions src/app/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) use stats_collector::StatsCollector;

mod stats_collector;
194 changes: 194 additions & 0 deletions src/app/metrics/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use std::sync::Arc;
use std::sync::Mutex;

use diesel::r2d2::event::*;
use log::error;

#[derive(Debug, Default)]
struct Stat {
total: u128,
micros_sum: u128,
max_micros: u128,
}

impl Stat {
fn reset(&mut self) {
self.total = 0;
self.micros_sum = 0;
self.max_micros = 0;
}

fn update(&mut self, micros: u128) {
self.total += 1;
self.micros_sum += micros;
self.max_micros = std::cmp::max(self.max_micros, micros)
}
}

#[derive(Debug, Default)]
struct Inner {
checkin: Stat,
checkout: Stat,
release: Stat,
timeout: Stat,
}

enum Update {
Checkin(u128),
Checkout(u128),
Release(u128),
Timeout(u128),
}

impl Inner {
fn get_stats(&mut self) -> Stats {
let stats = Stats {
avg_checkin: (self.checkin.micros_sum as f64) / (self.checkin.total as f64),
avg_checkout: (self.checkout.micros_sum as f64) / (self.checkout.total as f64),
avg_release: (self.release.micros_sum as f64) / (self.release.total as f64),
avg_timeout: (self.timeout.micros_sum as f64) / (self.timeout.total as f64),
max_checkin: self.checkin.max_micros,
max_checkout: self.checkout.max_micros,
max_release: self.release.max_micros,
max_timeout: self.timeout.max_micros,
};

self.reset();

stats
}

fn update(&mut self, update: Update) {
match update {
Update::Checkin(micros) => {
self.checkin.update(micros);
}
Update::Checkout(micros) => {
self.checkout.update(micros);
}
Update::Timeout(micros) => {
self.timeout.update(micros);
}
Update::Release(micros) => {
self.release.update(micros);
}
}
}

fn reset(&mut self) {
self.checkin.reset();
self.checkout.reset();
self.release.reset();
self.timeout.reset();
}
}

#[derive(Debug, Clone)]
pub struct Stats {
pub avg_checkin: f64,
pub avg_checkout: f64,
pub avg_release: f64,
pub avg_timeout: f64,
pub max_checkin: u128,
pub max_checkout: u128,
pub max_release: u128,
pub max_timeout: u128,
}

#[derive(Debug, Clone)]
pub struct StatsCollector {
inner: Arc<Mutex<Inner>>,
}

impl StatsCollector {
pub fn new() -> (Self, StatsTransmitter) {
let inner: Arc<Mutex<Inner>> = Default::default();
let collector = Self {
inner: inner.clone(),
};

let (tx, rx) = crossbeam_channel::bounded(100);
let transmitter = StatsTransmitter {
inner: Arc::new(Mutex::new(StatsTransmitterInner(tx))),
};

std::thread::Builder::new()
.name("dbpool-stats-collector".to_string())
.spawn(move || {
for update in rx {
inner
.lock()
.expect("Lock failed, poisoned mutex?")
.update(update);
}
})
.expect("Failed to start dbpool-stats-collector thread");

(collector, transmitter)
}

pub fn get_stats(&self) -> Stats {
let mut inner = self.inner.lock().expect("Lock failed, poisoned mutex?");
inner.get_stats()
}
}

#[derive(Debug)]
struct StatsTransmitterInner(crossbeam_channel::Sender<Update>);

#[derive(Debug)]
pub struct StatsTransmitter {
inner: Arc<Mutex<StatsTransmitterInner>>,
}

impl diesel::r2d2::HandleEvent for StatsTransmitter {
fn handle_checkin(&self, event: CheckinEvent) {
let micros = event.duration().as_micros();

let inner = self.inner.lock().expect("Lock failed, poisoned mutex?");
if let Err(e) = inner.0.send(Update::Checkin(micros)) {
error!(
"Failed to send checkin micros in StatsTransmiiter, reason = {}",
e
);
}
}

fn handle_checkout(&self, event: CheckoutEvent) {
let micros = event.duration().as_micros();

let inner = self.inner.lock().expect("Lock failed, poisoned mutex?");
if let Err(e) = inner.0.send(Update::Checkout(micros)) {
error!(
"Failed to send checkout micros in StatsTransmiiter, reason = {}",
e
);
}
}

fn handle_release(&self, event: ReleaseEvent) {
let micros = event.age().as_micros();

let inner = self.inner.lock().expect("Lock failed, poisoned mutex?");
if let Err(e) = inner.0.send(Update::Release(micros)) {
error!(
"Failed to send checkout micros in StatsTransmiiter, reason = {}",
e
);
}
}

fn handle_timeout(&self, event: TimeoutEvent) {
let micros = event.timeout().as_micros();

let inner = self.inner.lock().expect("Lock failed, poisoned mutex?");
if let Err(e) = inner.0.send(Update::Timeout(micros)) {
error!(
"Failed to send checkout micros in StatsTransmiiter, reason = {}",
e
);
}
}

fn handle_acquire(&self, _event: AcquireEvent) {}
}
Loading

0 comments on commit 91ad494

Please sign in to comment.