Skip to content

Commit

Permalink
define buffer sizes in u64
Browse files Browse the repository at this point in the history
since buffer size config will be transmitted in protobuf using
the uint64 type.
  • Loading branch information
Keksoj committed Feb 2, 2024
1 parent 4f2d760 commit 630b1a7
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 68 deletions.
8 changes: 4 additions & 4 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ pub enum SubCmd {
help = "Worker's channel buffer size",
default_value = "1000000"
)]
command_buffer_size: usize,
command_buffer_size: u64,
#[clap(
long = "max-command-buffer-size",
help = "Worker's channel max buffer size"
)]
max_command_buffer_size: Option<usize>,
max_command_buffer_size: Option<u64>,
},
#[clap(
name = "main",
Expand All @@ -95,12 +95,12 @@ pub enum SubCmd {
help = "Main process channel buffer size",
default_value = "1000000"
)]
command_buffer_size: usize,
command_buffer_size: u64,
#[clap(
long = "max-command-buffer-size",
help = "Main process channel max buffer size"
)]
max_command_buffer_size: Option<usize>,
max_command_buffer_size: Option<u64>,
},

// sozu command line
Expand Down
2 changes: 1 addition & 1 deletion bin/src/command/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl CommandHub {
if let Err(err) = self.register(token, &mut stream) {
error!("Could not register client: {}", err);
}
let channel = Channel::new(stream, 4096, usize::MAX);
let channel = Channel::new(stream, 4096, u64::MAX);
let id = self.next_client_id();
let session = ClientSession::new(channel, id, token);
info!("register new client: {}", id);
Expand Down
4 changes: 2 additions & 2 deletions bin/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ pub fn fork_main_into_new_main(
pub fn begin_new_main_process(
new_to_old_channel_fd: i32,
upgrade_file_fd: i32,
command_buffer_size: usize,
max_command_buffer_size: usize,
command_buffer_size: u64,
max_command_buffer_size: u64,
) -> Result<(), UpgradeError> {
let mut fork_confirmation_channel: Channel<bool, ()> = Channel::new(
unsafe { UnixStream::from_raw_fd(new_to_old_channel_fd) },
Expand Down
8 changes: 4 additions & 4 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ pub fn begin_worker_process(
worker_to_main_scm_fd: i32,
configuration_state_fd: i32,
id: i32,
command_buffer_size: usize,
max_command_buffer_size: usize,
command_buffer_size: u64,
max_command_buffer_size: u64,
) -> Result<(), WorkerError> {
let mut worker_to_main_channel: Channel<WorkerResponse, ServerConfig> = Channel::new(
unsafe { UnixStream::from_raw_fd(worker_to_main_channel_fd) },
Expand Down Expand Up @@ -232,8 +232,8 @@ pub fn fork_main_into_worker(

let mut main_to_worker_channel: Channel<ServerConfig, WorkerResponse> = Channel::new(
main_to_worker,
worker_config.command_buffer_size as usize,
worker_config.max_command_buffer_size as usize,
worker_config.command_buffer_size,
worker_config.max_command_buffer_size,
);

// DISCUSS: should we really block the channel just to write on it?
Expand Down
39 changes: 21 additions & 18 deletions command/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct Channel<Tx, Rx> {
pub sock: MioUnixStream,
pub front_buf: Buffer,
pub back_buf: Buffer,
max_buffer_size: usize,
max_buffer_size: u64,
pub readiness: Ready,
pub interest: Ready,
blocking: bool,
Expand Down Expand Up @@ -84,20 +84,20 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
/// Creates a nonblocking channel on a given socket path
pub fn from_path(
path: &str,
buffer_size: usize,
max_buffer_size: usize,
buffer_size: u64,
max_buffer_size: u64,
) -> Result<Channel<Tx, Rx>, ChannelError> {
let unix_stream = MioUnixStream::connect(path)
.map_err(|io_error| ChannelError::Connection(Some(io_error)))?;
Ok(Channel::new(unix_stream, buffer_size, max_buffer_size))
}

/// Creates a nonblocking channel, using a unix stream
pub fn new(sock: MioUnixStream, buffer_size: usize, max_buffer_size: usize) -> Channel<Tx, Rx> {
pub fn new(sock: MioUnixStream, buffer_size: u64, max_buffer_size: u64) -> Channel<Tx, Rx> {
Channel {
sock,
front_buf: Buffer::with_capacity(buffer_size),
back_buf: Buffer::with_capacity(buffer_size),
front_buf: Buffer::with_capacity(buffer_size as usize),
back_buf: Buffer::with_capacity(buffer_size as usize),
max_buffer_size,
readiness: Ready::EMPTY,
interest: Ready::READABLE,
Expand Down Expand Up @@ -300,11 +300,14 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
Some(position) => self.read_and_parse_from_front_buffer(position),
None => {
if self.front_buf.available_space() == 0 {
if self.front_buf.capacity() == self.max_buffer_size {
if (self.front_buf.capacity() as u64) == self.max_buffer_size {
error!("command buffer full, cannot grow more, ignoring");
} else {
println!("growing channel");
let new_size = min(self.front_buf.capacity() + 5000, self.max_buffer_size);
let new_size = min(
self.front_buf.capacity() + 5000,
self.max_buffer_size as usize,
);
self.front_buf.grow(new_size);
}
}
Expand Down Expand Up @@ -340,10 +343,13 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
Some(position) => break self.read_and_parse_from_front_buffer(position),
None => {
if self.front_buf.available_space() == 0 {
if self.front_buf.capacity() == self.max_buffer_size {
break Err(ChannelError::BufferFull);
if (self.front_buf.capacity() as u64) == self.max_buffer_size {
return Err(ChannelError::BufferFull);
}
let new_size = min(self.front_buf.capacity() + 5000, self.max_buffer_size);
let new_size = min(
self.front_buf.capacity() + 5000,
self.max_buffer_size as usize,
);
self.front_buf.grow(new_size);
}
match self.sock.read(self.front_buf.space()) {
Expand Down Expand Up @@ -399,7 +405,7 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {

if message_len > self.back_buf.available_space() {
if message_len - self.back_buf.available_space() + self.back_buf.capacity()
> self.max_buffer_size
> (self.max_buffer_size as usize)
{
return Err(ChannelError::MessageTooLarge(self.back_buf.capacity()));
}
Expand Down Expand Up @@ -434,7 +440,7 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {

if msg_len > self.back_buf.available_space() {
if msg_len - self.back_buf.available_space() + self.back_buf.capacity()
> self.max_buffer_size
> (self.max_buffer_size as usize)
{
return Err(ChannelError::MessageTooLarge(self.back_buf.capacity()));
}
Expand Down Expand Up @@ -473,7 +479,7 @@ impl<Tx: Debug + DeserializeOwned + Serialize, Rx: Debug + DeserializeOwned + Se
Channel<Tx, Rx>
{
/// creates a channel pair: `(blocking_channel, nonblocking_channel)`
pub fn generate(buffer_size: usize, max_buffer_size: usize) -> ChannelResult<Tx, Rx> {
pub fn generate(buffer_size: u64, max_buffer_size: u64) -> ChannelResult<Tx, Rx> {
let (command, proxy) = MioUnixStream::pair().map_err(ChannelError::Read)?;
let proxy_channel = Channel::new(proxy, buffer_size, max_buffer_size);
let mut command_channel = Channel::new(command, buffer_size, max_buffer_size);
Expand All @@ -482,10 +488,7 @@ impl<Tx: Debug + DeserializeOwned + Serialize, Rx: Debug + DeserializeOwned + Se
}

/// creates a pair of nonblocking channels
pub fn generate_nonblocking(
buffer_size: usize,
max_buffer_size: usize,
) -> ChannelResult<Tx, Rx> {
pub fn generate_nonblocking(buffer_size: u64, max_buffer_size: u64) -> ChannelResult<Tx, Rx> {
let (command, proxy) = MioUnixStream::pair().map_err(ChannelError::Read)?;
let proxy_channel = Channel::new(proxy, buffer_size, max_buffer_size);
let command_channel = Channel::new(command, buffer_size, max_buffer_size);
Expand Down
40 changes: 20 additions & 20 deletions command/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,22 @@ pub const DEFAULT_WORKER_AUTOMATIC_RESTART: bool = true;
pub const DEFAULT_AUTOMATIC_STATE_SAVE: bool = false;

/// minimum number of buffers (1)
pub const DEFAULT_MIN_BUFFERS: usize = 1;
pub const DEFAULT_MIN_BUFFERS: u64 = 1;

/// maximum number of buffers (1 000)
pub const DEFAULT_MAX_BUFFERS: usize = 1_000;
pub const DEFAULT_MAX_BUFFERS: u64 = 1_000;

/// size of the buffers, in bytes (16 KB)
pub const DEFAULT_BUFFER_SIZE: usize = 16_393;
pub const DEFAULT_BUFFER_SIZE: u64 = 16_393;

/// maximum number of simultaneous connections (10 000)
pub const DEFAULT_MAX_CONNECTIONS: usize = 10_000;

/// size of the buffer for the channels, in bytes. Must be bigger than the size of the data received. (1 MB)
pub const DEFAULT_COMMAND_BUFFER_SIZE: usize = 1_000_000;
pub const DEFAULT_COMMAND_BUFFER_SIZE: u64 = 1_000_000;

/// maximum size of the buffer for the channels, in bytes. (2 MB)
pub const DEFAULT_MAX_COMMAND_BUFFER_SIZE: usize = 2_000_000;
pub const DEFAULT_MAX_COMMAND_BUFFER_SIZE: u64 = 2_000_000;

/// wether to avoid register cluster metrics in the local drain
pub const DEFAULT_DISABLE_CLUSTER_METRICS: bool = false;
Expand Down Expand Up @@ -1077,12 +1077,12 @@ impl ClusterConfig {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Default, Deserialize)]
pub struct FileConfig {
pub command_socket: Option<String>,
pub command_buffer_size: Option<usize>,
pub max_command_buffer_size: Option<usize>,
pub command_buffer_size: Option<u64>,
pub max_command_buffer_size: Option<u64>,
pub max_connections: Option<usize>,
pub min_buffers: Option<usize>,
pub max_buffers: Option<usize>,
pub buffer_size: Option<usize>,
pub min_buffers: Option<u64>,
pub max_buffers: Option<u64>,
pub buffer_size: Option<u64>,
pub saved_state: Option<String>,
#[serde(default)]
pub automatic_state_save: Option<bool>,
Expand Down Expand Up @@ -1434,12 +1434,12 @@ impl ConfigBuilder {
pub struct Config {
pub config_path: String,
pub command_socket: String,
pub command_buffer_size: usize,
pub max_command_buffer_size: usize,
pub command_buffer_size: u64,
pub max_command_buffer_size: u64,
pub max_connections: usize,
pub min_buffers: usize,
pub max_buffers: usize,
pub buffer_size: usize,
pub min_buffers: u64,
pub max_buffers: u64,
pub buffer_size: u64,
pub saved_state: Option<String>,
#[serde(default)]
pub automatic_state_save: bool,
Expand Down Expand Up @@ -1763,14 +1763,14 @@ impl ServerConfig {
connect_timeout: config.connect_timeout,
zombie_check_interval: config.zombie_check_interval,
accept_queue_timeout: config.accept_queue_timeout,
min_buffers: config.min_buffers as u64,
max_buffers: config.max_buffers as u64,
buffer_size: config.buffer_size as u64,
min_buffers: config.min_buffers,
max_buffers: config.max_buffers,
buffer_size: config.buffer_size,
log_level: config.log_level.clone(),
log_target: config.log_target.clone(),
log_access_target: config.log_access_target.clone(),
command_buffer_size: config.command_buffer_size as u64,
max_command_buffer_size: config.max_command_buffer_size as u64,
command_buffer_size: config.command_buffer_size,
max_command_buffer_size: config.max_command_buffer_size,
metrics,
}
}
Expand Down
9 changes: 0 additions & 9 deletions command/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,6 @@ impl Request {
}
}

/*
/// This is sent only from Sōzu to Sōzu
#[derive(Debug, Clone, Serialize, PartialEq, Eq, Deserialize)]
pub struct WorkerRequest {
pub id: MessageId,
pub content: Request,
}
*/

impl WorkerRequest {
pub fn new(id: String, content: Request) -> Self {
Self { id, content }
Expand Down
16 changes: 6 additions & 10 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ impl Worker {
) -> (ScmSocket, Channel<WorkerRequest, WorkerResponse>, Server) {
let (scm_main_to_worker, scm_worker_to_main) =
UnixStream::pair().expect("could not create unix stream pair");
let (cmd_main_to_worker, cmd_worker_to_main) = Channel::generate(
config.command_buffer_size as usize,
config.max_command_buffer_size as usize,
)
.expect("could not create a channel");
let (cmd_main_to_worker, cmd_worker_to_main) =
Channel::generate(config.command_buffer_size, config.max_command_buffer_size)
.expect("could not create a channel");

set_no_close_exec(scm_main_to_worker.as_raw_fd());
set_no_close_exec(scm_worker_to_main.as_raw_fd());
Expand Down Expand Up @@ -119,11 +117,9 @@ impl Worker {
let name = name.into();
let (scm_main_to_worker, scm_worker_to_main) =
UnixStream::pair().expect("could not create unix stream pair");
let (cmd_main_to_worker, cmd_worker_to_main) = Channel::generate(
config.command_buffer_size as usize,
config.max_command_buffer_size as usize,
)
.expect("could not create a channel");
let (cmd_main_to_worker, cmd_worker_to_main) =
Channel::generate(config.command_buffer_size, config.max_command_buffer_size)
.expect("could not create a channel");

set_no_close_exec(scm_main_to_worker.as_raw_fd());
set_no_close_exec(scm_worker_to_main.as_raw_fd());
Expand Down

0 comments on commit 630b1a7

Please sign in to comment.