Skip to content

Commit

Permalink
comments in bin imports, functions and struct fields, refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Emmanuel Bosquet committed Sep 2, 2022
1 parent 2f44ccf commit 8f2f1c0
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 84 deletions.
64 changes: 35 additions & 29 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sozu_command_lib::{
use sozu::metrics::METRICS;

use crate::{
command::{CommandMessage, CommandServer, RequestIdentifier, Response, Success},
command::{CommandMessage, CommandServer, RequestIdentifier, Response, Success, Worker},
upgrade::start_new_main_process,
worker::start_worker,
};
Expand Down Expand Up @@ -319,23 +319,26 @@ impl CommandServer {
};

// notify the command server
if error == 0 {
return_success(
command_tx,
request_identifier,
Success::LoadState(path.to_string(), ok, error),
)
.await;
} else {
return_error(
command_tx,
request_identifier,
format!(
"Loading state failed, ok: {}, error: {}, path: {}",
ok, error, path
),
)
.await;
match error {
0 => {
return_success(
command_tx,
request_identifier,
Success::LoadState(path.to_string(), ok, error),
)
.await;
}
_ => {
return_error(
command_tx,
request_identifier,
format!(
"Loading state failed, ok: {}, error: {}, path: {}",
ok, error, path
),
)
.await;
}
}
})
.detach();
Expand Down Expand Up @@ -519,15 +522,15 @@ impl CommandServer {
.with_context(|| "Could not start a new main process")?;

channel.set_blocking(true);
let res = channel.read_message();
debug!("upgrade channel sent {:?}", res);
let received_ok_from_new_process = channel.read_message();
debug!("upgrade channel sent {:?}", received_ok_from_new_process);

// signaling the accept loop that it should stop
if let Err(e) = self.accept_cancel.take().unwrap().send(()) {
error!("could not close the accept loop: {:?}", e);
}

match res {
match received_ok_from_new_process {
Some(true) => {
info!("wrote final message, closing");
Ok(Some(Success::UpgradeMain(pid)))
Expand Down Expand Up @@ -602,7 +605,7 @@ impl CommandServer {

let mut listeners = None;
{
let old_worker = self
let old_worker: &mut Worker = self
.workers
.iter_mut()
.filter(|worker| worker.id == id)
Expand Down Expand Up @@ -650,15 +653,18 @@ impl CommandServer {
loop {
info!("waiting for scm sockets");
old_worker.scm.set_blocking(true);
if let Some(l) = old_worker.scm.receive_listeners() {
listeners = Some(l);
break;
} else {
counter += 1;
if counter == 50 {
match old_worker.scm.receive_listeners() {
Some(l) => {
listeners = Some(l);
break;
}
std::thread::sleep(Duration::from_millis(100));
None => {
counter += 1;
if counter == 50 {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}
}
info!("got scm sockets");
Expand Down
1 change: 1 addition & 0 deletions bin/src/command/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use sozu_command_lib::{
pub struct Worker {
pub id: u32,
pub fd: i32,
/// for the worker to receive and respond to the main process
pub channel: Option<Channel<ProxyRequest, ProxyResponse>>,
pub pid: pid_t,
pub run_state: RunState,
Expand Down
8 changes: 8 additions & 0 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[macro_use]
mod logging;

/// the arguments to the sozu command line
mod cli;
/// Receives orders from the CLI, transmits to workers
mod command;
/// The command line logic
mod ctl;
/// Forking & restarting the main process
mod upgrade;
/// Some unix helper functions
mod util;
/// Start and restart the worker UNIX processes
mod worker;

use std::panic;
Expand All @@ -52,6 +58,7 @@ fn main(args: Args) -> anyhow::Result<()> {
info!("main process stopped");
Ok(())
}
// this is used only by the CLI when upgrading
cli::SubCmd::Worker {
fd,
scm,
Expand All @@ -71,6 +78,7 @@ fn main(args: Args) -> anyhow::Result<()> {
max_command_buffer_size,
)
}
// this is used only by the CLI when upgrading
cli::SubCmd::Main {
fd,
upgrade_fd,
Expand Down
115 changes: 60 additions & 55 deletions command/src/scm_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,70 +68,75 @@ impl ScmSocket {
self.send_msg(&message, &v)
}

// Todo: return anyhow::Result instead of Option
pub fn receive_listeners(&self) -> Option<Listeners> {
let mut buf = vec![0; MAX_BYTES_OUT];

let mut received_fds: [RawFd; MAX_FDS_OUT] = [0; MAX_FDS_OUT];

match self.rcv_msg(&mut buf, &mut received_fds) {
let (size, file_descriptor_length) = match self.rcv_msg(&mut buf, &mut received_fds) {
Ok((s, f)) => (s, f),
Err(e) => {
error!("could not receive listeners (from fd {}): {:?}", self.fd, e);
None
return None;
}
Ok((sz, fds_len)) => {
//println!("{} received :{:?}", self.fd, (sz, fds_len));
match from_utf8(&buf[..sz]) {
Ok(s) => match serde_json::from_str::<ListenersCount>(s) {
Err(e) => {
error!(
"could not parse listeners list (from fd {}): {:?}",
self.fd, e
);
None
}
Ok(mut listeners_count) => {
let mut index = 0;
let len = listeners_count.http.len();
let mut http = Vec::new();
http.extend(
listeners_count
.http
.drain(..)
.zip((&received_fds[index..index + len]).iter().cloned()),
);

index += len;
let len = listeners_count.tls.len();
let mut tls = Vec::new();
tls.extend(
listeners_count
.tls
.drain(..)
.zip((&received_fds[index..index + len]).iter().cloned()),
);

index += len;
let mut tcp = Vec::new();
tcp.extend(
listeners_count
.tcp
.drain(..)
.zip((&received_fds[index..fds_len]).iter().cloned()),
);

Some(Listeners { http, tls, tcp })
}
},
Err(e) => {
error!(
"could not parse listeners list (from fd {}): {:?}",
self.fd, e
);
None
}
}
};

//println!("{} received :{:?}", self.fd, (sz, fds_len));

let raw_listener_list = match from_utf8(&buf[..size]) {
Ok(s) => s,
Err(e) => {
error!(
"could not parse listeners list (from fd {}): {:?}",
self.fd, e
);
return None;
}
}
};

let mut listeners_count = match serde_json::from_str::<ListenersCount>(raw_listener_list) {
Err(e) => {
error!(
"could not parse listeners list (from fd {}): {:?}",
self.fd, e
);
return None;
}
Ok(lc) => lc,
};

let mut index = 0;
let len = listeners_count.http.len();
let mut http = Vec::new();
http.extend(
listeners_count
.http
.drain(..)
.zip((&received_fds[index..index + len]).iter().cloned()),
);

index += len;
let len = listeners_count.tls.len();
let mut tls = Vec::new();
tls.extend(
listeners_count
.tls
.drain(..)
.zip((&received_fds[index..index + len]).iter().cloned()),
);

index += len;
let mut tcp = Vec::new();
tcp.extend(
listeners_count.tcp.drain(..).zip(
(&received_fds[index..file_descriptor_length])
.iter()
.cloned(),
),
);

Some(Listeners { http, tls, tcp })
}

pub fn send_msg(&self, buf: &[u8], fds: &[RawFd]) -> NixResult<()> {
Expand Down

0 comments on commit 8f2f1c0

Please sign in to comment.