Skip to content

Commit

Permalink
test(gossipsub): make CI test to be more strict, 0-tolerance for miss…
Browse files Browse the repository at this point in the history
…ed published messages
  • Loading branch information
bochaco committed Oct 4, 2023
1 parent 89ef8e2 commit cfb77f3
Showing 1 changed file with 78 additions and 56 deletions.
134 changes: 78 additions & 56 deletions sn_node/tests/msgs_over_gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,70 +24,89 @@ use tokio_stream::StreamExt;
use tonic::Request;

const NODE_COUNT: u8 = 25;
const NODES_SUBSCRIBED: u8 = NODE_COUNT / 2; // 12 out of 25 nodes will be subscribers
const TEST_CYCLES: u8 = 20;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn msgs_over_gossipsub() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);

for node_index in 1..NODE_COUNT + 1 {
// request current node to subscribe to a fresh new topic
addr.set_port(12000 + node_index as u16);
let random_num = rand::random::<u64>();
let topic = format!("TestTopic-{node_index}-{random_num}");
node_subscribe_to_topic(addr, topic.clone()).await?;

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(4000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - 1 {
break;
for c in 0..TEST_CYCLES {
let topic = format!("TestTopic-{}", rand::random::<u64>());
println!("Testing cicle {}/{TEST_CYCLES} - topic: {topic}", c + 1);
println!("============================================================");

let mut subs_addrs = vec![];
let mut subs_handles = vec![];

// get a random subset of NODES_SUBSCRIBED out of NODE_COUNT nodes to subscribe to the topic
let mut rng = rand::thread_rng();
let random_indexes =
rand::seq::index::sample(&mut rng, NODE_COUNT.into(), NODES_SUBSCRIBED.into())
.into_vec();

for node_index in random_indexes {
// request current node to subscribe to the topic
addr.set_port(12000 + node_index as u16);
node_subscribe_to_topic(addr, topic.clone()).await?;
subs_addrs.push(addr);

println!("Node {node_index} subscribed to {topic}");

let handle = tokio::spawn(async move {
let endpoint = format!("https://{addr}");
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events...");
let mut count = 0;

let _ = timeout(Duration::from_millis(6000), async {
let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!(
"New gossipsub msg received on '{topic}': {}",
String::from_utf8(msg).unwrap()
);
count += 1;
if count == NODE_COUNT - NODES_SUBSCRIBED {
break;
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
Ok(_) => { /* ignored */ }
Err(_) => {
println!("Error while parsing received NodeEvent");
}
}
}
})
.await;

Ok::<u8, eyre::Error>(count)
});
})
.await;

tokio::time::sleep(Duration::from_millis(1000)).await;
Ok::<u8, eyre::Error>(count)
});

// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(addr, topic.clone()).await?;
subs_handles.push((node_index, addr, handle));
}

let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert!(
count > 0,
"No message received by node at index {}",
node_index
);
tokio::time::sleep(Duration::from_millis(3000)).await;

node_unsubscribe_from_topic(addr, topic).await?;
// have all other nodes to publish each a different msg to that same topic
other_nodes_to_publish_on_topic(subs_addrs, topic.clone()).await?;

for (node_index, addr, handle) in subs_handles.into_iter() {
let count = handle.await??;
println!("Messages received by node {node_index}: {count}");
assert_eq!(
count,
NODE_COUNT - NODES_SUBSCRIBED,
"Not enough messages received by node at index {}",
node_index
);
node_unsubscribe_from_topic(addr, topic.clone()).await?;
}
}

Ok(())
Expand Down Expand Up @@ -117,11 +136,14 @@ async fn node_unsubscribe_from_topic(addr: SocketAddr, topic: String) -> Result<
Ok(())
}

async fn other_nodes_to_publish_on_topic(filter_addr: SocketAddr, topic: String) -> Result<()> {
async fn other_nodes_to_publish_on_topic(
filter_addrs: Vec<SocketAddr>,
topic: String,
) -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12000);
for node_index in 1..NODE_COUNT + 1 {
addr.set_port(12000 + node_index as u16);
if addr != filter_addr {
if filter_addrs.iter().all(|a| a != &addr) {
let msg = format!("TestMsgOnTopic-{topic}-from-{node_index}");

let endpoint = format!("https://{addr}");
Expand Down

0 comments on commit cfb77f3

Please sign in to comment.