Skip to content

Commit

Permalink
Merge 336d6a9 into 8affa4d
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Jul 30, 2021
2 parents 8affa4d + 336d6a9 commit 621613e
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 47 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ tracing = "~0.1.26"
[dev-dependencies]
anyhow = "1.0.36"
rand = "~0.7.3"
tracing-test = "0.1"
tracing-test = "0.1.0"
tracing-subscriber = "0.2.19"

[dev-dependencies.tiny-keccak]
version = "2.0.2"
Expand Down
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl QuicP2p {
// Private helpers

// Bind a new socket with a local address
fn bind(
pub(crate) fn bind(
endpoint_cfg: quinn::ServerConfig,
local_addr: SocketAddr,
allow_random_port: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ConnectionPool {

keys_to_remove
.iter()
.filter_map(|key| store.map.remove(&key))
.filter_map(|key| store.map.remove(key))
.collect::<Vec<_>>()
}

Expand Down
2 changes: 1 addition & 1 deletion src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ mod tests {
use crate::{config::Config, wire_msg::WireMsg, Error};
use std::net::{IpAddr, Ipv4Addr};

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn echo_service() -> Result<(), Error> {
let qp2p = QuicP2p::with_config(
Some(Config {
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Endpoint {
info!("Verifying provided public IP address");
endpoint.connect_to(contact).await?;
let connection = endpoint
.get_connection(&contact)
.get_connection(contact)
.await
.ok_or(Error::MissingConnection)?;
let (mut send, mut recv) = connection.open_bi().await?;
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
unused_parens,
while_true,
clippy::unicode_not_nfc,
clippy::wrong_pub_self_convention,
warnings
)]
#![warn(
Expand Down
3 changes: 1 addition & 2 deletions src/peer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ pub fn new_our_cfg(
quinn::ServerConfigBuilder::new(our_cfg)
};
let _ = our_cfg_builder
.certificate(quinn::CertificateChain::from_certs(vec![our_cert]), our_key)?
.use_stateless_retry(true);
.certificate(quinn::CertificateChain::from_certs(vec![our_cert]), our_key)?;

Ok(our_cfg_builder.build())
}
Expand Down
55 changes: 16 additions & 39 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use super::{new_qp2p, new_qp2p_with_hcc, random_msg};
use super::{hash, new_qp2p, new_qp2p_with_hcc, random_msg};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::{future, stream::FuturesUnordered, StreamExt};
use std::{
collections::{BTreeSet, HashSet},
time::Duration,
};
use tiny_keccak::{Hasher, Sha3};
use tokio::time::timeout;
use tracing::info;
use tracing_test::traced_test;
/// SHA3-256 hash digest.
type Digest256 = [u8; 32];

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn successful_connection() -> Result<()> {
let qp2p = new_qp2p()?;
let (peer1, mut peer1_incoming_connections, _, _) = qp2p.new_endpoint().await?;
Expand All @@ -42,8 +37,7 @@ async fn successful_connection() -> Result<()> {
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn single_message() -> Result<()> {
let qp2p = new_qp2p()?;
let (peer1, mut peer1_incoming_connections, mut peer1_incoming_messages, _) =
Expand Down Expand Up @@ -77,8 +71,7 @@ async fn single_message() -> Result<()> {
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn reuse_outgoing_connection() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, _, _, _) = qp2p.new_endpoint().await?;
Expand Down Expand Up @@ -128,8 +121,7 @@ async fn reuse_outgoing_connection() -> Result<()> {
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn reuse_incoming_connection() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
Expand Down Expand Up @@ -181,8 +173,7 @@ async fn reuse_incoming_connection() -> Result<()> {
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn disconnection() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, _, mut alice_disconnections) =
Expand Down Expand Up @@ -229,8 +220,7 @@ async fn disconnection() -> Result<()> {
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
// If both peers call `connect_to` simultaneously (that is, before any of them receives the
// others connection first), two separate connections are created. This test verifies that
Expand Down Expand Up @@ -314,8 +304,7 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
Expand Down Expand Up @@ -367,16 +356,7 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
Ok(())
}

fn hash(bytes: &Bytes) -> Digest256 {
let mut hasher = Sha3::v256();
let mut hash = Digest256::default();
hasher.update(bytes);
hasher.finalize(&mut hash);
hash
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
use futures::future;

Expand Down Expand Up @@ -445,7 +425,7 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
let mut hash_results = BTreeSet::new();
send_endpoint.connect_to(&server_addr).await?;
for (index, message) in messages.iter().enumerate().take(num_messages_each) {
let _ = hash_results.insert(hash(&message));
let _ = hash_results.insert(hash(message));
info!("sender #{} sending message #{}", id, index);
send_endpoint
.send_message(message.clone(), &server_addr)
Expand Down Expand Up @@ -483,7 +463,7 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[traced_test]
async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<()> {
let num_senders: usize = 10;
Expand Down Expand Up @@ -528,7 +508,7 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(
assert!(!logs_contain("error"));

num_received += 1;
println!("Server received count: {}", num_received);
// println!("Server received count: {}", num_received);
if num_received >= num_messages_total {
break;
}
Expand All @@ -554,7 +534,7 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(

send_endpoint.connect_to(&server_addr).await?;
for (index, message) in messages.iter().enumerate().take(num_messages_each) {
let _ = hash_results.insert(hash(&message));
let _ = hash_results.insert(hash(message));

info!("sender #{} sending message #{}", id, index);
send_endpoint
Expand Down Expand Up @@ -598,8 +578,7 @@ async fn multiple_connections_with_many_larger_concurrent_messages() -> Result<(
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn many_messages() -> Result<()> {
use futures::future;
use std::{convert::TryInto, sync::Arc};
Expand Down Expand Up @@ -661,8 +640,7 @@ async fn many_messages() -> Result<()> {
// When we bootstrap with multiple bootstrap contacts, we will use the first connection
// that succeeds. We should still be able to establish a connection with the rest of the
// bootstrap contacts later.
#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()> {
let qp2p = new_qp2p()?;

Expand All @@ -686,8 +664,7 @@ async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()
Ok(())
}

#[tokio::test]
#[traced_test]
#[tokio::test(flavor = "multi_thread")]
async fn reachability() -> Result<()> {
let qp2p = new_qp2p()?;

Expand Down
13 changes: 13 additions & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use tiny_keccak::{Hasher, Sha3};

/// SHA3-256 hash digest.
type Digest256 = [u8; 32];

mod common;
mod quinn;

/// Constructs a `QuicP2p` node with some sane defaults for testing.
pub fn new_qp2p() -> Result<QuicP2p> {
Expand Down Expand Up @@ -46,3 +51,11 @@ pub fn random_msg(size: usize) -> Bytes {
let random_bytes: Vec<u8> = (0..size).map(|_| rand::random::<u8>()).collect();
Bytes::from(random_bytes)
}

pub fn hash(bytes: &Bytes) -> Digest256 {
let mut hasher = Sha3::v256();
let mut hash = Digest256::default();
hasher.update(bytes);
hasher.finalize(&mut hash);
hash
}
Loading

0 comments on commit 621613e

Please sign in to comment.