Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(upnp): use new version of qp2p with UPnP and echo service
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Nov 12, 2020
1 parent bcecd27 commit afb609e
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ hex_fmt = "~0.3.0"
itertools = "~0.9.0"
log = "~0.4.8"
lru_time_cache = "~0.11.0"
qp2p = { version = "~0.8.5", features = ["upnp"] }
qp2p = "~0.8.8"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
serde = { version = "1.0.117", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions examples/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async fn start_node(

let contact_info = node
.our_connection_info()
.await
.expect("Failed to obtain node's contact info.");

info!(
Expand Down
22 changes: 11 additions & 11 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl Comm {
Ok(IncomingMessages::new(self.endpoint.listen()?))
}

pub fn our_connection_info(&self) -> Result<SocketAddr> {
self.endpoint.our_addr().map_err(|err| {
pub async fn our_connection_info(&self) -> Result<SocketAddr> {
self.endpoint.socket_addr().await.map_err(|err| {
error!("Failed to retrieve our connection info: {:?}", err);
err.into()
})
Expand Down Expand Up @@ -405,8 +405,8 @@ mod tests {
async fn successful_send() -> Result<()> {
let comm = Comm::new(transport_config())?;

let mut peer0 = Peer::new()?;
let mut peer1 = Peer::new()?;
let mut peer0 = Peer::new().await?;
let mut peer1 = Peer::new().await?;

let message = Bytes::from_static(b"hello world");
comm.send_message_to_targets(&[peer0.addr, peer1.addr], 2, message.clone())
Expand All @@ -422,8 +422,8 @@ mod tests {
async fn successful_send_to_subset() -> Result<()> {
let comm = Comm::new(transport_config())?;

let mut peer0 = Peer::new()?;
let mut peer1 = Peer::new()?;
let mut peer0 = Peer::new().await?;
let mut peer1 = Peer::new().await?;

let message = Bytes::from_static(b"hello world");
comm.send_message_to_targets(&[peer0.addr, peer1.addr], 1, message.clone())
Expand Down Expand Up @@ -459,7 +459,7 @@ mod tests {
#[tokio::test]
async fn successful_send_after_failed_attempts() -> Result<()> {
let comm = Comm::new(transport_config())?;
let mut peer = Peer::new()?;
let mut peer = Peer::new().await?;
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
Expand All @@ -474,7 +474,7 @@ mod tests {
#[tokio::test]
async fn partially_successful_send() -> Result<()> {
let comm = Comm::new(transport_config())?;
let mut peer = Peer::new()?;
let mut peer = Peer::new().await?;
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
Expand All @@ -498,7 +498,7 @@ mod tests {

let recv_transport = QuicP2p::with_config(Some(transport_config()), &[], false)?;
let recv_endpoint = recv_transport.new_endpoint()?;
let recv_addr = recv_endpoint.local_addr()?;
let recv_addr = recv_endpoint.socket_addr().await?;
let mut recv_incoming_connections = recv_endpoint.listen()?;

// Send the first message.
Expand Down Expand Up @@ -560,11 +560,11 @@ mod tests {
}

impl Peer {
fn new() -> Result<Self> {
async fn new() -> Result<Self> {
let transport = QuicP2p::with_config(Some(transport_config()), &[], false)?;

let endpoint = transport.new_endpoint()?;
let addr = endpoint.local_addr()?;
let addr = endpoint.socket_addr().await?;
let mut incoming_connections = endpoint.listen()?;

let (tx, rx) = mpsc::channel(1);
Expand Down
9 changes: 5 additions & 4 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tokio::sync::mpsc;
use xor_name::{Prefix, XorName};

/// Routing configuration.
#[derive(Debug)]
pub struct Config {
/// If true, configures the node to start a new network instead of joining an existing one.
pub first: bool,
Expand Down Expand Up @@ -93,7 +94,7 @@ impl Routing {
let comm = Comm::new(config.transport_config)?;
let incoming_msgs = comm.listen()?;

let node = Node::new(keypair, comm.our_connection_info()?);
let node = Node::new(keypair, comm.our_connection_info().await?);
let state = Approved::first_node(node, event_tx)?;

state.send_event(Event::PromotedToElder);
Expand All @@ -104,7 +105,7 @@ impl Routing {
let (comm, bootstrap_addr) = Comm::from_bootstrapping(config.transport_config).await?;
let mut incoming_msgs = comm.listen()?;

let node = Node::new(keypair, comm.our_connection_info()?);
let node = Node::new(keypair, comm.our_connection_info().await?);
let (node, section, backlog) =
bootstrap::infant(node, &comm, &mut incoming_msgs, bootstrap_addr).await?;
let state = Approved::new(node, section, None, event_tx);
Expand Down Expand Up @@ -163,8 +164,8 @@ impl Routing {
}

/// Returns connection info of this node.
pub fn our_connection_info(&self) -> Result<SocketAddr> {
self.stage.comm.our_connection_info()
pub async fn our_connection_info(&self) -> Result<SocketAddr> {
self.stage.comm.our_connection_info().await
}

/// Prefix of our section
Expand Down
4 changes: 2 additions & 2 deletions tests/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn test_node_bootstrapping() -> Result<()> {
});

// bootstrap a second node with genesis
let genesis_contact = genesis_node.our_connection_info()?;
let genesis_contact = genesis_node.our_connection_info().await?;
let (node1, _event_stream) = RoutingBuilder::new(None)
.with_contact(genesis_contact)
.create()
Expand Down Expand Up @@ -86,7 +86,7 @@ async fn test_startup_section_bootstrapping() -> Result<()> {
});

// bootstrap several nodes with genesis to form a section
let genesis_contact = genesis_node.our_connection_info()?;
let genesis_contact = genesis_node.our_connection_info().await?;
let nodes_joining_tasks: Vec<_> = (0..other_node_count)
.map(|_| async {
let (node, mut event_stream) = RoutingBuilder::new(None)
Expand Down
6 changes: 3 additions & 3 deletions tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn test_messages_client_node() -> Result<()> {
content, mut send, ..
} => {
assert_eq!(content, Bytes::from_static(msg));
send.send(Bytes::from_static(response)).await?;
send.send_user_msg(Bytes::from_static(response)).await?;
break;
}
_other => {}
Expand All @@ -40,7 +40,7 @@ async fn test_messages_client_node() -> Result<()> {
});

// create a client which sends a message to the node
let node_addr = node.our_connection_info()?;
let node_addr = node.our_connection_info().await?;
let mut config = TransportConfig::default();
config.ip = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));

Expand All @@ -63,7 +63,7 @@ async fn test_messages_between_nodes() -> Result<()> {
let response = b"good bye!";

let (node1, mut event_stream) = RoutingBuilder::new(None).first().create().await?;
let node1_contact = node1.our_connection_info()?;
let node1_contact = node1.our_connection_info().await?;
let node1_name = node1.name().await;

// spawn node events listener
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn create_connected_nodes(count: usize) -> Result<Vec<(Routing, EventS
let (node, mut event_stream) = RoutingBuilder::new(None).first().create().await?;
assert_next_event!(event_stream, Event::PromotedToElder);

let bootstrap_contact = node.our_connection_info()?;
let bootstrap_contact = node.our_connection_info().await?;

nodes.push((node, event_stream));

Expand Down

0 comments on commit afb609e

Please sign in to comment.