Skip to content

Commit

Permalink
refactor metrics query format and CLI metric command
Browse files Browse the repository at this point in the history
the command line now starts with:q
  • Loading branch information
Keksoj committed Aug 24, 2022
1 parent 23ca6eb commit c02b130
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 250 deletions.
68 changes: 36 additions & 32 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ pub enum SubCmd {
about = "gets statistics on the main process and its workers"
)]
Metrics {
#[clap(
short = 'j',
long = "json",
help = "Print the command result in JSON format",
global = true
)]
json: bool,
#[clap(subcommand)]
cmd: MetricsCmd,
},
Expand Down Expand Up @@ -202,6 +209,35 @@ pub enum MetricsCmd {
Disable,
#[clap(name = "clear", about = "Deletes local metrics data")]
Clear,
#[clap(name = "get", about = "get all or filtered metrics")]
Get {
#[clap(short, long, help = "list the available metrics on the proxy level")]
list: bool,
#[clap(short, long, help = "refresh metrics results (in seconds)")]
refresh: Option<u32>,
#[clap(
short = 'n',
long = "names",
help = "Filter by metric names. Coma-separated list.",
use_delimiter = true
)]
names: Vec<String>,
#[clap(
short = 'k',
long = "clusters",
help = "list of cluster ids (= application id)",
use_delimiter = true
)]
clusters: Vec<String>,
#[clap(
short = 'b',
long="backends",
help="coma-separated list of backends, 'one_backend_id, other_backend_id'",
use_delimiter = true
// parse(try_from_str = split_slash)
)]
backends: Vec<String>,
},
}

#[derive(Subcommand, PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -732,38 +768,6 @@ pub enum QueryCmd {
#[clap(short = 'd', long = "domain", help = "domain name")]
domain: Option<String>,
},
#[clap(
name = "metrics",
about = "Query all metrics, or matching a specific filter"
)]
Metrics {
#[clap(short, long, help = "list the available metrics on the proxy level")]
list: bool,
#[clap(short, long, help = "refresh metrics results (in seconds)")]
refresh: Option<u32>,
#[clap(
short = 'n',
long = "names",
help = "Filter by metric names. Coma-separated list.",
use_delimiter = true
)]
names: Vec<String>,
#[clap(
short = 'k',
long = "clusters",
help = "list of cluster ids (= application id)",
use_delimiter = true
)]
clusters: Vec<String>,
#[clap(
short = 'b',
long="backends",
help="coma-separated list of backends, 'one_backend_id, other_backend_id'",
use_delimiter = true
// parse(try_from_str = split_slash)
)]
backends: Vec<String>,
},
}

