Skip to content

Commit

Permalink
chore(fmt): apply rustfmt to examples and fix clippy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Mar 11, 2021
1 parent 426b872 commit 01c1586
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 54 deletions.
11 changes: 6 additions & 5 deletions examples/bootstrap_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod common;

use anyhow::Result;
use bytes::Bytes;
use common::{Rpc, Event, EventReceivers};
use common::{Event, EventReceivers, Rpc};
use log::{error, info, warn};
use qp2p::{Config, QuicP2p};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -54,13 +54,14 @@ async fn main() -> Result<()> {
let qp2p = QuicP2p::with_config(
Some(bootstrap_node_config.quic_p2p_opts),
Default::default(),
false
false,
)?;
let (endpoint, incoming_connections, incoming_messages, disconnections) = qp2p.new_endpoint().await?;
let (endpoint, incoming_connections, incoming_messages, disconnections) =
qp2p.new_endpoint().await?;
let mut event_rx = EventReceivers {
incoming_connections,
incoming_messages,
disconnections
disconnections,
};

let our_addr = endpoint.socket_addr();
Expand Down Expand Up @@ -96,4 +97,4 @@ async fn main() -> Result<()> {
}

Ok(())
}
}
70 changes: 37 additions & 33 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

mod common;

use anyhow::{Context, Result, anyhow};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use common::{Event, EventReceivers};
use qp2p::{Config, QuicP2p, Endpoint};
use qp2p::{Config, Endpoint, QuicP2p};
use rand::{distributions::Standard, Rng};
use rustyline::config::Configurer;
use rustyline::error::ReadlineError;
Expand All @@ -23,8 +23,8 @@ use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};
use tokio::task::JoinHandle;
use structopt::StructOpt;
use tokio::task::JoinHandle;

