Skip to content

Commit

Permalink
put Query variants into Order, remove Query
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Mar 9, 2023
1 parent 1d5c72e commit a1a801e
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 127 deletions.
2 changes: 2 additions & 0 deletions bin/src/acme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ pub fn main(
Ok(())
}

/*
fn generate_id() -> String {
let s: String = iter::repeat(())
.map(|()| thread_rng().sample(Alphanumeric))
Expand All @@ -266,6 +267,7 @@ fn generate_id() -> String {
.collect();
format!("ID-{s}")
}
*/

fn generate_app_id(app_id: &str) -> String {
let s: String = iter::repeat(())
Expand Down
57 changes: 30 additions & 27 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use sozu_command_lib::{
state::get_cluster_ids_by_domain,
worker::{
AggregatedMetricsData, InnerOrder, MetricsConfiguration, ProxyResponseContent,
ProxyResponseStatus, Query, QueryAnswer, QueryClusterType,
ProxyResponseStatus, QueryAnswer, QueryClusterType,
},
};

Expand Down Expand Up @@ -58,14 +58,20 @@ impl CommandServer {
Order::UpgradeMain => self.upgrade_main(client_id).await,
Order::UpgradeWorker(worker_id) => self.upgrade_worker(client_id, worker_id).await,
Order::ConfigureMetrics(config) => self.configure_metrics(client_id, config).await,
Order::Query(query) => self.query(client_id, query).await,
// Order::Query(query) => self.query(client_id, query).await,
Order::Logging(logging_filter) => self.set_logging_level(logging_filter),
Order::SubscribeEvents => {
self.event_subscribers.insert(client_id.clone());
Ok(Some(Success::SubscribeEvent(client_id.clone())))
}
Order::ReloadConfiguration { path } => self.reload_configuration(client_id, path).await,
Order::Status => self.status(client_id).await,

Order::QueryCertificates(_)
| Order::QueryClusters(_)
| Order::QueryClustersHashes
| Order::QueryMetrics(_) => self.query(client_id, order).await,

// any other case is an order for the workers, except for SoftStop and HardStop.
// TODO: we should have something like:
// RequestContent::SoftStop => self.do_something(),
Expand Down Expand Up @@ -1017,9 +1023,9 @@ impl CommandServer {
pub async fn query(
&mut self,
client_id: String,
query: Query,
order: Order,
) -> anyhow::Result<Option<Success>> {
debug!("Received this query: {:?}", query);
debug!("Received this query: {:?}", order);
let (query_tx, mut query_rx) = futures::channel::mpsc::channel(self.workers.len() * 2);
let mut count = 0usize;
for ref mut worker in self
Expand All @@ -1028,9 +1034,7 @@ impl CommandServer {
.filter(|worker| worker.run_state != RunState::Stopped)
{
let req_id = format!("{}-query-{}", client_id, worker.id);
worker
.send(req_id.clone(), Order::Query(query.clone()))
.await;
worker.send(req_id.clone(), order.clone()).await;
count += 1;
self.in_flight.insert(req_id, (query_tx.clone(), 1));
}
Expand All @@ -1043,11 +1047,11 @@ impl CommandServer {
.await;

let mut main_query_answer = None;
match &query {
Query::ClustersHashes => {
match &order {
Order::QueryClustersHashes => {
main_query_answer = Some(QueryAnswer::ClustersHashes(self.state.hash_state()));
}
Query::Clusters(query_type) => {
Order::QueryClusters(query_type) => {
main_query_answer = Some(QueryAnswer::Clusters(match query_type {
QueryClusterType::ClusterId(cluster_id) => {
vec![self.state.cluster_state(cluster_id)]
Expand All @@ -1065,8 +1069,9 @@ impl CommandServer {
}
}));
}
Query::Certificates(_) => {}
Query::Metrics(_) => {}
Order::QueryCertificates(_) => {}
Order::QueryMetrics(_) => {}
_ => {}
};

// all these are passed to the thread
Expand Down Expand Up @@ -1100,7 +1105,7 @@ impl CommandServer {
}
}

let mut proxy_responses_map: BTreeMap<String, QueryAnswer> = responses
let mut query_answers: BTreeMap<String, QueryAnswer> = responses
.into_iter()
.filter_map(|(worker_id, proxy_response)| {
if let Some(ProxyResponseContent::Query(d)) = proxy_response.content {
Expand All @@ -1111,31 +1116,29 @@ impl CommandServer {
})
.collect();

let success = match &query {
&Query::ClustersHashes | &Query::Clusters(_) => {
let success = match &order {
&Order::QueryClustersHashes | &Order::QueryClusters(_) => {
let main = main_query_answer.unwrap(); // we should refactor to avoid this unwrap()
proxy_responses_map.insert(String::from("main"), main);
Success::Query(CommandResponseContent::Query(proxy_responses_map))
query_answers.insert(String::from("main"), main);
Success::Query(CommandResponseContent::Query(query_answers))
}
&Query::Certificates(_) => {
info!(
"certificates query answer received: {:?}",
proxy_responses_map
);
Success::Query(CommandResponseContent::Query(proxy_responses_map))
&Order::QueryCertificates(_) => {
info!("certificates query answer received: {:?}", query_answers);
Success::Query(CommandResponseContent::Query(query_answers))
}
Query::Metrics(options) => {
debug!("metrics query answer received: {:?}", proxy_responses_map);
Order::QueryMetrics(options) => {
debug!("metrics query answer received: {:?}", query_answers);

if options.list {
Success::Query(CommandResponseContent::Query(proxy_responses_map))
Success::Query(CommandResponseContent::Query(query_answers))
} else {
Success::Query(CommandResponseContent::Metrics(AggregatedMetricsData {
main: main_metrics,
workers: proxy_responses_map,
workers: query_answers,
}))
}
}
_ => return, // very very unlikely
};

return_success(command_tx, cloned_identifier, success).await;
Expand Down
20 changes: 8 additions & 12 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use sozu_command_lib::{
command::{
CommandResponse, CommandResponseContent, CommandStatus, Order, RunState, WorkerInfo,
},
worker::{
Query, QueryCertificateType, QueryClusterDomain, QueryClusterType, QueryMetricsOptions,
},
worker::{QueryCertificateType, QueryClusterDomain, QueryClusterType, QueryMetricsOptions},
};

use crate::ctl::{
Expand Down Expand Up @@ -261,12 +259,12 @@ impl CommandManager {
cluster_ids: Vec<String>,
backend_ids: Vec<String>,
) -> Result<(), anyhow::Error> {
let command = Order::Query(Query::Metrics(QueryMetricsOptions {
let command = Order::QueryMetrics(QueryMetricsOptions {
list,
cluster_ids,
backend_ids,
metric_names,
}));
});

// a loop to reperform the query every refresh time
loop {
Expand Down Expand Up @@ -330,9 +328,7 @@ impl CommandManager {
}

let command = if let Some(ref cluster_id) = cluster_id {
Order::Query(Query::Clusters(QueryClusterType::ClusterId(
cluster_id.to_string(),
)))
Order::QueryClusters(QueryClusterType::ClusterId(cluster_id.to_string()))
} else if let Some(ref domain) = domain {
let splitted: Vec<String> =
domain.splitn(2, '/').map(|elem| elem.to_string()).collect();
Expand All @@ -349,9 +345,9 @@ impl CommandManager {
path: splitted.get(1).cloned().map(|path| format!("/{path}")), // We add the / again because of the splitn removing it
};

Order::Query(Query::Clusters(QueryClusterType::Domain(query_domain)))
Order::QueryClusters(QueryClusterType::Domain(query_domain))
} else {
Order::Query(Query::ClustersHashes)
Order::QueryClustersHashes
};

self.send_order(command)?;
Expand Down Expand Up @@ -399,9 +395,9 @@ impl CommandManager {
}
};

let command = Order::Query(Query::Certificates(query));
let order = Order::QueryCertificates(query);

self.send_order(command)?;
self.send_order(order)?;

loop {
let response = self.read_channel_message_with_timeout()?;
Expand Down
10 changes: 7 additions & 3 deletions command/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crate::{
worker::{
ActivateListener, AddCertificate, AggregatedMetricsData, Backend, Cluster,
DeactivateListener, HttpFrontend, HttpListenerConfig, HttpsListenerConfig,
MetricsConfiguration, ProxyEvent, Query, QueryAnswer, RemoveBackend, RemoveCertificate,
RemoveListener, ReplaceCertificate, TcpFrontend, TcpListenerConfig,
MetricsConfiguration, ProxyEvent, QueryAnswer, QueryCertificateType, QueryClusterType,
QueryMetricsOptions, RemoveBackend, RemoveCertificate, RemoveListener, ReplaceCertificate,
TcpFrontend, TcpListenerConfig,
},
};

Expand Down Expand Up @@ -81,7 +82,10 @@ pub enum Order {
ActivateListener(ActivateListener),
DeactivateListener(DeactivateListener),

Query(Query),
QueryCertificates(QueryCertificateType),
QueryClusters(QueryClusterType),
QueryClustersHashes,
QueryMetrics(QueryMetricsOptions),

SoftStop,
HardStop,
Expand Down
2 changes: 1 addition & 1 deletion command/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use toml;

use crate::{
certificate::split_certificate_chain,
command::{Order, PROTOCOL_VERSION},
command::Order,
worker::{
ActivateListener, AddCertificate, Backend, CertificateAndKey, Cluster, HttpFrontend,
HttpListenerConfig, HttpsListenerConfig, InnerOrder, ListenerType, LoadBalancingAlgorithms,
Expand Down
8 changes: 7 additions & 1 deletion command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,15 @@ impl ConfigState {
// This is to avoid the error message
&Order::Logging(_)
| &Order::Status
| &Order::Query(_)
| &Order::SoftStop
| &Order::QueryCertificates(_)
| &Order::QueryClusters(_)
| &Order::QueryMetrics(_)
| &Order::QueryClustersHashes
| &Order::ConfigureMetrics(_)
| &Order::ReturnListenSockets
| &Order::HardStop => Ok(()),

other_order => {
bail!("state cannot handle order message: {:#?}", other_order);
}
Expand Down
15 changes: 4 additions & 11 deletions command/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,16 +749,6 @@ pub enum MetricsConfiguration {
Clear,
}

/// Details of a query for information, sent to a worker
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Query {
Clusters(QueryClusterType),
Certificates(QueryCertificateType),
Metrics(QueryMetricsOptions),
ClustersHashes,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum QueryClusterType {
Expand Down Expand Up @@ -850,7 +840,7 @@ impl Order {
| Order::AddCertificate(_)
| Order::ReplaceCertificate(_)
| Order::RemoveCertificate(_)
| Order::Query(_) => proxy_destination.to_https_proxy = true,
| Order::QueryCertificates(_) => proxy_destination.to_https_proxy = true,

Order::AddTcpFrontend(_) | Order::RemoveTcpFrontend(_) => {
proxy_destination.to_tcp_proxy = true
Expand All @@ -863,6 +853,9 @@ impl Order {
| Order::SoftStop
| Order::HardStop
| Order::Status
| Order::QueryClusters(_)
| Order::QueryClustersHashes
| Order::QueryMetrics(_)
| Order::Logging(_) => {
proxy_destination.to_http_proxy = true;
proxy_destination.to_https_proxy = true;
Expand Down
9 changes: 4 additions & 5 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ use crate::{
state::ClusterId,
worker::{
AddCertificate, CertificateFingerprint, Cluster, HttpFrontend, HttpsListenerConfig,
InnerOrder, ProxyResponse, ProxyResponseContent, ProxyResponseStatus, Query,
QueryAnswer, QueryAnswerCertificate, QueryCertificateType, RemoveCertificate, Route,
TlsVersion,
InnerOrder, ProxyResponse, ProxyResponseContent, ProxyResponseStatus, QueryAnswer,
QueryAnswerCertificate, QueryCertificateType, RemoveCertificate, Route, TlsVersion,
},
},
timer::TimeoutContainer,
Expand Down Expand Up @@ -1352,12 +1351,12 @@ impl ProxyConfiguration for HttpsProxy {
self.logging(logging_filter.clone())
.with_context(|| format!("Could not set logging level to {logging_filter}"))
}
Order::Query(Query::Certificates(QueryCertificateType::All)) => {
Order::QueryCertificates(QueryCertificateType::All) => {
info!("{} query all certificates", request_id);
self.query_all_certificates()
.with_context(|| "Could not query all certificates")
}
Order::Query(Query::Certificates(QueryCertificateType::Domain(domain))) => {
Order::QueryCertificates(QueryCertificateType::Domain(domain)) => {
info!("{} query certificate for domain {}", request_id, domain);
self.query_certificate_for_domain(domain.clone())
.with_context(|| format!("Could not query certificate for domain {domain}"))
Expand Down

0 comments on commit a1a801e

Please sign in to comment.