#[derive(Subcommand, PartialEq, Clone, Debug)]
Expand Down
52 changes: 18 additions & 34 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use sozu_command_lib::{
parser::parse_several_commands,
proxy::{
AggregatedMetricsData, MetricsConfiguration, ProxyRequest, ProxyRequestData,
ProxyResponseData, ProxyResponseStatus, Query, QueryAnswer, QueryAnswerMetrics,
QueryClusterType, QueryMetricsType, Route, TcpFrontend,
ProxyResponseData, ProxyResponseStatus, Query, QueryAnswer, QueryClusterType, Route,
TcpFrontend,
},
scm_socket::Listeners,
state::get_cluster_ids_by_domain,
Expand Down Expand Up @@ -73,7 +73,9 @@ impl CommandServer {
self.upgrade_worker(request_identifier, worker_id).await
}
CommandRequestData::Proxy(proxy_request) => match proxy_request {
ProxyRequestData::Metrics(config) => self.metrics(request_identifier, config).await,
ProxyRequestData::ConfigureMetrics(config) => {
self.configure_metrics(request_identifier, config).await
}
ProxyRequestData::Query(query) => self.query(request_identifier, query).await,
ProxyRequestData::Logging(logging_filter) => self.set_logging_level(logging_filter),
// we should have something like
Expand Down Expand Up @@ -857,7 +859,7 @@ impl CommandServer {

// This handles the CLI's "metrics enable", "metrics disable", "metrics clear"
// To get the proxy's metrics, the cli command is "query metrics", handled by the query() function
pub async fn metrics(
pub async fn configure_metrics(
&mut self,
request_identifier: RequestIdentifier,
config: MetricsConfiguration,
Expand All @@ -871,7 +873,10 @@ impl CommandServer {
{
let req_id = format!("{}-metrics-{}", request_identifier.client, worker.id);
worker
.send(req_id.clone(), ProxyRequestData::Metrics(config.clone()))
.send(
req_id.clone(),
ProxyRequestData::ConfigureMetrics(config.clone()),
)
.await;
count += 1;
self.in_flight.insert(req_id, (metrics_tx.clone(), 1));
Expand Down Expand Up @@ -1039,37 +1044,16 @@ impl CommandServer {
);
Success::Query(CommandResponseData::Query(proxy_responses_map))
}
Query::Metrics(query_metrics_type) => {
Query::Metrics(options) => {
debug!("metrics query answer received: {:?}", proxy_responses_map);

match query_metrics_type {
QueryMetricsType::List => {
Success::Query(CommandResponseData::Query(proxy_responses_map))
}
_ => {
let worker_metrics_map = proxy_responses_map
.iter()
.map(|(worker_id, query_answer)| {
if let QueryAnswer::Metrics(query_answer_metrics) = query_answer
{
(worker_id.to_owned(), query_answer_metrics.to_owned())
} else {
(
worker_id.to_owned(),
QueryAnswerMetrics::Error(
"This worker answered with the wrong type of answer"
.to_owned(),
),
)
}
})
.collect::<BTreeMap<String, QueryAnswerMetrics>>();

Success::Query(CommandResponseData::Metrics(AggregatedMetricsData {
main: main_metrics,
workers: worker_metrics_map,
}))
}
if options.list {
Success::Query(CommandResponseData::Query(proxy_responses_map))
} else {
Success::Query(CommandResponseData::Metrics(AggregatedMetricsData {
main: main_metrics,
workers: proxy_responses_map,
}))
}
}
};
Expand Down
155 changes: 75 additions & 80 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use sozu_command_lib::{
},
proxy::{
MetricsConfiguration, ProxyRequestData, Query, QueryCertificateType, QueryClusterDomain,
QueryClusterType, QueryMetricsType,
QueryClusterType, QueryMetricsOptions,
},
};

Expand Down Expand Up @@ -533,19 +533,20 @@ impl CommandManager {
Ok(())
}

pub fn metrics(&mut self, cmd: MetricsCmd) -> Result<(), anyhow::Error> {
pub fn configure_metrics(&mut self, cmd: MetricsCmd) -> Result<(), anyhow::Error> {
let id = generate_id();
//println!("will send message for metrics with id {}", id);

let configuration = match cmd {
MetricsCmd::Enable => MetricsConfiguration::Enabled(true),
MetricsCmd::Disable => MetricsConfiguration::Enabled(false),
MetricsCmd::Clear => MetricsConfiguration::Clear,
_ => bail!("The command passed to the configure_metrics function is wrong."),
};

self.channel.write_message(&CommandRequest::new(
id.clone(),
CommandRequestData::Proxy(ProxyRequestData::Metrics(configuration)),
CommandRequestData::Proxy(ProxyRequestData::ConfigureMetrics(configuration)),
None,
));

Expand All @@ -567,6 +568,77 @@ impl CommandManager {
Ok(())
}

pub fn get_metrics(
&mut self,
json: bool,
list: bool,
refresh: Option<u32>,
metric_names: Vec<String>,
cluster_ids: Vec<String>,
backend_ids: Vec<String>,
) -> Result<(), anyhow::Error> {
let command = CommandRequestData::Proxy(ProxyRequestData::Query(Query::Metrics(
QueryMetricsOptions {
list,
cluster_ids,
backend_ids,
metric_names,
},
)));

// a loop to reperform the query every refresh time
loop {
let id = generate_id();
self.channel
.write_message(&CommandRequest::new(id.clone(), command.clone(), None));
print!("{}", termion::cursor::Save);

// this functions may bail and escape the loop, should we avoid that?
let message = self.read_channel_message_with_timeout()?;

//println!("received message: {:?}", message);
if id != message.id {
bail!("received message with invalid id: {:?}", message);
}
match message.status {
CommandStatus::Processing => {
// do nothing here
// for other messages, we would loop over read_message
// until an error or ok message was sent
}
CommandStatus::Error => {
if json {
return print_json_response(&message.message);
} else {
bail!("could not query proxy state: {}", message.message);
}
}
CommandStatus::Ok => match message.data {
Some(CommandResponseData::Metrics(aggregated_metrics_data)) => {
print_metrics(aggregated_metrics_data, json)?
}
Some(CommandResponseData::Query(lists_of_metrics)) => {
print_available_metrics(&lists_of_metrics)?;
}
_ => println!("Wrong kind of response here"),
},
}

match refresh {
None => break,
Some(seconds) => std::thread::sleep(std::time::Duration::from_secs(seconds as u64)),
}

print!(
"{}{}",
termion::cursor::Restore,
termion::clear::BeforeCursor
);
}

Ok(())
}

pub fn reload_configuration(
&mut self,
path: Option<String>,
Expand Down Expand Up @@ -772,83 +844,6 @@ impl CommandManager {
Ok(())
}

pub fn query_metrics(
&mut self,
json: bool,
list: bool,
refresh: Option<u32>,
metric_names: Vec<String>,
cluster_ids: Vec<String>,
backend_ids: Vec<String>,
) -> Result<(), anyhow::Error> {
let query = match (list, cluster_ids.is_empty(), backend_ids.is_empty()) {
(true, _, _) => QueryMetricsType::List,
(false, true, true) => QueryMetricsType::All { metric_names },
(false, false, _) => QueryMetricsType::Cluster {
cluster_ids,
metric_names,
},
(false, true, false) => QueryMetricsType::Backend {
backend_ids,
metric_names,
},
};

let command = CommandRequestData::Proxy(ProxyRequestData::Query(Query::Metrics(query)));

// a loop to reperform the query every refresh time
loop {
let id = generate_id();
self.channel
.write_message(&CommandRequest::new(id.clone(), command.clone(), None));
print!("{}", termion::cursor::Save);

// this functions may bail and escape the loop, should we avoid that?
let message = self.read_channel_message_with_timeout()?;

//println!("received message: {:?}", message);
if id != message.id {
bail!("received message with invalid id: {:?}", message);
}
match message.status {
CommandStatus::Processing => {
// do nothing here
// for other messages, we would loop over read_message
// until an error or ok message was sent
}
CommandStatus::Error => {
if json {
return print_json_response(&message.message);
} else {
bail!("could not query proxy state: {}", message.message);
}
}
CommandStatus::Ok => match message.data {
Some(CommandResponseData::Metrics(aggregated_metrics_data)) => {
print_metrics(aggregated_metrics_data, json)?
}
Some(CommandResponseData::Query(lists_of_metrics)) => {
print_available_metrics(&lists_of_metrics)?;
}
_ => println!("Wrong kind of response here"),
},
}

match refresh {
None => break,
Some(seconds) => std::thread::sleep(std::time::Duration::from_secs(seconds as u64)),
}

print!(
"{}{}",
termion::cursor::Restore,
termion::clear::BeforeCursor
);
}

Ok(())
}

pub fn events(&mut self) -> Result<(), anyhow::Error> {
let id = generate_id();
self.channel.write_message(&CommandRequest::new(
Expand Down
8 changes: 4 additions & 4 deletions bin/src/ctl/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ pub fn print_metrics(
Ok(())
}

fn print_worker_metrics(query_answer_metrics: &QueryAnswerMetrics) -> anyhow::Result<()> {
match query_answer_metrics {
QueryAnswerMetrics::All(WorkerMetrics { proxy, clusters }) => {
fn print_worker_metrics(query_answer: &QueryAnswer) -> anyhow::Result<()> {
match query_answer {
QueryAnswer::Metrics(QueryAnswerMetrics::All(WorkerMetrics { proxy, clusters })) => {
print_proxy_metrics(proxy);
print_cluster_metrics(clusters);
}
QueryAnswerMetrics::Error(error) => {
QueryAnswer::Metrics(QueryAnswerMetrics::Error(error)) => {
println!("Error: {}\nMaybe check your command.", error)
}
_ => bail!("The query answer is wrong."),
Expand Down

0 comments on commit c02b130

Please sign in to comment.