Skip to content

Commit

Permalink
count config messages loaded at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent daacf30 commit a673a3b
Showing 1 changed file with 47 additions and 5 deletions.
52 changes: 47 additions & 5 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ impl CommandServer {
}

pub async fn load_static_application_configuration(&mut self) {
let (tx, mut rx) = futures::channel::mpsc::channel(self.workers.len() * 2);

let mut total_message_count = 0usize;

//FIXME: too many loops, this could be cleaner
for message in self.config.generate_config_messages() {
if let CommandRequestData::Proxy(order) = message.data {
Expand All @@ -321,17 +325,21 @@ impl CommandServer {
} else {
debug!("config generated {:?}", order);
}
let mut found = false;

let mut count = 0usize;
for ref mut worker in self.workers.iter_mut().filter(|worker| {
worker.run_state != RunState::Stopping && worker.run_state != RunState::Stopped
}) {
worker.send(message.id.clone(), order.clone()).await;
found = true;
count += 1;
}

if !found {
if count == 0 {
// FIXME: should send back error here
error!("no worker found");
} else {
self.in_flight.insert(message.id.clone(), (tx.clone(), count));
total_message_count += count;
}
}
}
Expand All @@ -341,6 +349,40 @@ impl CommandServer {
gauge!("configuration.clusters", self.state.clusters.len());
gauge!("configuration.backends", self.backends_count);
gauge!("configuration.frontends", self.frontends_count);

Task::spawn(async move {
let mut ok = 0usize;
let mut error = 0usize;

let mut i = 0;
while let Some(proxy_response) = rx.next().await {
match proxy_response.status {
ProxyResponseStatus::Ok => {
ok += 1;
}
ProxyResponseStatus::Processing => {
//info!("metrics processing");
continue;
}
ProxyResponseStatus::Error(e) => {
error!("error handling configuration message {}: {}", proxy_response.id, e);
error += 1;
}
};

i += 1;
if i == total_message_count {
break;
}
}

if error == 0 {
info!("loading state: {} ok messages, 0 errors", ok);
} else {
error!("loading state: {} ok messages, {} errors", ok, error);
}
})
.detach();
}

// in case a worker has crashed while Running and automatic_worker_restart is set to true
Expand Down Expand Up @@ -762,7 +804,7 @@ async fn client(

match serde_json::from_slice::<sozu_command::command::CommandRequest>(&message) {
Err(e) => {
error!("could not decode message: {:?}", e);
error!("could not decode client message: {:?}", e);
break;
}
Ok(message) => {
Expand Down Expand Up @@ -817,7 +859,7 @@ async fn worker_loop(

match serde_json::from_slice::<sozu_command::proxy::ProxyResponse>(&message) {
Err(e) => {
error!("could not decode message: {:?}", e);
error!("could not decode worker message: {:?}", e);
break;
}
Ok(message) => {
Expand Down

0 comments on commit a673a3b

Please sign in to comment.