Skip to content

Commit

Permalink
metrics query in command line
Browse files Browse the repository at this point in the history
this will allow running quick queries like `sozu query metrics
--names=bytes_in,bytes_out --cluster=MyApp` or `sozu query metrics
--names requests --backends=MyApp/MyApp-0`
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 67b98d0 commit 086fdca
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 9 deletions.
21 changes: 21 additions & 0 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,30 @@ pub enum QueryCmd {
fingerprint: Option<String>,
#[structopt(short = "d", long="domain", help="domain name")]
domain: Option<String>
},

#[structopt(name = "metrics", about = "Query metrics matching a specific filter")]
Metrics {
//#[structopt(short = "n", long="names", help="metric names", parse(from_str = split_comma))]
#[structopt(short = "n", long="names", help="metric names", use_delimiter = true)]
names: Vec<String>,
#[structopt(short = "c", long="clusters", help="list of cluster ids", use_delimiter = true)]
clusters: Vec<String>,
#[structopt(short = "b", long="backends", help="list of backend ids", use_delimiter = true, parse(try_from_str = split_slash))]
backends: Vec<(String, String)>,
}
}

fn split_slash(input: &str) -> Result<(String, String), String> {
let mut it = input.split('/').map(|s| s.trim().to_string());

if let (Some(cluster), Some(backend)) = (it.next(), it.next()) {
Ok((cluster, backend))
} else {
Err(format!("could not split cluster id and backend id in {}", input))
}
}

#[derive(StructOpt, PartialEq, Debug)]
pub enum ConfigCmd {
#[structopt(name = "check", about = "check configuration file syntax and exit")]
Expand Down
12 changes: 12 additions & 0 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ impl CommandServer {
}));
}
&Query::Certificates(_) => {}
&Query::Metrics(_) => {}
};

let mut client_tx = self.clients.get_mut(&client_id).unwrap().clone();
Expand Down Expand Up @@ -1018,6 +1019,17 @@ impl CommandServer {
error!("could not send message to client {:?}: {:?}", client_id, e);
}
}
&Query::Metrics(_) => {
debug!("metrics query received: {:?}", data);
let res = client_tx
.send(CommandResponse::new(
request_id.clone(),
CommandStatus::Ok,
"".to_string(),
Some(CommandResponseData::Query(data)),
))
.await;
}
};
})
.detach();
Expand Down
78 changes: 73 additions & 5 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use sozu_command::proxy::{Cluster, ProxyRequestData, Backend, HttpFrontend,
RemoveCertificate, ReplaceCertificate, LoadBalancingParams, RemoveBackend,
TcpListener, ListenerType, TlsVersion, QueryCertificateType,
QueryAnswerCertificate, RemoveListener, ActivateListener, DeactivateListener,
LoadBalancingAlgorithms, PathRule, RulePosition, Route};
LoadBalancingAlgorithms, PathRule, RulePosition, Route, QueryMetricsType};

use serde_json;
use std::collections::{HashMap,HashSet,BTreeMap};
Expand Down Expand Up @@ -645,7 +645,7 @@ pub fn metrics(mut channel: Channel<CommandRequest,CommandResponse>, json: bool)
}

}
break;
break Ok(());
}
}
}
Expand Down Expand Up @@ -765,7 +765,6 @@ fn print_metrics(table_name: &str, data: &BTreeMap<String, BTreeMap<String, Filt

timing_table.printstd();
}
Ok(())
}

pub fn reload_configuration(mut channel: Channel<CommandRequest,CommandResponse>, path: Option<String>, json: bool)
Expand Down Expand Up @@ -1473,8 +1472,77 @@ pub fn query_certificate(mut channel: Channel<CommandRequest,CommandResponse>, j
}
}

