diff --git a/sn_node/src/comm/mod.rs b/sn_node/src/comm/mod.rs index 2f585bf4e8..9b1d2ba1fb 100644 --- a/sn_node/src/comm/mod.rs +++ b/sn_node/src/comm/mod.rs @@ -167,6 +167,40 @@ impl Comm { Ok(()) } + // Test helper to send out Msgs in a blocking fashion + #[cfg(test)] + pub(crate) async fn send_out_bytes_sync(&self, peer: Peer, msg_id: MsgId, bytes: UsrMsgBytes) { + let watcher = self.send_to_one(peer, msg_id, bytes, None).await; + match watcher { + Ok(Some(watcher)) => { + let (send_was_successful, should_remove) = Self::is_sent(watcher, msg_id, peer) + .await + .expect("Error in is_sent"); + + if send_was_successful { + trace!("Msg {msg_id:?} sent to {peer:?}"); + } else if should_remove { + // do cleanup of that peer + let perhaps_session = self.sessions.remove(&peer); + if let Some((_peer, session)) = perhaps_session { + session.disconnect().await; + } + } + } + Ok(None) => {} + Err(error) => { + error!( + "Sending message (msg_id: {:?}) to {:?} (name {:?}) failed as we have disconnected from the peer. (Error is: {})", + msg_id, + peer.addr(), + peer.name(), + error, + ); + let _peer = self.sessions.remove(&peer); + } + } + } + // TODO: tweak messaging to just allow passthrough #[tracing::instrument(skip(self, bytes))] pub(crate) async fn send_out_bytes_to_peer_and_return_response( diff --git a/sn_node/src/node/flow_ctrl/dispatcher.rs b/sn_node/src/node/flow_ctrl/dispatcher.rs index 836ac32f0f..6a6e79df6b 100644 --- a/sn_node/src/node/flow_ctrl/dispatcher.rs +++ b/sn_node/src/node/flow_ctrl/dispatcher.rs @@ -234,7 +234,7 @@ impl Dispatcher { // Serializes and signs the msg if it's a Client message, // and produces one [`WireMsg`] instance per recipient - // the last step before passing it over to comms module. -fn into_msg_bytes( +pub(crate) fn into_msg_bytes( network_knowledge: &NetworkKnowledge, our_node_name: XorName, msg: NodeMsg, diff --git a/sn_node/src/node/flow_ctrl/tests/cmd_utils.rs b/sn_node/src/node/flow_ctrl/tests/cmd_utils.rs index 7d55ee046a..ba24dd9ecf 100644 --- a/sn_node/src/node/flow_ctrl/tests/cmd_utils.rs +++ b/sn_node/src/node/flow_ctrl/tests/cmd_utils.rs @@ -1,7 +1,11 @@ -use crate::node::{flow_ctrl::dispatcher::Dispatcher, messaging::Peers, Cmd}; -use assert_matches::assert_matches; -use eyre::eyre; -use eyre::Result; +use crate::{ + comm::MsgFromPeer, + node::{ + flow_ctrl::dispatcher::{into_msg_bytes, Dispatcher}, + messaging::Peers, + Cmd, + }, +}; use sn_interface::{ messaging::{ data::ClientMsg, @@ -12,7 +16,12 @@ use sn_interface::{ network_knowledge::{test_utils::*, MembershipState, NodeState, RelocateDetails}, types::{Keypair, Peer, ReplicatedData}, }; + +use assert_matches::assert_matches; +use eyre::{eyre, Result}; use std::collections::BTreeSet; +use tokio::sync::mpsc::{self, error::TryRecvError}; +use xor_name::XorName; pub(crate) struct HandleOnlineStatus { pub(crate) node_approval_sent: bool, @@ -217,3 +226,54 @@ impl Cmd { // } // } } + +impl Dispatcher { + // Sends out `NodeMsgs` to others synchronously, (process_cmd() spawns tasks to do it). + // Optionally drop the msgs to the provided set of peers. + pub(crate) async fn mock_send_msg(&self, cmd: Cmd, filter_recp: Option>) { + if let Cmd::SendMsg { + msg, + msg_id, + recipients, + send_stream, + context, + } = cmd + { + let _ = send_stream; + let peer_msgs = { + into_msg_bytes( + &context.network_knowledge, + context.name, + msg.clone(), + msg_id, + recipients, + ) + .expect("cannot convert msg into bytes") + }; + + for (peer, msg_bytes) in peer_msgs { + if let Some(filter) = &filter_recp { + if filter.contains(&peer.name()) { + continue; + } + } + context + .comm + .send_out_bytes_sync(peer, msg_id, msg_bytes) + .await; + info!("Sent {msg} to {}", peer.name()); + } + } else { + panic!("mock_send_msg expects Cmd::SendMsg, got {cmd:?}"); + } + } +} + +// Receive the next `MsgFromPeer` if the buffer is not empty. Returns None if the buffer is currently empty +pub(crate) fn get_next_msg(comm_rx: &mut mpsc::Receiver) -> Option { + match comm_rx.try_recv() { + Ok(msg) => Some(msg), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => panic!("the comm_rx channel is closed"), + } +} diff --git a/sn_node/src/node/flow_ctrl/tests/network_builder.rs b/sn_node/src/node/flow_ctrl/tests/network_builder.rs index 1affbb4b62..1edfcc2172 100644 --- a/sn_node/src/node/flow_ctrl/tests/network_builder.rs +++ b/sn_node/src/node/flow_ctrl/tests/network_builder.rs @@ -248,21 +248,28 @@ impl TestNetworkBuilder { .map(|p| p.bit_count()) .max() .expect("at-least one sap should be provided"); - // the permutations of 0,1 with len = max_bit_count gives us the max_prefixes - let bits = ["0", "1"]; - let max_prefixes: Vec<_> = (2..max_bit_count).fold( - bits.iter() - .flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + *b1)) - .collect(), - |acc, _| { - acc.iter() - .flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + b1)) - .collect() - }, - ); + // max_prefixes are used to construct the `SectionTree` - let max_prefixes: BTreeSet<_> = - max_prefixes.into_iter().map(|str| prefix(&str)).collect(); + let bits = ["0", "1"]; + let max_prefixes = if max_bit_count == 0 { + BTreeSet::from([Prefix::default()]) + } else if max_bit_count == 1 { + bits.iter().map(|bit| prefix(bit)).collect() + } else { + // the permutations of 0,1 with len = max_bit_count gives us the max_prefixes + // works only if max_bit_count >= 2 + let max_prefixes: Vec<_> = (2..max_bit_count).fold( + bits.iter() + .flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + *b1)) + .collect(), + |acc, _| { + acc.iter() + .flat_map(|b1| bits.iter().map(|&b2| b2.to_string() + b1)) + .collect() + }, + ); + max_prefixes.into_iter().map(|str| prefix(&str)).collect() + }; // missing_prefixes are used to construct saps let mut missing_prefixes: BTreeSet = BTreeSet::new(); diff --git a/sn_node/src/node/messaging/dkg.rs b/sn_node/src/node/messaging/dkg.rs index 310ebb48c9..efdf6b03a0 100644 --- a/sn_node/src/node/messaging/dkg.rs +++ b/sn_node/src/node/messaging/dkg.rs @@ -38,7 +38,7 @@ fn dkg_peers(our_index: usize, session_id: &DkgSessionId) -> BTreeSet { .collect() } -fn acknowledge_dkg_oucome( +fn acknowledge_dkg_outcome( session_id: &DkgSessionId, participant_index: usize, pub_key_set: PublicKeySet, @@ -432,7 +432,7 @@ impl MyNode { session_id.elders.len(), new_pubs.public_key(), ); - cmds.push(acknowledge_dkg_oucome( + cmds.push(acknowledge_dkg_outcome( session_id, our_id, new_pubs, new_sec, )) } @@ -683,7 +683,7 @@ impl MyNode { "Gossiping DKG outcome for s{} as we didn't notice SAP change", session_id.sh() ); - let cmd = acknowledge_dkg_oucome(session_id, our_id.into(), new_pubs, new_sec); + let cmd = acknowledge_dkg_outcome(session_id, our_id.into(), new_pubs, new_sec); vec![cmd] } Ok(None) => { @@ -788,16 +788,20 @@ impl MyNode { #[cfg(test)] mod tests { use super::MyNode; - use crate::node::{ - flow_ctrl::{cmds::Cmd, tests::network_builder::TestNetworkBuilder}, - messaging::Peers, + use crate::{ + comm::MsgFromPeer, + node::flow_ctrl::{ + cmds::Cmd, + dispatcher::Dispatcher, + tests::{cmd_utils::get_next_msg, network_builder::TestNetworkBuilder}, + }, }; use sn_interface::{ init_logger, messaging::{ signature_aggregator::SignatureAggregator, system::{DkgSessionId, NodeMsg, Proposal}, - MsgId, SectionSigShare, + MsgType, SectionSigShare, }, network_knowledge::{supermajority, NodeState, SectionKeyShare, SectionsDAG}, test_utils::{TestKeys, TestSectionTree}, @@ -813,89 +817,67 @@ mod tests { collections::{BTreeMap, BTreeSet}, sync::Arc, }; - use tokio::sync::RwLock; + use tokio::sync::{mpsc, RwLock}; use xor_name::{Prefix, XorName}; /// Simulate an entire round of dkg till termination; The dkg round creates a new keyshare set - /// without any elder change (i.e., the dkg is between the same set of elders). The test - /// collects the `NodeMsg`s and passes them to the recipient nodes directly instead of using the - /// comm module. + /// without any elder change (i.e., the dkg is between the same set of elders). #[tokio::test] async fn simulate_dkg_round() -> Result<()> { init_logger(); let mut rng = rand::thread_rng(); let node_count = 7; - let (mut node_instances, _) = MyNodeInstance::new_instances(node_count, &mut rng).await; + let (mut node_instances, mut comm_receivers, _) = create_elders(node_count, &mut rng).await; - // let current set of elders start the dkg round and capture the msgs that are outbound to the other nodes - let _ = MyNodeInstance::start_dkg(&mut node_instances).await?; + // let the current set of elders start the dkg round + let _ = start_dkg(&mut node_instances).await?; let mut new_sk_shares = BTreeMap::new(); - let mut done = false; - while !done { - // For every msg in `msg_queue` for every node instance, 1) handle the msg 2) handle the cmds - // 3) if the cmds produce more msgs, add them to the `msg_queue` of the respective peer - let mut msgs_to_other_nodes = Vec::new(); - for mock_node in node_instances.values() { - let node = mock_node.node.clone(); - info!("\n\n NODE: {}", node.read().await.name()); - let context = node.read().await.context(); - - while let Some((msg_id, msg, sender)) = mock_node.msg_queue.write().await.pop() { - let cmds = MyNode::handle_valid_node_msg( - node.clone(), - context.clone(), - msg_id, - msg, - sender, - None, - ) - .await?; - + // run until all the node buffers are empty + let mut empty_nodes = BTreeSet::new(); + while empty_nodes.len() != node_count { + empty_nodes = BTreeSet::new(); + for dispatcher in node_instances.values() { + let name = dispatcher.node().read().await.name(); + let comm_rx = comm_receivers + .get_mut(&name) + .ok_or_else(|| eyre!("comm_rx should be present"))?; + info!("\n\n NODE: {name}"); + + // used to check if the buffer is empty during the first iteration of the buffer. + // If all the node buffers are empty during the first try, break out of main loop + let mut empty_at_first_try = true; + while let Some(msg) = get_next_msg(comm_rx) { + if empty_at_first_try { + empty_at_first_try = false; + } + let cmds = dispatcher.mock_handle_node_msg(msg).await; for cmd in cmds { info!("Got cmd {}", cmd); - match cmd { - Cmd::SendMsg { - msg, - msg_id, - recipients, - .. - } => { - let new_msgs = - node.read().await.mock_send_msg(msg, msg_id, recipients); - msgs_to_other_nodes.push(new_msgs); - } - Cmd::HandleDkgOutcome { - section_auth, - outcome, - } => { - // capture the sk_share here as we don't proceed with the SAP update - let _ = - new_sk_shares.insert(node.read().await.name(), outcome.clone()); - let ((_, msg, _), _) = node - .write() - .await - .mock_dkg_outcome_proposal(section_auth, outcome) - .await; - assert_matches!(msg, NodeMsg::Propose { proposal, .. } => { - assert_matches!(proposal, Proposal::RequestHandover(_)) - }); - } - _ => panic!("got a different cmd {:?}", cmd), + if let Cmd::SendMsg { .. } = &cmd { + dispatcher.mock_send_msg(cmd, None).await; + } else if let Cmd::HandleDkgOutcome { + section_auth: _, + outcome, + } = &cmd + { + let _ = new_sk_shares.insert(name, outcome.clone()); + let dkg_cmds = dispatcher.process_cmd(cmd).await?; + verify_dkg_outcome_cmds(dkg_cmds); + } else { + panic!("got a different cmd {:?}", cmd); } } } + // the msg buffer is empty, + if empty_at_first_try { + let _ = empty_nodes.insert(name); + } } - - // add the msgs to the msg_queue of each node - MyNodeInstance::add_msgs_to_queue(&mut node_instances, msgs_to_other_nodes).await; - - // done if the queues are empty - done = MyNodeInstance::is_msg_queue_empty(&node_instances).await; } - // dkg done, make sure the new key share is valid - MyNodeInstance::verify_new_key(&new_sk_shares, node_count).await; + // dkg done, make sure the new key_shares are valid + verify_new_key(&new_sk_shares, node_count).await; Ok(()) } @@ -906,35 +888,34 @@ mod tests { init_logger(); let mut rng = rand::thread_rng(); let node_count = 7; - let (mut node_instances, initial_sk_set) = - MyNodeInstance::new_instances(node_count, &mut rng).await; + let (mut node_instances, mut comm_receivers, initial_sk_set) = + create_elders(node_count, &mut rng).await; - // let current set of elders start the dkg round and capture the msgs that are outbound to the other nodes - let _ = MyNodeInstance::start_dkg(&mut node_instances).await?; + // let current set of elders start the dkg round + let _ = start_dkg(&mut node_instances).await?; let mut new_sk_shares: BTreeMap = BTreeMap::new(); let mut new_sap: BTreeSet = BTreeSet::new(); let mut lagging = false; - while !MyNodeInstance::is_msg_queue_empty(&node_instances).await { - // For every msg in `msg_queue` for every node instance, 1) handle the msg 2) handle the cmds - // 3) if the cmds produce more msgs, add them to the `msg_queue` of the respective peer - let mut msgs_to_other_nodes = Vec::new(); - for mock_node in node_instances.values() { - let node = mock_node.node.clone(); - let name = node.read().await.name(); - info!("\n\n NODE: {}", name); - let context = node.read().await.context(); - - while let Some((msg_id, msg, sender)) = mock_node.msg_queue.write().await.pop() { - let cmds = MyNode::handle_valid_node_msg( - node.clone(), - context.clone(), - msg_id, - msg, - sender, - None, - ) - .await?; + // run until all the node buffers are empty + let mut empty_nodes = BTreeSet::new(); + while empty_nodes.len() != node_count { + empty_nodes = BTreeSet::new(); + for dispatcher in node_instances.values() { + let name = dispatcher.node().read().await.name(); + let comm_rx = comm_receivers + .get_mut(&name) + .ok_or_else(|| eyre!("comm_rx should be present"))?; + info!("\n\n NODE: {name}"); + + // used to check if the buffer is empty during the first iteration of the buffer. + // If all the node buffers are empty during the first try, break out of main loop + let mut empty_at_first_try = true; + while let Some(msg) = get_next_msg(comm_rx) { + if empty_at_first_try { + empty_at_first_try = false; + } + let cmds = dispatcher.mock_handle_node_msg(msg).await; // If supermajority of the nodes have terminated, then the remaining nodes // can be considered as 'lagging'. So use the supermajority of the shares @@ -950,7 +931,7 @@ mod tests { .clone() .into_iter() .next() - .ok_or_else(|| eyre!("should contain 1"))?; + .ok_or_else(|| eyre!("should contain a sap"))?; let signed_sap = TestKeys::get_section_signed( &new_sk_set.secret_key(), new_sap.clone(), @@ -984,7 +965,7 @@ mod tests { let _updated = node_instances .get(&lag) .ok_or_else(|| eyre!("node will be present"))? - .node + .node() .write() .await .network_knowledge @@ -1001,71 +982,52 @@ mod tests { for cmd in cmds { info!("Got cmd {}", cmd); - match cmd { - Cmd::SendMsg { - msg, - msg_id, - recipients, - .. - } => { - let new_msgs = - node.read().await.mock_send_msg(msg, msg_id, recipients); - msgs_to_other_nodes.push(new_msgs); - } - Cmd::HandleDkgOutcome { - section_auth, - outcome, - } => { - let _ = - new_sk_shares.insert(node.read().await.name(), outcome.clone()); - let _ = new_sap.insert(section_auth.clone()); - if !lagging { - let ((_, msg, _), _) = node - .write() - .await - .mock_dkg_outcome_proposal(section_auth, outcome) - .await; - assert_matches!(msg, NodeMsg::Propose { proposal, .. } => { - assert_matches!(proposal, Proposal::RequestHandover(_)) - }); - } else { - // Since the dkg session is for the same prefix, the - // lagging node should just complete the elder handover - // without requesting handover. - let cmds = node - .write() - .await - .handle_dkg_outcome(section_auth, outcome) - .await?; - - assert_eq!(cmds.len(), 2); - for cmd in cmds { - let msg = - assert_matches!(cmd, Cmd::SendMsg { msg, .. } => msg); - - match msg { - NodeMsg::Propose { - proposal: Proposal::JoinsAllowed(..), - .. - } => (), - NodeMsg::AntiEntropy { .. } => (), - msg => panic!("Unexpected msg {msg}"), - } + if let Cmd::SendMsg { .. } = &cmd { + dispatcher.mock_send_msg(cmd, None).await; + } + // Dkg done, stop the test here + else if let Cmd::HandleDkgOutcome { + section_auth, + outcome, + } = &cmd + { + let _ = new_sk_shares.insert(name, outcome.clone()); + let _ = new_sap.insert(section_auth.clone()); + let cmds = dispatcher.process_cmd(cmd).await?; + if !lagging { + verify_dkg_outcome_cmds(cmds); + } else { + // Since the dkg session is for the same prefix, the + // lagging node should just complete the elder handover + // without requesting handover. + assert_eq!(cmds.len(), 2); + for cmd in cmds { + let msg = assert_matches!(cmd, Cmd::SendMsg { msg, .. } => msg); + + match msg { + NodeMsg::Propose { + proposal: Proposal::JoinsAllowed(..), + .. + } => (), + NodeMsg::AntiEntropy { .. } => (), + msg => panic!("Unexpected msg {msg}"), } } } - _ => panic!("got a different cmd {:?}", cmd), + } else { + panic!("got a different cmd {:?}", cmd); } } } + // the msg buffer is empty, + if empty_at_first_try { + let _ = empty_nodes.insert(name); + } } - - // add the msgs to the msg_queue of each node - MyNodeInstance::add_msgs_to_queue(&mut node_instances, msgs_to_other_nodes).await; } - // dkg done, make sure the new key share is valid - MyNodeInstance::verify_new_key(&new_sk_shares, node_count).await; + // dkg done, make sure the new key_shares are valid + verify_new_key(&new_sk_shares, node_count).await; Ok(()) } @@ -1076,63 +1038,53 @@ mod tests { init_logger(); let mut rng = rand::thread_rng(); let node_count = 7; - let (mut node_instances, _initial_sk_set) = - MyNodeInstance::new_instances(node_count, &mut rng).await; + let (mut node_instances, mut comm_receivers, _) = create_elders(node_count, &mut rng).await; - let _ = MyNodeInstance::start_dkg(&mut node_instances).await?; + // let current set of elders start the dkg round + let _ = start_dkg(&mut node_instances).await?; let dead_node = node_instances .keys() .next() .cloned() .ok_or_else(|| eyre!("node_instances is not empty"))?; - let mut done = false; - while !done { - let mut msgs_to_other_nodes = Vec::new(); - for mock_node in node_instances.values() { - let node = mock_node.node.clone(); - let context = node.read().await.context(); - info!("\n\n NODE: {}", node.read().await.name()); - while let Some((msg_id, msg, sender)) = mock_node.msg_queue.write().await.pop() { - let cmds = MyNode::handle_valid_node_msg( - node.clone(), - context.clone(), - msg_id, - msg, - sender, - None, - ) - .await?; - + // run until all the node buffers are empty (dead node's buffer is empty after it processes `dkg_start` msgs) + let mut empty_nodes = BTreeSet::new(); + while empty_nodes.len() != node_count { + empty_nodes = BTreeSet::new(); + for dispatcher in node_instances.values() { + let name = dispatcher.node().read().await.name(); + let comm_rx = comm_receivers + .get_mut(&name) + .ok_or_else(|| eyre!("comm_rx should be present"))?; + info!("\n\n NODE: {name}"); + + // used to check if the buffer is empty during the first iteration of the buffer. + // If all the node buffers are empty during the first try, break out of main loop + let mut empty_at_first_try = true; + while let Some(msg) = get_next_msg(comm_rx) { + if empty_at_first_try { + empty_at_first_try = false; + } + let cmds = dispatcher.mock_handle_node_msg(msg).await; for cmd in cmds { info!("Got cmd {}", cmd); - match cmd { - Cmd::SendMsg { - msg, - msg_id, - recipients, - .. - } => { - let mut new_msgs = - node.read().await.mock_send_msg(msg, msg_id, recipients); - // dead_node will not recieve the msg - new_msgs.1.retain(|peer| peer.name() != dead_node); - msgs_to_other_nodes.push(new_msgs); - } - _ => panic!("got a different cmd {:?}", cmd), + if let Cmd::SendMsg { .. } = cmd { + let filter = BTreeSet::from([dead_node]); + dispatcher.mock_send_msg(cmd, Some(filter)).await; + } else { + panic!("got a different cmd {:?}", cmd); } } } + // the msg buffer is empty, + if empty_at_first_try { + let _ = empty_nodes.insert(name); + } } - - // add the msgs to the msg_queue of each node - MyNodeInstance::add_msgs_to_queue(&mut node_instances, msgs_to_other_nodes).await; - - // done if the queues are empty - done = MyNodeInstance::is_msg_queue_empty(&node_instances).await; } - // all the msgs are processed and we counldn't reach dkg termination + // all the msgs are processed and we couldn't reach dkg termination Ok(()) } @@ -1144,85 +1096,81 @@ mod tests { init_logger(); let mut rng = rand::thread_rng(); let node_count = 7; - let (mut node_instances, _) = MyNodeInstance::new_instances(node_count, &mut rng).await; + let (mut node_instances, mut comm_receivers, _) = create_elders(node_count, &mut rng).await; - // let current set of elders start the dkg round and capture the msgs that are outbound to the other nodes - let dkg_session_id = MyNodeInstance::start_dkg(&mut node_instances).await?; + // let current set of elders start the dkg round + let dkg_session_id = start_dkg(&mut node_instances).await?; let mut new_sk_shares = BTreeMap::new(); - let mut done = false; - while !done { - let mut msgs_to_other_nodes = Vec::new(); - for mock_node in node_instances.values() { - let node = mock_node.node.clone(); - info!("\n\n NODE: {}", node.read().await.name()); - let context = node.read().await.context(); - - while let Some((msg_id, msg, sender)) = mock_node.msg_queue.write().await.pop() { - let cmds = MyNode::handle_valid_node_msg( - node.clone(), - context.clone(), - msg_id, - msg, - sender, - None, - ) - .await?; - + // run until we get all the sk_shares + while new_sk_shares.len() != node_count { + let mut empty_nodes = BTreeSet::new(); + for dispatcher in node_instances.values() { + let name = dispatcher.node().read().await.name(); + let comm_rx = comm_receivers + .get_mut(&name) + .ok_or_else(|| eyre!("comm_rx should be present"))?; + info!("\n\n NODE: {name}"); + + // used to check if the buffer is empty during the first iteration of the buffer. + // If all the node buffers are empty during the first try, break out of main loop + let mut empty_at_first_try = true; + while let Some(msg) = get_next_msg(comm_rx) { + if empty_at_first_try { + empty_at_first_try = false; + } + let cmds = dispatcher.mock_handle_node_msg(msg).await; for cmd in cmds { info!("Got cmd {}", cmd); - match cmd { - Cmd::SendMsg { - msg, - msg_id, - recipients, - .. - } => { - let mut new_msgs = - node.read().await.mock_send_msg(msg, msg_id, recipients); - // randomly drop the msg to a peer; chance = 1/node_count - new_msgs.1.retain(|_| rng.gen::() % node_count != 0); - msgs_to_other_nodes.push(new_msgs); - } - Cmd::HandleDkgOutcome { - section_auth, - outcome, - } => { - // capture the sk_share here as we don't proceed with the SAP update - let _ = - new_sk_shares.insert(node.read().await.name(), outcome.clone()); - let ((_, msg, _), _) = node - .write() - .await - .mock_dkg_outcome_proposal(section_auth, outcome) - .await; - assert_matches!(msg, NodeMsg::Propose { proposal, .. } => { - assert_matches!(proposal, Proposal::RequestHandover(_)) - }); - } - _ => panic!("got a different cmd {:?}", cmd), + if let Cmd::SendMsg { .. } = &cmd { + // (1/node_count) chance to drop a msg + let drop_recp = if rng.gen::() % node_count == 0 { + let recp = cmd.recipients()?; + let recp_count = recp.len(); + let drop = recp + .into_iter() + .map(|p| p.name()) + .nth(rng.gen::() % recp_count) + .ok_or_else(|| eyre!("Contains node_count peers"))?; + Some(BTreeSet::from([drop])) + } else { + None + }; + dispatcher.mock_send_msg(cmd, drop_recp).await; + } else if let Cmd::HandleDkgOutcome { + section_auth: _, + outcome, + } = &cmd + { + // capture the sk_share here as we don't proceed with the SAP update + let _ = new_sk_shares.insert(name, outcome.clone()); + let dkg_cmds = dispatcher.process_cmd(cmd).await?; + verify_dkg_outcome_cmds(dkg_cmds); + } else { + panic!("got a different cmd {:?}", cmd); } } } + // the msg buffer is empty, + if empty_at_first_try { + let _ = empty_nodes.insert(name); + } } - // If the msg_queue is empty for all participant and if the current dkg - // session has not terminated, then send a gossip msg from a random node. This + // If the msg buffers are empty and if the current dkg session has not yet + // terminated, then send a gossip msg from a random node. This // allows everyone to catchup.(in the real network each node sends out a - // gossip if it has not recieved any valid dkg msg in 30 seconds). - if MyNodeInstance::is_msg_queue_empty(&node_instances).await - && msgs_to_other_nodes.is_empty() - && new_sk_shares.len() != node_count - { + // gossip if it has not received any valid dkg msg in 30 seconds). + if empty_nodes.len() == node_count && new_sk_shares.len() != node_count { // select a random_node which has not terminated, since terminated node // sends out HandleDkgOutcome cmd instead of NodeMsg let random_node = loop { - let random_node = &node_instances + let random_node = node_instances .values() .nth(rng.gen::() % node_count) - .ok_or_else(|| eyre!("there should be node_count nodes"))? - .node; + .ok_or_else(|| eyre!("there should be node_count nodes"))?; if !random_node + .node() .read() .await .dkg_voter @@ -1233,229 +1181,159 @@ mod tests { }; info!( "Sending gossip from random node {:?}", - random_node.read().await.name() + random_node.node().read().await.name() ); - let cmds = random_node.read().await.dkg_gossip_msgs(); + let cmds = random_node.node().read().await.dkg_gossip_msgs(); for cmd in cmds { info!("Got cmd {}", cmd); - match cmd { - Cmd::SendMsg { - msg, - msg_id, - recipients, - .. - } => { - let new_msgs = random_node - .read() - .await - .mock_send_msg(msg, msg_id, recipients); - msgs_to_other_nodes.push(new_msgs); - } - _ => panic!("should be send msg, got {cmd}"), - } + assert_matches!(&cmd, Cmd::SendMsg { .. }); + random_node.mock_send_msg(cmd, None).await; } } - - // add the msgs to the msg_queue of each node - MyNodeInstance::add_msgs_to_queue(&mut node_instances, msgs_to_other_nodes).await; - - // done if we have generated all the sk_shares - done = new_sk_shares.len() == node_count; } - // dkg done, make sure the new key share is valid - MyNodeInstance::verify_new_key(&new_sk_shares, node_count).await; + // dkg done, make sure the new key_shares are valid + verify_new_key(&new_sk_shares, node_count).await; Ok(()) } // Test helpers - type MockSystemMsg = (MsgId, NodeMsg, Peer); - struct MyNodeInstance { - node: Arc>, - msg_queue: RwLock>, + /// Generate a set of `MyNode` instances + async fn create_elders( + elder_count: usize, + rng: impl RngCore, + ) -> ( + BTreeMap, + BTreeMap>, + SecretKeySet, + ) { + let mut env = TestNetworkBuilder::new(rng) + .sap(Prefix::default(), elder_count, 0, None, None) + .build(); + let sk_set = env.get_secret_key_set(Prefix::default(), None); + let node_instances = env + .get_nodes(Prefix::default(), elder_count, 0, None) + .into_iter() + .map(|node| { + let name = node.name(); + let (dispatcher, _) = Dispatcher::new(Arc::new(RwLock::new(node))); + (name, dispatcher) + }) + .collect::>(); + let mut comm_receivers = BTreeMap::new(); + for (name, node) in node_instances.iter() { + let pk = node.node().read().await.info().public_key(); + let comm = env.take_comm_rx(pk); + let _ = comm_receivers.insert(*name, comm); + } + (node_instances, comm_receivers, sk_set) } - impl MyNodeInstance { - // Creates a set of MyNodeInstances. The network contains a genesis section with all the - // node_count present in it. The gen_sk_set is also returned - async fn new_instances( - node_count: usize, - rng: &mut R, - ) -> (BTreeMap, SecretKeySet) { - let env = TestNetworkBuilder::new(rng) - .sap(Prefix::default(), node_count, 0, None, None) - .build(); - - let node_instances = env - .get_nodes(Prefix::default(), node_count, 0, None) - .into_iter() - .map(|node| { - let name = node.name(); - let mock = MyNodeInstance { - node: Arc::new(RwLock::new(node)), - msg_queue: RwLock::new(Vec::new()), - }; - (name, mock) - }) - .collect::>(); - let sk_set = env.get_secret_key_set(Prefix::default(), None); - (node_instances, sk_set) + // Each node sends out DKG start msg to the other nodes + async fn start_dkg(nodes: &mut BTreeMap) -> Result { + let mut elders = BTreeMap::new(); + for (name, node) in nodes.iter() { + let _ = elders.insert(*name, node.node().read().await.addr); } - - // Each node sends out DKG start msg and they are added to the msg queue for the other nodes - async fn start_dkg(nodes: &mut BTreeMap) -> Result { - let mut elders = BTreeMap::new(); - for (name, node) in nodes.iter() { - let _ = elders.insert(*name, node.node.read().await.addr); - } - let bootstrap_members = elders - .iter() - .map(|(name, addr)| { - let peer = Peer::new(*name, *addr); - NodeState::joined(peer, None) - }) - .collect::>(); - // A DKG session which just creates a new key for the same set of eleders - let session_id = DkgSessionId { - prefix: Prefix::default(), - elders, - section_chain_len: 1, - bootstrap_members, - membership_gen: 0, - }; - let mut msgs_to_other_nodes = Vec::new(); - for node in nodes.values() { - let mut node = node.node.write().await; - let mut cmd = node.send_dkg_start(session_id.clone())?; - assert_eq!(cmd.len(), 1); - let msg = assert_matches!(cmd.remove(0), Cmd::SendMsg { msg, msg_id, recipients, .. } => (msg, msg_id, recipients)); - let msg = node.mock_send_msg(msg.0, msg.1, msg.2); - msgs_to_other_nodes.push(msg); - } - // add the msgs to the msg_queue of each node - Self::add_msgs_to_queue(nodes, msgs_to_other_nodes).await; - Ok(session_id) + let bootstrap_members = elders + .iter() + .map(|(name, addr)| { + let peer = Peer::new(*name, *addr); + NodeState::joined(peer, None) + }) + .collect::>(); + // A DKG session which just creates a new key for the same set of elders + let session_id = DkgSessionId { + prefix: Prefix::default(), + elders, + section_chain_len: 1, + bootstrap_members, + membership_gen: 0, + }; + for node in nodes.values() { + let mut cmd = node + .node() + .write() + .await + .send_dkg_start(session_id.clone())?; + assert_eq!(cmd.len(), 1); + let cmd = cmd.remove(0); + matches!(cmd, Cmd::SendMsg { .. }); + node.mock_send_msg(cmd, None).await; } + Ok(session_id) + } - // Given a list of node instances and a lit of NodeMsgs, add the msgs to the message queue of the recipients - async fn add_msgs_to_queue( - nodes: &mut BTreeMap, - msgs: Vec<(MockSystemMsg, Vec)>, - ) { - for (system_msg, recipients) in msgs { - for recp in recipients { - nodes - .get(&recp.name()) - .expect("recp is present in node_instances") - .msg_queue - .write() - .await - .push(system_msg.clone()); - } - } + // Verify that the newly generated key is valid. Aggregate the signature shares instead of + // using `TestKeys::get_sk_set_from_shares`. + async fn verify_new_key(new_sk_shares: &BTreeMap, node_count: usize) { + let mut pub_key_set = BTreeSet::new(); + let mut sig_shares = Vec::new(); + for key_share in new_sk_shares.values() { + let pk = key_share.public_key_set.public_key(); + let _ = pub_key_set.insert(pk); + + let sig_share = SectionSigShare::new( + key_share.public_key_set.clone(), + key_share.index, + &key_share.secret_key_share, + "msg".as_bytes(), + ); + sig_shares.push(sig_share); } - - async fn is_msg_queue_empty(nodes: &BTreeMap) -> bool { - let mut not_empty = false; - for node in nodes.values() { - if !node.msg_queue.read().await.is_empty() { - not_empty = true; - } + assert_eq!(pub_key_set.len(), 1); + let mut agg = SignatureAggregator::default(); + let mut sig_count = 1; + for sig_share in sig_shares { + // threshold = 4 i.e, we need 5 shares to gen the complete sig; Thus the first 4 return None, and 5th one + // gives us the complete sig; + if sig_count < supermajority(node_count) || sig_count > supermajority(node_count) { + assert!(agg + .try_aggregate("msg".as_bytes(), sig_share) + .expect("Failed to aggregate sigs") + .is_none()); + } else if sig_count == supermajority(node_count) { + let sig = agg + .try_aggregate("msg".as_bytes(), sig_share) + .expect("Failed to aggregate sigs") + .expect("Should return the SectionSig"); + assert!(sig.verify("msg".as_bytes()), "Failed to verify SectionSig"); } - !not_empty + sig_count += 1; } + info!("the generated key is valid!"); + } - // Verify that the newly generated key is valid. Aggregate the signature shares instead of - // using `TestKeys::get_sk_set_from_shares`. - async fn verify_new_key( - new_sk_shares: &BTreeMap, - node_count: usize, - ) { - let mut pub_key_set = BTreeSet::new(); - let mut sig_shares = Vec::new(); - for key_share in new_sk_shares.values() { - let pk = key_share.public_key_set.public_key(); - let _ = pub_key_set.insert(pk); - - let sig_share = SectionSigShare::new( - key_share.public_key_set.clone(), - key_share.index, - &key_share.secret_key_share, - "msg".as_bytes(), - ); - sig_shares.push(sig_share); - } - assert_eq!(pub_key_set.len(), 1); - let mut agg = SignatureAggregator::default(); - let mut sig_count = 1; - for sig_share in sig_shares { - // threshold = 4 i.e, we need 5 shares to gen the complete sig; Thus the first 4 return None, and 5th one - // gives us the complete sig; - if sig_count < supermajority(node_count) || sig_count > supermajority(node_count) { - assert!(agg - .try_aggregate("msg".as_bytes(), sig_share) - .expect("Failed to aggregate sigs") - .is_none()); - } else if sig_count == supermajority(node_count) { - let sig = agg - .try_aggregate("msg".as_bytes(), sig_share) - .expect("Failed to aggregate sigs") - .expect("Should return the SectionSig"); - assert!(sig.verify("msg".as_bytes()), "Failed to verify SectionSig"); - } - sig_count += 1; + fn verify_dkg_outcome_cmds(cmds: Vec) { + assert_eq!(cmds.len(), 2); + for cmd in cmds { + let msg = assert_matches!(cmd, Cmd::SendMsg { msg, .. } => msg); + + match msg { + NodeMsg::Propose { + proposal: Proposal::RequestHandover(_), + .. + } => (), + NodeMsg::AntiEntropy { .. } => (), + msg => panic!("Unexpected msg {msg}"), } - info!("the generated key is valid!"); } } - impl MyNode { - fn mock_send_msg( - &self, - msg: NodeMsg, - msg_id: MsgId, - recipients: Peers, - ) -> (MockSystemMsg, Vec) { - info!("msg: {msg:?} msg_id {msg_id:?}, recipients {recipients:?}"); - let current_node = Peer::new(self.name(), self.addr); - - let recipients = match recipients { - Peers::Single(peer) => vec![peer], - Peers::Multiple(peers) => peers.into_iter().collect(), - }; - let mock_system_msg: MockSystemMsg = (msg_id, msg, current_node); - info!("SendMsg output {}", mock_system_msg.2); - (mock_system_msg, recipients) - } - - // if RequestHandover proposal is triggered, it will send out msgs to other nodes - async fn mock_dkg_outcome_proposal( - &mut self, - sap: SectionAuthorityProvider, - key_share: SectionKeyShare, - ) -> (MockSystemMsg, Vec) { - for cmd in self - .handle_dkg_outcome(sap, key_share) + impl Dispatcher { + // the actual handler has an AE check. Skipping it here + async fn mock_handle_node_msg(&self, msg: MsgFromPeer) -> Vec { + let context = self.node().read().await.context(); + let origin = msg.sender; + let (msg_id, msg) = assert_matches!( + msg.wire_msg.into_msg().expect("Failed to deserialize wire_msg"), + MsgType::Node { msg_id, dst: _, msg } => (msg_id, msg) + ); + MyNode::handle_valid_node_msg(self.node(), context, msg_id, msg, origin, None) .await - .expect("Failed to handle DKG outcome") - { - let (msg, msg_id, recipients) = assert_matches!(cmd, Cmd::SendMsg { msg, msg_id, recipients, ..} => (msg, msg_id, recipients)); - - // contains only the SendMsg for RequestHandover proposal - if matches!( - msg, - NodeMsg::Propose { - proposal: Proposal::RequestHandover(..), - .. - } - ) { - return self.mock_send_msg(msg, msg_id, recipients); - } - } - - panic!("Expected propose msg"); + .expect("Error while handling node msg") } } }