Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(comm): try to re-connect after previously failed send
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Oct 20, 2020
1 parent 25ae87a commit 08d9410
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,15 @@ impl Comm {
conn
};

conn.send_uni(msg).await?;
let result = conn.send_uni(msg).await;

Ok(())
// In case of error, remove the cached connection so it can be re-established on the next
// attempt.
if result.is_err() {
let _ = self.node_conns.lock().await.remove(recipient);
}

result
}
}

Expand Down Expand Up @@ -293,10 +299,13 @@ mod tests {
use futures::future;
use std::{
net::{IpAddr, Ipv4Addr},
slice,
time::Duration,
};
use tokio::{net::UdpSocket, sync::mpsc, time};

const TIMEOUT: Duration = Duration::from_secs(1);

#[tokio::test]
async fn successful_send() -> Result<()> {
let comm = Comm::new(transport_config())?;
Expand Down Expand Up @@ -327,7 +336,7 @@ mod tests {

assert_eq!(peer0.rx.recv().await, Some(message));

assert!(time::timeout(Duration::from_millis(100), peer1.rx.recv())
assert!(time::timeout(TIMEOUT, peer1.rx.recv())
.await
.unwrap_or_default()
.is_none());
Expand Down Expand Up @@ -388,6 +397,60 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn send_after_reconnect() -> Result<()> {
let send_comm = Comm::new(transport_config())?;

let recv_transport = QuicP2p::with_config(Some(transport_config()), &[], false)?;
let recv_endpoint = recv_transport.new_endpoint()?;
let recv_addr = recv_endpoint.local_addr()?;
let mut recv_incoming_connections = recv_endpoint.listen()?;

// Send the first message.
let msg0 = Bytes::from_static(b"zero");
send_comm
.send_message_to_targets(slice::from_ref(&recv_addr), 1, msg0.clone())
.await?;

let mut msg0_received = false;

// Receive one message and drop the incoming stream.
{
if let Some(mut incoming_msgs) =
time::timeout(TIMEOUT, recv_incoming_connections.next()).await?
{
if let Some(msg) = time::timeout(TIMEOUT, incoming_msgs.next()).await? {
assert_eq!(msg.get_message_data(), msg0);
msg0_received = true;
}
}

assert!(msg0_received);
}

// Send the second message.
let msg1 = Bytes::from_static(b"one");
send_comm
.send_message_to_targets(slice::from_ref(&recv_addr), 1, msg1.clone())
.await?;

let mut msg1_received = false;

// Expect to receive the second message on a re-established connection.
if let Some(mut incoming_msgs) =
time::timeout(TIMEOUT, recv_incoming_connections.next()).await?
{
if let Some(msg) = time::timeout(TIMEOUT, incoming_msgs.next()).await? {
assert_eq!(msg.get_message_data(), msg1);
msg1_received = true;
}
}

assert!(msg1_received);

Ok(())
}

fn transport_config() -> qp2p::Config {
qp2p::Config {
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
Expand Down

0 comments on commit 08d9410

Please sign in to comment.