Skip to content

Commit

Permalink
handle_client_message returns Result<OrderSuccess>
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent fba2f8e commit c4ec939
Show file tree
Hide file tree
Showing 2 changed files with 375 additions and 339 deletions.
253 changes: 191 additions & 62 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use sozu_command::command::{
};
use sozu_command::config::Config;

use sozu_command::proxy::{ProxyRequest, ProxyRequestData, ProxyResponseData, ProxyResponseStatus};
use sozu_command::proxy::{
ProxyRequest, ProxyRequestData, ProxyResponse, ProxyResponseData, ProxyResponseStatus,
};
use sozu_command::scm_socket::{Listeners, ScmSocket};
use sozu_command::state::ConfigState;

Expand Down Expand Up @@ -118,14 +120,17 @@ impl CommandServer {
let fd = sock.into_raw_fd();
UnixStream::from_raw_fd(fd)
})
.unwrap();
.with_context(|| "Could not get a unix stream from the file descriptor")?;

let id = worker.id;
let command_tx = command_tx.clone();
smol::spawn(async move {
worker_loop(id, stream, command_tx, worker_rx)
if worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
.is_err()
{
error!("The worker loop of worker {} crashed", id);
}
})
.detach();
}
Expand Down Expand Up @@ -507,7 +512,7 @@ impl CommandServer {
Ok(())
}

async fn handle_worker_close(&mut self, id: u32) {
async fn handle_worker_close(&mut self, id: u32) -> anyhow::Result<OrderSuccess> {
info!("removing worker {}", id);

if let Some(w) = self.workers.iter_mut().filter(|w| w.id == id).next() {
Expand All @@ -518,14 +523,14 @@ impl CommandServer {
Ok(()) => info!("Worker {} has automatically restarted!", id),
Err(e) => error!("Could not restart worker {}: {}", id, e),
}
return;
return Ok(OrderSuccess::WorkerRestarted(id));
}

info!("Closing the worker {}.", w.id);
if !w.the_pid_is_alive() {
info!("Worker {} is dead, setting to Stopped.", w.id);
w.run_state = RunState::Stopped;
return;
return Ok(OrderSuccess::WorkerStopped(id));
}

info!("Worker {} is not dead but should be. Let's kill it.", w.id);
Expand All @@ -534,10 +539,68 @@ impl CommandServer {
Ok(()) => {
info!("Worker {} was successfuly killed", id);
w.run_state = RunState::Stopped;
return Ok(OrderSuccess::WorkerKilled(id));
}
Err(e) => {
return Err(e).with_context(|| "failed to kill the worker process");
}
}
}
bail!(format!("Could not find worker {}", id))
}

async fn handle_worker_response(
&mut self,
id: u32,
message: ProxyResponse,
) -> anyhow::Result<OrderSuccess> {
debug!("worker {} sent back {:?}", id, message);
if let Some(ProxyResponseData::Event(data)) = message.data {
let event: Event = data.into();
for client_id in self.event_subscribers.iter() {
if let Some(tx) = self.clients.get_mut(client_id) {
let event = CommandResponse::new(
message.id.to_string(),
CommandStatus::Processing,
format!("{}", id),
Some(CommandResponseData::Event(event.clone())),
);
tx.send(event).await.with_context(|| {
format!("could not send message to client {}", client_id)
})?
}
}
return Ok(OrderSuccess::PropagatedWorkerEvent);
}
match self.in_flight.remove(&message.id) {
None => {
// FIXME: this messsage happens a lot at startup because AddCluster
// messages receive responses from each of the HTTP, HTTPS and TCP
// proxys. The clusters list should be merged
debug!("unknown message id: {}", message.id);
}
Some((mut tx, mut nb)) => {
let message_id = message.id.clone();

// if a worker returned Ok or Error, we're not expecting any more
// messages with this id from it
match message.status {
ProxyResponseStatus::Ok | ProxyResponseStatus::Error(_) => {
nb -= 1;
}
_ => {}
};

if tx.send(message.clone()).await.is_err() {
error!("Failed to send message: {}", message);
};

if nb > 0 {
self.in_flight.insert(message_id, (tx, nb));
}
Err(e) => error!("failed to kill the worker process: {:?}", e),
}
}
Ok(OrderSuccess::WorkerResponse)
}
}

