Skip to content

Commit

Permalink
proper handling of WorkerClose by the CommandServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj authored and Geal committed Aug 18, 2021
1 parent 863ee59 commit 6b24a24
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 106 deletions.
194 changes: 88 additions & 106 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,123 +342,102 @@ impl CommandServer {
gauge!("configuration.frontends", self.frontends_count);
}

// what is this supposed to do anyway
pub async fn check_worker_status(&mut self, worker_id: u32) {
// in case a worker has crashed while Running and automatic_worker_restart is set to true
pub async fn restart_worker(&mut self, worker_id: u32) {
let ref mut worker = self
.workers
.get_mut(worker_id as usize)
.expect("there should be a worker at that token");
let res = kill(Pid::from_raw(worker.pid), None);

if res.is_ok() {
// this is redundant, the whole function is called only
// if run_state is Stopping
if worker.run_state == RunState::Running {
match kill(Pid::from_raw(worker.pid), None) {
Ok(_) => {
error!(
"worker process {} (PID = {}) not answering, killing and replacing",
"worker process {} (PID = {}) is alive but the worker must have crashed. Killing and replacing",
worker.id, worker.pid
);
if let Err(e) = kill(Pid::from_raw(worker.pid), Signal::SIGKILL) {
error!("failed to kill the worker process: {:?}", e);
}
} else {
return;
}
} else {
// this is redundant, the whole function is called only
// if run_state is Stopping
if worker.run_state == RunState::Running {
},
Err(_) => {
error!(
"worker process {} (PID = {}) stopped running, replacing",
worker.id, worker.pid
);
} else if worker.run_state == RunState::Stopping {
info!(
"worker process {} (PID = {}) not detected, assuming it stopped",
"worker process {} (PID = {}) not answering, killing and replacing",
worker.id, worker.pid
);
worker.run_state = RunState::Stopped;
return;
} else {
error!("failed to check process status: {:?}", res);
return;
}
}

worker.run_state = RunState::Stopping;

// this is redundant, the whole function is called only when this config variable
// is set to true
if self.config.worker_automatic_restart {
incr!("worker_restart");

let id = self.next_id;
let listeners = Some(Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
});

if let Ok(mut worker) = start_worker(
id,
&self.config,
self.executable_path.clone(),
&self.state,
listeners,
) {
info!("created new worker: {}", id);
self.next_id += 1;

let sock = worker.channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);
kill(Pid::from_raw(worker.pid), Signal::SIGKILL)
.map_err(|e| error!("failed to kill the worker process: {:?}", e))
.unwrap();

let stream = Async::new(unsafe {
let fd = sock.into_raw_fd();
UnixStream::from_raw_fd(fd)
})
.unwrap();
worker.run_state = RunState::Stopped;

let id = worker.id;
let command_tx = self.command_tx.clone();
smol::spawn(async move {
worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
})
.detach();
incr!("worker_restart");

let mut count = 0usize;
let mut orders = self.state.generate_activate_orders();
for order in orders.drain(..) {
if let Err(e) = worker
.sender
.as_mut()
.unwrap()
.send(ProxyRequest {
id: format!("RESTART-{}-ACTIVATE-{}", id, count),
order,
})
.await {
error!("could not send activate order to worker {:?}: {:?}", worker.id, e);
}
count += 1;
}
let id = self.next_id;
let listeners = Some(Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
});

if let Ok(mut worker) = start_worker(
id,
&self.config,
self.executable_path.clone(),
&self.state,
listeners,
) {
info!("created new worker: {}", id);
self.next_id += 1;

let sock = worker.channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);

let stream = Async::new(unsafe {
let fd = sock.into_raw_fd();
UnixStream::from_raw_fd(fd)
})
.unwrap();

let id = worker.id;
let command_tx = self.command_tx.clone();
smol::spawn(async move {
worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
})
.detach();

let mut count = 0usize;
let mut orders = self.state.generate_activate_orders();
for order in orders.drain(..) {
if let Err(e) = worker
.sender
.as_mut()
.unwrap()
.send(ProxyRequest {
id: format!("RESTART-{}-STATUS", id),
order: ProxyRequestData::Status,
id: format!("RESTART-{}-ACTIVATE-{}", id, count),
order,
})
.await{
error!("could not send status message to worker {:?}: {:?}", worker.id, e);
}
.await {
error!("could not send activate order to worker {:?}: {:?}", worker.id, e);
}
count += 1;
}

self.workers.push(worker);
if let Err(e) = worker
.sender
.as_mut()
.unwrap()
.send(ProxyRequest {
id: format!("RESTART-{}-STATUS", id),
order: ProxyRequestData::Status,
})
.await{
error!("could not send status message to worker {:?}: {:?}", worker.id, e);
}

self.workers.push(worker);
}
}
}
Expand Down Expand Up @@ -613,22 +592,25 @@ impl CommandServer {
CommandMessage::WorkerClose { id } => {
info!("removing worker {}", id);
if let Some(w) = self.workers.iter_mut().filter(|w| w.id == id).next() {
// there is little chance the run state would be Running,
// the upgrade_worker() method sends a WorkerClose only after setting
// the run state to Stopping
if self.config.worker_automatic_restart && w.run_state == RunState::Running
{
// we should rename this for clarity
self.check_worker_status(id).await;
// In case a worker crashes and should be restarted
if self.config.worker_automatic_restart && w.run_state == RunState::Running {
self.restart_worker(id).await;
return;
}

info!("Closing the worker {}.", w.id);
if !w.the_pid_is_alive() {
info!("Worker {} is dead, setting to Stopped.", w.id);
w.run_state = RunState::Stopped;
return;
}
// we could add some stuff to ensure the worker is down in other cases
// but mainly we SHOULD set the worker's run state to Stopped at some point
// suggestion:
// else {
// info!("Setting the worker to stopped");
// self.ensure_that_the_worker_pid_is_actually_dead(id);
// w.run_state = RunState::Stopped;
// }

info!("Worker {} is not dead but should be. Let's kill it.", w.id);
kill(Pid::from_raw(w.pid), Signal::SIGKILL)
.map_err(|e| error!("failed to kill the worker process: {:?}", e))
.unwrap();

w.run_state = RunState::Stopped;
}
}
CommandMessage::WorkerResponse { id, message } => {
Expand Down
10 changes: 10 additions & 0 deletions bin/src/command/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use libc::pid_t;
use std::collections::VecDeque;
use std::fmt;
use std::os::unix::io::AsRawFd;
use nix::sys::signal::kill;
use nix::unistd::Pid;

use sozu_command::channel::Channel;
use sozu_command::command::RunState;
Expand Down Expand Up @@ -54,6 +56,14 @@ impl Worker {
}
}

pub fn the_pid_is_alive(&self) -> bool {
// send a kill -0 to check on the pid, if it's dead it should be an error
match kill(Pid::from_raw(self.pid), None) {
Ok(_) => true,
Err(_) => false,
}
}

/*
pub fn push_message(&mut self, message: ProxyRequest) {
self.queue.push_back(message);
Expand Down
1 change: 1 addition & 0 deletions ctl/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ pub fn metrics(mut channel: Channel<CommandRequest,CommandResponse>, json: bool)
));
//println!("message sent");

// we should add a timeout somehow, otherwise it hangs
loop {
match channel.read_message() {
None => {
Expand Down

0 comments on commit 6b24a24

Please sign in to comment.