Skip to content

Commit

Permalink
put QueryAnswer variants in ProxyResponse, remove QueryAnswer
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Mar 22, 2023
1 parent 0e62f8d commit 4df5ba9
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 77 deletions.
32 changes: 16 additions & 16 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use sozu_command_lib::{
parser::parse_several_commands,
request::{FrontendFilters, MetricsConfiguration, QueryClusterType, Request, WorkerRequest},
response::{
AggregatedMetricsData, ListedFrontends, ListenersList, QueryAnswer, Response,
ResponseContent, ResponseStatus, RunState, WorkerInfo,
AggregatedMetricsData, ListedFrontends, ListenersList, Response, ResponseContent,
ResponseStatus, RunState, WorkerInfo,
},
scm_socket::Listeners,
state::get_cluster_ids_by_domain,
Expand Down Expand Up @@ -1044,13 +1044,13 @@ impl CommandServer {
)
.await;

let mut main_query_answer = None;
let mut main_response_content = None;
match &request {
Request::QueryClustersHashes => {
main_query_answer = Some(QueryAnswer::ClustersHashes(self.state.hash_state()));
main_response_content = Some(ResponseContent::ClustersHashes(self.state.hash_state()));
}
Request::QueryClusters(query_type) => {
main_query_answer = Some(QueryAnswer::Clusters(match query_type {
main_response_content = Some(ResponseContent::Clusters(match query_type {
QueryClusterType::ClusterId(cluster_id) => {
vec![self.state.cluster_state(cluster_id)]
}
Expand Down Expand Up @@ -1103,11 +1103,11 @@ impl CommandServer {
}
}

let mut query_answers: BTreeMap<String, QueryAnswer> = responses
let mut worker_responses: BTreeMap<String, ResponseContent> = responses
.into_iter()
.filter_map(|(worker_id, proxy_response)| {
if let Some(ResponseContent::Query(d)) = proxy_response.content {
Some((worker_id.to_string(), d))
if let Some(response_content) = proxy_response.content {
Some((worker_id.to_string(), response_content))
} else {
None
}
Expand All @@ -1116,23 +1116,23 @@ impl CommandServer {

let success = match &request {
&Request::QueryClustersHashes | &Request::QueryClusters(_) => {
let main = main_query_answer.unwrap(); // we should refactor to avoid this unwrap()
query_answers.insert(String::from("main"), main);
Success::Query(ResponseContent::WorkerResponses(query_answers))
let main = main_response_content.unwrap(); // we should refactor to avoid this unwrap()
worker_responses.insert(String::from("main"), main);
Success::Query(ResponseContent::WorkerResponses(worker_responses))
}
&Request::QueryCertificates(_) => {
info!("certificates query answer received: {:?}", query_answers);
Success::Query(ResponseContent::WorkerResponses(query_answers))
info!("certificates query answer received: {:?}", worker_responses);
Success::Query(ResponseContent::WorkerResponses(worker_responses))
}
Request::QueryMetrics(options) => {
debug!("metrics query answer received: {:?}", query_answers);
debug!("metrics query answer received: {:?}", worker_responses);

if options.list {
Success::Query(ResponseContent::WorkerResponses(query_answers))
Success::Query(ResponseContent::WorkerResponses(worker_responses))
} else {
Success::Query(ResponseContent::Metrics(AggregatedMetricsData {
main: main_metrics,
workers: query_answers,
workers: worker_responses,
}))
}
}
Expand Down
9 changes: 6 additions & 3 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ impl CommandManager {
| ResponseContent::Metrics(_)
| ResponseContent::WorkerResponses(_)
| ResponseContent::WorkerMetrics(_)
| ResponseContent::Query(_)
| ResponseContent::ClustersHashes(_)
| ResponseContent::Clusters(_)
| ResponseContent::Certificates(_)
| ResponseContent::QueriedMetrics(_)
| ResponseContent::Event(_) => {}
ResponseContent::State(state) => match json {
true => print_json_response(&state)?,
Expand Down Expand Up @@ -294,8 +297,8 @@ impl CommandManager {
Some(ResponseContent::Metrics(aggregated_metrics_data)) => {
print_metrics(aggregated_metrics_data, json)?
}
Some(ResponseContent::WorkerResponses(lists_of_metrics)) => {
print_available_metrics(&lists_of_metrics)?;
Some(ResponseContent::WorkerResponses(worker_responses)) => {
print_available_metrics(&worker_responses)?;
}
_ => println!("Wrong kind of response here"),
}
Expand Down
43 changes: 24 additions & 19 deletions bin/src/ctl/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use prettytable::{Row, Table};

use sozu_command_lib::response::{
AggregatedMetricsData, ClusterMetricsData, FilteredData, ListedFrontends, ListenersList,
QueryAnswer, QueryAnswerCertificate, QueryAnswerMetrics, ResponseContent, Route, WorkerInfo,
WorkerMetrics,
QueryAnswerCertificate, QueryAnswerMetrics, ResponseContent, Route, WorkerInfo, WorkerMetrics,
};

pub fn print_listeners(listeners_list: ListenersList) {
Expand Down Expand Up @@ -208,20 +207,23 @@ pub fn print_metrics(
print_proxy_metrics(&Some(aggregated_metrics.main));

// workers
for (worker_id, query_answer_metrics) in aggregated_metrics.workers.iter() {
for (worker_id, worker_metrics) in aggregated_metrics.workers.iter() {
println!("\nWorker {worker_id}\n=========");
print_worker_metrics(query_answer_metrics)?;
print_worker_metrics(worker_metrics)?;
}
Ok(())
}

fn print_worker_metrics(query_answer: &QueryAnswer) -> anyhow::Result<()> {
match query_answer {
QueryAnswer::Metrics(QueryAnswerMetrics::All(WorkerMetrics { proxy, clusters })) => {
fn print_worker_metrics(worker_metrics: &ResponseContent) -> anyhow::Result<()> {
match worker_metrics {
ResponseContent::QueriedMetrics(QueryAnswerMetrics::All(WorkerMetrics {
proxy,
clusters,
})) => {
print_proxy_metrics(proxy);
print_cluster_metrics(clusters);
}
QueryAnswer::Metrics(QueryAnswerMetrics::Error(error)) => {
ResponseContent::QueriedMetrics(QueryAnswerMetrics::Error(error)) => {
println!("Error: {error}\nMaybe check your command.")
}
_ => bail!("The query answer is wrong."),
Expand Down Expand Up @@ -376,7 +378,7 @@ pub fn print_json_response<T: ::serde::Serialize>(input: &T) -> Result<(), anyho

pub fn create_queried_cluster_table(
headers: Vec<&str>,
data: &BTreeMap<String, QueryAnswer>,
data: &BTreeMap<String, ResponseContent>,
) -> Table {
let mut table = Table::new();
table.set_format(*prettytable::format::consts::FORMAT_BOX_CHARS);
Expand Down Expand Up @@ -425,7 +427,7 @@ pub fn print_query_response_data(

for (key, metrics) in data.iter() {
//let m: u8 = metrics;
if let QueryAnswer::Clusters(clusters) = metrics {
if let ResponseContent::Clusters(clusters) = metrics {
for cluster in clusters.iter() {
let entry = cluster_data.entry(cluster).or_insert(Vec::new());
entry.push(key.to_owned());
Expand Down Expand Up @@ -591,7 +593,7 @@ pub fn print_query_response_data(

for metrics in data.values() {
//let m: u8 = metrics;
if let QueryAnswer::ClustersHashes(clusters) = metrics {
if let ResponseContent::ClustersHashes(clusters) = metrics {
for (key, value) in clusters.iter() {
query_data.entry(key).or_insert(Vec::new()).push(value);
}
Expand Down Expand Up @@ -619,15 +621,18 @@ pub fn print_query_response_data(
Ok(())
}

pub fn print_certificates(data: BTreeMap<String, QueryAnswer>, json: bool) -> anyhow::Result<()> {
pub fn print_certificates(
response_contents: BTreeMap<String, ResponseContent>,
json: bool,
) -> anyhow::Result<()> {
if json {
print_json_response(&data)?;
print_json_response(&response_contents)?;
return Ok(());
}

//println!("received: {:?}", data);
let it = data.iter().map(|(k, v)| match v {
QueryAnswer::Certificates(c) => (k, c),
let it = response_contents.iter().map(|(k, v)| match v {
ResponseContent::Certificates(c) => (k, c),
v => {
eprintln!("unexpected certificates query answer: {v:?}");
exit(1);
Expand Down Expand Up @@ -684,12 +689,12 @@ fn format_tags_to_string(tags: Option<&BTreeMap<String, String>>) -> String {
.unwrap_or_default()
}

pub fn print_available_metrics(answers: &BTreeMap<String, QueryAnswer>) -> anyhow::Result<()> {
pub fn print_available_metrics(answers: &BTreeMap<String, ResponseContent>) -> anyhow::Result<()> {
let mut available_metrics: (HashSet<String>, HashSet<String>) =
(HashSet::new(), HashSet::new());
for query_answer in answers.values() {
match query_answer {
QueryAnswer::Metrics(QueryAnswerMetrics::List((
for content in answers.values() {
match content {
ResponseContent::QueriedMetrics(QueryAnswerMetrics::List((
proxy_metric_keys,
cluster_metric_keys,
))) => {
Expand Down
2 changes: 1 addition & 1 deletion command/assets/answer_metrics.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
"workers": {
"0": {
"type": "METRICS",
"type": "QUERIED_METRICS",
"data": {
"All": {
"proxy": {
Expand Down
18 changes: 5 additions & 13 deletions command/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ pub enum ResponseContent {
Workers(Vec<WorkerInfo>),
/// aggregated metrics of main process and workers
Metrics(AggregatedMetricsData),
/// worker responses to a same query: worker_id -> query_answer
// TODO: repace with WorkerResponses(BtreeMap<String, ResponseContent>)
WorkerResponses(BTreeMap<String, QueryAnswer>),
/// worker responses to a same query: worker_id -> response_content
WorkerResponses(BTreeMap<String, ResponseContent>),
/// the state of Sōzu: frontends, backends, listeners, etc.
State(Box<ConfigState>),
/// a proxy event
Expand All @@ -74,18 +73,11 @@ pub enum ResponseContent {

/// contains proxy & cluster metrics
WorkerMetrics(WorkerMetrics),
Query(QueryAnswer),
}

/// details of an query answer, sent by a worker
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum QueryAnswer {
Clusters(Vec<QueryAnswerCluster>),
/// cluster id -> hash of cluster information
ClustersHashes(BTreeMap<String, u64>),
Certificates(QueryAnswerCertificate),
Metrics(QueryAnswerMetrics),
QueriedMetrics(QueryAnswerMetrics),
}

#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -519,7 +511,7 @@ pub enum ResponseContent {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AggregatedMetricsData {
pub main: BTreeMap<String, FilteredData>,
pub workers: BTreeMap<String, QueryAnswer>,
pub workers: BTreeMap<String, ResponseContent>,
}

/// All metrics of a worker: proxy and clusters
Expand Down Expand Up @@ -668,7 +660,7 @@ mod tests {
.collect(),
workers: [(
String::from("0"),
QueryAnswer::Metrics(QueryAnswerMetrics::All(WorkerMetrics {
ResponseContent::QueriedMetrics(QueryAnswerMetrics::All(WorkerMetrics {
proxy: Some(
[
(String::from("sozu.gauge"), FilteredData::Gauge(1)),
Expand Down
12 changes: 6 additions & 6 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use crate::{
ReplaceCertificate, Request, WorkerRequest,
},
response::{
HttpFrontend, HttpsListenerConfig, QueryAnswer, QueryAnswerCertificate,
ResponseContent, Route, WorkerResponse,
HttpFrontend, HttpsListenerConfig, QueryAnswerCertificate, ResponseContent, Route,
WorkerResponse,
},
scm_socket::ScmSocket,
state::ClusterId,
Expand Down Expand Up @@ -960,9 +960,9 @@ impl HttpsProxy {
certificates
);

Ok(Some(ResponseContent::Query(QueryAnswer::Certificates(
Ok(Some(ResponseContent::Certificates(
QueryAnswerCertificate::All(certificates),
))))
)))
}

pub fn query_certificate_for_domain(
Expand All @@ -989,9 +989,9 @@ impl HttpsProxy {
domain, certificates
);

Ok(Some(ResponseContent::Query(QueryAnswer::Certificates(
Ok(Some(ResponseContent::Certificates(
QueryAnswerCertificate::Domain(certificates),
))))
)))
}

pub fn activate_listener(
Expand Down
29 changes: 10 additions & 19 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ use sozu_command::{
QueryCertificateType, QueryClusterType, RemoveBackend, Request, WorkerRequest,
},
response::{
Event, HttpListenerConfig, HttpsListenerConfig, MessageId, QueryAnswer,
QueryAnswerCertificate, ResponseContent, ResponseStatus,
TcpListenerConfig as CommandTcpListener, WorkerResponse,
Event, HttpListenerConfig, HttpsListenerConfig, MessageId, QueryAnswerCertificate,
ResponseContent, ResponseStatus, TcpListenerConfig as CommandTcpListener, WorkerResponse,
},
scm_socket::{Listeners, ScmSocket},
state::{get_certificate, get_cluster_ids_by_domain, ConfigState},
Expand Down Expand Up @@ -882,16 +881,14 @@ impl Server {
Request::QueryClustersHashes => {
push_queue(WorkerResponse::ok_with_content(
message.id.clone(),
ResponseContent::Query(QueryAnswer::ClustersHashes(
self.config_state.hash_state(),
)),
ResponseContent::ClustersHashes(self.config_state.hash_state()),
));
return;
}
Request::QueryClusters(query_type) => {
let query_answer = match query_type {
let content = match query_type {
QueryClusterType::ClusterId(cluster_id) => {
QueryAnswer::Clusters(vec![self.config_state.cluster_state(cluster_id)])
ResponseContent::Clusters(vec![self.config_state.cluster_state(cluster_id)])
}
QueryClusterType::Domain(domain) => {
let cluster_ids = get_cluster_ids_by_domain(
Expand All @@ -904,13 +901,10 @@ impl Server {
.map(|cluster_id| self.config_state.cluster_state(cluster_id))
.collect();

QueryAnswer::Clusters(answer)
ResponseContent::Clusters(answer)
}
};
push_queue(WorkerResponse::ok_with_content(
message.id.clone(),
ResponseContent::Query(query_answer),
));
push_queue(WorkerResponse::ok_with_content(message.id.clone(), content));
return;
}
Request::QueryCertificates(q) => {
Expand All @@ -922,11 +916,8 @@ impl Server {
QueryCertificateType::Fingerprint(f) => {
push_queue(WorkerResponse::ok_with_content(
message.id.clone(),
ResponseContent::Query(QueryAnswer::Certificates(
QueryAnswerCertificate::Fingerprint(get_certificate(
&self.config_state,
f,
)),
ResponseContent::Certificates(QueryAnswerCertificate::Fingerprint(
get_certificate(&self.config_state, f),
)),
));
return;
Expand All @@ -939,7 +930,7 @@ impl Server {

push_queue(WorkerResponse::ok_with_content(
message.id.clone(),
ResponseContent::Query(QueryAnswer::Metrics(data)),
ResponseContent::QueriedMetrics(data),
));
});
return;
Expand Down

0 comments on commit 4df5ba9

Please sign in to comment.