Skip to content

Commit

Permalink
chore: expand the workload simulation test
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jun 9, 2021
1 parent 4fe22ad commit 5156da2
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 23 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ flexi_logger = "~0.16.1"
anyhow = "1.0.36"
rand = "~0.7.3"

[dev-dependencies.tiny-keccak]
version = "2.0.2"
features = [ "sha3" ]

[target."cfg(any(all(unix, not(any(target_os = \"android\", target_os = \"androideabi\", target_os = \"ios\"))), windows))".dependencies]
dirs-next = "2.0.0"

Expand Down
106 changes: 85 additions & 21 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
// Copyright 2021 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use super::{new_qp2p, new_qp2p_with_hcc, random_msg};
use crate::utils;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::future;
use std::{collections::HashSet, time::Duration};
use std::{
collections::{BTreeSet, HashSet},
time::Duration,
};
use tiny_keccak::{Hasher, Sha3};
use tokio::time::timeout;

/// SHA3-256 hash digest.
type Digest256 = [u8; 32];

#[tokio::test]
async fn successful_connection() -> Result<()> {
utils::init_logging();
Expand Down Expand Up @@ -40,7 +57,7 @@ async fn single_message() -> Result<()> {

// Peer 2 connects and sends a message
peer2.connect_to(&peer1_addr).await?;
let msg_from_peer2 = random_msg();
let msg_from_peer2 = random_msg(1024);
peer2
.send_message(msg_from_peer2.clone(), &peer1_addr)
.await?;
Expand Down Expand Up @@ -76,7 +93,7 @@ async fn reuse_outgoing_connection() -> Result<()> {

// Connect for the first time and send a message.
alice.connect_to(&bob_addr).await?;
let msg0 = random_msg();
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;

// Bob should recieve an incoming connection and message
Expand All @@ -95,7 +112,7 @@ async fn reuse_outgoing_connection() -> Result<()> {

// Try connecting again and send a message
alice.connect_to(&bob_addr).await?;
let msg1 = random_msg();
let msg1 = random_msg(1024);
alice.send_message(msg1.clone(), &bob_addr).await?;

// Bob *should not* get an incoming connection since there is already a connection established
Expand Down Expand Up @@ -129,7 +146,7 @@ async fn reuse_incoming_connection() -> Result<()> {

// Connect for the first time and send a message.
alice.connect_to(&bob_addr).await?;
let msg0 = random_msg();
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;

// Bob should recieve an incoming connection and message
Expand All @@ -148,7 +165,7 @@ async fn reuse_incoming_connection() -> Result<()> {

// Bob tries to connect to alice and sends a message
bob.connect_to(&alice_addr).await?;
let msg1 = random_msg();
let msg1 = random_msg(1024);
bob.send_message(msg1.clone(), &alice_addr).await?;

// Alice *will not* get an incoming connection since there is already a connection established
Expand Down Expand Up @@ -252,10 +269,10 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
anyhow!("No incoming connectino from Alice");
}

let msg0 = random_msg();
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;

let msg1 = random_msg();
let msg1 = random_msg(1024);
bob.send_message(msg1.clone(), &alice_addr).await?;

if let Some((src, message)) = alice_incoming_messages.next().await {
Expand Down Expand Up @@ -292,7 +309,7 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
anyhow!("Unexpected incoming connection from {}", connecting_peer);
}

let msg2 = random_msg();
let msg2 = random_msg(1024);
bob.send_message(msg2.clone(), &alice_addr).await?;

if let Some((src, message)) = alice_incoming_messages.next().await {
Expand Down Expand Up @@ -333,10 +350,10 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
}

// Send two messages, one from each end
let msg0 = random_msg();
let msg0 = random_msg(1024);
alice.send_message(msg0.clone(), &bob_addr).await?;

let msg1 = random_msg();
let msg1 = random_msg(1024);
bob.send_message(msg1.clone(), &alice_addr).await?;

// Both messages are received at the other end
Expand All @@ -357,21 +374,29 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
Ok(())
}

fn hash(bytes: &Bytes) -> Digest256 {
let mut hasher = Sha3::v256();
let mut hash = Digest256::default();
hasher.update(bytes);
hasher.finalize(&mut hash);
hash
}

#[tokio::test]
async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
use futures::future;

utils::init_logging();

let num_senders: usize = 10;
let num_messages_each: usize = 10;
let num_messages_total: usize = 100;
let num_messages_each: usize = 100;
let num_messages_total: usize = 1000;

let qp2p = new_qp2p()?;
let (recv_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let recv_addr = recv_endpoint.socket_addr();
let (server_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;
let server_addr = server_endpoint.socket_addr();

This comment has been minimized.

Copy link
@dirvine

dirvine Jun 9, 2021

Member

Be great to not use the word server ;-) Took me years to kill that one. Can we call it node please?


let test_msgs: Vec<_> = (0..num_messages_each).map(|_| random_msg()).collect();
let test_msgs: Vec<_> = (0..num_messages_each).map(|_| random_msg(1024)).collect();
let sending_msgs = test_msgs.clone();

let mut tasks = Vec::new();
Expand All @@ -380,18 +405,39 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
tasks.push(tokio::spawn({
async move {
let mut num_received = 0;
let mut sending_tasks = Vec::new();

while let Some((src, msg)) = recv_incoming_messages.next().await {
log::info!("received from {:?} with message size {}", src, msg.len());
assert_eq!(msg.len(), test_msgs[0].len());

num_received += 1;
let sending_endpoint = server_endpoint.clone();

sending_tasks.push(tokio::spawn({
async move {
// Hash the inputs for couple times to simulate certain workload.
let hash_result = hash(&msg);
for _ in 0..5 {
let _ = hash(&msg);
}
// Send the hash result back.
sending_endpoint.connect_to(&src).await?;
sending_endpoint
.send_message(hash_result.to_vec().into(), &src)
.await?;

Ok::<_, anyhow::Error>(())
}
}));

num_received += 1;
if num_received >= num_messages_total {
break;
}
}

let _ = future::try_join_all(sending_tasks).await?;

Ok(())
}
}));
Expand All @@ -401,19 +447,37 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
let messages = sending_msgs.clone();
tasks.push(tokio::spawn({
let qp2p = new_qp2p()?;
let (send_endpoint, _, _, _) = qp2p.new_endpoint().await?;
let (send_endpoint, _, mut recv_incoming_messages, _) = qp2p.new_endpoint().await?;

async move {
let mut hash_results = BTreeSet::new();
log::info!("connecting {}", id);
send_endpoint.connect_to(&recv_addr).await?;
send_endpoint.connect_to(&server_addr).await?;
for (index, message) in messages.iter().enumerate().take(num_messages_each) {
let _ = hash_results.insert(hash(&message));
log::info!("sender #{} sending message #{}", id, index);
send_endpoint
.send_message(message.clone(), &recv_addr)
.send_message(message.clone(), &server_addr)
.await?;
}

log::info!("sender #{} completed sending messages", id);
log::info!(
"sender #{} completed sending messages, starts listening",
id
);

while let Some((src, msg)) = recv_incoming_messages.next().await {
log::info!(
"#{} received from server {:?} with message size {}",
id,
src,
msg.len()
);
assert!(hash_results.remove(&msg[..]));
if hash_results.is_empty() {
break;
}
}

Ok::<_, anyhow::Error>(())
}
Expand Down
13 changes: 11 additions & 2 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
// Copyright 2021 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under the MIT license <LICENSE-MIT
// http://opensource.org/licenses/MIT> or the Modified BSD license <LICENSE-BSD
// https://opensource.org/licenses/BSD-3-Clause>, at your option. This file may not be copied,
// modified, or distributed except according to those terms. Please review the Licences for the
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::{Config, QuicP2p};
use anyhow::Result;
use bytes::Bytes;
Expand Down Expand Up @@ -29,7 +38,7 @@ pub fn new_qp2p_with_hcc(hard_coded_contacts: HashSet<SocketAddr>) -> Result<Qui
Ok(qp2p)
}

pub fn random_msg() -> Bytes {
let random_bytes: Vec<u8> = (0..1024).map(|_| rand::random::<u8>()).collect();
pub fn random_msg(size: usize) -> Bytes {
let random_bytes: Vec<u8> = (0..size).map(|_| rand::random::<u8>()).collect();
Bytes::from(random_bytes)
}

0 comments on commit 5156da2

Please sign in to comment.