Skip to content

Commit

Permalink
error management in metrics recording and retrieving
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Aug 24, 2022
1 parent 9eb2c01 commit e774cff
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/src/ctl/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn print_worker_metrics(query_answer: &QueryAnswer) -> anyhow::Result<()> {
print_cluster_metrics(clusters);
}
QueryAnswer::Metrics(QueryAnswerMetrics::Error(error)) => {
println!("Proxy responded with error: {}\nMaybe check your command.", error)
println!("Error: {}\nMaybe check your command.", error)
}
_ => bail!("The query answer is wrong."),
}
Expand Down
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include = [
]

[dependencies]
anyhow = "^1.0.62"
cookie-factory = "^0.3.2"
foreign-types-shared = "^0.1.1"
hdrhistogram = "^7.5.0"
Expand Down
152 changes: 86 additions & 66 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(dead_code)]
use std::{collections::BTreeMap, str, time::Instant};

use anyhow::Context;
use hdrhistogram::Histogram;

use crate::sozu_command::proxy::{
Expand All @@ -19,20 +20,22 @@ pub enum AggregatedMetric {
}

impl AggregatedMetric {
fn new(metric: MetricData) -> AggregatedMetric {
fn new(metric: MetricData) -> anyhow::Result<AggregatedMetric> {
match metric {
MetricData::Gauge(value) => AggregatedMetric::Gauge(value),
MetricData::GaugeAdd(value) => AggregatedMetric::Gauge(value as usize),
MetricData::Count(value) => AggregatedMetric::Count(value),
MetricData::Gauge(value) => Ok(AggregatedMetric::Gauge(value)),
MetricData::GaugeAdd(value) => Ok(AggregatedMetric::Gauge(value as usize)),
MetricData::Count(value) => Ok(AggregatedMetric::Count(value)),
MetricData::Time(value) => {
//FIXME: do not expect here. We'll need error management all the way up.
// this will compile but may panic. DO NOT SHIP THIS INTO PRODUCTION
let mut histogram = ::hdrhistogram::Histogram::new(3)
.expect("could not create histogram for a time metric");
if let Err(e) = histogram.record(value as u64) {
error!("could not record time metric: {:?}", e);
}
AggregatedMetric::Time(histogram)
let mut histogram = ::hdrhistogram::Histogram::new(3).context(format!(
"Could not create histogram for time metric {:?}",
metric
))?;

histogram
.record(value as u64)
.context(format!("could not record time metric: {:?}", metric))?;

Ok(AggregatedMetric::Time(histogram))
}
}
}
Expand All @@ -50,7 +53,7 @@ impl AggregatedMetric {
}
(&mut AggregatedMetric::Time(ref mut v1), MetricData::Time(v2)) => {
if let Err(e) = (*v1).record(v2 as u64) {
error!("could not record time metric: {:?}", e);
error!("could not record time metric: {:?}", e.to_string());
}
}
(s, m) => panic!(
Expand Down Expand Up @@ -198,7 +201,7 @@ impl LocalDrain {

match worker_metrics {
Ok(worker_metrics) => QueryAnswerMetrics::All(worker_metrics),
Err(e) => QueryAnswerMetrics::Error(e),
Err(e) => QueryAnswerMetrics::Error(e.to_string()),
}
}

Expand All @@ -218,7 +221,7 @@ impl LocalDrain {
pub fn dump_all_metrics(
&mut self,
metric_names: &Vec<String>,
) -> Result<WorkerMetrics, String> {
) -> anyhow::Result<WorkerMetrics> {
Ok(WorkerMetrics {
proxy: Some(self.dump_proxy_metrics(metric_names)),
clusters: Some(self.dump_cluster_metrics(metric_names)?),
Expand All @@ -245,14 +248,17 @@ impl LocalDrain {
pub fn dump_cluster_metrics(
&mut self,
metric_names: &Vec<String>,
) -> Result<BTreeMap<String, ClusterMetricsData>, String> {
) -> anyhow::Result<BTreeMap<String, ClusterMetricsData>> {
let mut cluster_data = BTreeMap::new();

for cluster_id in self.get_cluster_ids() {
cluster_data.insert(
cluster_id.to_owned(),
self.metrics_of_one_cluster(&cluster_id, metric_names)?,
);
let cluster_metrics = self
.metrics_of_one_cluster(&cluster_id, metric_names)
.context(format!(
"Could not retrieve metrics of cluster {}",
cluster_id
))?;
cluster_data.insert(cluster_id.to_owned(), cluster_metrics);
}

Ok(cluster_data)
Expand All @@ -262,16 +268,11 @@ impl LocalDrain {
&self,
cluster_id: &str,
metric_names: &Vec<String>,
) -> Result<ClusterMetricsData, String> {
let raw_metrics = match self.cluster_metrics.get(cluster_id) {
Some(raw_metrics) => raw_metrics,
None => {
return Err(format!(
"No metrics found for cluster with id {}",
cluster_id
))
}
};
) -> anyhow::Result<ClusterMetricsData> {
let raw_metrics = self.cluster_metrics.get(cluster_id).context(format!(
"No metrics found for cluster with id {}",
cluster_id
))?;

let cluster: BTreeMap<String, FilteredData> = raw_metrics
.iter()
Expand All @@ -287,10 +288,13 @@ impl LocalDrain {

let mut backends = BTreeMap::new();
for backend_id in self.get_backend_ids(&cluster_id) {
backends.insert(
backend_id.to_owned(),
self.metrics_of_one_backend(&backend_id, metric_names)?,
);
let backend_metrics = self
.metrics_of_one_backend(&backend_id, metric_names)
.context(format!(
"Could not retrieve metrics for backend {} of cluster {}",
backend_id, cluster_id
))?;
backends.insert(backend_id.to_owned(), backend_metrics);
}
Ok(ClusterMetricsData {
cluster: Some(cluster),
Expand All @@ -302,31 +306,32 @@ impl LocalDrain {
&self,
backend_id: &str,
metric_names: &Vec<String>,
) -> Result<BTreeMap<String, FilteredData>, String> {
match self.cluster_metrics.get(backend_id) {
Some(backend_metrics) => Ok(backend_metrics
.iter()
.filter(|entry| {
if metric_names.is_empty() {
true
} else {
metric_names.contains(entry.0)
}
})
.map(|entry| (entry.0.to_owned(), entry.1.to_filtered()))
.collect::<BTreeMap<String, FilteredData>>()),
None => Err(format!(
"No metrics found for backend with id {}",
backend_id
)),
}
) -> anyhow::Result<BTreeMap<String, FilteredData>> {
let backend_metrics = self.cluster_metrics.get(backend_id).context(format!(
"No metrics found for backend with id {}",
backend_id
))?;

let filtered_backend_metrics = backend_metrics
.iter()
.filter(|entry| {
if metric_names.is_empty() {
true
} else {
metric_names.contains(entry.0)
}
})
.map(|entry| (entry.0.to_owned(), entry.1.to_filtered()))
.collect::<BTreeMap<String, FilteredData>>();

Ok(filtered_backend_metrics)
}

fn query_clusters(
&mut self,
cluster_ids: &Vec<String>,
metric_names: &Vec<String>,
) -> Result<WorkerMetrics, String> {
) -> anyhow::Result<WorkerMetrics> {
debug!("Querying cluster with ids: {:?}", cluster_ids);
let mut clusters: BTreeMap<String, ClusterMetricsData> = BTreeMap::new();

Expand All @@ -348,19 +353,18 @@ impl LocalDrain {
&mut self,
backend_ids: &Vec<String>,
metric_names: &Vec<String>,
) -> Result<WorkerMetrics, String> {
) -> anyhow::Result<WorkerMetrics> {
let mut clusters: BTreeMap<String, ClusterMetricsData> = BTreeMap::new();

for backend_id in backend_ids {
let cluster_id = match self.backend_to_cluster.get(backend_id) {
Some(id) => id.to_owned(),
None => {
return Err(format!(
"No metrics found for backend with id {}",
backend_id
))
}
};
let cluster_id = self
.backend_to_cluster
.get(backend_id)
.context(format!(
"No metrics found for backend with id {}",
backend_id
))?
.to_owned();

let mut backend_map: BTreeMap<String, BTreeMap<String, FilteredData>> = BTreeMap::new();
backend_map.insert(
Expand Down Expand Up @@ -397,7 +401,10 @@ impl LocalDrain {

trace!(
"cluster metric: {} {} {:?} {:?}",
metric_name, cluster_id, backend_id, metric_value
metric_name,
cluster_id,
backend_id,
metric_value
);

let cluster_or_backend_id = match backend_id {
Expand All @@ -417,7 +424,13 @@ impl LocalDrain {
match submap.get_mut(metric_name) {
Some(existing_metric) => existing_metric.update(metric_name, metric_value),
None => {
submap.insert(metric_name.to_owned(), AggregatedMetric::new(metric_value));
let aggregated_metric = match AggregatedMetric::new(metric_value) {
Ok(m) => m,
Err(e) => {
return error!("Could not aggregate metric: {}", e.to_string());
}
};
submap.insert(metric_name.to_owned(), aggregated_metric);
}
}
}
Expand Down Expand Up @@ -446,8 +459,15 @@ impl Subscriber for LocalDrain {
match self.proxy_metrics.get_mut(key) {
Some(stored_metric) => stored_metric.update(key, metric),
None => {
let aggregated_metric = match AggregatedMetric::new(metric) {
Ok(m) => m,
Err(e) => {
return error!("Could not aggregate metric: {}", e.to_string());
}
};

self.proxy_metrics
.insert(String::from(key), AggregatedMetric::new(metric));
.insert(String::from(key), aggregated_metric);
}
}
}
Expand Down

0 comments on commit e774cff

Please sign in to comment.