Skip to content

Commit

Permalink
feat: emit Db pool metrics periodically (#605)
Browse files Browse the repository at this point in the history
kill the mislabeled pool get metric for now (fixes it repeatedly
emitting)

Closes #555, #406

(cherry picked from commit c3d6946)
  • Loading branch information
pjenvey committed May 5, 2020
1 parent f1cceda commit 1761f7c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ impl DbPool for MockDbPool {
Box::pin(future::ok(Box::new(MockDb::new()) as Box<dyn Db>))
}

fn state(&self) -> results::PoolState {
results::PoolState::default()
}

fn box_clone(&self) -> Box<dyn DbPool> {
Box::new(self.clone())
}
Expand Down
32 changes: 31 additions & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pub mod spanner;
mod tests;
pub mod util;

use std::fmt::Debug;
use std::{fmt::Debug, time::Duration};

use cadence::{Gauged, StatsdClient};
use futures::future::{self, LocalBoxFuture, TryFutureExt};
use lazy_static::lazy_static;
use serde::Deserialize;
Expand Down Expand Up @@ -61,6 +62,9 @@ type DbFuture<T> = LocalBoxFuture<'static, Result<T, ApiError>>;

pub trait DbPool: Sync + Send + Debug {
fn get(&self) -> DbFuture<Box<dyn Db>>;

fn state(&self) -> results::PoolState;

fn box_clone(&self) -> Box<dyn DbPool>;
}

Expand Down Expand Up @@ -259,3 +263,29 @@ pub fn pool_from_settings(
_ => Err(DbErrorKind::InvalidUrl(settings.database_url.to_owned()))?,
})
}

/// Emit DbPool metrics periodically
pub fn spawn_pool_periodic_reporter(
interval: Duration,
metrics: StatsdClient,
pool: Box<dyn DbPool>,
) {
actix_rt::spawn(async move {
loop {
let results::PoolState {
connections,
idle_connections,
} = pool.state();
metrics
.gauge(
"storage.pool.connections.active",
(connections - idle_connections) as u64,
)
.ok();
metrics
.gauge("storage.pool.connections.idle", idle_connections as u64)
.ok();
actix_rt::time::delay_for(interval).await;
}
});
}
6 changes: 5 additions & 1 deletion src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use diesel::{
use super::models::{MysqlDb, Result};
#[cfg(test)]
use super::test::TestTransactionCustomizer;
use crate::db::{error::DbError, Db, DbFuture, DbPool, STD_COLLS};
use crate::db::{error::DbError, results, Db, DbFuture, DbPool, STD_COLLS};
use crate::server::metrics::Metrics;
use crate::settings::Settings;

Expand Down Expand Up @@ -92,6 +92,10 @@ impl DbPool for MysqlDbPool {
)
}

fn state(&self) -> results::PoolState {
self.pool.state().into()
}

fn box_clone(&self) -> Box<dyn DbPool> {
Box::new(self.clone())
}
Expand Down
16 changes: 16 additions & 0 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ pub struct PostBsos {
pub failed: HashMap<String, String>,
}

#[derive(Debug, Default)]
/// A mockable r2d2::State
pub struct PoolState {
pub connections: u32,
pub idle_connections: u32,
}

impl From<diesel::r2d2::State> for PoolState {
fn from(state: diesel::r2d2::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}

#[cfg(test)]
pub type GetCollectionId = i32;

Expand Down
10 changes: 6 additions & 4 deletions src/db/spanner/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use scheduled_thread_pool::ScheduledThreadPool;
use super::models::Result;
#[cfg(test)]
use super::test_util::SpannerTestTransactionCustomizer;
use crate::db::{error::DbError, Db, DbFuture, DbPool, STD_COLLS};
use crate::db::{error::DbError, results, Db, DbFuture, DbPool, STD_COLLS};
use crate::server::metrics::Metrics;
use crate::settings::Settings;

Expand Down Expand Up @@ -63,8 +63,6 @@ impl SpannerDbPool {
let builder = r2d2::Pool::builder()
.max_size(max_size)
.thread_pool(Arc::new(ScheduledThreadPool::new(r2d2_thread_pool_size)));
let mut metrics = metrics.clone();
metrics.start_timer("storage.spanner.pool.get", None);

#[cfg(test)]
let builder = if settings.database_use_test_transactions {
Expand All @@ -76,7 +74,7 @@ impl SpannerDbPool {
Ok(Self {
pool: builder.build(manager)?,
coll_cache: Default::default(),
metrics,
metrics: metrics.clone(),
})
}

Expand All @@ -102,6 +100,10 @@ impl DbPool for SpannerDbPool {
)
}

fn state(&self) -> results::PoolState {
self.pool.state().into()
}

fn box_clone(&self) -> Box<dyn DbPool> {
Box::new(self.clone())
}
Expand Down
6 changes: 4 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Main application server

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::db::{pool_from_settings, DbPool};
use crate::db::{pool_from_settings, spawn_pool_periodic_reporter, DbPool};
use crate::error::ApiError;
use crate::server::metrics::Metrics;
use crate::settings::{Secrets, ServerLimits, Settings};
Expand Down Expand Up @@ -158,6 +158,8 @@ impl Server {
let secrets = Arc::new(settings.master_secret);
let port = settings.port;

spawn_pool_periodic_reporter(Duration::from_secs(10), metrics.clone(), db_pool.clone());

let server = HttpServer::new(move || {
// Setup the server state
let state = ServerState {
Expand Down

0 comments on commit 1761f7c

Please sign in to comment.