Expand Down Expand Up @@ -635,7 +698,8 @@ pub fn start(
if let Some(path) = saved_state_path {
server
.load_state(None, "INITIALIZATION".to_string(), &path)
.await?;
.await
.with_context(|| format!("Loading {:?} failed", &path))?;
}
gauge!("configuration.clusters", server.state.clusters.len());
gauge!("configuration.backends", server.backends_count);
Expand Down Expand Up @@ -672,75 +736,60 @@ enum CommandMessage {
impl CommandServer {
pub async fn run(&mut self) {
while let Some(msg) = self.command_rx.next().await {
match msg {
let result: anyhow::Result<OrderSuccess> = match msg {
CommandMessage::ClientNew { id, sender } => {
debug!("adding new client {}", id);
self.clients.insert(id, sender);
self.clients.insert(id.to_owned(), sender);
Ok(OrderSuccess::ClientNew(id))
}
CommandMessage::ClientClose { id } => {
debug!("removing client {}", id);
self.clients.remove(&id);
self.event_subscribers.remove(&id);
Ok(OrderSuccess::ClientClose(id))
}
CommandMessage::ClientRequest { id, message } => {
debug!("client {} sent {:?}", id, message);
match self.handle_client_message(id, message).await {
Ok(()) => {}
Err(e) => error!("{}", e),
}
self.handle_client_message(id, message).await
}
CommandMessage::WorkerClose { id } => {
self.handle_worker_close(id).await;
CommandMessage::WorkerClose { id } => self
.handle_worker_close(id)
.await
.with_context(|| "Could not close worker"),
CommandMessage::WorkerResponse { id, message } => self
.handle_worker_response(id, message)
.await
.with_context(|| "Could not handle worker response"),
CommandMessage::MasterStop => {
info!("stopping main process");
Ok(OrderSuccess::MasterStop)
}
CommandMessage::WorkerResponse { id, message } => {
debug!("worker {} sent back {:?}", id, message);
if let Some(ProxyResponseData::Event(data)) = message.data {
let event: Event = data.into();
for client_id in self.event_subscribers.iter() {
if let Some(tx) = self.clients.get_mut(client_id) {
let event = CommandResponse::new(
message.id.to_string(),
CommandStatus::Processing,
format!("{}", id),
Some(CommandResponseData::Event(event.clone())),
);
if let Err(e) = tx.send(event).await {
error!("could not send message to client: {:?}", e);
}
}
};

match result {
Ok(order_success) => {
info!("Order OK: {}", order_success);

// perform shutdowns
match order_success {
OrderSuccess::UpgradeMain(_) => {
// the main process has to shutdown after the other has launched successfully
//FIXME: should do some cleanup before exiting
std::thread::sleep(std::time::Duration::from_secs(2));
std::process::exit(0);
}
} else {
match self.in_flight.remove(&message.id) {
None => {
// FIXME: this messsage happens a lot at startup because AddCluster
// messages receive responses from each of the HTTP, HTTPS and TCP
// proxys. The clusters list should be merged
debug!("unknown message id: {}", message.id);
}
Some((mut tx, mut nb)) => {
let message_id = message.id.clone();

// if a worker returned Ok or Error, we're not expecting any more
// messages with this id from it
match message.status {
ProxyResponseStatus::Ok | ProxyResponseStatus::Error(_) => {
nb -= 1;
}
_ => {}
};

tx.send(message).await.unwrap();

if nb > 0 {
self.in_flight.insert(message_id, (tx, nb));
}
}
OrderSuccess::MasterStop => {
// breaking the loop brings run() to return and ends Sōzu
// shouldn't we have the same break for both shutdowns?

break;
}
_ => {}
}
}
CommandMessage::MasterStop => {
info!("stopping main process");
break;
Err(error) => {
// log the error on the main process without stopping it
error!("Failed order: {:#?}", error);
}
}
}
Expand Down Expand Up @@ -916,3 +965,83 @@ async fn worker_loop(

Ok(())
}

pub enum OrderSuccess {
ClientClose(String),
ClientNew(String),
ClientRequest,
DumpState(CommandResponseData), // contains the cloned state
LaunchWorker(u32),
ListFrontends(CommandResponseData),
ListWorkers(CommandResponseData),
LoadState(String, usize, usize), // path, ok, errors
MasterStop,
// this should contain CommandResponseData but the logic does not return anything
// is this logic gone into sozu_command_lib::proxy::Query::Metrics(_) ?
Metrics,
PropagatedWorkerEvent,
Query(CommandResponseData), // same remark
ReloadConfiguration,
SaveState(usize, String),
SubscribeEvent(String),
UpgradeMain(i32),
UpgradeWorker(u32),
WorkerKilled(u32),
WorkerOrder(Option<u32>),
WorkerResponse,
WorkerRestarted(u32),
WorkerStopped(u32),
}

impl std::fmt::Display for OrderSuccess {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::ClientClose(id) => write!(f, "Close client: {}", id),
Self::ClientNew(id) => write!(f, "New client successfully added: {}", id),
Self::ClientRequest => write!(f, "Successfully executed client request"),
Self::DumpState(_) => write!(f, "Successfully gathered state from the main process"),
Self::LaunchWorker(worker_id) => {
write!(f, "Successfully launched worker {}", worker_id)
}
Self::ListFrontends(_) => write!(f, "Successfully gathered the list of frontends"),
Self::ListWorkers(_) => write!(f, "Listed all workers"),
Self::LoadState(path, ok, error) => write!(
f,
"Successfully loaded state from path {}, {} ok messages, {} errors",
path, ok, error
),
Self::MasterStop => write!(f, "stopping main process"),
Self::Metrics => write!(f, "Successfully fetched the metrics"),
Self::PropagatedWorkerEvent => {
write!(f, "Sent worker response to all subscribing clients")
}
Self::Query(_) => write!(f, "Ran the query successfully"),
Self::ReloadConfiguration => write!(f, "Successfully reloaded configuration"),
Self::SaveState(counter, path) => {
write!(f, "saved {} config messages to {}", counter, path)
}
Self::SubscribeEvent(client_id) => {
write!(f, "Successfully Added {} to subscribers", client_id)
}
Self::UpgradeMain(pid) => write!(
f,
"new main process launched with pid {}, closing the old one",
pid
),
Self::UpgradeWorker(id) => {
write!(f, "Successfully upgraded worker with new id: {}", id)
}
Self::WorkerKilled(id) => write!(f, "Successfully killed worker {}", id),
Self::WorkerOrder(worker) => {
if let Some(worker_id) = worker {
write!(f, "Successfully executed the order on worker {}", worker_id)
} else {
write!(f, "Successfully executed the order on worker")
}
}
Self::WorkerResponse => write!(f, "Successfully handled worker response"),
Self::WorkerRestarted(id) => write!(f, "Successfully restarted worker {}", id),
Self::WorkerStopped(id) => write!(f, "Successfully stopped worker {}", id),
}
}
}

0 comments on commit c4ec939

Please sign in to comment.