Skip to content

Commit

Permalink
test(gossipsub): make CI test to be more strict, 0-tolerance for miss…
Browse files Browse the repository at this point in the history
…ed published messages
  • Loading branch information
bochaco committed Oct 4, 2023
1 parent da8be1c commit 894e86c
Showing 1 changed file with 74 additions and 55 deletions.
129 changes: 74 additions & 55 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use common::safenode_proto::{
use sn_node::NodeEvent;

use eyre::Result;
use rand::Rng;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
Expand All @@ -24,70 +25,85 @@ use tokio_stream::StreamExt;
use tonic::Request;

const NODE_COUNT: u8 = 25;
const NODES_SUBSCRIBED: u8 = NODE_COUNT / 6; // 4 out of 25 nodes will be subscribers
const TEST_CYCLES: u8 = 100;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);

for node_index in 1..NODE_COUNT + 1 {
// request current node to subscribe to a fresh new topic
addr.set_port(12000 + node_index as u16);
let random_num = rand::random::<u64>();
let topic = format!("TestTopic-{node_index}-{random_num}");
node_subscribe_to_topic(addr, topic.clone()).await?;

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(4000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - 1 {
break;
for c in 0..TEST_CYCLES {
let topic = format!("TestTopic-{}", rand::random::<u64>());
println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1);
println!("============================================================");

let mut subs_addrs = vec![];
let mut subs_handles = vec![];

// let's get a random subset of NODES_SUBSCRIBED number of nodes to subscribe to the topic
let random_index = rand::thread_rng().gen_range(1..NODE_COUNT - NODES_SUBSCRIBED);
for node_index in random_index..random_index + NODES_SUBSCRIBED {
// request current node to subscribe to the topic
addr.set_port(12000 + node_index as u16);
node_subscribe_to_topic(addr, topic.clone()).await?;
subs_addrs.push(addr);

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(3000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - NODES_SUBSCRIBED {
break;
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
}
})
.await;
})
.await;

Ok::<u8, eyre::Error>(count)
});

Ok::<u8, eyre::Error>(count)
});
subs_handles.push((node_index, addr, handle));
}

tokio::time::sleep(Duration::from_millis(1000)).await;

// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(addr, topic.clone()).await?;

let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert!(
count > 0,
"No message received by node at index {}",
node_index
);

node_unsubscribe_from_topic(addr, topic).await?;
other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?;

for (node_index, addr, handle) in subs_handles.into_iter() {
let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert_eq!(
count,
NODE_COUNT - NODES_SUBSCRIBED,
"Not enough messages received by node at index {}",
node_index
);
node_unsubscribe_from_topic(addr, topic.clone()).await?;
}
}

Ok(())
Expand Down Expand Up @@ -117,11 +133,14 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result<
Ok(())
}

async fn other_nodes_to_publish_on_topic(filter_addr: SocketAddr, topic: String) -> Result<()> {
async fn other_nodes_to_publish_on_topic(
filter_addrs: Vec<SocketAddr>,
topic: String,
) -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);
for node_index in 1..NODE_COUNT + 1 {
addr.set_port(12000 + node_index as u16);
if addr != filter_addr {
if filter_addrs.iter().all(|a| a != &addr) {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
Expand Down

0 comments on commit 894e86c

Please sign in to comment.