Skip to content

Commit

Permalink
lts/feat : Added prom metrics for drainer lag
Browse files Browse the repository at this point in the history
  • Loading branch information
khuzema786 committed Oct 13, 2023
1 parent 82f9a5b commit a9dc89d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
40 changes: 30 additions & 10 deletions crates/location_tracking_service/src/drainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ use crate::{
use fred::types::{GeoPosition, GeoValue};
use rustc_hash::FxHashMap;
use shared::redis::types::RedisConnectionPool;
use shared::utils::{logger::*, prometheus};
use shared::{
queue_drainer_latency,
utils::{
logger::*,
prometheus::{NEW_RIDE_QUEUE_COUNTER, QUEUE_COUNTER, QUEUE_DRAINER_LATENCY},
},
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc;
use tokio::time::interval;
use tokio::{sync::mpsc, time::Instant};

async fn drain_driver_locations(
driver_locations: &FxHashMap<String, Vec<GeoValue>>,
Expand Down Expand Up @@ -48,9 +54,11 @@ pub async fn run_drainer(
) {
let mut driver_locations: FxHashMap<String, Vec<GeoValue>> = FxHashMap::default();
let mut timer = interval(Duration::from_secs(drainer_delay));
let mut start_time = Instant::now();

let mut new_ride_driver_locations: FxHashMap<String, Vec<GeoValue>> = FxHashMap::default();
let mut new_ride_timer = interval(Duration::from_secs(new_ride_drainer_delay));
let mut new_ride_start_time = Instant::now();

let mut drainer_size = 0;
let mut new_ride_drainer_size = 0;
Expand All @@ -64,7 +72,9 @@ pub async fn run_drainer(
info!(tag = "[Force Draining Queue]", length = %drainer_size);
drain_driver_locations(&driver_locations, bucket_expiry, non_persistent_redis)
.await;
prometheus::QUEUE_COUNTER.reset();
// Cleanup
queue_drainer_latency!("OFF_RIDE", start_time);
QUEUE_COUNTER.reset();
driver_locations.clear();
}
if new_ride_drainer_size > 0 {
Expand All @@ -75,7 +85,9 @@ pub async fn run_drainer(
non_persistent_redis,
)
.await;
prometheus::NEW_RIDE_QUEUE_COUNTER.reset();
// Cleanup
queue_drainer_latency!("NEW_RIDE", new_ride_start_time);
NEW_RIDE_QUEUE_COUNTER.reset();
new_ride_driver_locations.clear();
}
break;
Expand All @@ -98,7 +110,7 @@ pub async fn run_drainer(
member: driver_id.into(),
});
new_ride_drainer_size += 1;
prometheus::NEW_RIDE_QUEUE_COUNTER.inc();
NEW_RIDE_QUEUE_COUNTER.inc();
} else {
driver_locations
.entry(driver_loc_bucket_key(&merchant_id, &city, &vehicle_type, &bucket))
Expand All @@ -111,21 +123,25 @@ pub async fn run_drainer(
member: driver_id.into(),
});
drainer_size += 1;
prometheus::QUEUE_COUNTER.inc();
QUEUE_COUNTER.inc();
}
if drainer_size >= drainer_capacity {
info!(tag = "[Force Draining Queue]", length = %drainer_size);
drain_driver_locations(&driver_locations, bucket_expiry, non_persistent_redis).await;
// Cleanup
prometheus::QUEUE_COUNTER.reset();
queue_drainer_latency!("OFF_RIDE", start_time);
start_time = Instant::now();
QUEUE_COUNTER.reset();
drainer_size = 0;
driver_locations.clear();
}
if new_ride_drainer_size >= drainer_capacity {
info!(tag = "[Force Draining Queue - New Ride]", length = %new_ride_drainer_size);
drain_driver_locations(&new_ride_driver_locations, bucket_expiry, non_persistent_redis).await;
// Cleanup
prometheus::NEW_RIDE_QUEUE_COUNTER.reset();
queue_drainer_latency!("NEW_RIDE", new_ride_start_time);
new_ride_start_time = Instant::now();
NEW_RIDE_QUEUE_COUNTER.reset();
new_ride_drainer_size = 0;
new_ride_driver_locations.clear();
}
Expand All @@ -138,7 +154,9 @@ pub async fn run_drainer(
info!(tag = "[Draining Queue]", length = %drainer_size);
drain_driver_locations(&driver_locations, bucket_expiry, non_persistent_redis).await;
// Cleanup
prometheus::QUEUE_COUNTER.reset();
queue_drainer_latency!("OFF_RIDE", start_time);
start_time = Instant::now();
QUEUE_COUNTER.reset();
drainer_size = 0;
driver_locations.clear();
}
Expand All @@ -148,7 +166,9 @@ pub async fn run_drainer(
info!(tag = "[Draining Queue - New Ride]", length = %new_ride_drainer_size);
drain_driver_locations(&new_ride_driver_locations, bucket_expiry, non_persistent_redis).await;
// Cleanup
prometheus::NEW_RIDE_QUEUE_COUNTER.reset();
queue_drainer_latency!("NEW_RIDE", new_ride_start_time);
new_ride_start_time = Instant::now();
NEW_RIDE_QUEUE_COUNTER.reset();
new_ride_drainer_size = 0;
new_ride_driver_locations.clear();
}
Expand Down
24 changes: 24 additions & 0 deletions crates/shared/src/utils/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ pub static NEW_RIDE_QUEUE_COUNTER: once_cell::sync::Lazy<IntCounter> =
.expect("Failed to register new ride queue counter metrics")
});

pub static QUEUE_DRAINER_LATENCY: once_cell::sync::Lazy<HistogramVec> =
once_cell::sync::Lazy::new(|| {
register_histogram_vec!(
opts!("queue_drainer_latency", "Queue Drainer Montitoring").into(),
&["type"]
)
.expect("Failed to register queue drainer latency metrics")
});

#[macro_export]
macro_rules! incoming_api {
($method:expr, $endpoint:expr, $status:expr, $code:expr, $start:expr) => {
Expand All @@ -53,6 +62,16 @@ macro_rules! call_external_api {
};
}

#[macro_export]
macro_rules! queue_drainer_latency {
($type:expr, $start:expr) => {
let duration = $start.elapsed().as_secs_f64();
QUEUE_DRAINER_LATENCY
.with_label_values(&["type"])
.observe(duration);
};
}

pub fn prometheus_metrics() -> PrometheusMetrics {
let prometheus = PrometheusMetricsBuilder::new("api")
.endpoint("/metrics")
Expand All @@ -79,5 +98,10 @@ pub fn prometheus_metrics() -> PrometheusMetrics {
.register(Box::new(NEW_RIDE_QUEUE_COUNTER.to_owned()))
.expect("Failed to register queue counter metrics");

prometheus
.registry
.register(Box::new(QUEUE_DRAINER_LATENCY.to_owned()))
.expect("Failed to register queue drainer latency metrics");

prometheus
}

0 comments on commit a9dc89d

Please sign in to comment.