pub fn logging_filter(channel: Channel<CommandRequest,CommandResponse>, timeout: u64, filter: &str)
-> Result<(), anyhow::Error> {
pub fn query_metrics(mut channel: Channel<CommandRequest,CommandResponse>, json: bool,
names: Vec<String>, clusters: Vec<String>, mut backends: Vec<(String, String)>) -> Result<(), anyhow::Error> {

let query = if !clusters.is_empty() && !backends.is_empty() {
bail!("Error: Either request a list of clusters or a list of backends");
} else {
if !clusters.is_empty(){
QueryMetricsType::Cluster { metrics: names, clusters }
} else {
QueryMetricsType::Backend { metrics: names, backends }
}
};

let command = CommandRequestData::Proxy(ProxyRequestData::Query(Query::Metrics(query)));

let id = generate_id();
channel.write_message(&CommandRequest::new(
id.clone(),
command,
None,
));

match channel.read_message() {
None => {
bail!("the proxy didn't answer");
},
Some(message) => {
println!("received message: {:?}", message);
if id != message.id {
bail!("received message with invalid id: {:?}", message);
}
match message.status {
CommandStatus::Processing => {
// do nothing here
// for other messages, we would loop over read_message
// until an error or ok message was sent
},
CommandStatus::Error => {
if json {
print_json_response(&message.message);
exit(1);
} else {
bail!("could not query proxy state: {}", message.message);
}
},
CommandStatus::Ok => {
if let Some(CommandResponseData::Query(data)) = message.data {
if json {
print_json_response(&data);
return Ok(());
}

println!("got data: {:#?}", data);
let data = data.iter().filter_map(|(key, value)| {
match value {
QueryAnswer::Metrics(d) => Some((key.clone(), d.clone())),
_ => None,
}
}).collect::<BTreeMap<_,_>>();
print_metrics("Result", &data);
}
}
}
}
}

Ok(())
}


pub fn logging_filter(channel: Channel<CommandRequest,CommandResponse>, timeout: u64, filter: &str) -> Result<(), anyhow::Error> {
order_command(channel, timeout, ProxyRequestData::Logging(String::from(filter)))
}

Expand Down
3 changes: 2 additions & 1 deletion bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use self::command::{add_application,remove_application,dump_state,load_state,
remove_tcp_frontend, add_tcp_frontend, add_certificate, remove_certificate,
replace_certificate, query_application, logging_filter, upgrade_worker,
events, query_certificate, add_tcp_listener, add_http_listener, add_https_listener,
remove_listener, activate_listener, deactivate_listener, reload_configuration};
remove_listener, activate_listener, deactivate_listener, reload_configuration, query_metrics};

