Skip to content

Commit

Permalink
rename CommandRequest to ClientRequest
Browse files Browse the repository at this point in the history
rename CommandRequestOrder to RequestContent
  • Loading branch information
Keksoj committed Mar 8, 2023
1 parent e3c8bec commit efb9c5d
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 204 deletions.
16 changes: 8 additions & 8 deletions bin/src/acme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng};
use sozu_command_lib::{
certificate::{calculate_fingerprint, split_certificate_chain},
channel::Channel,
command::{CommandRequest, CommandRequestOrder, CommandResponse, CommandStatus},
command::{ClientRequest, CommandResponse, CommandStatus, RequestContent},
config::Config,
worker::{
AddCertificate, Backend, CertificateAndKey, CertificateFingerprint, HttpFrontend, PathRule,
Expand Down Expand Up @@ -87,7 +87,7 @@ pub fn main(

let tls_versions = vec![TlsVersion::TLSv1_2, TlsVersion::TLSv1_3];

let mut channel: Channel<CommandRequest, CommandResponse> = Channel::new(stream, 10000, 20000);
let mut channel: Channel<ClientRequest, CommandResponse> = Channel::new(stream, 10000, 20000);
channel
.blocking()
.with_context(|| "Could not block channel")?;
Expand Down Expand Up @@ -277,7 +277,7 @@ fn generate_app_id(app_id: &str) -> String {
}

fn set_up_proxying(
channel: &mut Channel<CommandRequest, CommandResponse>,
channel: &mut Channel<ClientRequest, CommandResponse>,
frontend: &SocketAddr,
cluster_id: &str,
hostname: &str,
Expand Down Expand Up @@ -310,7 +310,7 @@ fn set_up_proxying(
}

fn remove_proxying(
channel: &mut Channel<CommandRequest, CommandResponse>,
channel: &mut Channel<ClientRequest, CommandResponse>,
frontend: &SocketAddr,
cluster_id: &str,
hostname: &str,
Expand Down Expand Up @@ -340,7 +340,7 @@ fn remove_proxying(
}

fn add_certificate(
channel: &mut Channel<CommandRequest, CommandResponse>,
channel: &mut Channel<ClientRequest, CommandResponse>,
frontend: &SocketAddr,
hostname: &str,
certificate_path: &str,
Expand Down Expand Up @@ -391,14 +391,14 @@ fn add_certificate(
}

fn order_command(
channel: &mut Channel<CommandRequest, CommandResponse>,
channel: &mut Channel<ClientRequest, CommandResponse>,
order: ProxyRequestOrder,
) -> anyhow::Result<()> {
let id = generate_id();
channel
.write_message(&CommandRequest::new(
.write_message(&ClientRequest::new(
id.clone(),
CommandRequestOrder::Proxy(Box::new(order.clone())),
RequestContent::Proxy(Box::new(order.clone())),
))
.with_context(|| "Could not write message on the channel")?;

Expand Down
10 changes: 5 additions & 5 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use serde::{Deserialize, Serialize};

use sozu_command_lib::{
command::{
CommandRequest, CommandRequestOrder, CommandResponse, CommandResponseContent,
CommandStatus, Event, RunState,
ClientRequest, CommandResponse, CommandResponseContent, CommandStatus, Event,
RequestContent, RunState,
},
config::Config,
scm_socket::{Listeners, ScmSocket},
Expand Down Expand Up @@ -61,7 +61,7 @@ enum CommandMessage {
},
ClientRequest {
client_id: String,
request: CommandRequest,
request: ClientRequest,
},
WorkerResponse {
worker_id: u32,
Expand Down Expand Up @@ -531,7 +531,7 @@ impl CommandServer {

//FIXME: too many loops, this could be cleaner
for message in self.config.generate_config_messages() {
if let CommandRequestOrder::Proxy(order) = message.order {
if let RequestContent::Proxy(order) = message.content {
if let Err(e) = self.state.dispatch(&order) {
error!("Could not execute order on state: {:#}", e);
}
Expand Down Expand Up @@ -977,7 +977,7 @@ async fn client_loop(
Ok(msg) => msg,
};

match serde_json::from_slice::<CommandRequest>(&message) {
match serde_json::from_slice::<ClientRequest>(&message) {
Err(e) => {
error!("could not decode client message: {:?}", e);
break;
Expand Down
47 changes: 22 additions & 25 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use nom::{Err, HexDisplay, Offset};
use sozu_command_lib::{
buffer::fixed::Buffer,
command::{
CommandRequest, CommandRequestOrder, CommandResponse, CommandResponseContent,
CommandStatus, FrontendFilters, ListedFrontends, ListenersList, RunState, WorkerInfo,
PROTOCOL_VERSION,
ClientRequest, CommandResponse, CommandResponseContent, CommandStatus, FrontendFilters,
ListedFrontends, ListenersList, RequestContent, RunState, WorkerInfo, PROTOCOL_VERSION,
},
config::Config,
logging,
Expand All @@ -42,7 +41,7 @@ impl CommandServer {
pub async fn handle_client_request(
&mut self,
client_id: String,
request: CommandRequest,
request: ClientRequest,
) -> anyhow::Result<Success> {
trace!("Received order {:?}", request);
let request_identifier = RequestIdentifier {
Expand All @@ -51,28 +50,26 @@ impl CommandServer {
};
let cloned_identifier = request_identifier.clone();

let result: anyhow::Result<Option<Success>> = match request.order {
CommandRequestOrder::SaveState { path } => self.save_state(&path).await,
CommandRequestOrder::DumpState => self.dump_state().await,
CommandRequestOrder::ListWorkers => self.list_workers().await,
CommandRequestOrder::ListFrontends(filters) => self.list_frontends(filters).await,
CommandRequestOrder::ListListeners => self.list_listeners(),
CommandRequestOrder::LoadState { path } => {
let result: anyhow::Result<Option<Success>> = match request.content {
RequestContent::SaveState { path } => self.save_state(&path).await,
RequestContent::DumpState => self.dump_state().await,
RequestContent::ListWorkers => self.list_workers().await,
RequestContent::ListFrontends(filters) => self.list_frontends(filters).await,
RequestContent::ListListeners => self.list_listeners(),
RequestContent::LoadState { path } => {
self.load_state(
Some(request_identifier.client),
request_identifier.request,
&path,
)
.await
}
CommandRequestOrder::LaunchWorker(tag) => {
self.launch_worker(request_identifier, &tag).await
}
CommandRequestOrder::UpgradeMain => self.upgrade_main(request_identifier).await,
CommandRequestOrder::UpgradeWorker(worker_id) => {
RequestContent::LaunchWorker(tag) => self.launch_worker(request_identifier, &tag).await,
RequestContent::UpgradeMain => self.upgrade_main(request_identifier).await,
RequestContent::UpgradeWorker(worker_id) => {
self.upgrade_worker(request_identifier, worker_id).await
}
CommandRequestOrder::Proxy(proxy_request_order) => match *proxy_request_order {
RequestContent::Proxy(proxy_request_order) => match *proxy_request_order {
ProxyRequestOrder::ConfigureMetrics(config) => {
self.configure_metrics(request_identifier, config).await
}
Expand All @@ -86,14 +83,14 @@ impl CommandServer {
// but it goes in there instead:
order => self.worker_order(request_identifier, order).await,
},
CommandRequestOrder::SubscribeEvents => {
RequestContent::SubscribeEvents => {
self.event_subscribers.insert(client_id.clone());
Ok(Some(Success::SubscribeEvent(client_id.clone())))
}
CommandRequestOrder::ReloadConfiguration { path } => {
RequestContent::ReloadConfiguration { path } => {
self.reload_configuration(request_identifier, path).await
}
CommandRequestOrder::Status => self.status(request_identifier).await,
RequestContent::Status => self.status(request_identifier).await,
};

// Notify the command server by sending using his command_tx
Expand Down Expand Up @@ -135,9 +132,9 @@ impl CommandServer {

let result: anyhow::Result<usize> = (move || {
for command in orders {
let message = CommandRequest::new(
let message = ClientRequest::new(
format!("SAVE-{counter}"),
CommandRequestOrder::Proxy(Box::new(command)),
RequestContent::Proxy(Box::new(command)),
);

file.write_all(
Expand Down Expand Up @@ -211,7 +208,7 @@ impl CommandServer {
}

let mut offset = 0usize;
match parse_several_commands::<CommandRequest>(buffer.data()) {
match parse_several_commands::<ClientRequest>(buffer.data()) {
Ok((i, requests)) => {
if !i.is_empty() {
debug!("could not parse {} bytes", i.len());
Expand All @@ -233,7 +230,7 @@ impl CommandServer {
}

for request in requests {
if let CommandRequestOrder::Proxy(order) = request.order {
if let RequestContent::Proxy(order) = request.content {
message_counter += 1;

if self.state.dispatch(&order).is_ok() {
Expand Down Expand Up @@ -808,7 +805,7 @@ impl CommandServer {
.await;

for message in new_config.generate_config_messages() {
if let CommandRequestOrder::Proxy(order) = message.order {
if let RequestContent::Proxy(order) = message.content {
if self.state.dispatch(&order).is_ok() {
diff_counter += 1;

Expand Down
38 changes: 19 additions & 19 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use serde::Serialize;

use sozu_command_lib::{
command::{
CommandRequest, CommandRequestOrder, CommandResponse, CommandResponseContent,
CommandStatus, RunState, WorkerInfo,
ClientRequest, CommandResponse, CommandResponseContent, CommandStatus, RequestContent,
RunState, WorkerInfo,
},
worker::{
ProxyRequestOrder, Query, QueryCertificateType, QueryClusterDomain, QueryClusterType,
Expand Down Expand Up @@ -52,9 +52,9 @@ impl CommandManager {
fn send_request(
&mut self,
id: &str,
command_request_order: CommandRequestOrder,
command_request_order: RequestContent,
) -> anyhow::Result<()> {
let command_request = CommandRequest::new(id.to_string(), command_request_order);
let command_request = ClientRequest::new(id.to_string(), command_request_order);

self.channel
.write_message(&command_request)
Expand All @@ -67,26 +67,26 @@ impl CommandManager {
.with_context(|| "Command timeout. The proxy didn't send an answer")
}

pub fn order_command(&mut self, order: CommandRequestOrder) -> Result<(), anyhow::Error> {
pub fn order_command(&mut self, order: RequestContent) -> Result<(), anyhow::Error> {
self.order_command_to_all_workers(order, false)
}

pub fn order_command_with_json(
&mut self,
command_request_order: CommandRequestOrder,
command_request_order: RequestContent,
json: bool,
) -> Result<(), anyhow::Error> {
self.order_command_to_all_workers(command_request_order, json)
}

pub fn order_command_to_all_workers(
&mut self,
command_request_order: CommandRequestOrder,
command_request_order: RequestContent,
json: bool,
) -> Result<(), anyhow::Error> {
let id = generate_id();

let command_request = CommandRequest::new(id, command_request_order);
let command_request = ClientRequest::new(id, command_request_order);

println!("Sending command : {command_request:?}");

Expand Down Expand Up @@ -154,7 +154,7 @@ impl CommandManager {

let id = generate_tagged_id("LIST-WORKERS");

self.send_request(&id, CommandRequestOrder::ListWorkers)?;
self.send_request(&id, RequestContent::ListWorkers)?;

loop {
let response = self.read_channel_message_with_timeout()?;
Expand Down Expand Up @@ -186,7 +186,7 @@ impl CommandManager {
println!();

let id = generate_tagged_id("UPGRADE-MAIN");
self.send_request(&id, CommandRequestOrder::UpgradeMain)?;
self.send_request(&id, RequestContent::UpgradeMain)?;

println!("Upgrading main process");

Expand Down Expand Up @@ -257,7 +257,7 @@ impl CommandManager {
let id = generate_id();

//FIXME: we should be able to soft stop one specific worker
self.send_request(&id, CommandRequestOrder::UpgradeWorker(worker_id))?;
self.send_request(&id, RequestContent::UpgradeWorker(worker_id))?;

loop {
let response = self.read_channel_message_with_timeout()?;
Expand Down Expand Up @@ -290,14 +290,14 @@ impl CommandManager {
cluster_ids: Vec<String>,
backend_ids: Vec<String>,
) -> Result<(), anyhow::Error> {
let command = CommandRequestOrder::Proxy(Box::new(ProxyRequestOrder::Query(
Query::Metrics(QueryMetricsOptions {
let command = RequestContent::Proxy(Box::new(ProxyRequestOrder::Query(Query::Metrics(
QueryMetricsOptions {
list,
cluster_ids,
backend_ids,
metric_names,
}),
)));
},
))));

// a loop to reperform the query every refresh time
loop {
Expand Down Expand Up @@ -365,7 +365,7 @@ impl CommandManager {
}

let command = if let Some(ref cluster_id) = cluster_id {
CommandRequestOrder::Proxy(Box::new(ProxyRequestOrder::Query(Query::Clusters(
RequestContent::Proxy(Box::new(ProxyRequestOrder::Query(Query::Clusters(
QueryClusterType::ClusterId(cluster_id.to_string()),
))))
} else if let Some(ref domain) = domain {
Expand All @@ -384,11 +384,11 @@ impl CommandManager {
path: splitted.get(1).cloned().map(|path| format!("/{path}")), // We add the / again because of the splitn removing it
};

CommandRequestOrder::Proxy(Box::new(ProxyRequestOrder::Query(Query::Clusters(
RequestContent::Proxy(Box::new(ProxyRequestOrder::Query(Query::Clusters(
QueryClusterType::Domain(query_domain),
))))
} else {
CommandRequestOrder::Proxy(Box::new(ProxyRequestOrder::Query(Query::ClustersHashes)))
RequestContent::Proxy(Box::new(ProxyRequestOrder::Query(Query::ClustersHashes)))
};

let id = generate_id();
Expand Down Expand Up @@ -440,7 +440,7 @@ impl CommandManager {
}
};

let command = CommandRequestOrder::Proxy(Box::new(ProxyRequestOrder::Query(
let command = RequestContent::Proxy(Box::new(ProxyRequestOrder::Query(
Query::Certificates(query),
)));

Expand Down
6 changes: 3 additions & 3 deletions bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::Context;

use sozu_command_lib::{
channel::Channel,
command::{CommandRequest, CommandResponse},
command::{ClientRequest, CommandResponse},
config::Config,
};

Expand All @@ -18,7 +18,7 @@ use crate::{
};

pub struct CommandManager {
channel: Channel<CommandRequest, CommandResponse>,
channel: Channel<ClientRequest, CommandResponse>,
timeout: Duration,
config: Config,
}
Expand Down Expand Up @@ -150,7 +150,7 @@ impl CommandManager {
}

/// creates a blocking channel
pub fn create_channel(config: &Config) -> anyhow::Result<Channel<CommandRequest, CommandResponse>> {
pub fn create_channel(config: &Config) -> anyhow::Result<Channel<ClientRequest, CommandResponse>> {
let mut channel = Channel::from_path(
&config.command_socket_path()?,
config.command_buffer_size,
Expand Down

0 comments on commit efb9c5d

Please sign in to comment.