Skip to content

Commit

Permalink
Merge #1887
Browse files Browse the repository at this point in the history
1887: fix(storage): write all Register cmds to disk even if one or more fail r=bochaco a=bochaco

- When writting Register cmds log to disk, we log and return the error for any of them failing, but we don't prevent the rest to be written to disk.
- Enable multi-threaded mode for sn_api tests in Bors.
- Some minor improvements to log msgs.

Co-authored-by: bochaco <gabrielviganotti@gmail.com>
  • Loading branch information
bors[bot] and bochaco committed Dec 14, 2022
2 parents 79439fb + b67adb7 commit 2b66221
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/merge.yml
Expand Up @@ -605,7 +605,7 @@ jobs:
- name: Run API tests
env:
RUST_LOG: "sn_client=trace"
run: cd sn_api && cargo test --release --features check-replicas -- --test-threads=1
run: cd sn_api && cargo test --release --features check-replicas
timeout-minutes: 30

- name: Are nodes still running...?
Expand Down
12 changes: 9 additions & 3 deletions sn_client/src/connections/link.rs
Expand Up @@ -47,7 +47,10 @@ impl Link {
let peer = self.peer;
debug!("sending bidi msg out... {msg_id:?} to {peer:?}");
let conn = self.get_or_connect(msg_id).await?;
debug!("connection got {msg_id:?} to {peer:?}");
debug!(
"connection got {msg_id:?} to {peer:?}, conn_id={}",
conn.id()
);
let (mut send_stream, recv_stream) = conn.open_bi().await.map_err(LinkError::Connection)?;

debug!("{msg_id:?} to {peer:?} bidi opened");
Expand Down Expand Up @@ -110,11 +113,14 @@ impl Link {

// let's double check we havent got a connection meanwhile
if let Some(conn) = conns_write_lock.iter().next().map(|(_, c)| c.clone()) {
debug!("{msg_id:?} Connection already exists in Link, so returning that instead of creating a fresh connection");
debug!(
"{msg_id:?} Connection already exists in Link to {peer:?}, using that, conn_id={}",
conn.id()
);
return Ok(conn);
}

debug!("{msg_id:?} creating connnnn to {peer:?}");
debug!("{msg_id:?} creating conn to {peer:?}");
let (conn, _incoming_msgs) = self
.endpoint
.connect_to(&peer.addr())
Expand Down
8 changes: 3 additions & 5 deletions sn_client/src/sessions/messaging.rs
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{MsgResponse, QueryResult, Session};
use crate::{Error, LinkError, Result};
use crate::{Error, Result};
use sn_interface::{
messaging::{
data::{DataQuery, DataQueryVariant, QueryResponse},
Expand Down Expand Up @@ -486,12 +486,10 @@ impl Session {
.recv_stream_listener(msg_id, peer, peer_index, recv_stream)
.await;
}
Err(error @ LinkError::Send(_) | error @ LinkError::Connection(_))
if !connect_now =>
{
Err(error) if !connect_now => {
// Let's retry (only once) to reconnect to this peer and send the msg.
error!(
"Connection lost to {peer:?}. Failed to send {msg_id:?} on a new \
"Failed to send {msg_id:?} to {peer:?} on a new \
bi-stream: {error:?}. Creating a new connection to retry once ..."
);
session.peer_links.remove_link_from_peer_links(&peer).await;
Expand Down
5 changes: 1 addition & 4 deletions sn_node/src/comm/link.rs
Expand Up @@ -139,10 +139,7 @@ impl Link {
};

let stream_id = send_stream.id();
trace!(
"bidi {stream_id} openeed for {msg_id:?} to: {:?}",
self.peer
);
trace!("bidi {stream_id} opened for {msg_id:?} to: {:?}", self.peer);
send_stream.set_priority(10);
match send_stream.send_user_msg(bytes.clone()).await {
Ok(_) => {}
Expand Down
33 changes: 18 additions & 15 deletions sn_node/src/storage/register_store.rs
Expand Up @@ -117,10 +117,7 @@ impl RegisterStore {
};

if !path.exists() {
trace!(
"Register log does not exist, creating a new one {}",
path.display()
);
trace!("Register log does not exist yet: {}", path.display());
return Ok(stored_reg);
}

Expand All @@ -146,9 +143,8 @@ impl RegisterStore {
}
other => {
warn!(
"Ignoring corrupted register cmd from storage found at {}: {:?}",
filepath.display(),
other
"Ignoring corrupted register cmd from storage found at {}: {other:?}",
filepath.display()
)
}
}
Expand All @@ -170,17 +166,24 @@ impl RegisterStore {

create_dir_all(path).await?;

let mut last_err = None;
for cmd in log {
// TODO do we want to fail here if one entry fails?
self.write_register_cmd(cmd, path).await?;
if let Err(err) = self.write_register_cmd(cmd, path).await {
error!("Failed to write Register cmd {cmd:?} to disk: {err:?}");
last_err = Some(err);
}
}

trace!(
"Log of {} cmd/s written successfully at {}",
log.len(),
path.display()
);
Ok(())
if let Some(err) = last_err {
Err(err)
} else {
trace!(
"Log of {} cmd/s written successfully at {}",
log.len(),
path.display()
);
Ok(())
}
}

/// Persists a RegisterCmd to disk
Expand Down

0 comments on commit 2b66221

Please sign in to comment.