Skip to content

Commit

Permalink
refactor(comms): prevent early exit of control flow when sending an
Browse files Browse the repository at this point in the history
event via the streams

- this also prevents the creation of a new connection when send_message
is called
  • Loading branch information
lionel-faber committed Feb 11, 2021
1 parent db83038 commit 860ea5b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 35 deletions.
14 changes: 4 additions & 10 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ pub(super) fn listen_for_incoming_connections(
}) => {
let peer_address = connection.remote_address();
let pool_handle = connection_pool.insert(peer_address, connection);
connection_tx
.send(peer_address)
.map_err(|err| Error::MpscChannelSend(err.to_string()))?;
let _ = connection_tx.send(peer_address);
listen_for_incoming_messages(
uni_streams,
bi_streams,
Expand Down Expand Up @@ -203,15 +201,11 @@ pub(super) fn listen_for_incoming_messages(
// When the message in handled internally we return Bytes::new() to prevent
// connection termination
if !message.is_empty() {
message_tx
.send((src, message))
.map_err(|err| Error::MpscChannelSend(err.to_string()))?;
let _ = message_tx.send((src, message));
}
} else {
log::trace!("The connection has been terminated.");
disconnection_tx
.send(*remover.remote_addr())
.map_err(|err| Error::MpscChannelSend(err.to_string()))?;
let _ = disconnection_tx.send(*remover.remote_addr());
remover.remove();
break;
}
Expand Down Expand Up @@ -322,7 +316,7 @@ mod tests {
use crate::{config::Config, wire_msg::WireMsg, Error};
use std::net::{IpAddr, Ipv4Addr};

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn echo_service() -> Result<(), Error> {
let qp2p = QuicP2p::with_config(
Some(Config {
Expand Down
6 changes: 2 additions & 4 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,9 @@ impl Endpoint {
connection.open_bi().await
}

/// Sends a message to a peer. This will attempt to re-use any existing connections
/// with the said peer. If a connection doesn't exist already, a new connection will be created.
/// Sends a message to a peer. This will attempt to use an existing connection
/// to the destination peer. If a connection does not exist, this will fail with `Error::MissingConnection`
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr) -> Result<()> {
self.connect_to(dest).await?;

let connection = self.get_connection(dest).ok_or(Error::MissingConnection)?;
connection.send_uni(msg).await?;
Ok(())
Expand Down
3 changes: 0 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,4 @@ pub enum Error {
/// Missing connection
#[error("No connection to the dest peer")]
MissingConnection,
/// Unable to send message on channel
#[error("Unable to send message on channel")]
MpscChannelSend(String),
}
31 changes: 13 additions & 18 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::future;
use std::time::Duration;
use tokio::time::timeout;

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn successful_connection() -> Result<()> {
utils::init_logging();

Expand All @@ -26,7 +26,7 @@ async fn successful_connection() -> Result<()> {
Ok(())
}

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn single_message() -> Result<()> {
utils::init_logging();

Expand Down Expand Up @@ -62,7 +62,7 @@ async fn single_message() -> Result<()> {
Ok(())
}

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn reuse_outgoing_connection() -> Result<()> {
utils::init_logging();

Expand Down Expand Up @@ -114,7 +114,7 @@ async fn reuse_outgoing_connection() -> Result<()> {
Ok(())
}

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn reuse_incoming_connection() -> Result<()> {
utils::init_logging();

Expand Down Expand Up @@ -168,20 +168,16 @@ async fn reuse_incoming_connection() -> Result<()> {
Ok(())
}

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn disconnection() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (
mut alice,
mut alice_incoming_connections,
_alice_incoming_messages,
mut alice_disconnections,
) = qp2p.new_endpoint().await?;
let (mut alice, mut alice_incoming_connections, _, mut alice_disconnections) =
qp2p.new_endpoint().await?;
let alice_addr = alice.socket_addr();

let (bob, mut bob_incoming_connections, _bob_incoming_messages, mut bob_disconnections) =
let (bob, mut bob_incoming_connections, _, mut bob_disconnections) =
qp2p.new_endpoint().await?;
let bob_addr = bob.socket_addr();

Expand Down Expand Up @@ -221,7 +217,7 @@ async fn disconnection() -> Result<()> {
Ok(())
}

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
// If both peers call `connect_to` simultaneously (that is, before any of them receives the
// others connection first), two separate connections are created. This test verifies that
Expand All @@ -238,7 +234,7 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
) = qp2p.new_endpoint().await?;
let alice_addr = alice.socket_addr();

let (mut bob, mut bob_incoming_connections, mut bob_incoming_messages, _bob_disconnections) =
let (mut bob, mut bob_incoming_connections, mut bob_incoming_messages, _) =
qp2p.new_endpoint().await?;
let bob_addr = bob.socket_addr();

Expand Down Expand Up @@ -307,17 +303,16 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
Ok(())
}

#[tokio::test(core_threads = 10)]
#[tokio::test]
async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
utils::init_logging();

let qp2p = new_qp2p()?;
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _alice_disconnections) =
let (alice, mut alice_incoming_connections, mut alice_incoming_messages, _) =
qp2p.new_endpoint().await?;
let alice_addr = alice.socket_addr();

let (bob, _bob_incoming_connections, mut bob_incoming_messages, _bob_disconnections) =
qp2p.new_endpoint().await?;
let (bob, _, mut bob_incoming_messages, _) = qp2p.new_endpoint().await?;
let bob_addr = bob.socket_addr();

// Try to establish two connections to the same peer at the same time.
Expand Down

0 comments on commit 860ea5b

Please sign in to comment.