Skip to content

Commit

Permalink
pass initial state to workers in protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Feb 2, 2024
1 parent 39af839 commit 5852c6b
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 88 deletions.
12 changes: 9 additions & 3 deletions bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sozu_command_lib::{
logging::setup_logging,
proto::command::{ServerConfig, WorkerRequest, WorkerResponse},
ready::Ready,
request::{read_requests_from_file, RequestError},
request::{read_initial_state_from_file, RequestError},
scm_socket::{Listeners, ScmSocket, ScmSocketError},
state::{ConfigState, StateError},
};
Expand Down Expand Up @@ -124,8 +124,12 @@ pub fn begin_worker_process(
worker_config
);
info!("worker {} starting...", id);
let initial_state = read_requests_from_file(&mut configuration_state_file)

// let initial_state = read_requests_from_file(&mut configuration_state_file)

let initial_state = read_initial_state_from_file(&mut configuration_state_file)
.map_err(WorkerError::ReadRequestsFromFile)?;
// .expect("could not parse configuration state data");

worker_to_main_channel
.nonblocking()
Expand Down Expand Up @@ -197,8 +201,10 @@ pub fn fork_main_into_worker(
})?;

state
.write_requests_to_file(&mut state_file)
// .write_requests_to_file(&mut state_file)
.write_initial_state_to_file(&mut state_file)
.map_err(WorkerError::WriteStateFile)?;
// .expect("Could not write state to file");

state_file.rewind().map_err(WorkerError::Rewind)?;

Expand Down
6 changes: 6 additions & 0 deletions command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -684,4 +684,10 @@ message ListenersCount {
repeated string tls = 2;
// socket addresses of TCP listeners
repeated string tcp = 3;
}

// the Sōzu state, passed to a new worker.
// Consists in a collection of worker requests
message InitialState {
repeated WorkerRequest requests = 1;
}
64 changes: 14 additions & 50 deletions command/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ use std::{
error,
fmt::{self, Display},
fs::File,
io::Read,
io::{BufReader, Read},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
str::FromStr,
};

use nom::{HexDisplay, Offset};
use prost::Message;

use crate::{
buffer::fixed::Buffer,
parser::parse_several_requests,
proto::{
command::{
ip_address, request::RequestType, IpAddress, LoadBalancingAlgorithms, PathRuleKind,
Request, RequestHttpFrontend, RulePosition, SocketAddress, Uint128, WorkerRequest,
ip_address, request::RequestType, InitialState, IpAddress, LoadBalancingAlgorithms,
PathRuleKind, Request, RequestHttpFrontend, RulePosition, SocketAddress, Uint128,
WorkerRequest,
},
display::format_request_type,
},
Expand Down Expand Up @@ -134,51 +133,16 @@ impl fmt::Display for WorkerRequest {
}
}

