Skip to content

Commit

Permalink
metrics collection can now be disabled at runtime
Browse files Browse the repository at this point in the history
this reuses the old metrics command to add flags and configure metrics
collection, instead of just receiving all metrics
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 6a853ce commit 5309acb
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 117 deletions.
14 changes: 12 additions & 2 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ pub enum SubCmd {
},
#[structopt(name = "metrics", about = "gets statistics on the main process and its workers")]
Metrics {
#[structopt(short = "j", long = "json", help = "Print the command result in JSON format")]
json: bool
#[structopt(subcommand)]
cmd: MetricsCmd,
},
#[structopt(name = "logging", about = "change logging level")]
Logging {
Expand Down Expand Up @@ -124,6 +124,16 @@ pub enum SubCmd {
Events
}

#[derive(StructOpt, PartialEq, Debug)]
pub enum MetricsCmd {
#[structopt(name = "enable", about = "Enables local metrics collection")]
Enable,
#[structopt(name = "disable", about = "Disables local metrics collection")]
Disable,
#[structopt(name = "clear", about = "Deletes local metrics data")]
Clear,
}

#[derive(StructOpt, PartialEq, Debug)]
pub enum StateCmd {
#[structopt(name = "save", about = "Save state to that file")]
Expand Down
40 changes: 10 additions & 30 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ use sozu_command::command::{
CommandRequest, CommandRequestData, CommandResponse, CommandResponseData, CommandStatus,
RunState, WorkerInfo,
};
use sozu_command::proxy::Route;
use sozu_command::logging;
use sozu_command::proxy::{
AggregatedMetricsData, HttpFrontend, MetricsData, ProxyRequestData, ProxyResponseData,
ProxyResponseStatus, Query, QueryAnswer, QueryApplicationType, TcpFrontend,
ProxyRequest,
ProxyResponseStatus, Query, QueryAnswer, QueryApplicationType, Route, TcpFrontend,
ProxyRequest, MetricsConfiguration,
};
use sozu_command::scm_socket::Listeners;
use sozu_command::state::get_application_ids_by_domain;
Expand Down Expand Up @@ -62,7 +61,7 @@ impl CommandServer {
}

CommandRequestData::Proxy(proxy_request) => match proxy_request {
ProxyRequestData::Metrics => self.metrics(client_id, request.id).await,
ProxyRequestData::Metrics(config) => self.metrics(client_id, request.id, config).await,
ProxyRequestData::Query(query) => self.query(client_id, request.id, query).await,
order => {
self.worker_order(client_id, request.id, order, request.worker_id)
Expand Down Expand Up @@ -828,7 +827,7 @@ impl CommandServer {
self.config = new_config;
}

pub async fn metrics(&mut self, client_id: String, request_id: String) {
pub async fn metrics(&mut self, client_id: String, request_id: String, config: MetricsConfiguration) {
let (tx, mut rx) = futures::channel::mpsc::channel(self.workers.len() * 2);
let mut count = 0usize;
for ref mut worker in self
Expand All @@ -837,13 +836,11 @@ impl CommandServer {
.filter(|worker| worker.run_state != RunState::Stopped)
{
let req_id = format!("{}-metrics-{}", request_id, worker.id);
worker.send(req_id.clone(), ProxyRequestData::Metrics).await;
worker.send(req_id.clone(), ProxyRequestData::Metrics(config.clone())).await;
count += 1;
self.in_flight.insert(req_id, (tx.clone(), 1));
}

let main_metrics = METRICS.with(|metrics| (*metrics.borrow_mut()).dump_process_data());

let mut client_tx = self.clients.get_mut(&client_id).unwrap().clone();
let prefix = format!("{}-metrics-", request_id);
smol::spawn(async move {
Expand All @@ -856,7 +853,7 @@ impl CommandServer {
v.push((tag, proxy_response));
}
ProxyResponseStatus::Processing => {
info!("metrics processing");
//info!("metrics processing");
continue;
}
ProxyResponseStatus::Error(_) => {
Expand All @@ -871,31 +868,14 @@ impl CommandServer {
}
}

let data: BTreeMap<String, MetricsData> = v
.into_iter()
.filter_map(|(tag, metrics)| {
if let Some(ProxyResponseData::Metrics(d)) = metrics.data {
Some((tag, d))
} else {
None
}
})
.collect();

let aggregated_data = AggregatedMetricsData {
main: main_metrics,
workers: data,
};

if let Err(e) = client_tx
client_tx
.send(CommandResponse::new(
request_id.clone(),
CommandStatus::Ok,
"".to_string(),
Some(CommandResponseData::Metrics(aggregated_data)),
)).await{
error!("could not send message to client {:?}: {:?}", client_id, e);
}
None,
))
.await;
})
.detach();
}
Expand Down
90 changes: 14 additions & 76 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use sozu_command::proxy::{Cluster, ProxyRequestData, Backend, HttpFrontend,
RemoveCertificate, ReplaceCertificate, LoadBalancingParams, RemoveBackend,
TcpListener, ListenerType, TlsVersion, QueryCertificateType,
QueryAnswerCertificate, RemoveListener, ActivateListener, DeactivateListener,
LoadBalancingAlgorithms, PathRule, RulePosition, Route, QueryMetricsType, QueryAnswerMetrics};
LoadBalancingAlgorithms, PathRule, RulePosition, Route, QueryMetricsType, QueryAnswerMetrics,
MetricsConfiguration};
use crate::cli::MetricsCmd;

use serde_json;
use std::collections::{HashMap,HashSet,BTreeMap};
Expand Down Expand Up @@ -549,16 +551,22 @@ pub fn status(mut channel: Channel<CommandRequest,CommandResponse>, json: bool)
}
}

pub fn metrics(mut channel: Channel<CommandRequest,CommandResponse>, json: bool)
-> Result<(), anyhow::Error> {
pub fn metrics(mut channel: Channel<CommandRequest,CommandResponse>,
cmd: MetricsCmd) -> Result<(), anyhow::Error> {
let id = generate_id();
//println!("will send message for metrics with id {}", id);

let configuration = match cmd {
MetricsCmd::Enable => MetricsConfiguration::Enabled(true),
MetricsCmd::Disable => MetricsConfiguration::Enabled(false),
MetricsCmd::Clear => MetricsConfiguration::Clear,
};

channel.write_message(&CommandRequest::new(
id.clone(),
CommandRequestData::Proxy(ProxyRequestData::Metrics),
CommandRequestData::Proxy(ProxyRequestData::Metrics(configuration)),
None,
));
//println!("message sent");

// we should add a timeout somehow, otherwise it hangs
loop {
Expand All @@ -572,81 +580,11 @@ pub fn metrics(mut channel: Channel<CommandRequest,CommandResponse>, json: bool)
println!("Proxy is processing: {}", message.message);
},
CommandStatus::Error => {
if json {
print_json_response(&message.message)?;
}
bail!("could not stop the proxy: {}", message.message);
},
CommandStatus::Ok => {
if &id == &message.id {
//println!("Sozu metrics:\n{}\n{:#?}", message.message, message.data);

if let Some(CommandResponseData::Metrics(data)) = message.data {
if json {
print_json_response(&data)?;
return Ok(());
}

let mut main_metrics = BTreeMap::new();
main_metrics.insert("".to_string(), data.main.clone());
print_metrics("Main", &main_metrics);

let worker_metrics = data.workers.iter().map(|(k,v)| (k.to_string(), v.proxy.clone()))
.collect::<BTreeMap<_,_>>();
print_metrics("Workers", &worker_metrics);

let mut cluster_ids = HashSet::new();
for worker in data.workers.values() {
for key in worker.clusters.keys() {
cluster_ids.insert(key);
}
}
let mut cluster_ids: Vec<_> = cluster_ids.drain().collect();
cluster_ids.sort();

println!("\ncluster metrics:\n");
for cluster_id in cluster_ids.iter() {
println!("looking for data for cluster: {}", cluster_id);
let cluster_metrics = data.workers.iter()
.map(|(worker, worker_data)| {
//println!("worker data: {:?}", worker_data.clusters.get(cluster_id.as_str()));
(worker.clone(),
worker_data.clusters.get(cluster_id.as_str())
.map(|cluster_data| {
cluster_data.data.clone()
})
.unwrap_or_default())
}).collect::<BTreeMap<_, _>>();

println!("generated app metrics");
print_metrics(cluster_id, &cluster_metrics);

let mut backend_ids: HashSet<_> = data.workers.values()
.filter_map(|w| w.clusters.get(cluster_id.as_str()))
.flat_map(|app_metrics| {
app_metrics.backends.keys()
}).collect();
let mut backend_ids: Vec<_> = backend_ids.drain().collect();
backend_ids.sort();

for backend_id in backend_ids.iter() {
let backend_metrics = data.workers.iter()
.map(|(worker, worker_data)| {
(worker.clone(),
worker_data.clusters.get(cluster_id.as_str())
.and_then(|cluster_data| {
cluster_data.backends.get(backend_id.as_str())
.map(|b| b.clone())
})
.unwrap_or_default())
}).collect::<BTreeMap<_, _>>();

print_metrics(&format!("{} {}", cluster_id, backend_id), &backend_metrics);
}

}
break Ok(());
}
break Ok(());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn ctl(matches: Sozu) -> Result<(), anyhow::Error>{
Ok(())
},
SubCmd::Status{ json } => status(channel, json),
SubCmd::Metrics{ json } => metrics(channel, json),
SubCmd::Metrics{ cmd } => metrics(channel, cmd),
SubCmd::Logging{ level } => logging_filter(channel, timeout, &level),
SubCmd::State{ cmd } => {
match cmd {
Expand Down
10 changes: 8 additions & 2 deletions command/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub enum ProxyRequestData {
HardStop,

Status,
Metrics,
Metrics(MetricsConfiguration),
Logging(String),

ReturnListenSockets,
Expand Down Expand Up @@ -725,6 +725,12 @@ pub struct TcpListener {
pub connect_timeout: u32,
}

#[derive(Debug,Clone,PartialEq,Eq,Hash, Serialize, Deserialize)]
pub enum MetricsConfiguration {
Enabled(bool),
Clear,
}

#[derive(Debug,Clone,PartialEq,Eq,Hash, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Query {
Expand Down Expand Up @@ -840,7 +846,7 @@ impl ProxyRequestData {
ProxyRequestData::SoftStop => [Topic::HttpProxyConfig, Topic::HttpsProxyConfig, Topic::TcpProxyConfig].iter().cloned().collect(),
ProxyRequestData::HardStop => [Topic::HttpProxyConfig, Topic::HttpsProxyConfig, Topic::TcpProxyConfig].iter().cloned().collect(),
ProxyRequestData::Status => [Topic::HttpProxyConfig, Topic::HttpsProxyConfig, Topic::TcpProxyConfig].iter().cloned().collect(),
ProxyRequestData::Metrics => HashSet::new(),
ProxyRequestData::Metrics(_) => HashSet::new(),
ProxyRequestData::Logging(_) => [Topic::HttpsProxyConfig, Topic::HttpProxyConfig, Topic::TcpProxyConfig].iter().cloned().collect(),
ProxyRequestData::ReturnListenSockets => HashSet::new(),
}
Expand Down
21 changes: 20 additions & 1 deletion lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use time::{Duration, OffsetDateTime};
use std::convert::TryInto;
use std::collections::BTreeMap;
use hdrhistogram::Histogram;
use sozu_command::proxy::{FilteredData,MetricsData,Percentiles,AppMetricsData,QueryMetricsType,QueryAnswerMetrics};
use sozu_command::proxy::{FilteredData,MetricsData,Percentiles,AppMetricsData,
QueryMetricsType,QueryAnswerMetrics,MetricsConfiguration};

use super::{MetricData,Subscriber};

Expand Down Expand Up @@ -113,6 +114,7 @@ pub struct LocalDrain {
metrics: BTreeMap<String, (MetricMeta, MetricKind)>,
use_tagged_metrics: bool,
origin: String,
enabled: bool,
}

impl LocalDrain {
Expand All @@ -135,6 +137,7 @@ impl LocalDrain {
data: BTreeMap::new(),
use_tagged_metrics: false,
origin: String::from("x"),
enabled: true,
}
}

Expand All @@ -146,6 +149,18 @@ impl LocalDrain {
}
}

pub fn configure(&mut self, config: &MetricsConfiguration) {
match config {
MetricsConfiguration::Enabled(enabled) => {
self.enabled = *enabled;
},
MetricsConfiguration::Clear => {
self.backend_tree.clear();
self.cluster_tree.clear();
}
}
}

pub fn dump_metrics_data(&mut self) -> MetricsData {
MetricsData {
proxy: self.dump_process_data(),
Expand Down Expand Up @@ -466,6 +481,10 @@ impl LocalDrain {
}

fn receive_cluster_metric(&mut self, key: &str, cluster_id: &str, backend_id: Option<&str>, metric: MetricData) {
if !self.enabled {
return;
}

info!("metric: {} {} {:?} {:?}", key, cluster_id, backend_id, metric);

if let MetricData::Time(t) = metric {
Expand Down
7 changes: 6 additions & 1 deletion lib/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::collections::BTreeMap;
use std::net::SocketAddr;
use mio::net::UdpSocket;
use std::io::{self,Write};
use sozu_command::proxy::{FilteredData,MetricsData,QueryMetricsType,QueryAnswerMetrics};
use sozu_command::proxy::{FilteredData,MetricsData,QueryMetricsType,
QueryAnswerMetrics,MetricsConfiguration};

mod network_drain;
mod local_drain;
Expand Down Expand Up @@ -191,6 +192,10 @@ impl Aggregator {
pub fn clear_local(&mut self, now: time::OffsetDateTime) {
self.local.clear(now);
}

pub fn configure(&mut self, config: &MetricsConfiguration) {
self.local.configure(config);
}
}

impl Subscriber for Aggregator {
Expand Down
8 changes: 4 additions & 4 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,15 +587,15 @@ impl Server {
}

fn notify(&mut self, message: ProxyRequest) {
if let ProxyRequestData::Metrics = message.order {
if let ProxyRequestData::Metrics(ref configuration) = message.order {
//let id = message.id.clone();
METRICS.with(|metrics| {
(*metrics.borrow_mut()).configure(configuration);

push_queue(ProxyResponse {
id: message.id.clone(),
status: ProxyResponseStatus::Ok,
data: Some(ProxyResponseData::Metrics(
(*metrics.borrow_mut()).dump_metrics_data()
))
data: None,
});
});
return;
Expand Down

0 comments on commit 5309acb

Please sign in to comment.