Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(comm): dont hold on to messages sent on a channel that is unused
Browse files Browse the repository at this point in the history
- dont try to connect to a client when sending back a message
- also fixes the logic of a the send_after_reconnect test
  • Loading branch information
lionel-faber committed Feb 15, 2021
1 parent c95f27c commit 92856cd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 31 deletions.
41 changes: 11 additions & 30 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use tokio::{sync::mpsc, task};
pub(crate) struct Comm {
_quic_p2p: QuicP2p,
endpoint: Endpoint,
_incoming_connections: qp2p::IncomingConnections,
// Sender for connection events. Kept here so we can clone it and pass it to the incoming
// messages handler every time we establish new connection. It's kept in an `Option` so we can
// take it out and drop it on `terminate` which together with all the incoming message handlers
Expand All @@ -40,8 +39,7 @@ impl Comm {

// Don't bootstrap, just create an endpoint where to listen to
// the incoming messages from other nodes.
let (endpoint, incoming_connections, incoming_messages, disconnections) =
quic_p2p.new_endpoint().await?;
let (endpoint, _, incoming_messages, disconnections) = quic_p2p.new_endpoint().await?;

let _ = task::spawn(handle_incoming_messages(
incoming_messages,
Expand All @@ -55,7 +53,6 @@ impl Comm {

Ok(Self {
_quic_p2p: quic_p2p,
_incoming_connections: incoming_connections,
endpoint,
event_tx: RwLock::new(Some(event_tx)),
})
Expand All @@ -68,7 +65,7 @@ impl Comm {
let quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?;

// Bootstrap to the network returning the connection to a node.
let (endpoint, incoming_connections, incoming_messages, disconnections, bootstrap_addr) =
let (endpoint, _, incoming_messages, disconnections, bootstrap_addr) =
quic_p2p.bootstrap().await?;

let _ = task::spawn(handle_incoming_messages(
Expand All @@ -84,7 +81,6 @@ impl Comm {
Ok((
Self {
_quic_p2p: quic_p2p,
_incoming_connections: incoming_connections,
endpoint,
event_tx: RwLock::new(Some(event_tx)),
},
Expand Down Expand Up @@ -112,25 +108,10 @@ impl Comm {
recipient: &SocketAddr,
msg: Bytes,
) -> Result<(), SendError> {
// This will attempt to used a cached connection
if self
.endpoint
.send_message(msg.clone(), recipient)
self.endpoint
.send_message(msg, recipient)
.await
.is_err()
{
// If the sending of a message failed the connection would no longer
// exist in the pool. Calling send_message agail will use a new connection.
if let Err(err) = self.endpoint.send_message(msg, recipient).await {
error!(
"Sending message to client {:?} failed with error {:?}",
recipient, err
);
} else {
return Ok(());
}
}
Err(SendError)
.map_err(|_| SendError)
}

/// Sends a message to multiple recipients. Attempts to send to `delivery_group_size`
Expand Down Expand Up @@ -227,7 +208,8 @@ impl Comm {
}

// If the sending of a message failed the connection would no longer
// exist in the pool. Calling send_message again will use a new connection.
// exist in the pool. So we connect again and then send the message.
self.endpoint.connect_to(recipient).await?;
self.endpoint.send_message(msg, recipient).await
}
}
Expand Down Expand Up @@ -416,9 +398,7 @@ mod tests {
let send_comm = Comm::new(transport_config(), tx).await?;

let recv_transport = QuicP2p::with_config(Some(transport_config()), &[], false)?;
#[allow(unused)]
let (recv_endpoint, incoming_connection, mut incoming_msgs, disconnections) =
recv_transport.new_endpoint().await?;
let (mut recv_endpoint, _, mut incoming_msgs, _) = recv_transport.new_endpoint().await?;
let recv_addr = recv_endpoint.socket_addr();

// Send the first message.
Expand All @@ -430,11 +410,12 @@ mod tests {

let mut msg0_received = false;

// Receive one message and drop the incoming stream.
// Receive one message and disconnect from the peer
{
if let Some((_src, msg)) = time::timeout(TIMEOUT, incoming_msgs.next()).await? {
if let Some((src, msg)) = time::timeout(TIMEOUT, incoming_msgs.next()).await? {
assert_eq!(msg, msg0);
msg0_received = true;
recv_endpoint.disconnect_from(&src)?;
}
assert!(msg0_received);
}
Expand Down
2 changes: 1 addition & 1 deletion src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async fn receive_mismatching_get_section_request_as_adult() -> Result<()> {

let node = create_node();
let state = Approved::new(node, section, None, mpsc::unbounded_channel().0);
let stage = Stage::new(state, create_comm()?);
let stage = Stage::new(state, create_comm().await?);

let new_node_name = bad_prefix.substituted_in(rand::random());
let new_node_addr = gen_addr();
Expand Down

0 comments on commit 92856cd

Please sign in to comment.