pub fn ctl(matches: Sozu) -> Result<(), anyhow::Error>{

Expand Down Expand Up @@ -143,6 +143,7 @@ pub fn ctl(matches: Sozu) -> Result<(), anyhow::Error>{
match cmd {
QueryCmd::Applications{ id, domain } => query_application(channel, json, id, domain),
QueryCmd::Certificates{ fingerprint, domain } => query_certificate(channel, json, fingerprint, domain),
QueryCmd::Metrics{ names, clusters, backends } => query_metrics(channel, json, names, clusters, backends),
}
},
SubCmd::Config{ cmd: _ } => Ok(()), // noop, handled at the beginning of the method
Expand Down
10 changes: 10 additions & 0 deletions command/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ pub struct TcpListener {
pub enum Query {
Applications(QueryApplicationType),
Certificates(QueryCertificateType),
Metrics(QueryMetricsType),
ApplicationsHashes,
}

Expand All @@ -754,13 +755,22 @@ pub enum QueryCertificateType {
Fingerprint(Vec<u8>),
}

#[derive(Debug,Clone,PartialEq,Eq,Hash, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum QueryMetricsType {
Cluster { metrics: Vec<String>, clusters: Vec<String> },
// tuple cluster_id, backend_id
Backend { metrics: Vec<String>, backends: Vec<(String, String)> },
}

#[derive(Debug,Clone,PartialEq,Eq, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum QueryAnswer {
Applications(Vec<QueryAnswerApplication>),
/// application id, hash of application information
ApplicationsHashes(BTreeMap<String, u64>),
Certificates(QueryAnswerCertificate),
Metrics(BTreeMap<String, FilteredData>),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down
133 changes: 131 additions & 2 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use time::OffsetDateTime;
use std::convert::TryInto;
use std::collections::BTreeMap;
use hdrhistogram::Histogram;
use sozu_command::proxy::{FilteredData,MetricsData,Percentiles,AppMetricsData};
use sozu_command::proxy::{FilteredData,MetricsData,Percentiles,AppMetricsData,QueryMetricsType};

use super::{MetricData,Subscriber};

Expand Down Expand Up @@ -134,7 +134,7 @@ impl LocalDrain {

pub fn dump_metrics_data(&mut self) -> MetricsData {
MetricsData {
proxy: self.dump_process_data(),
proxy: self.dump_process_data(),
clusters: self.dump_cluster_data(),
}
}
Expand All @@ -147,6 +147,135 @@ impl LocalDrain {
data
}

pub fn query(&mut self, q: &QueryMetricsType) -> BTreeMap<String, FilteredData> {
info!("GOT QUERY: {:?}", q);
match q {
QueryMetricsType::Cluster { metrics, clusters } => {
self.query_cluster(metrics, clusters)
},
QueryMetricsType::Backend { metrics, backends } => {
self.query_backend(metrics, backends)
},
}
}

fn query_cluster(&mut self, metrics: &Vec<String>, clusters: &Vec<String>) -> BTreeMap<String, FilteredData> {
let mut apps: BTreeMap<String, FilteredData> = BTreeMap::new();

info!("current metrics: {:#?}", self.metrics);
for prefix_key in metrics.iter() {
for cluster_id in clusters.iter() {
let key = format!("{}\t{}\x1f", prefix_key, cluster_id);

let res = self.metrics.get(&key);
if res.is_none() {
error!("unknown metric key {}", key);
continue
}
let (meta, kind) = res.unwrap();

let end = format!("{}\x7F", key);
for res in self.db.range(key.as_bytes()..end.as_bytes()) {
let (k, v) = res.unwrap();
info!("looking at key: {:?}", std::str::from_utf8(&k));
match meta {
MetricMeta::Cluster => {
let mut it1 = k.split(|c: &u8| *c == b'\x1F');
let k2 = it1.next().unwrap();
let mut it2 = k2.split(|c: &u8| *c == b'\t');
let key = std::str::from_utf8(it2.next().unwrap()).unwrap();
let cluster_id = std::str::from_utf8(it2.next().unwrap()).unwrap();
let timestamp_with_prefix = it1.next().unwrap();
// remove the leading \t
let timestamp:i64 = std::str::from_utf8(&timestamp_with_prefix[1..]).unwrap().parse().unwrap();

info!("looking at key = {}, id = {}, ts = {}",
key, cluster_id, timestamp);

let output_key = format!("{}.{}", cluster_id, key);
match kind {
MetricKind::Gauge => {
apps.insert(output_key, FilteredData::Gauge(usize::from_le_bytes((*v).try_into().unwrap())));
},
MetricKind::Count => {
apps.insert(output_key, FilteredData::Count(i64::from_le_bytes((*v).try_into().unwrap())));
},
MetricKind::Time => {
//unimplemented for now
}
}
},
MetricMeta::ClusterBackend => {
error!("metric key {} is for backend level metrics", key);
}
}
}
}
}

info!("WILL RETURN: {:#?}", apps);
apps
}

fn query_backend(&mut self, metrics: &Vec<String>, backends: &Vec<(String,String)>) -> BTreeMap<String, FilteredData> {
let mut backend_data: BTreeMap<String, FilteredData> = BTreeMap::new();

info!("current metrics: {:#?}", self.metrics);
for prefix_key in metrics.iter() {
for (cluster_id, backend_id) in backends.iter() {
let key = format!("{}\t{}\t{} ", prefix_key, cluster_id, backend_id);

let res = self.metrics.get(&key);
if res.is_none() {
error!("unknown metric key {}", key);
continue
}
let (meta, kind) = res.unwrap();

let end = format!("{}\x7F", key);
for res in self.db.range(key.as_bytes()..end.as_bytes()) {
let (k, v) = res.unwrap();
info!("looking at key: {:?}", std::str::from_utf8(&k));
match meta {
MetricMeta::Cluster => {
error!("metric key {} is for cluster level metrics", key);
},
MetricMeta::ClusterBackend => {
let mut it = k.split(|c: &u8| *c == b'\t');
let key = std::str::from_utf8(it.next().unwrap()).unwrap();
let app_id = std::str::from_utf8(it.next().unwrap()).unwrap();
let backend_id = std::str::from_utf8(it.next().unwrap()).unwrap();
let timestamp:i64 = std::str::from_utf8(it.next().unwrap()).unwrap().parse().unwrap();

info!("looking at key = {}, cluster id = {}, bid: {}, ts = {}",
key, app_id, backend_id, timestamp);

let output_key = format!("{}.{}.{}", cluster_id, backend_id, key);
match kind {
MetricKind::Gauge => {
backend_data.insert(output_key, FilteredData::Gauge(usize::from_le_bytes((*v).try_into().unwrap())));
},
MetricKind::Count => {
backend_data.insert(output_key, FilteredData::Count(i64::from_le_bytes((*v).try_into().unwrap())));
},
MetricKind::Time => {
//unimplemented for now
}
}
}
}

}


}
}

info!("WILL RETURN: {:#?}", backend_data);
backend_data
}


pub fn dump_cluster_data(&mut self) -> BTreeMap<String,AppMetricsData> {
let mut apps = BTreeMap::new();

Expand Down
4 changes: 4 additions & 0 deletions lib/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ impl Aggregator {
self.local.dump_process_data()
}

pub fn query(&mut self, q: &sozu_command_lib::proxy::QueryMetricsType) -> BTreeMap<String, FilteredData> {
self.local.query(q)
}

pub fn clear_local(&mut self, now: time::OffsetDateTime) {
self.local.clear(now);
}
Expand Down
13 changes: 12 additions & 1 deletion lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,18 @@ impl Server {
return
},
}
}
},
&Query::Metrics(ref q) => {
METRICS.with(|metrics| {
push_queue(ProxyResponse {
id: message.id.clone(),
status: ProxyResponseStatus::Ok,
data: Some(ProxyResponseData::Query(
QueryAnswer::Metrics((*metrics.borrow_mut()).query(q))))
});
});
return
},
}
}

Expand Down

0 comments on commit 086fdca

Please sign in to comment.