pub fn read_requests_from_file(file: &mut File) -> Result<Vec<WorkerRequest>, RequestError> {
let mut acc = Vec::new();
let mut buffer = Buffer::with_capacity(200000);
loop {
let previous = buffer.available_data();
pub fn read_initial_state_from_file(file: &mut File) -> Result<InitialState, RequestError> {
let mut buf_reader = BufReader::new(file);
let mut buffer = Vec::new();
buf_reader
.read_to_end(&mut buffer)
.map_err(|e| RequestError::FileError(e))?;

let bytes_read = file.read(buffer.space()).map_err(RequestError::FileError)?;

buffer.fill(bytes_read);

if buffer.available_data() == 0 {
trace!("read_requests_from_file: empty buffer");
break;
}

let mut offset = 0usize;
match parse_several_requests::<WorkerRequest>(buffer.data()) {
Ok((i, requests)) => {
if !i.is_empty() {
trace!("read_requests_from_file: could not parse {} bytes", i.len());
if previous == buffer.available_data() {
break;
}
}
offset = buffer.data().offset(i);

acc.push(requests);
}
Err(nom::Err::Incomplete(_)) => {
if buffer.available_data() == buffer.capacity() {
error!(
"read_requests_from_file: message too big, stopping parsing:\n{}",
buffer.data().to_hex(16)
);
break;
}
}
Err(parse_error) => {
return Err(RequestError::ParseError(parse_error.to_string()));
}
}
buffer.consume(offset);
}
let requests = acc.into_iter().flatten().collect();
Ok(requests)
let initial_state =
InitialState::decode(&buffer[..]).map_err(|e| RequestError::ParseError(e.to_string()))?;
Ok(initial_state)
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
38 changes: 34 additions & 4 deletions command/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use std::{
net::SocketAddr,
};

use prost::DecodeError;
use prost::{DecodeError, Message};

use crate::{
certificate::{self, calculate_fingerprint, Fingerprint},
proto::{
command::{
request::RequestType, ActivateListener, AddBackend, AddCertificate, CertificateAndKey,
Cluster, ClusterInformation, DeactivateListener, FrontendFilters, HttpListenerConfig,
HttpsListenerConfig, ListedFrontends, ListenerType, ListenersList, PathRule,
QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener,
HttpsListenerConfig, InitialState, ListedFrontends, ListenerType, ListenersList,
PathRule, QueryCertificatesFilters, RemoveBackend, RemoveCertificate, RemoveListener,
ReplaceCertificate, Request, RequestCounts, RequestHttpFrontend, RequestTcpFrontend,
SocketAddress, TcpListenerConfig, WorkerRequest,
},
Expand Down Expand Up @@ -539,7 +539,8 @@ impl ConfigState {
Ok(())
}

pub fn generate_requests(&self) -> Vec<Request> {
/// creates all requests needed to bootstrap the state
fn generate_requests(&self) -> Vec<Request> {
let mut v: Vec<Request> = Vec::new();

for listener in self.http_listeners.values() {
Expand Down Expand Up @@ -1380,6 +1381,35 @@ impl ConfigState {
}
}

// create requests needed for a worker to recreate the state
pub fn produce_initial_state(&self) -> InitialState {
let mut counter = 0usize;
let mut worker_requests = Vec::new();
for request in self.generate_requests() {
worker_requests.push(WorkerRequest::new(format!("SAVE-{counter}"), request));
counter += 1;
}
InitialState {
requests: worker_requests,
}
}

/// generate requests necessary to recreate the state,
/// in protobuf, to a temp file
pub fn write_initial_state_to_file(&self, file: &mut File) -> Result<usize, StateError> {
let initial_state = self.produce_initial_state();
let count = initial_state.requests.len();

let bytes_to_write = initial_state.encode_to_vec();
println!("writing {} in the temp file", bytes_to_write.len());
file.write_all(&bytes_to_write)
.map_err(StateError::FileError)?;

file.sync_all().map_err(StateError::FileError)?;

Ok(count)
}

/// generate requests necessary to recreate the state,
/// write them in a JSON form in a file, separated by \n\0,
/// returns the number of written requests
Expand Down
18 changes: 2 additions & 16 deletions e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,7 @@ impl Worker {
.send_listeners(&listeners)
.expect("could not send listeners");

let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let initial_state = state.produce_initial_state();
let server = Server::try_new_from_config(
cmd_worker_to_main,
scm_worker_to_main,
Expand Down Expand Up @@ -133,14 +126,7 @@ impl Worker {
.expect("could not send listeners");

let thread_config = config.to_owned();
let initial_state = state
.generate_requests()
.into_iter()
.map(|request| WorkerRequest {
id: "initial_state".to_string(),
content: request,
})
.collect();
let initial_state = state.produce_initial_state();
let thread_name = name.to_owned();
let thread_scm_worker_to_main = scm_worker_to_main.to_owned();

Expand Down
2 changes: 1 addition & 1 deletion lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use sozu_command::{
logging,
proto::command::{
request::RequestType, Cluster, HttpListenerConfig, ListenerType, RemoveListener,
RequestHttpFrontend, ServerConfig, WorkerRequest, WorkerResponse,
RequestHttpFrontend, WorkerRequest, WorkerResponse,
},
ready::Ready,
response::HttpFrontend,
Expand Down
2 changes: 1 addition & 1 deletion lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use sozu_command::{
request::RequestType, response_content::ContentType, AddCertificate, CertificateSummary,
CertificatesByAddress, Cluster, HttpsListenerConfig, ListOfCertificatesByAddress,
ListenerType, RemoveCertificate, RemoveListener, ReplaceCertificate, RequestHttpFrontend,
ResponseContent, ServerConfig, TlsVersion, WorkerRequest, WorkerResponse,
ResponseContent, TlsVersion, WorkerRequest, WorkerResponse,
},
ready::Ready,
response::HttpFrontend,
Expand Down
16 changes: 8 additions & 8 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use sozu_command::{
proto::command::{
request::RequestType, response_content::ContentType, ActivateListener, AddBackend,
CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations,
DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, ListenerType,
LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend, Request,
ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener, WorkerRequest,
WorkerResponse,
DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, InitialState,
ListenerType, LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend,
Request, ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
WorkerRequest, WorkerResponse,
},
ready::Ready,
scm_socket::{Listeners, ScmSocket, ScmSocketError},
Expand Down Expand Up @@ -243,7 +243,7 @@ impl Server {
worker_to_main_channel: ProxyChannel,
worker_to_main_scm: ScmSocket,
config: ServerConfig,
initial_state: Vec<WorkerRequest>,
initial_state: InitialState,
expects_initial_status: bool,
) -> Result<Self, ServerError> {
let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
Expand Down Expand Up @@ -321,7 +321,7 @@ impl Server {
https: Option<https::HttpsProxy>,
tcp: Option<tcp::TcpProxy>,
server_config: ServerConfig,
initial_state: Option<Vec<WorkerRequest>>,
initial_state: Option<InitialState>,
expects_initial_status: bool,
) -> Result<Self, ServerError> {
FEATURES.with(|_features| {
Expand Down Expand Up @@ -413,8 +413,8 @@ impl Server {
};

// initialize the worker with the state we got from a file
if let Some(requests) = initial_state {
for request in requests {
if let Some(state) = initial_state {
for request in state.requests {
trace!("generating initial config request: {:#?}", request);
server.notify_proxys(request);
}
Expand Down
6 changes: 1 addition & 5 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ use mio::{
use rusty_ulid::Ulid;
use time::{Duration, Instant};

use sozu_command::{
config::MAX_LOOP_ITERATIONS,
proto::command::{request::RequestType, ServerConfig},
ObjectKind,
};
use sozu_command::{config::MAX_LOOP_ITERATIONS, proto::command::request::RequestType, ObjectKind};

use crate::{
backends::{Backend, BackendMap},
Expand Down

0 comments on commit 5852c6b

Please sign in to comment.