Skip to content

Commit

Permalink
chore: remove unbounded channels
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Jul 13, 2021
1 parent 964558c commit 9455975
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
32 changes: 17 additions & 15 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use super::{
use bytes::Bytes;
use futures::{future, stream::StreamExt};
use std::net::SocketAddr;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::Sender;
use tokio::time::{timeout, Duration};
use tracing::{error, trace, warn};
use tracing::{debug, error, trace, warn};

/// Connection instance to a node which can be used to send messages to it
#[derive(Clone)]
Expand Down Expand Up @@ -139,9 +139,9 @@ pub async fn send_msg(mut send_stream: &mut quinn::SendStream, msg: Bytes) -> Re
pub(super) fn listen_for_incoming_connections(
mut quinn_incoming: quinn::Incoming,
connection_pool: ConnectionPool,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
connection_tx: UnboundedSender<SocketAddr>,
disconnection_tx: UnboundedSender<SocketAddr>,
message_tx: Sender<(SocketAddr, Bytes)>,
connection_tx: Sender<SocketAddr>,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint,
) {
let _ = tokio::spawn(async move {
Expand All @@ -156,7 +156,7 @@ pub(super) fn listen_for_incoming_connections(
}) => {
let peer_address = connection.remote_address();
let pool_handle = connection_pool.insert(peer_address, connection).await;
let _ = connection_tx.send(peer_address);
let _ = connection_tx.send(peer_address).await;
listen_for_incoming_messages(
uni_streams,
bi_streams,
Expand Down Expand Up @@ -187,20 +187,21 @@ pub(super) fn listen_for_incoming_messages(
mut uni_streams: quinn::IncomingUniStreams,
mut bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
disconnection_tx: UnboundedSender<SocketAddr>,
message_tx: Sender<(SocketAddr, Bytes)>,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint,
) {
let src = *remover.remote_addr();
let _ = tokio::spawn(async move {
debug!("qp2p another incoming listerner spawned");
let _ = future::join(
read_on_uni_streams(&mut uni_streams, src, message_tx.clone()),
read_on_bi_streams(&mut bi_streams, src, message_tx, &endpoint),
)
.await;

tracing::trace!("The connection to {:?} has been terminated.", src);
let _ = disconnection_tx.send(src);
trace!("The connection to {:?} has been terminated.", src);
let _ = disconnection_tx.send(src).await;
remover.remove().await;
});
}
Expand All @@ -209,7 +210,7 @@ pub(super) fn listen_for_incoming_messages(
async fn read_on_uni_streams(
uni_streams: &mut quinn::IncomingUniStreams,
peer_addr: SocketAddr,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
message_tx: Sender<(SocketAddr, Bytes)>,
) {
while let Some(result) = uni_streams.next().await {
match result {
Expand All @@ -227,7 +228,7 @@ async fn read_on_uni_streams(
Ok(mut recv) => loop {
match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => {
let _ = message_tx.send((peer_addr, bytes));
let _ = message_tx.send((peer_addr, bytes)).await;
}
Ok(msg) => error!("Unexpected message type: {:?}", msg),
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => break,
Expand All @@ -248,12 +249,13 @@ async fn read_on_uni_streams(
async fn read_on_bi_streams(
bi_streams: &mut quinn::IncomingBiStreams,
peer_addr: SocketAddr,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
message_tx: Sender<(SocketAddr, Bytes)>,
endpoint: &Endpoint,
) {
while let Some(result) = bi_streams.next().await {
match result {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
Err(quinn::ConnectionError::ApplicationClosed { .. })
| Err(quinn::ConnectionError::ConnectionClosed { .. }) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
break;
}
Expand All @@ -267,7 +269,7 @@ async fn read_on_bi_streams(
Ok((mut send, mut recv)) => loop {
match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => {
let _ = message_tx.send((peer_addr, bytes));
let _ = message_tx.send((peer_addr, bytes)).await;
}
Ok(WireMsg::EndpointEchoReq) => {
if let Err(error) = handle_endpoint_echo_req(peer_addr, &mut send).await {
Expand Down
22 changes: 13 additions & 9 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use super::{
use bytes::Bytes;
use std::{net::SocketAddr, time::Duration};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};

use tokio::time::timeout;
use tracing::{debug, error, info, trace, warn};

Expand All @@ -39,8 +40,11 @@ const PORT_FORWARD_TIMEOUT: u64 = 30;
// Number of seconds before timing out the echo service query.
const ECHO_SERVICE_QUERY_TIMEOUT: u64 = 30;

/// Standard size of our channel bounds
const STANDARD_CHANNEL_SIZE: usize = 5;

/// Channel on which incoming messages can be listened to
pub struct IncomingMessages(pub(crate) UnboundedReceiver<(SocketAddr, Bytes)>);
pub struct IncomingMessages(pub(crate) MpscReceiver<(SocketAddr, Bytes)>);

impl IncomingMessages {
/// Blocks and returns the next incoming message and the source peer address
Expand All @@ -50,7 +54,7 @@ impl IncomingMessages {
}

/// Channel on which incoming connections are notified on
pub struct IncomingConnections(pub(crate) UnboundedReceiver<SocketAddr>);
pub struct IncomingConnections(pub(crate) MpscReceiver<SocketAddr>);

impl IncomingConnections {
/// Blocks until there is an incoming connection and returns the address of the
Expand All @@ -61,7 +65,7 @@ impl IncomingConnections {
}

/// Disconnection
pub struct DisconnectionEvents(pub(crate) UnboundedReceiver<SocketAddr>);
pub struct DisconnectionEvents(pub(crate) MpscReceiver<SocketAddr>);

impl DisconnectionEvents {
/// Blocks until there is a disconnection event and returns the address of the disconnected peer
Expand All @@ -77,8 +81,8 @@ pub struct Endpoint {
local_addr: SocketAddr,
public_addr: Option<SocketAddr>,
quic_endpoint: quinn::Endpoint,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
disconnection_tx: UnboundedSender<SocketAddr>,
message_tx: MpscSender<(SocketAddr, Bytes)>,
disconnection_tx: MpscSender<SocketAddr>,
client_cfg: quinn::ClientConfig,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
Expand Down Expand Up @@ -117,9 +121,9 @@ impl Endpoint {
};
let connection_pool = ConnectionPool::new();

let (message_tx, message_rx) = mpsc::unbounded_channel();
let (connection_tx, connection_rx) = mpsc::unbounded_channel();
let (disconnection_tx, disconnection_rx) = mpsc::unbounded_channel();
let (message_tx, message_rx) = mpsc::channel(STANDARD_CHANNEL_SIZE);
let (connection_tx, connection_rx) = mpsc::channel(STANDARD_CHANNEL_SIZE);
let (disconnection_tx, disconnection_rx) = mpsc::channel(STANDARD_CHANNEL_SIZE);
let (termination_tx, termination_rx) = broadcast::channel(1);

let mut endpoint = Self {
Expand Down

0 comments on commit 9455975

Please sign in to comment.