struct PeerList {
peers: Vec<SocketAddr>,
Expand Down Expand Up @@ -76,16 +76,13 @@ struct CliArgs {
async fn main() -> Result<()> {
let CliArgs { quic_p2p_opts } = CliArgs::from_args();

let qp2p = QuicP2p::with_config(
Some(quic_p2p_opts),
Default::default(),
false
)?;
let (mut endpoint, incoming_connections, incoming_messages, disconnections) = qp2p.new_endpoint().await?;
let qp2p = QuicP2p::with_config(Some(quic_p2p_opts), Default::default(), false)?;
let (endpoint, incoming_connections, incoming_messages, disconnections) =
qp2p.new_endpoint().await?;
let event_rx = EventReceivers {
incoming_connections,
incoming_messages,
disconnections
disconnections,
};

print_logo();
Expand All @@ -108,7 +105,7 @@ async fn main() -> Result<()> {
let mut peerlist = peerlist.lock().unwrap();
let result = match cmd {
"ourinfo" => {
print_ourinfo(&mut endpoint)?;
print_ourinfo(&endpoint);
Ok(())
}
"addpeer" => peerlist.insert_from_str(&args.collect::<Vec<_>>().join(" ")),
Expand All @@ -118,12 +115,12 @@ async fn main() -> Result<()> {
}
"delpeer" => args
.next()
.ok_or(anyhow!("Missing index argument"))
.and_then(|idx| idx.parse().or(Err(anyhow!("Invalid index argument"))))
.ok_or_else(|| anyhow!("Missing index argument"))
.and_then(|idx| idx.parse().map_err(|_| anyhow!("Invalid index argument")))
.and_then(|idx| peerlist.remove(idx))
.and(Ok(())),
"send" => on_cmd_send(&mut args, &peerlist, &mut endpoint).await,
"sendrand" => on_cmd_send_rand(&mut args, &peerlist, &mut endpoint).await,
"send" => on_cmd_send(&mut args, &peerlist, &endpoint).await,
"sendrand" => on_cmd_send_rand(&mut args, &peerlist, &endpoint).await,
"quit" | "exit" => break 'outer,
"help" => {
println!(
Expand Down Expand Up @@ -154,10 +151,11 @@ async fn on_cmd_send<'a>(
peer_list: &PeerList,
endpoint: &Endpoint,
) -> Result<()> {
let peer = args.next()
let peer = args
.next()
.with_context(|| "Missing index argument")
.and_then(|idx| idx.parse().or(Err(anyhow!("Invalid index argument"))))
.and_then(|idx| peer_list.get(idx).ok_or(anyhow!("Index out of bounds")))?;
.and_then(|idx| idx.parse().map_err(|_| anyhow!("Invalid index argument")))
.and_then(|idx| peer_list.get(idx).ok_or_else(|| anyhow!("Index out of bounds")))?;
let msg = Bytes::from(args.collect::<Vec<_>>().join(" "));
endpoint.send_message(msg, peer).await.map_err(From::from)
}
Expand All @@ -169,21 +167,29 @@ async fn on_cmd_send_rand<'a>(
peer_list: &PeerList,
endpoint: &Endpoint,
) -> Result<()> {
let (addr, msg_len) = args.next()
.ok_or(anyhow!("Missing index argument"))
.and_then(|idx| idx.parse().or(Err(anyhow!("Invalid index argument"))))
.and_then(|idx| peer_list.get(idx).ok_or(anyhow!("Index out of bounds")))
let (addr, msg_len) = args
.next()
.ok_or_else(|| anyhow!("Missing index argument"))
.and_then(|idx| idx.parse().map_err(|_| anyhow!("Invalid index argument")))
.and_then(|idx| peer_list.get(idx).ok_or_else(|| anyhow!("Index out of bounds")))
.and_then(|peer| {
args.next()
.ok_or(anyhow!("Missing bytes count"))
.and_then(|bytes| bytes.parse::<usize>().or(Err(anyhow!("Invalid bytes count argument"))))
.ok_or_else(|| anyhow!("Missing bytes count"))
.and_then(|bytes| {
bytes
.parse::<usize>()
.map_err(|_| anyhow!("Invalid bytes count argument"))
})
.map(|bytes_to_send| (peer, bytes_to_send))
})?;
let data = Bytes::from(random_vec(msg_len));
endpoint.send_message(data,addr).await.map_err(From::from)
let data = Bytes::from(random_vec(msg_len));
endpoint.send_message(data, addr).await.map_err(From::from)
}

fn handle_qp2p_events(mut event_rx: EventReceivers, peer_list: Arc<Mutex<PeerList>>) -> JoinHandle<()> {
fn handle_qp2p_events(
mut event_rx: EventReceivers,
peer_list: Arc<Mutex<PeerList>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match event {
Expand All @@ -195,11 +201,10 @@ fn handle_qp2p_events(mut event_rx: EventReceivers, peer_list: Arc<Mutex<PeerLis
println!(
"[{}] {}",
src,
String::from_utf8(msg.to_vec()).unwrap_or("Invalid String".to_string())
String::from_utf8(msg.to_vec()).unwrap_or_else(|_| "Invalid String".to_string())
);
}
}
// event => println!("Unexpected quic-p2p event: {:?}", event),
} // event => println!("Unexpected quic-p2p event: {:?}", event),
}
}
})
Expand All @@ -215,11 +220,10 @@ fn parse_socket_addr(input: &str) -> Result<SocketAddr> {
input.parse().map_err(|_| anyhow!("Invalid socket address"))
}

fn print_ourinfo(endpoint: &Endpoint) -> Result<()> {
fn print_ourinfo(endpoint: &Endpoint) {
let ourinfo = endpoint.socket_addr();

println!("Our info: {}", ourinfo);
Ok(())
}

fn print_logo() {
Expand All @@ -240,4 +244,4 @@ fn random_vec(size: usize) -> Vec<u8> {
.sample_iter(&Standard)
.take(size)
.collect()
}
}
25 changes: 13 additions & 12 deletions examples/client_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

mod common;

use anyhow::{Context, Result};
use bytes::Bytes;
use common::{EventReceivers, Rpc, Event};
use common::{Event, EventReceivers, Rpc};
use crc::crc32;
use log::{debug, error, info};
use qp2p::{Config, Endpoint, QuicP2p};
use rand::{self, distributions::Standard, seq::IteratorRandom, Rng};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use structopt::StructOpt;
use anyhow::{Context, Result};
use tracing_subscriber::EnvFilter;

/// Client node will be connecting to bootstrap node from which it will receive contacts
Expand Down Expand Up @@ -84,26 +84,23 @@ impl ClientNode {
.with_context(|| "No valid bootstrap node was provided.".to_string())?;

// let (event_tx, event_rx) = new_unbounded_channels();
let qp2p = QuicP2p::with_config(
Some(config),
&[],
false
)?;
let qp2p = QuicP2p::with_config(Some(config), &[], false)?;

let large_msg = Bytes::from(random_data_with_hash(LARGE_MSG_SIZE));
assert!(hash_correct(&large_msg));

let small_msg = Bytes::from(random_data_with_hash(SMALL_MSG_SIZE));
assert!(hash_correct(&small_msg));

let (endpoint, incoming_connections, incoming_messages, disconnections) = qp2p.new_endpoint().await?;
let (endpoint, incoming_connections, incoming_messages, disconnections) =
qp2p.new_endpoint().await?;
let our_addr = endpoint.socket_addr();
info!("Our address: {}", our_addr);

let event_rx = EventReceivers {
incoming_connections,
incoming_messages,
disconnections
disconnections,
};

Ok(Self {
Expand Down Expand Up @@ -150,8 +147,12 @@ impl ClientNode {
info!("Connected to bootstrap node. Waiting for other node contacts...");
} else if self.client_nodes.contains(&peer) {
// TODO: handle tokens properly. Currently just hardcoding to 0 in example
self.endpoint.send_message(self.large_msg.clone(), &peer).await?;
self.endpoint.send_message(self.small_msg.clone(), &peer).await?;
self.endpoint
.send_message(self.large_msg.clone(), &peer)
.await?;
self.endpoint
.send_message(self.small_msg.clone(), &peer)
.await?;
self.sent_messages += 1;
}
Ok(())
Expand Down Expand Up @@ -240,4 +241,4 @@ fn random_vec(size: usize) -> Vec<u8> {
.sample_iter(&Standard)
.take(size)
.collect()
}
}
7 changes: 3 additions & 4 deletions examples/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::net::SocketAddr;
use bytes::Bytes;
use crossbeam_channel as mpmc;
use futures::{select, FutureExt};
use qp2p::{IncomingConnections, IncomingMessages, DisconnectionEvents};
use qp2p::{DisconnectionEvents, IncomingConnections, IncomingMessages};
use serde::{Deserialize, Serialize};

/// Remote procedure call for our examples to communicate.
Expand All @@ -33,10 +33,9 @@ pub enum Event {
pub struct EventReceivers {
pub incoming_messages: IncomingMessages,
pub incoming_connections: IncomingConnections,
pub disconnections: DisconnectionEvents
pub disconnections: DisconnectionEvents,
}


#[allow(unused)]
pub struct EventSenders {
pub node_tx: mpmc::Sender<Event>,
Expand Down Expand Up @@ -93,4 +92,4 @@ impl EventReceivers {

// event.ok()
// }
// }
// }

0 comments on commit 01c1586

Please sign in to comment.