Skip to content

Commit

Permalink
rename fields to better detailed names
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Sep 2, 2022
1 parent a30eb4e commit 0955dcd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 65 deletions.
14 changes: 7 additions & 7 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl CommandServer {
let state: ConfigState = Default::default();

for worker in workers.iter_mut() {
let sock = worker.channel.take().unwrap().sock;
let sock = worker.command_channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);

Expand Down Expand Up @@ -475,14 +475,14 @@ impl CommandServer {
.detach();

Some(Worker {
fd: serialized.fd,
command_channel_fd: serialized.fd,
id: serialized.id,
channel: None,
command_channel: None,
sender,
pid: serialized.pid,
run_state: serialized.run_state.clone(),
queue: serialized.queue.clone().into(),
scm: ScmSocket::new(serialized.scm),
scm_socket: ScmSocket::new(serialized.scm),
})
})
.collect();
Expand Down Expand Up @@ -515,7 +515,7 @@ impl CommandServer {
pub fn disable_cloexec_before_upgrade(&mut self) -> anyhow::Result<()> {
for ref mut worker in self.workers.iter_mut() {
if worker.run_state == RunState::Running {
let _ = util::disable_close_on_exec(worker.fd).map_err(|e| {
let _ = util::disable_close_on_exec(worker.command_channel_fd).map_err(|e| {
error!(
"could not disable close on exec for worker {}: {}",
worker.id, e
Expand All @@ -534,7 +534,7 @@ impl CommandServer {
pub fn enable_cloexec_after_upgrade(&mut self) -> anyhow::Result<()> {
for ref mut worker in self.workers.iter_mut() {
if worker.run_state == RunState::Running {
let _ = util::enable_close_on_exec(worker.fd).map_err(|e| {
let _ = util::enable_close_on_exec(worker.command_channel_fd).map_err(|e| {
error!(
"could not enable close on exec for worker {}: {}",
worker.id, e
Expand Down Expand Up @@ -673,7 +673,7 @@ impl CommandServer {
info!("created new worker: {}", id);
self.next_id += 1;

let sock = worker.channel.take().unwrap().sock;
let sock = worker.command_channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10_000);
worker.sender = Some(worker_tx);

Expand Down
14 changes: 7 additions & 7 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl CommandServer {

self.next_id += 1;

let sock = worker.channel.take().unwrap().sock;
let sock = worker.command_channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);

Expand All @@ -476,7 +476,7 @@ impl CommandServer {

info!(
"sending listeners: to the new worker: {:?}",
worker.scm.send_listeners(&Listeners {
worker.scm_socket.send_listeners(&Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
Expand Down Expand Up @@ -588,7 +588,7 @@ impl CommandServer {

self.next_id += 1;

let sock = worker.channel.take().unwrap().sock;
let sock = worker.command_channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);

Expand Down Expand Up @@ -652,16 +652,16 @@ impl CommandServer {
//FIXME: use blocking
loop {
info!("waiting for scm sockets");
old_worker.scm.set_blocking(true);
match old_worker.scm.receive_listeners() {
old_worker.scm_socket.set_blocking(true);
match old_worker.scm_socket.receive_listeners() {
Ok(l) => {
listeners = Some(l);
break;
}
Err(error) => {
error!(
"Could not receive listerners from scm socket with file descriptor {}:\n{:?}",
old_worker.scm.fd, error
old_worker.scm_socket.fd, error
);
counter += 1;
if counter == 50 {
Expand Down Expand Up @@ -720,7 +720,7 @@ impl CommandServer {
Some(l) => {
info!(
"sending listeners: to the new worker: {:?}",
worker.scm.send_listeners(&l)
worker.scm_socket.send_listeners(&l)
);
l.close();
}
Expand Down
20 changes: 11 additions & 9 deletions bin/src/command/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,35 @@ use sozu_command_lib::{

pub struct Worker {
pub id: u32,
pub fd: i32,
/// for the worker to receive and respond to the main process
pub channel: Option<Channel<ProxyRequest, ProxyResponse>>,
/// for the worker to receive requests and respond to the main process
pub command_channel: Option<Channel<ProxyRequest, ProxyResponse>>,
/// file descriptor of the command channel
pub command_channel_fd: i32,
pub pid: pid_t,
pub run_state: RunState,
pub queue: VecDeque<ProxyRequest>,
pub scm: ScmSocket,
/// used to receive listeners
pub scm_socket: ScmSocket,
pub sender: Option<futures::channel::mpsc::Sender<ProxyRequest>>,
}

impl Worker {
pub fn new(
id: u32,
pid: pid_t,
channel: Channel<ProxyRequest, ProxyResponse>,
scm: ScmSocket,
command_channel: Channel<ProxyRequest, ProxyResponse>,
scm_socket: ScmSocket,
_: &Config,
) -> Worker {
Worker {
id,
fd: channel.sock.as_raw_fd(),
channel: Some(channel),
command_channel_fd: command_channel.sock.as_raw_fd(),
command_channel: Some(command_channel),
sender: None,
pid,
run_state: RunState::Running,
queue: VecDeque::new(),
scm,
scm_socket,
}
}

Expand Down
4 changes: 2 additions & 2 deletions bin/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ pub struct SerializedWorker {
impl SerializedWorker {
pub fn from_worker(worker: &Worker) -> SerializedWorker {
SerializedWorker {
fd: worker.fd,
fd: worker.command_channel_fd,
pid: worker.pid,
id: worker.id,
run_state: worker.run_state,
//token: worker.token.clone().map(|Token(t)| t),
queue: worker.queue.clone().into(),
scm: worker.scm.raw_fd(),
scm: worker.scm_socket.raw_fd(),
}
}
}
Expand Down
79 changes: 40 additions & 39 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,26 @@ pub fn start_workers(executable_path: String, config: &Config) -> anyhow::Result
tcp: Vec::new(),
});

let (pid, command, scm) = start_worker_process(
let (pid, command_channel, scm_socket) = start_worker_process(
&index.to_string(),
config,
executable_path.clone(),
&state,
listeners,
)?;
let mut w = Worker::new(index as u32, pid, command, scm, config);
let mut worker = Worker::new(index as u32, pid, command_channel, scm_socket, config);

// the new worker expects a status message at startup
if let Some(channel) = w.channel.as_mut() {
channel.set_blocking(true);
channel.write_message(&ProxyRequest {
if let Some(command_channel) = worker.command_channel.as_mut() {
command_channel.set_blocking(true);
command_channel.write_message(&ProxyRequest {
id: format!("start-status-{}", index),
order: ProxyRequestData::Status,
});
channel.set_nonblocking(true);
command_channel.set_nonblocking(true);
}
workers.push(w);

workers.push(worker);
}
info!("Created workers");
Ok(workers)
Expand All @@ -89,26 +91,26 @@ pub fn start_worker(
}

pub fn begin_worker_process(
fd: i32,
scm: i32,
command_socket_fd: i32,
scm_socket_fd: i32,
configuration_state_fd: i32,
id: i32,
command_buffer_size: usize,
max_command_buffer_size: usize,
) -> Result<(), anyhow::Error> {
let mut command: Channel<ProxyResponse, Config> = Channel::new(
unsafe { UnixStream::from_raw_fd(fd) },
let mut command_channel: Channel<ProxyResponse, Config> = Channel::new(
unsafe { UnixStream::from_raw_fd(command_socket_fd) },
command_buffer_size,
max_command_buffer_size,
);

command.set_nonblocking(false);
command_channel.set_nonblocking(false);

let configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };
let config_state: ConfigState = serde_json::from_reader(configuration_state_file)
.with_context(|| "could not parse configuration state data")?;

let worker_config = command
let worker_config = command_channel
.read_message()
.with_context(|| "worker could not read configuration from socket")?;
//println!("got message: {:?}", worker_config);
Expand Down Expand Up @@ -136,9 +138,9 @@ pub fn begin_worker_process(
);
info!("worker {} starting...", id);

command.set_nonblocking(true);
let mut command: Channel<ProxyResponse, ProxyRequest> = command.into();
command.readiness.insert(Ready::readable());
command_channel.set_nonblocking(true);
let mut command_channel: Channel<ProxyResponse, ProxyRequest> = command_channel.into();
command_channel.readiness.insert(Ready::readable());

if let Some(metrics) = worker_config.metrics.as_ref() {
metrics::setup(
Expand All @@ -150,8 +152,8 @@ pub fn begin_worker_process(
}

let mut server = Server::new_from_config(
command,
ScmSocket::new(scm),
command_channel,
ScmSocket::new(scm_socket_fd),
worker_config,
config_state,
true,
Expand Down Expand Up @@ -182,39 +184,38 @@ pub fn start_worker_process(
.seek(SeekFrom::Start(0))
.with_context(|| "could not seek to beginning of file")?;

let (server, client) = UnixStream::pair()?;
let (scm_server_fd, scm_client) = UnixStream::pair()?;
let (command_socket_main, command_socket_worker) = UnixStream::pair()?;
let (scm_socket_main, scm_socket_worker) = UnixStream::pair()?;

let scm_server = ScmSocket::new(scm_server_fd.into_raw_fd());
let scm_main = ScmSocket::new(scm_socket_main.into_raw_fd());

util::disable_close_on_exec(client.as_raw_fd())?;
util::disable_close_on_exec(scm_client.as_raw_fd())?;
util::disable_close_on_exec(command_socket_worker.as_raw_fd())?;
util::disable_close_on_exec(scm_socket_worker.as_raw_fd())?;

let mut command: Channel<Config, ProxyResponse> = Channel::new(
server,
let mut command_channel: Channel<Config, ProxyResponse> = Channel::new(
command_socket_main,
config.command_buffer_size,
config.max_command_buffer_size,
);
command.set_nonblocking(false);
command_channel.set_nonblocking(false);

info!("{} launching worker", id);
debug!("executable path is {}", executable_path);
match unsafe { fork() } {
Ok(ForkResult::Parent { child }) => {
info!("{} worker launched: {}", id, child);
command.write_message(config);
command.set_nonblocking(true);

if let Some(l) = listeners {
info!("sending listeners to new worker: {:?}", l);
let res = scm_server.send_listeners(&l);
info!("sent listeners from main: {:?}", res);
l.close();
command_channel.write_message(config);
command_channel.set_nonblocking(true);

if let Some(listeners) = listeners {
info!("sending listeners to new worker: {:?}", listeners);
let result = scm_main.send_listeners(&listeners);
info!("sent listeners from main: {:?}", result);
listeners.close();
};
util::disable_close_on_exec(scm_server.fd)?;
util::disable_close_on_exec(scm_main.fd)?;

let command: Channel<ProxyRequest, ProxyResponse> = command.into();
Ok((child.into(), command, scm_server))
Ok((child.into(), command_channel.into(), scm_main))
}
Ok(ForkResult::Child) => {
trace!("child({}):\twill spawn a child", unsafe { libc::getpid() });
Expand All @@ -223,9 +224,9 @@ pub fn start_worker_process(
.arg("--id")
.arg(id)
.arg("--fd")
.arg(client.as_raw_fd().to_string())
.arg(command_socket_worker.as_raw_fd().to_string())
.arg("--scm")
.arg(scm_client.as_raw_fd().to_string())
.arg(scm_socket_worker.as_raw_fd().to_string())
.arg("--configuration-state-fd")
.arg(state_file.as_raw_fd().to_string())
.arg("--command-buffer-size")
Expand Down
2 changes: 1 addition & 1 deletion command/src/scm_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
str::from_utf8,
};

use anyhow::{bail, Context};
use anyhow::Context;
use mio::net::TcpListener;
use nix::{cmsg_space, sys::socket, Result as NixResult};
use serde_json;
Expand Down

0 comments on commit 0955dcd

Please sign in to comment.