Skip to content

Commit

Permalink
feat: use tracing for logging
Browse files Browse the repository at this point in the history
use tracing test to init logging for tests
  • Loading branch information
joshuef committed Jul 12, 2021
1 parent a02f27d commit b406646
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 68 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ edition = "2018"
base64 = "~0.12.2"
bincode = "1.2.1"
futures = "~0.3.8"
log = "~0.4.8"
rcgen = "~0.8.4"
serde_json = "1.0.59"
structopt = "~0.3.15"
thiserror = "1.0.23"
webpki = "~0.21.3"
tracing-futures = "~0.2.5"
tracing= "~0.1.26"

[dependencies.bytes]
version = "1.0.1"
Expand Down Expand Up @@ -57,9 +58,11 @@ webpki = "~0.21.3"
]

[dev-dependencies]
flexi_logger = "~0.16.1"
anyhow = "1.0.36"
rand = "~0.7.3"
tracing-appender = "~0.1.2"
tracing-subscriber = "~0.2.15"
tracing-test = "0.1"

[dev-dependencies.tiny-keccak]
version = "2.0.2"
Expand Down
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use super::{
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
};
use futures::future;
use log::{debug, error, info, trace};
use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket};
use std::path::PathBuf;
use tracing::{debug, error, info, trace};

/// In the absence of a port supplied by the user via the config we will first try using this
/// before using a random port.
Expand Down
2 changes: 1 addition & 1 deletion src/bootstrap_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use crate::error::{Error, Result};
use bincode::{deserialize_from, serialize_into};
use dirs_next::home_dir;
use log::warn;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::{HashSet, VecDeque},
Expand All @@ -24,6 +23,7 @@ use std::{
fs::File,
io::{BufReader, BufWriter},
};
use tracing::warn;

/// Maximum peers in the cache.
const MAX_CACHE_SIZE: usize = 200;
Expand Down
4 changes: 2 additions & 2 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use super::{
};
use bytes::Bytes;
use futures::{future, stream::StreamExt};
use log::{error, trace, warn};
use std::net::SocketAddr;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{timeout, Duration};
use tracing::{error, trace, warn};

/// Connection instance to a node which can be used to send messages to it
#[derive(Clone)]
Expand Down Expand Up @@ -199,7 +199,7 @@ pub(super) fn listen_for_incoming_messages(
)
.await;

log::trace!("The connection to {:?} has been terminated.", src);
tracing::trace!("The connection to {:?} has been terminated.", src);
let _ = disconnection_tx.send(src);
remover.remove().await;
});
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use super::{
Config,
};
use bytes::Bytes;
use log::{debug, error, info, trace, warn};
use std::{net::SocketAddr, time::Duration};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::time::timeout;
use tracing::{debug, error, info, trace, warn};

/// Host name of the Quic communication certificate used by peers
// FIXME: make it configurable
Expand Down
2 changes: 1 addition & 1 deletion src/igd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

use crate::error::{Error, Result};
use igd::SearchOptions;
use log::{debug, info, warn};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::sync::broadcast::{error::TryRecvError, Receiver};
use tokio::time::{self, Instant};
use tracing::{debug, info, warn};

/// Automatically forwards a port and setups a tokio task to renew it periodically.
pub async fn forward_port(
Expand Down
48 changes: 19 additions & 29 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// Software.

use super::{new_qp2p, new_qp2p_with_hcc, random_msg};
use crate::utils;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::future;
Expand All @@ -18,14 +17,13 @@ use std::{
};
use tiny_keccak::{Hasher, Sha3};
use tokio::time::timeout;

use tracing_test::traced_test;
/// SHA3-256 hash digest.
type Digest256 = [u8; 32];

#[tokio::test]
#[traced_test]
async fn successful_connection() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (peer1, mut peer1_incoming_connections, _, _) = qp2p.new_endpoint().await?;
let peer1_addr = peer1.socket_addr();
Expand All @@ -44,9 +42,8 @@ async fn successful_connection() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn single_message() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (peer1, mut peer1_incoming_connections, mut peer1_incoming_messages, _) =
qp2p.new_endpoint().await?;
Expand Down Expand Up @@ -80,9 +77,8 @@ async fn single_message() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn reuse_outgoing_connection() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (alice, _, _, _) = qp2p.new_endpoint().await?;
let alice_addr = alice.socket_addr();
Expand Down Expand Up @@ -132,9 +128,8 @@ async fn reuse_outgoing_connection() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn reuse_incoming_connection() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
qp2p.new_endpoint().await?;
Expand Down Expand Up @@ -186,9 +181,8 @@ async fn reuse_incoming_connection() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn disconnection() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, _, mut alice_disconnections) =
qp2p.new_endpoint().await?;
Expand Down Expand Up @@ -235,13 +229,12 @@ async fn disconnection() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
// If both peers call `connect_to` simultaneously (that is, before any of them receives the
// others connection first), two separate connections are created. This test verifies that
// everything still works correctly even in this case.

