Skip to content

Commit

Permalink
write ResponseContent in protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Apr 28, 2023
1 parent 537e0e9 commit dbab4b4
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 406 deletions.
14 changes: 10 additions & 4 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use serde::{Deserialize, Serialize};
use sozu_command_lib::{
config::Config,
proto::command::{
request::RequestType, MetricsConfiguration, Request, ResponseStatus, RunState, Status,
request::RequestType, response_content::ContentType, MetricsConfiguration, Request,
ResponseContent, ResponseStatus, RunState, Status,
},
request::WorkerRequest,
response::{Response, ResponseContent, WorkerResponse},
response::{Response, WorkerResponse},
scm_socket::{Listeners, ScmSocket},
state::ConfigState,
};
Expand Down Expand Up @@ -713,14 +714,19 @@ impl CommandServer {
response: WorkerResponse,
) -> anyhow::Result<Success> {
// Notify the client with Processing in case of a proxy event
if let Some(ResponseContent::Event(event)) = response.content {
if let Some(ResponseContent {
content_type: Some(ContentType::Event(event)),
}) = response.content
{
for client_id in self.event_subscribers.iter() {
if let Some(client_tx) = self.clients.get_mut(client_id) {
let event = Response::new(
// response.id.to_string(),
ResponseStatus::Processing,
format!("{worker_id}"),
Some(ResponseContent::Event(event.clone())),
Some(ResponseContent {
content_type: Some(ContentType::Event(event.clone())),
}),
);
client_tx
.send(event)
Expand Down
102 changes: 58 additions & 44 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use sozu_command_lib::{
logging,
parser::parse_several_commands,
proto::command::{
request::RequestType, AggregatedMetrics, AvailableMetrics, ClusterHashes,
ClusterInformations, FrontendFilters, ListedFrontends, ListenersList, MetricsConfiguration,
Request, ResponseStatus, ReturnListenSockets, RunState, SoftStop, Status, WorkerInfo,
WorkerInfos,
request::RequestType, response_content::ContentType, AggregatedMetrics, AvailableMetrics,
ClusterHashes, ClusterInformations, FrontendFilters, ListedFrontends, ListenersList,
MetricsConfiguration, Request, ResponseContent, ResponseStatus, ReturnListenSockets,
RunState, SoftStop, Status, WorkerInfo, WorkerInfos, WorkerResponses,
},
request::WorkerRequest,
response::{Response, ResponseContent},
response::Response,
scm_socket::Listeners,
state::get_cluster_ids_by_domain,
};
Expand Down Expand Up @@ -395,19 +395,19 @@ impl CommandServer {
}
}

Ok(Some(Success::ListFrontends(ResponseContent::FrontendList(
listed_frontends,
))))
Ok(Some(Success::ListFrontends(ResponseContent {
content_type: Some(ContentType::FrontendList(listed_frontends)),
})))
}

fn list_listeners(&self) -> anyhow::Result<Option<Success>> {
Ok(Some(Success::ListListeners(
ResponseContent::ListenersList(ListenersList {
Ok(Some(Success::ListListeners(ResponseContent {
content_type: Some(ContentType::ListenersList(ListenersList {
http_listeners: self.state.http_listeners.clone(),
https_listeners: self.state.https_listeners.clone(),
tcp_listeners: self.state.tcp_listeners.clone(),
}),
)))
})),
})))
}

pub async fn list_workers(&mut self) -> anyhow::Result<Option<Success>> {
Expand All @@ -423,9 +423,9 @@ impl CommandServer {

debug!("workers: {:#?}", workers);

Ok(Some(Success::ListWorkers(ResponseContent::Workers(
WorkerInfos { vec: workers },
))))
Ok(Some(Success::ListWorkers(ResponseContent {
content_type: Some(ContentType::Workers(WorkerInfos { vec: workers })),
})))
}

pub async fn launch_worker(
Expand Down Expand Up @@ -968,7 +968,9 @@ impl CommandServer {
return_success(
command_tx,
thread_client_id,
Success::Status(ResponseContent::Workers(worker_info_vec)),
Success::Status(ResponseContent {
content_type: Some(ContentType::Workers(worker_info_vec)),
}),
)
.await;
})
Expand Down Expand Up @@ -1080,18 +1082,17 @@ impl CommandServer {
)
.await;

let mut main_response_content = None;
match &request.request_type {
Some(RequestType::QueryClustersHashes(_)) => {
main_response_content = Some(ResponseContent::ClustersHashes(ClusterHashes {
let main_response_content = match &request.request_type {
Some(RequestType::QueryClustersHashes(_)) => Some(ResponseContent {
content_type: Some(ContentType::ClusterHashes(ClusterHashes {
map: self.state.hash_state(),
}))
}
Some(RequestType::QueryClusterById(cluster_id)) => {
main_response_content = Some(ResponseContent::Clusters(ClusterInformations {
})),
}),
Some(RequestType::QueryClusterById(cluster_id)) => Some(ResponseContent {
content_type: Some(ContentType::Clusters(ClusterInformations {
vec: vec![self.state.cluster_state(cluster_id)],
}))
}
})),
}),
Some(RequestType::QueryClustersByDomain(domain)) => {
let cluster_ids = get_cluster_ids_by_domain(
&self.state,
Expand All @@ -1102,10 +1103,11 @@ impl CommandServer {
.iter()
.map(|cluster_id| self.state.cluster_state(cluster_id))
.collect();
main_response_content =
Some(ResponseContent::Clusters(ClusterInformations { vec }));
Some(ResponseContent {
content_type: Some(ContentType::Clusters(ClusterInformations { vec })),
})
}
_ => {}
_ => None,
};

// all these are passed to the thread
Expand Down Expand Up @@ -1156,46 +1158,58 @@ impl CommandServer {
| &Some(RequestType::QueryClustersByDomain(_)) => {
let main = main_response_content.unwrap(); // we should refactor to avoid this unwrap()
worker_responses.insert(String::from("main"), main);
ResponseContent::WorkerResponses(worker_responses)
ResponseContent {
content_type: Some(ContentType::WorkerResponses(WorkerResponses {
map: worker_responses,
})),
}
}
&Some(RequestType::QueryCertificatesByDomain(_))
| &Some(RequestType::QueryCertificateByFingerprint(_))
| &Some(RequestType::QueryAllCertificates(_)) => {
info!("certificates query answer received: {:?}", worker_responses);
ResponseContent::WorkerResponses(worker_responses)
ResponseContent {
content_type: Some(ContentType::WorkerResponses(WorkerResponses {
map: worker_responses,
})),
}
}
Some(RequestType::QueryMetrics(options)) => {
if options.list {
let mut summed_proxy_metrics = Vec::new();
let mut summed_cluster_metrics = Vec::new();
for (_, response) in worker_responses {
if let ResponseContent::AvailableMetrics(AvailableMetrics {
if let Some(ContentType::AvailableMetrics(AvailableMetrics {
proxy_metrics,
cluster_metrics,
}) = response
})) = response.content_type
{
summed_proxy_metrics.append(&mut proxy_metrics.clone());
summed_cluster_metrics.append(&mut cluster_metrics.clone());
}
}
ResponseContent::AvailableMetrics(AvailableMetrics {
proxy_metrics: summed_proxy_metrics,
cluster_metrics: summed_cluster_metrics,
})
ResponseContent {
content_type: Some(ContentType::AvailableMetrics(AvailableMetrics {
proxy_metrics: summed_proxy_metrics,
cluster_metrics: summed_cluster_metrics,
})),
}
} else {
let workers_metrics = worker_responses
.into_iter()
.filter_map(|(worker_id, worker_response)| match worker_response {
ResponseContent::WorkerMetrics(worker_metrics) => {
Some((worker_id, worker_metrics))
}
ResponseContent {
content_type: Some(ContentType::WorkerMetrics(worker_metrics)),
} => Some((worker_id, worker_metrics)),
_ => None,
})
.collect();
ResponseContent::Metrics(AggregatedMetrics {
main: main_metrics,
workers: workers_metrics,
})
ResponseContent {
content_type: Some(ContentType::Metrics(AggregatedMetrics {
main: main_metrics,
workers: workers_metrics,
})),
}
}
}
_ => return, // very very unlikely
Expand Down
67 changes: 35 additions & 32 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use serde::Serialize;

use sozu_command_lib::{
proto::command::{
request::RequestType, ListWorkers, QueryAllCertificates, QueryClusterByDomain,
QueryClustersHashes, QueryMetricsOptions, Request, ResponseStatus, RunState, UpgradeMain,
WorkerInfo,
request::RequestType, response_content::ContentType, ListWorkers, QueryAllCertificates,
QueryClusterByDomain, QueryClustersHashes, QueryMetricsOptions, Request, ResponseContent,
ResponseStatus, RunState, UpgradeMain, WorkerInfo,
},
response::{Response, ResponseContent},
response::{Response},
};

use crate::ctl::{
Expand Down Expand Up @@ -92,27 +92,19 @@ impl CommandManager {
println!("Success: {}", response.message);
}
match response.content {
Some(response_content) => match response_content {
ResponseContent::Metrics(_)
| ResponseContent::WorkerResponses(_)
| ResponseContent::WorkerMetrics(_)
| ResponseContent::ClustersHashes(_)
| ResponseContent::Clusters(_)
| ResponseContent::CertificateByFingerprint(_)
| ResponseContent::Certificates(_)
| ResponseContent::AvailableMetrics(_)
| ResponseContent::Event(_) => {}
ResponseContent::FrontendList(frontends) => {
Some(response_content) => match response_content.content_type {
Some(ContentType::FrontendList(frontends)) => {
print_frontend_list(frontends)
}
ResponseContent::Workers(worker_infos) => {
Some(ContentType::Workers(worker_infos)) => {
if json {
print_json_response(&worker_infos)?;
} else {
print_status(worker_infos);
}
}
ResponseContent::ListenersList(list) => print_listeners(list),
Some(ContentType::ListenersList(list)) => print_listeners(list),
_ => {}
},
None => {}
}
Expand Down Expand Up @@ -147,7 +139,10 @@ impl CommandManager {
);
}
ResponseStatus::Ok => {
if let Some(ResponseContent::Workers(ref worker_infos)) = response.content {
if let Some(ResponseContent {
content_type: Some(ContentType::Workers(ref worker_infos)),
}) = response.content
{
let mut table = Table::new();
table.set_format(*prettytable::format::consts::FORMAT_BOX_CHARS);
table.add_row(row!["Worker", "pid", "run state"]);
Expand All @@ -169,9 +164,9 @@ impl CommandManager {
loop {
let response = self.read_channel_message_with_timeout()?;

if id != response.id {
bail!("Error: received unexpected message: {:?}", response);
}
// if id != response.id {
// bail!("Error: received unexpected message: {:?}", response);
// }

match response.status {
ResponseStatus::Processing => {
Expand Down Expand Up @@ -300,15 +295,18 @@ impl CommandManager {
}
}
ResponseStatus::Ok => {
match response.content {
Some(ResponseContent::Metrics(aggregated_metrics_data)) => {
print_metrics(aggregated_metrics_data, json)?
}
Some(ResponseContent::AvailableMetrics(available)) => {
print_available_metrics(&available)?;
if let Some(response_content) = response.content {
match response_content.content_type {
Some(ContentType::Metrics(aggregated_metrics_data)) => {
print_metrics(aggregated_metrics_data, json)?
}
Some(ContentType::AvailableMetrics(available)) => {
print_available_metrics(&available)?;
}
_ => println!("Wrong kind of response here"),
}
_ => println!("Wrong kind of response here"),
}

break;
}
}
Expand Down Expand Up @@ -384,7 +382,12 @@ impl CommandManager {
bail!("could not query proxy state: {}", response.message);
}
ResponseStatus::Ok => {
print_query_response_data(cluster_id, domain, response.content, json)?;
match response.content {
Some(content) => {
print_query_response_data(cluster_id, domain, content, json)?
}
None => println!("No content in the response"),
}
break;
}
}
Expand Down Expand Up @@ -438,9 +441,9 @@ impl CommandManager {
}
ResponseStatus::Ok => {
match response.content {
Some(ResponseContent::WorkerResponses(data)) => {
print_certificates(data, json)?
}
Some(ResponseContent {
content_type: Some(ContentType::WorkerResponses(worker_responses)),
}) => print_certificates(worker_responses.map, json)?,
_ => bail!("unexpected response: {:?}", response.content),
}
break;
Expand Down

0 comments on commit dbab4b4

Please sign in to comment.