Skip to content

Commit

Permalink
rename MetricData to MetricValue
Browse files Browse the repository at this point in the history
rename NetworkDrain::send_data() to send_metrics()
  • Loading branch information
Keksoj committed Jun 16, 2023
1 parent 737072f commit 0ff7b31
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 69 deletions.
26 changes: 13 additions & 13 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sozu_command::proto::command::{
ResponseContent, WorkerMetrics,
};

use super::{MetricData, Subscriber};
use super::{MetricValue, Subscriber};

/// This is how the metrics are stored in the local drain
#[derive(Debug, Clone)]
Expand All @@ -20,12 +20,12 @@ pub enum AggregatedMetric {
}

impl AggregatedMetric {
fn new(metric: MetricData) -> anyhow::Result<AggregatedMetric> {
fn new(metric: MetricValue) -> anyhow::Result<AggregatedMetric> {
match metric {
MetricData::Gauge(value) => Ok(AggregatedMetric::Gauge(value)),
MetricData::GaugeAdd(value) => Ok(AggregatedMetric::Gauge(value as usize)),
MetricData::Count(value) => Ok(AggregatedMetric::Count(value)),
MetricData::Time(value) => {
MetricValue::Gauge(value) => Ok(AggregatedMetric::Gauge(value)),
MetricValue::GaugeAdd(value) => Ok(AggregatedMetric::Gauge(value as usize)),
MetricValue::Count(value) => Ok(AggregatedMetric::Count(value)),
MetricValue::Time(value) => {
let mut histogram = ::hdrhistogram::Histogram::new(3).context(format!(
"Could not create histogram for time metric {metric:?}"
))?;
Expand All @@ -39,18 +39,18 @@ impl AggregatedMetric {
}
}

fn update(&mut self, key: &str, m: MetricData) {
fn update(&mut self, key: &str, m: MetricValue) {
match (self, m) {
(&mut AggregatedMetric::Gauge(ref mut v1), MetricData::Gauge(v2)) => {
(&mut AggregatedMetric::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
*v1 = v2;
}
(&mut AggregatedMetric::Gauge(ref mut v1), MetricData::GaugeAdd(v2)) => {
(&mut AggregatedMetric::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
*v1 = (*v1 as i64 + v2) as usize;
}
(&mut AggregatedMetric::Count(ref mut v1), MetricData::Count(v2)) => {
(&mut AggregatedMetric::Count(ref mut v1), MetricValue::Count(v2)) => {
*v1 += v2;
}
(&mut AggregatedMetric::Time(ref mut v1), MetricData::Time(v2)) => {
(&mut AggregatedMetric::Time(ref mut v1), MetricValue::Time(v2)) => {
if let Err(e) = (*v1).record(v2 as u64) {
error!("could not record time metric: {:?}", e.to_string());
}
Expand Down Expand Up @@ -383,7 +383,7 @@ impl LocalDrain {
metric_name: &str,
cluster_id: &str,
backend_id: Option<&str>,
metric_value: MetricData,
metric_value: MetricValue,
) {
if !self.enabled {
return;
Expand Down Expand Up @@ -432,7 +432,7 @@ impl Subscriber for LocalDrain {
key: &'static str,
cluster_id: Option<&str>,
backend_id: Option<&str>,
metric: MetricData,
metric: MetricValue,
) {
trace!(
"receiving metric with key {}, cluster_id: {:?}, backend_id: {:?}, metric data: {:?}",
Expand Down
71 changes: 36 additions & 35 deletions lib/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,27 @@ thread_local! {
pub static METRICS: RefCell<Aggregator> = RefCell::new(Aggregator::new(String::from("sozu")));
}

/// We should rename this to MetricValue
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MetricData {
pub enum MetricValue {
Gauge(usize),
GaugeAdd(i64),
Count(i64),
Time(usize),
}

impl MetricData {
impl MetricValue {
fn is_time(&self) -> bool {
matches!(self, &MetricData::Time(_))
matches!(self, &MetricValue::Time(_))
}

fn update(&mut self, key: &'static str, m: MetricData) -> bool {
fn update(&mut self, key: &'static str, m: MetricValue) -> bool {
match (self, m) {
(&mut MetricData::Gauge(ref mut v1), MetricData::Gauge(v2)) => {
(&mut MetricValue::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
let changed = *v1 != v2;
*v1 = v2;
changed
}
(&mut MetricData::Gauge(ref mut v1), MetricData::GaugeAdd(v2)) => {
(&mut MetricValue::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
debug_assert!(
*v1 as i64 + v2 >= 0,
"metric {key} underflow: previous value: {v1}, adding: {v2}"
Expand All @@ -63,7 +62,7 @@ impl MetricData {

changed
}
(&mut MetricData::Count(ref mut v1), MetricData::Count(v2)) => {
(&mut MetricValue::Count(ref mut v1), MetricValue::Count(v2)) => {
let changed = v2 != 0;
*v1 += v2;
changed
Expand All @@ -76,30 +75,30 @@ impl MetricData {
}

#[derive(Debug, Clone)]
pub struct StoredMetricData {
pub struct StoredMetricValue {
last_sent: Instant,
updated: bool,
data: MetricData,
data: MetricValue,
}

impl StoredMetricData {
pub fn new(last_sent: Instant, data: MetricData) -> StoredMetricData {
StoredMetricData {
impl StoredMetricValue {
pub fn new(last_sent: Instant, data: MetricValue) -> StoredMetricValue {
StoredMetricValue {
last_sent,
updated: true,
data: if let MetricData::GaugeAdd(v) = data {
data: if let MetricValue::GaugeAdd(v) = data {
if v >= 0 {
MetricData::Gauge(v as usize)
MetricValue::Gauge(v as usize)
} else {
MetricData::Gauge(0)
MetricValue::Gauge(0)
}
} else {
data
},
}
}

pub fn update(&mut self, key: &'static str, m: MetricData) {
pub fn update(&mut self, key: &'static str, m: MetricValue) {
let updated = self.data.update(key, m);
if !self.updated {
self.updated = updated;
Expand Down Expand Up @@ -137,7 +136,7 @@ pub trait Subscriber {
label: &'static str,
cluster_id: Option<&str>,
backend_id: Option<&str>,
metric: MetricData,
metric: MetricValue,
);
}

Expand Down Expand Up @@ -190,15 +189,15 @@ impl Aggregator {
}

pub fn count_add(&mut self, key: &'static str, count_value: i64) {
self.receive_metric(key, None, None, MetricData::Count(count_value));
self.receive_metric(key, None, None, MetricValue::Count(count_value));
}

pub fn set_gauge(&mut self, key: &'static str, gauge_value: usize) {
self.receive_metric(key, None, None, MetricData::Gauge(gauge_value));
self.receive_metric(key, None, None, MetricValue::Gauge(gauge_value));
}

pub fn gauge_add(&mut self, key: &'static str, gauge_value: i64) {
self.receive_metric(key, None, None, MetricData::GaugeAdd(gauge_value));
self.receive_metric(key, None, None, MetricValue::GaugeAdd(gauge_value));
}

pub fn writable(&mut self) {
Expand All @@ -209,7 +208,7 @@ impl Aggregator {

pub fn send_data(&mut self) {
if let Some(ref mut net) = self.network.as_mut() {
net.send_data();
net.send_metrics();
}
}

Expand All @@ -236,7 +235,7 @@ impl Subscriber for Aggregator {
label: &'static str,
cluster_id: Option<&str>,
backend_id: Option<&str>,
metric: MetricData,
metric: MetricValue,
) {
if let Some(ref mut net) = self.network.as_mut() {
net.receive_metric(label, cluster_id, backend_id, metric.to_owned());
Expand Down Expand Up @@ -277,6 +276,7 @@ pub fn udp_bind() -> anyhow::Result<UdpSocket> {
UdpSocket::bind(address).with_context(|| "Could not bind to 0.0.0.0:0 udp socket")
}

/// adds a value to a counter
#[macro_export]
macro_rules! count (
($key:expr, $value: expr) => ({
Expand All @@ -287,14 +287,15 @@ macro_rules! count (
})
);

/// adds 1 to a counter
#[macro_export]
macro_rules! incr (
($key:expr) => (count!($key, 1));
($key:expr, $cluster_id:expr, $backend_id:expr) => {
use $crate::metrics::Subscriber;

$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricData::Count(1));
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::Count(1));
});
}
);
Expand Down Expand Up @@ -327,51 +328,51 @@ macro_rules! gauge_add (
let v = $value;

$crate::metrics::METRICS.with(|metrics| {
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricData::GaugeAdd(v));
(*metrics.borrow_mut()).receive_metric($key, $cluster_id, $backend_id, $crate::metrics::MetricValue::GaugeAdd(v));
});
}
);

#[macro_export]
macro_rules! time (
($key:expr, $value: expr) => ({
use $crate::metrics::{MetricData,Subscriber};
use $crate::metrics::{MetricValue,Subscriber};
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
let m = &mut *metrics.borrow_mut();

m.receive_metric($key, None, None, MetricData::Time(v as usize));
m.receive_metric($key, None, None, MetricValue::Time(v as usize));
});
});
($key:expr, $cluster_id:expr, $value: expr) => ({
use $crate::metrics::{MetricData,Subscriber};
use $crate::metrics::{MetricValue,Subscriber};
let v = $value;
$crate::metrics::METRICS.with(|metrics| {
let m = &mut *metrics.borrow_mut();
let cluster: &str = $cluster_id;

m.receive_metric($key, Some(cluster), None, MetricData::Time(v as usize));
m.receive_metric($key, Some(cluster), None, MetricValue::Time(v as usize));
});
})
);

#[macro_export]
macro_rules! record_backend_metrics (
($cluster_id:expr, $backend_id:expr, $response_time: expr, $backend_connection_time: expr, $bin: expr, $bout: expr) => {
use $crate::metrics::{MetricData,Subscriber};
use $crate::metrics::{MetricValue,Subscriber};
$crate::metrics::METRICS.with(|metrics| {
let m = &mut *metrics.borrow_mut();
let cluster_id: &str = $cluster_id;
let backend_id: &str = $backend_id;

m.receive_metric("bytes_in", Some(cluster_id), Some(backend_id), MetricData::Count($bin as i64));
m.receive_metric("bytes_out", Some(cluster_id), Some(backend_id), MetricData::Count($bout as i64));
m.receive_metric("backend_response_time", Some(cluster_id), Some(backend_id), MetricData::Time($response_time as usize));
m.receive_metric("bytes_in", Some(cluster_id), Some(backend_id), MetricValue::Count($bin as i64));
m.receive_metric("bytes_out", Some(cluster_id), Some(backend_id), MetricValue::Count($bout as i64));
m.receive_metric("backend_response_time", Some(cluster_id), Some(backend_id), MetricValue::Time($response_time as usize));
if let Some(t) = $backend_connection_time {
m.receive_metric("backend_connection_time", Some(cluster_id), Some(backend_id), MetricData::Time(t.whole_milliseconds() as usize));
m.receive_metric("backend_connection_time", Some(cluster_id), Some(backend_id), MetricValue::Time(t.whole_milliseconds() as usize));
}

m.receive_metric("requests", Some(cluster_id), Some(backend_id), MetricData::Count(1));
m.receive_metric("requests", Some(cluster_id), Some(backend_id), MetricValue::Count(1));
});
}
);

0 comments on commit 0ff7b31

Please sign in to comment.