utils::init_logging();

let qp2p = new_qp2p()?;
let (
alice,
Expand Down Expand Up @@ -321,9 +314,8 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
qp2p.new_endpoint().await?;
Expand Down Expand Up @@ -383,11 +375,10 @@ fn hash(bytes: &Bytes) -> Digest256 {
}

#[tokio::test]
#[traced_test]
async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
use futures::future;

utils::init_logging();

let num_senders: usize = 10;
let num_messages_each: usize = 100;
let num_messages_total: usize = 1000;
Expand All @@ -408,7 +399,7 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
let mut sending_tasks = Vec::new();

while let Some((src, msg)) = recv_incoming_messages.next().await {
log::info!("received from {:?} with message size {}", src, msg.len());
tracing::info!("received from {:?} with message size {}", src, msg.len());
assert_eq!(msg.len(), test_msgs[0].len());

let sending_endpoint = server_endpoint.clone();
Expand Down Expand Up @@ -451,23 +442,23 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {

async move {
let mut hash_results = BTreeSet::new();
log::info!("connecting {}", id);
tracing::info!("connecting {}", id);
send_endpoint.connect_to(&server_addr).await?;
for (index, message) in messages.iter().enumerate().take(num_messages_each) {
let _ = hash_results.insert(hash(&message));
log::info!("sender #{} sending message #{}", id, index);
tracing::info!("sender #{} sending message #{}", id, index);
send_endpoint
.send_message(message.clone(), &server_addr)
.await?;
}

log::info!(
tracing::info!(
"sender #{} completed sending messages, starts listening",
id
);

while let Some((src, msg)) = recv_incoming_messages.next().await {
log::info!(
tracing::info!(
"#{} received from server {:?} with message size {}",
id,
src,
Expand All @@ -489,12 +480,11 @@ async fn multiple_connections_with_many_concurrent_messages() -> Result<()> {
}

#[tokio::test]
#[traced_test]
async fn many_messages() -> Result<()> {
use futures::future;
use std::{convert::TryInto, sync::Arc};

utils::init_logging();

let num_messages: usize = 10_000;

let qp2p = new_qp2p()?;
Expand All @@ -513,11 +503,11 @@ async fn many_messages() -> Result<()> {
tasks.push(tokio::spawn({
let endpoint = send_endpoint.clone();
async move {
log::info!("sending {}", id);
tracing::info!("sending {}", id);
let msg = id.to_le_bytes().to_vec().into();
endpoint.connect_to(&recv_addr).await?;
endpoint.send_message(msg, &recv_addr).await?;
log::info!("sent {}", id);
tracing::info!("sent {}", id);

Ok::<_, anyhow::Error>(())
}
Expand All @@ -532,7 +522,7 @@ async fn many_messages() -> Result<()> {
while let Some((src, msg)) = recv_incoming_messages.next().await {
let id = usize::from_le_bytes(msg[..].try_into().unwrap());
assert_eq!(src, send_addr);
log::info!("received {}", id);
tracing::info!("received {}", id);

num_received += 1;

Expand All @@ -553,8 +543,8 @@ async fn many_messages() -> Result<()> {
// that succeeds. We should still be able to establish a connection with the rest of the
// bootstrap contacts later.
#[tokio::test]
#[traced_test]
async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()> {
utils::init_logging();
let qp2p = new_qp2p()?;

let (ep1, _, _, _) = qp2p.new_endpoint().await?;
Expand Down
31 changes: 0 additions & 31 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,3 @@ pub fn bin_data_format(data: &[u8]) -> String {
data[len - 1]
)
}

#[cfg(test)]
pub(crate) fn init_logging() {
use flexi_logger::{DeferredNow, Logger};
use log::Record;
use std::io::Write;

// Custom formatter for logs
let do_format = move |writer: &mut dyn Write, clock: &mut DeferredNow, record: &Record| {
let handle = std::thread::current();
write!(
writer,
"[{}] {} {} [{}:{}] {}",
handle
.name()
.unwrap_or(&format!("Thread-{:?}", handle.id())),
record.level(),
clock.now().to_rfc3339(),
record.file().unwrap_or_default(),
record.line().unwrap_or_default(),
record.args()
)
};

Logger::with_env()
.format(do_format)
.suppress_timestamp()
.start()
.map(|_| ())
.unwrap_or(());
}

0 comments on commit b406646

Please sign in to comment.