Skip to content

Commit

Permalink
Merge #1862
Browse files Browse the repository at this point in the history
1862: tests: replace msg_queue with `Comm` r=joshuef a=RolandSherwin

- **fix(network_builder): fix edge case while calculating max_prefixes**
  - When `max_bit_count == 0 (or) 1`, the logic for calculating the permutations failed, this fixes it.
- **feat(test): synchronously send `NodeMsg` to other nodes**
  - This allows us to control the flow of msgs in certain tests allowing us to avoid using custom msg queues.
- **refactor(test): pass `NodeMsgs` via the `Comm` module**
  - The dkg tests bypassed the `Comm` module and used a queue to pass along the `NodeMsgs` for testing. This was due to the fact that the msgs were sent asynchronously, preventing any control of the flow.
  - Hence using the test-only synchronous msg sender allows us to do the above without using any extra queues.


Co-authored-by: RolandSherwin <RolandSherwin@protonmail.com>
  • Loading branch information
bors[bot] and RolandSherwin committed Dec 20, 2022
2 parents aed73cf + 781459f commit aed6549
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 472 deletions.
34 changes: 34 additions & 0 deletions sn_node/src/comm/mod.rs
Expand Up @@ -167,6 +167,40 @@ impl Comm {
Ok(())
}

// Test helper to send out Msgs in a blocking fashion
#[cfg(test)]
pub(crate) async fn send_out_bytes_sync(&self, peer: Peer, msg_id: MsgId, bytes: UsrMsgBytes) {
let watcher = self.send_to_one(peer, msg_id, bytes, None).await;
match watcher {
Ok(Some(watcher)) => {
let (send_was_successful, should_remove) = Self::is_sent(watcher, msg_id, peer)
.await
.expect("Error in is_sent");

if send_was_successful {
trace!("Msg {msg_id:?} sent to {peer:?}");
} else if should_remove {
// do cleanup of that peer
let perhaps_session = self.sessions.remove(&peer);
if let Some((_peer, session)) = perhaps_session {
session.disconnect().await;
}
}
}
Ok(None) => {}
Err(error) => {
error!(
"Sending message (msg_id: {:?}) to {:?} (name {:?}) failed as we have disconnected from the peer. (Error is: {})",
msg_id,
peer.addr(),
peer.name(),
error,
);
let _peer = self.sessions.remove(&peer);
}
}
}

// TODO: tweak messaging to just allow passthrough
#[tracing::instrument(skip(self, bytes))]
pub(crate) async fn send_out_bytes_to_peer_and_return_response(
Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/node/flow_ctrl/dispatcher.rs
Expand Up @@ -234,7 +234,7 @@ impl Dispatcher {
// Serializes and signs the msg if it's a Client message,
// and produces one [`WireMsg`] instance per recipient -
// the last step before passing it over to comms module.
fn into_msg_bytes(
pub(crate) fn into_msg_bytes(
network_knowledge: &NetworkKnowledge,
our_node_name: XorName,
msg: NodeMsg,
Expand Down
68 changes: 64 additions & 4 deletions sn_node/src/node/flow_ctrl/tests/cmd_utils.rs
@@ -1,7 +1,11 @@
use crate::node::{flow_ctrl::dispatcher::Dispatcher, messaging::Peers, Cmd};
use assert_matches::assert_matches;
use eyre::eyre;
use eyre::Result;
use crate::{
comm::MsgFromPeer,
node::{
flow_ctrl::dispatcher::{into_msg_bytes, Dispatcher},
messaging::Peers,
Cmd,
},
};
use sn_interface::{
messaging::{
data::ClientMsg,
Expand All @@ -12,7 +16,12 @@ use sn_interface::{
network_knowledge::{test_utils::*, MembershipState, NodeState, RelocateDetails},
types::{Keypair, Peer, ReplicatedData},
};

use assert_matches::assert_matches;
use eyre::{eyre, Result};
use std::collections::BTreeSet;
use tokio::sync::mpsc::{self, error::TryRecvError};
use xor_name::XorName;

pub(crate) struct HandleOnlineStatus {
pub(crate) node_approval_sent: bool,
Expand Down Expand Up @@ -217,3 +226,54 @@ impl Cmd {
// }
// }
}

impl Dispatcher {
// Sends out `NodeMsgs` to others synchronously, (process_cmd() spawns tasks to do it).
// Optionally drop the msgs to the provided set of peers.
pub(crate) async fn mock_send_msg(&self, cmd: Cmd, filter_recp: Option<BTreeSet<XorName>>) {
if let Cmd::SendMsg {
msg,
msg_id,
recipients,
send_stream,
context,
} = cmd
{
let _ = send_stream;
let peer_msgs = {
into_msg_bytes(
&context.network_knowledge,
context.name,
msg.clone(),
msg_id,
recipients,
)
.expect("cannot convert msg into bytes")
};

for (peer, msg_bytes) in peer_msgs {
if let Some(filter) = &filter_recp {
if filter.contains(&peer.name()) {
continue;
}
}
context
.comm
.send_out_bytes_sync(peer, msg_id, msg_bytes)
.await;
info!("Sent {msg} to {}", peer.name());
}
} else {
panic!("mock_send_msg expects Cmd::SendMsg, got {cmd:?}");
}
}
}

// Receive the next `MsgFromPeer` if the buffer is not empty. Returns None if the buffer is currently empty
pub(crate) fn get_next_msg(comm_rx: &mut mpsc::Receiver<MsgFromPeer>) -> Option<MsgFromPeer> {
match comm_rx.try_recv() {
Ok(msg) => Some(msg),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => panic!("the comm_rx channel is closed"),
}
}
35 changes: 21 additions & 14 deletions sn_node/src/node/flow_ctrl/tests/network_builder.rs
Expand Up @@ -248,21 +248,28 @@ impl<R: RngCore> TestNetworkBuilder<R> {
.map(|p| p.bit_count())
.max()
.expect("at-least one sap should be provided");
// the permutations of 0,1 with len = max_bit_count gives us the max_prefixes
let bits = ["0", "1"];
let max_prefixes: Vec<_> = (2..max_bit_count).fold(
bits.iter()
.flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + *b1))
.collect(),
|acc, _| {
acc.iter()
.flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + b1))
.collect()
},
);

// max_prefixes are used to construct the `SectionTree`
let max_prefixes: BTreeSet<_> =
max_prefixes.into_iter().map(|str| prefix(&str)).collect();
let bits = ["0", "1"];
let max_prefixes = if max_bit_count == 0 {
BTreeSet::from([Prefix::default()])
} else if max_bit_count == 1 {
bits.iter().map(|bit| prefix(bit)).collect()
} else {
// the permutations of 0,1 with len = max_bit_count gives us the max_prefixes
// works only if max_bit_count >= 2
let max_prefixes: Vec<_> = (2..max_bit_count).fold(
bits.iter()
.flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + *b1))
.collect(),
|acc, _| {
acc.iter()
.flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + b1))
.collect()
},
);
max_prefixes.into_iter().map(|str| prefix(&str)).collect()
};

// missing_prefixes are used to construct saps
let mut missing_prefixes: BTreeSet<Prefix> = BTreeSet::new();
Expand Down

0 comments on commit aed6549

Please sign in to comment.