Skip to content

Commit

Permalink
list both proxy metric names and cluster metric names, refactoring, v…
Browse files Browse the repository at this point in the history
…ariable renaming
  • Loading branch information
Keksoj committed Aug 17, 2022
1 parent 0cac89c commit 4962f38
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 83 deletions.
52 changes: 39 additions & 13 deletions bin/src/ctl/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,20 +602,46 @@ fn format_tags_to_string(tags: Option<&BTreeMap<String, String>>) -> String {
}

fn print_available_metrics(answers: &BTreeMap<String, QueryAnswer>) -> anyhow::Result<()> {
let metrics: HashSet<String> = answers
.values()
.filter_map(|value| match value {
QueryAnswer::Metrics(QueryAnswerMetrics::List(v)) => Some(v.iter()),
_ => None,
})
.flatten()
.map(|s| s.replace("\t", "."))
.collect();
let mut metrics: Vec<_> = metrics.iter().collect();
metrics.sort();
let mut available_metrics: (HashSet<String>, HashSet<String>) =
(HashSet::new(), HashSet::new());
for query_answer in answers.values() {
match query_answer {
QueryAnswer::Metrics(QueryAnswerMetrics::List((
proxy_metric_keys,
cluster_metric_keys,
))) => {
for key in proxy_metric_keys {
available_metrics
.0
.insert(key.replace("\t", ".").to_owned());
}
for key in cluster_metric_keys {
available_metrics
.1
.insert(key.replace("\t", ".").to_owned());
}
}
_ => bail!("The proxy responded nonsense instead of metric names"),
}
}
let proxy_metrics_names = available_metrics
.0
.iter()
.map(|s| s.to_owned())
.collect::<Vec<String>>();
let cluster_metrics_names = available_metrics
.1
.iter()
.map(|s| s.to_owned())
.collect::<Vec<String>>();

println!("Available metrics on the proxy level:");
for metric in metrics {
println!("\t{}", metric);
for metric_name in proxy_metrics_names {
println!("\t{}", metric_name);
}
println!("Available metrics on the cluster level:");
for metric_name in cluster_metrics_names {
println!("\t{}", metric_name);
}
Ok(())
}
3 changes: 2 additions & 1 deletion command/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,8 @@ pub enum QueryAnswerCertificate {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum QueryAnswerMetrics {
List(Vec<String>),
/// (list of proxy metrics, list of cluster metrics)
List((Vec<String>, Vec<String>)),
/// cluster_id -> (key -> metric)
Cluster(BTreeMap<String, BTreeMap<String, FilteredData>>),
/// cluster_id -> (backend_id -> (key -> metric))
Expand Down
125 changes: 60 additions & 65 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]
use std::{collections::BTreeMap, str, time::Instant};use std::convert::TryInto;
use std::convert::TryInto;
use std::{collections::BTreeMap, str, time::Instant};

use hdrhistogram::Histogram;
use time::{Duration, OffsetDateTime};
Expand Down Expand Up @@ -115,7 +116,9 @@ pub struct LocalDrain {
pub created: Instant,
pub cluster_tree: BTreeMap<String, u64>,
pub backend_tree: BTreeMap<String, u64>,
/// metric_name -> metric value
pub proxy_metrics: BTreeMap<String, AggregatedMetric>,
/// keyTABcluster_id -> (metric meta, metric kind)
cluster_metrics: BTreeMap<String, (MetricMeta, MetricKind)>,
use_tagged_metrics: bool,
origin: String,
Expand Down Expand Up @@ -189,14 +192,17 @@ impl LocalDrain {
}

pub fn query(&mut self, q: &QueryMetricsType) -> QueryAnswerMetrics {
debug!(
"The local drain received this query: {:?}\n, here is the local drain: {:#?}",
q, self
trace!(
"The local drain received this query: {:?}\n, here is the local drain: {:?}",
q,
self
);
match q {
QueryMetricsType::List => {
debug!("Here are the metrics keys: {:?}", self.proxy_metrics.keys());
QueryAnswerMetrics::List(self.proxy_metrics.keys().cloned().collect())
let proxy_metrics_names = self.proxy_metrics.keys().cloned().collect();
let cluster_metrics_names = Vec::new();
QueryAnswerMetrics::List((proxy_metrics_names, cluster_metrics_names))
}
QueryMetricsType::Cluster {
metrics,
Expand All @@ -212,7 +218,7 @@ impl LocalDrain {
}
}

fn query_metric(
fn get_metric_from_tree(
&self,
key: &str,
is_backend: bool,
Expand Down Expand Up @@ -273,13 +279,14 @@ impl LocalDrain {

fn query_clusters(
&mut self,
metrics: &[String],
metric_names: &[String],
cluster_ids: &[String],
date: Option<i64>,
) -> QueryAnswerMetrics {
let mut clusters: BTreeMap<String, BTreeMap<String, FilteredData>> = BTreeMap::new();
debug!("Querying cluster with ids: {:?}", cluster_ids);
let mut response: BTreeMap<String, BTreeMap<String, FilteredData>> = BTreeMap::new();
for cluster_id in cluster_ids.iter() {
clusters.insert(cluster_id.to_string(), BTreeMap::new());
response.insert(cluster_id.to_string(), BTreeMap::new());
}

let timestamp = date.unwrap_or_else(|| {
Expand All @@ -291,45 +298,48 @@ impl LocalDrain {
trace!("current metrics: {:#?}", self.cluster_metrics);

// TODO: check that the cluster ids exist, and if not, reply with error
for prefix_key in metrics.iter() {
for metric_name in metric_names.iter() {
for cluster_id in cluster_ids.iter() {
let key = format!("{}\t{}", prefix_key, cluster_id);
let key = format!("{}\t{}", metric_name, cluster_id);

let res = self.cluster_metrics.get(&key);
if res.is_none() {
//error!("unknown metric key {}", key);
continue;
}
let (meta, kind) = res.unwrap();
let (meta, kind) = match self.cluster_metrics.get(&key) {
Some((m, k)) => (m, k),
None => {
error!("Did not find cluster metrics with the metric key '{}'", key);
continue;
}
};

if *meta == MetricMeta::ClusterBackend {
error!("{} is a backend metric", key);
continue;
}

if let Some(filtered_data) = self.query_metric(&key, false, timestamp, *kind) {
clusters
if let Some(filtered_data) =
self.get_metric_from_tree(&key, false, timestamp, *kind)
{
response
.get_mut(cluster_id)
.unwrap()
.insert(key.to_string(), filtered_data);
}
}
}

trace!("query result: {:#?}", clusters);
QueryAnswerMetrics::Cluster(clusters)
trace!("query result: {:#?}", response);
QueryAnswerMetrics::Cluster(response)
}

fn query_backends(
&mut self,
metrics: &[String],
metric_names: &[String],
backends: &[(String, String)],
date: Option<i64>,
) -> QueryAnswerMetrics {
let mut backend_data: BTreeMap<String, BTreeMap<String, BTreeMap<String, FilteredData>>> =
let mut response: BTreeMap<String, BTreeMap<String, BTreeMap<String, FilteredData>>> =
BTreeMap::new();
for (cluster_id, backend_id) in backends.iter() {
let t = backend_data
let t = response
.entry(cluster_id.to_string())
.or_insert_with(BTreeMap::new);
t.insert(backend_id.to_string(), BTreeMap::new());
Expand All @@ -342,25 +352,27 @@ impl LocalDrain {
});

trace!("current metrics: {:#?}", self.cluster_metrics);
for prefix_key in metrics.iter() {
for metric_name in metric_names.iter() {
// TODO: check that the backend_ids & cluster_ids exist, and if not, reply with error
for (cluster_id, backend_id) in backends.iter() {
let key = format!("{}\t{}\t{}", prefix_key, cluster_id, backend_id);
let key = format!("{}\t{}\t{}", metric_name, cluster_id, backend_id);

let res = self.cluster_metrics.get(&key);
if res.is_none() {
//error!("unknown metric key {}", key);
continue;
}
let (meta, kind) = res.unwrap();
let (meta, kind) = match self.cluster_metrics.get(&key) {
Some((m, k)) => (m, k),
None => {
error!("Did not find backend metrics with the metric key '{}'", key);
continue;
}
};

if *meta == MetricMeta::Cluster {
error!("{} is a cluster metric", key);
continue;
}

if let Some(filtered_data) = self.query_metric(&key, true, timestamp, *kind) {
backend_data
if let Some(filtered_data) = self.get_metric_from_tree(&key, true, timestamp, *kind)
{
response
.get_mut(cluster_id)
.unwrap()
.get_mut(backend_id)
Expand All @@ -370,16 +382,12 @@ impl LocalDrain {
}
}

trace!("query result: {:#?}", backend_data);
QueryAnswerMetrics::Backend(backend_data)
trace!("query result: {:#?}", response);
QueryAnswerMetrics::Backend(response)
}

fn get_last_before(&self, start: &str, end: &str, is_backend: bool) -> Option<u64> {
let tree = self.tree(is_backend);
// let tree = match is_backend {
// true => &self.backend_tree,
// false => &self.cluster_tree,
// };

//if let Some((k, v)) = tree.get_lt(end.as_bytes())? {
if let Some((k, v)) = tree.range(start.to_string()..end.to_string()).rev().next() {
Expand Down Expand Up @@ -561,33 +569,21 @@ impl LocalDrain {
return;
}

self.store_metric(
&format!("{}\t{}", key, cluster_id),
cluster_id,
None,
&metric,
);
self.store_metric(&format!("{}\t{}", key, cluster_id), None, &metric);

// backend metrics are stored twice, in cluster_metrics and backend_metrics
if let Some(bid) = backend_id {
self.store_metric(
&format!("{}\t{}\t{}", key, cluster_id, bid),
cluster_id,
backend_id,
&metric,
);
}
}

fn store_metric(
&mut self,
key_prefix: &str,
_id: &str,
backend_id: Option<&str>,
metric: &MetricData,
) {
debug!("Storing metrics with key prefix: {}", key_prefix);
if !self.cluster_metrics.contains_key(key_prefix) {
fn store_metric(&mut self, key: &str, backend_id: Option<&str>, metric: &MetricData) {
debug!("Storing metrics with key prefix: {}", key);
if !self.cluster_metrics.contains_key(key) {
let kind = match metric {
MetricData::Gauge(_) => MetricKind::Gauge,
MetricData::GaugeAdd(_) => MetricKind::Gauge,
Expand All @@ -599,19 +595,18 @@ impl LocalDrain {
None => MetricMeta::Cluster,
};

self.cluster_metrics
.insert(key_prefix.to_string(), (meta, kind));
self.cluster_metrics.insert(key.to_string(), (meta, kind));
}

match metric {
MetricData::Gauge(i) => {
self.store_gauge(key_prefix, *i, backend_id.is_some());
self.store_gauge(key, *i, backend_id.is_some());
}
MetricData::GaugeAdd(i) => {
self.add_gauge(key_prefix, *i, backend_id.is_some());
self.add_gauge(key, *i, backend_id.is_some());
}
MetricData::Count(i) => {
self.store_count(key_prefix, *i, backend_id.is_some());
self.store_count(key, *i, backend_id.is_some());
}
MetricData::Time(_) => {}
}
Expand Down Expand Up @@ -1140,10 +1135,10 @@ impl Subscriber for LocalDrain {
backend_id: Option<&str>,
metric: MetricData,
) {
println!(
"receiving metric with key {}, cluster_id: {:?}, backend_id: {:?}, metric data: {:?}",
key, cluster_id, backend_id, metric
);
// println!(
// "receiving metric with key {}, cluster_id: {:?}, backend_id: {:?}, metric data: {:?}",
// key, cluster_id, backend_id, metric
// );

// cluster metric
if let Some(id) = cluster_id {
Expand Down
5 changes: 1 addition & 4 deletions lib/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod writer;

use std::{
cell::RefCell,
collections::BTreeMap,
io::{self, Write},
net::SocketAddr,
str,
Expand All @@ -13,9 +12,7 @@ use std::{

use mio::net::UdpSocket;

use crate::sozu_command::proxy::{
FilteredData, MetricsConfiguration, QueryAnswerMetrics, QueryMetricsType, WorkerMetrics,
};
use crate::sozu_command::proxy::{MetricsConfiguration, QueryAnswerMetrics, QueryMetricsType};

use self::{local_drain::LocalDrain, network_drain::NetworkDrain};

Expand Down

0 comments on commit 4962f38

Please sign in to comment.