Skip to content

Commit

Permalink
additional error logs and context
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 5f62687 commit 3c7ab65
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 58 deletions.
62 changes: 37 additions & 25 deletions bin/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,25 @@ enum CommandMessage {
MasterStop,
}

#[derive(Clone)]
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct RequestIdentifier {
client: String, // the client who sent the request (ex: "CL-0")
request: String, // the request id (ex: "ID-MAN9QF")
}

impl RequestIdentifier {
pub fn new<T>(client: T, request: T) -> Self
where
T: ToString,
{
Self {
client: client.to_string(),
request: request.to_string(),
}
}
}

#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Response {
Error(String),
// Todo: refactor the CLI, see issue #740
Expand All @@ -90,7 +103,7 @@ pub enum Response {

// Indicates success of either inner Sōzu logic and of handling the ClientRequest,
// in which case Success caries the response data.
#[derive(Clone)]
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Success {
ClientClose(String), // the client id
ClientNew(String), // the client id
Expand Down Expand Up @@ -252,12 +265,7 @@ impl CommandServer {
let id = worker.id;
let command_tx = command_tx.clone();
smol::spawn(async move {
if worker_loop(id, stream, command_tx, worker_rx)
.await
.is_err()
{
error!("The worker loop of worker {} crashed", id);
}
worker_loop(id, stream, command_tx, worker_rx).await;
})
.detach();
}
Expand Down Expand Up @@ -445,9 +453,7 @@ impl CommandServer {
let command_tx = tx.clone();
//async fn worker(id: u32, sock: Async<UnixStream>, tx: Sender<CommandMessage>, rx: Receiver<()>) -> std::io::Result<()> {
smol::spawn(async move {
worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
worker_loop(id, stream, command_tx, worker_rx).await;
})
.detach();

Expand Down Expand Up @@ -648,7 +654,7 @@ impl CommandServer {
self.next_id += 1;

let sock = worker.channel.take().unwrap().sock;
let (worker_tx, worker_rx) = channel(10000);
let (worker_tx, worker_rx) = channel(10_000);
worker.sender = Some(worker_tx);

let stream = Async::new(unsafe {
Expand All @@ -659,9 +665,7 @@ impl CommandServer {
let id = worker.id;
let command_tx = self.command_tx.clone();
smol::spawn(async move {
worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
worker_loop(id, stream, command_tx, worker_rx).await;
})
.detach();

Expand Down Expand Up @@ -901,7 +905,7 @@ async fn client_loop(
stream: Async<UnixStream>,
mut command_tx: Sender<CommandMessage>,
mut client_rx: Receiver<CommandResponse>,
) -> std::io::Result<()> {
) {
let stream = Arc::new(stream);
let mut s = stream.clone();

Expand Down Expand Up @@ -952,18 +956,23 @@ async fn client_loop(
}

// If the loop breaks, order the command server to close the client
if let Err(e) = command_tx.send(CommandMessage::ClientClose { id }).await {
error!("error writing to client: {:?}", e);
if let Err(send_error) = command_tx
.send(CommandMessage::ClientClose { id: id.to_owned() })
.await
{
error!(
"The client loop {} could not send ClientClose to the command server: {:?}",
id, send_error
);
}
Ok(())
}

async fn worker_loop(
id: u32,
stream: Async<UnixStream>,
mut command_tx: Sender<CommandMessage>,
mut worker_rx: Receiver<ProxyRequest>,
) -> std::io::Result<()> {
) {
let stream = Arc::new(stream);
let mut s = stream.clone();

Expand Down Expand Up @@ -1015,10 +1024,13 @@ async fn worker_loop(
}

// if the loop breaks, order the command server to close the worker
command_tx
.send(CommandMessage::WorkerClose { id })
if let Err(send_error) = command_tx
.send(CommandMessage::WorkerClose { id: id.to_owned() })
.await
.unwrap();

Ok(())
{
error!(
"The worker loop {} could not send WorkerClose to the CommandServer: {:?}",
id, send_error
);
}
}
68 changes: 35 additions & 33 deletions bin/src/command/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde_json;
use std::{
collections::{BTreeMap, HashSet},
fs::File,
io::{self, Read, Write},
io::{Read, Write},
os::unix::io::{FromRawFd, IntoRawFd},
os::unix::net::UnixStream,
time::Duration,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl CommandServer {
let mut counter = 0usize;
let orders = self.state.generate_orders();

let result: io::Result<usize> = (move || {
let result: anyhow::Result<usize> = (move || {
for command in orders {
let message = CommandRequest::new(
format!("SAVE-{}", counter),
Expand All @@ -133,21 +133,31 @@ impl CommandServer {
&serde_json::to_string(&message)
.map(|s| s.into_bytes())
.unwrap_or(vec![]),
)?;
file.write_all(&b"\n\0"[..])?;
)
.with_context(|| {
format!(
"Could not add this instruction line to the saved state file: {:?}",
message
)
})?;

file.write_all(&b"\n\0"[..])
.with_context(|| "Could not add new line to the saved state file")?;

if counter % 1000 == 0 {
info!("writing command {}", counter);
file.sync_all()?;
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;
}
counter += 1;
}
file.sync_all()?;
file.sync_all()
.with_context(|| "Failed to sync the saved state file")?;

Ok(counter)
})();

Ok(result?)
Ok(result.with_context(|| "Could not write the state onto the state file")?)
}

pub async fn dump_state(&mut self) -> anyhow::Result<Option<Success>> {
Expand Down Expand Up @@ -270,7 +280,7 @@ impl CommandServer {
path, diff_counter
);

let command_tx = self.command_tx.clone();
let command_tx = self.command_tx.to_owned();
let path = path.to_owned();

smol::spawn(async move {
Expand All @@ -293,10 +303,7 @@ impl CommandServer {
// or pattern matching

let request_identifier = if let Some(client_id) = client_id {
RequestIdentifier {
client: client_id,
request: request_id,
}
RequestIdentifier::new(client_id, request_id)
} else {
if error == 0 {
info!("loading state: {} ok messages, 0 errors", ok);
Expand Down Expand Up @@ -458,9 +465,7 @@ impl CommandServer {
let command_tx = self.command_tx.clone();

smol::spawn(async move {
super::worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
super::worker_loop(id, stream, command_tx, worker_rx).await;
})
.detach();

Expand Down Expand Up @@ -696,6 +701,9 @@ impl CommandServer {
}
// shouldn't we use return_success() here, to notify the command server
// and the client?
// Answer: actually, the CLI could not take several success messages
// about upgraded workers. See issue #740. The best would be to return
// Processing here. See issue #740
})
.detach();
}
Expand All @@ -719,9 +727,7 @@ impl CommandServer {
let id = worker.id;
let command_tx = self.command_tx.clone();
smol::spawn(async move {
super::worker_loop(id, stream, command_tx, worker_rx)
.await
.unwrap();
super::worker_loop(id, stream, command_tx, worker_rx).await;
})
.detach();

Expand Down Expand Up @@ -1257,11 +1263,9 @@ impl CommandServer {
let RequestIdentifier {
client: client_id,
request: request_id,
} = request_identifier.clone();
} = request_identifier.to_owned();

let command_response: CommandResponse;

match response {
let command_response = match response {
Response::Ok(success) => {
let success_message = success.to_string();

Expand All @@ -1274,12 +1278,12 @@ impl CommandServer {
_ => None,
};

command_response = CommandResponse::new(
CommandResponse::new(
request_id.clone(),
CommandStatus::Ok,
success_message,
command_response_data,
);
)
}
// Todo: refactor the CLI to accept processing, see issue #740
// Response::Processing(_) => {
Expand All @@ -1290,15 +1294,13 @@ impl CommandServer {
// None,
// );
// }
Response::Error(error_message) => {
command_response = CommandResponse::new(
request_id.clone(),
CommandStatus::Error,
error_message,
None,
);
}
}
Response::Error(error_message) => CommandResponse::new(
request_id.clone(),
CommandStatus::Error,
error_message,
None,
),
};

trace!(
"Sending response to request {} of client {}: {:?}",
Expand Down

0 comments on commit 3c7ab65

Please sign in to comment.