Skip to content

Commit

Permalink
better syntax and error management in launch_worker()
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 047c03a commit 8512878
Showing 1 changed file with 77 additions and 70 deletions.
147 changes: 77 additions & 70 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ impl CommandServer {
path: &str,
) -> anyhow::Result<()> {
if let Ok(mut f) = fs::File::create(&path) {
let res = self.save_state_to_file(&mut f);

match res {
match self.save_state_to_file(&mut f) {
Ok(counter) => {
info!("wrote {} commands to {}", counter, path);
self.answer_success(
Expand All @@ -108,7 +106,10 @@ impl CommandServer {
error!("could not open file: {}", &path);
self.answer_error(client_id, message_id, "could not open file", None)
.await;
bail!(format!("could not open file: {}", &path));
Err(anyhow::Error::msg(format!(
"could not open file: {}",
&path
)))
}
}

Expand Down Expand Up @@ -381,87 +382,93 @@ impl CommandServer {
request_id: String,
_tag: &str,
) -> anyhow::Result<()> {
if let Ok(mut worker) = start_worker(
let mut worker = match start_worker(
self.next_id,
&self.config,
self.executable_path.clone(),
&self.state,
None,
) {
if let Some(sender) = self.clients.get_mut(&client_id) {
if let Err(e) = sender
.send(CommandResponse::new(
request_id.clone(),
CommandStatus::Processing,
"sending configuration orders".to_string(),
None,
))
.await
{
error!("could not send message to client {:?}: {:?}", client_id, e);
// should we have an early return here?
// return Err(anyhow::Error::from(e));
}
Ok(worker) => worker,
Err(e) => {
error!("Failed at creating worker");
self.answer_error(client_id, request_id, "failed creating worker", None)
.await;
return Err(e);
}
};

info!("created new worker: {}", worker.id);
if let Some(sender) = self.clients.get_mut(&client_id) {
if let Err(e) = sender
.send(CommandResponse::new(
request_id.clone(),
CommandStatus::Processing,
"sending configuration orders".to_string(),
None,
))
.await
{
error!("could not send message to client {:?}: {:?}", client_id, e);
// should we have an early return here?
// return Err(anyhow::Error::from(e));
}
}

self.next_id += 1;
/*
let worker_token = self.token_count + 1;
self.token_count = worker_token;
worker.token = Some(Token(worker_token));*/

/*debug!("registering new sock {:?} at token {:?} for tag {} and id {} (sock error: {:?})", worker.channel.sock,
worker_token, tag, worker.id, worker.channel.sock.take_error());
self.poll.registry().register(&mut worker.channel.sock, Token(worker_token),
Interest::READABLE | Interest::WRITABLE).unwrap();
worker.token = Some(Token(worker_token));
*/
let sock = worker.channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);

let stream = Async::new(unsafe {
let fd = sock.into_raw_fd();
UnixStream::from_raw_fd(fd)
})?;

let id = worker.id;
let command_tx = self.command_tx.clone();
//async fn worker(id: u32, sock: Async<UnixStream>, tx: Sender<CommandMessage>, rx: Receiver<()>) -> std::io::Result<()> {
smol::spawn(async move {
super::worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
})
.detach();
info!("created new worker: {}", worker.id);

info!(
"sending listeners: to the new worker: {:?}",
worker.scm.send_listeners(&Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
})
);
self.next_id += 1;
/*
let worker_token = self.token_count + 1;
self.token_count = worker_token;
worker.token = Some(Token(worker_token));*/

let activate_orders = self.state.generate_activate_orders();
let mut count = 0usize;
for order in activate_orders.into_iter() {
worker
.send(format!("{}-ACTIVATE-{}", id, count), order)
.await;
count += 1;
}
/*debug!("registering new sock {:?} at token {:?} for tag {} and id {} (sock error: {:?})", worker.channel.sock,
worker_token, tag, worker.id, worker.channel.sock.take_error());
self.poll.registry().register(&mut worker.channel.sock, Token(worker_token),
Interest::READABLE | Interest::WRITABLE).unwrap();
worker.token = Some(Token(worker_token));
*/
let sock = worker.channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
worker.sender = Some(worker_tx);

self.workers.push(worker);
let stream = Async::new(unsafe {
let fd = sock.into_raw_fd();
UnixStream::from_raw_fd(fd)
})?;

self.answer_success(client_id, request_id, "", None).await;
} else {
self.answer_error(client_id, request_id, "failed creating worker", None)
let id = worker.id;
let command_tx = self.command_tx.clone();
//async fn worker(id: u32, sock: Async<UnixStream>, tx: Sender<CommandMessage>, rx: Receiver<()>) -> std::io::Result<()> {
smol::spawn(async move {
super::worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
})
.detach();

info!(
"sending listeners: to the new worker: {:?}",
worker.scm.send_listeners(&Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
})
);

let activate_orders = self.state.generate_activate_orders();
let mut count = 0usize;
for order in activate_orders.into_iter() {
worker
.send(format!("{}-ACTIVATE-{}", id, count), order)
.await;
count += 1;
}

self.workers.push(worker);

self.answer_success(client_id, request_id, "", None).await;

Ok(())
}

Expand Down

0 comments on commit 8512878

Please sign in to comment.