Skip to content

Commit

Permalink
refactor: some code and log msgs minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Dec 27, 2022
1 parent 43dd1c6 commit 8e00563
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
15 changes: 10 additions & 5 deletions src/connection.rs
Expand Up @@ -154,9 +154,11 @@ fn listen_on_uni_streams(connection: quinn::Connection, tx: Sender<IncomingMsg>)
let uni = connection.accept_uni().await.map_err(ConnectionError::from);
let mut recv = match uni {
Ok(recv) => recv,
// In case of a connection error, there is not much we can do.
Err(err) => {
trace!("Connection {conn_id}: incoming uni-stream: ERROR: {err}");
// In case of a connection error, there is not much we can do.
trace!(
"Connection {conn_id}: failure when awaiting incoming uni-streams: {err:?}"
);
// WARNING: This might block!
let _ = tx.send(Err(RecvError::ConnectionLost(err))).await;
break;
Expand Down Expand Up @@ -204,9 +206,11 @@ fn listen_on_bi_streams(
let bi = connection.accept_bi().await.map_err(ConnectionError::from);
let (send, mut recv) = match bi {
Ok(recv) => recv,
// In case of a connection error, there is not much we can do.
Err(err) => {
trace!("Connection {conn_id}: incoming bi-stream: ERROR: {err:?}");
// In case of a connection error, there is not much we can do.
trace!(
"Connection {conn_id}: failure when awaiting incoming bi-streams: {err:?}"
);
// WARNING: This might block!
let _ = tx.send(Err(RecvError::ConnectionLost(err))).await;
break;
Expand Down Expand Up @@ -248,9 +252,10 @@ fn listen_on_bi_streams(
};

// Pass the stream, so it can be used to respond to the user message.
let msg = msg.map(|msg| (msg, Some(SendStream::new(send, conn_id))));
let msg = msg.map(|msg| (msg, Some(SendStream::new(send, conn_id.clone()))));
// Send away the msg or error
let _ = tx.send(msg).await;
trace!("Incoming new msg on conn_id={conn_id} sent to user in upper layer");
});
}

Expand Down
33 changes: 16 additions & 17 deletions src/endpoint.rs
Expand Up @@ -24,7 +24,7 @@ use tracing::{error, info, trace, warn};
const ECHO_SERVICE_QUERY_TIMEOUT: Duration = Duration::from_secs(30);

/// Standard size of our channel bounds
const STANDARD_CHANNEL_SIZE: usize = 10000;
const STANDARD_CHANNEL_SIZE: usize = 10_000;

/// Channel on which incoming connections are notified on
#[derive(Debug)]
Expand Down Expand Up @@ -406,26 +406,25 @@ pub(super) fn listen_for_incoming_connections(
connection_tx: mpsc::Sender<(Connection, ConnectionIncoming)>,
) {
let _ = tokio::spawn(async move {
loop {
match quinn_endpoint.accept().await {
Some(quinn_conn) => match quinn_conn.await {
Ok(connection) => {
let connection = Connection::new(connection, quinn_endpoint.clone());
let conn_id = connection.0.id();
if connection_tx.send(connection).await.is_err() {
warn!("Dropping incoming connection conn_id={conn_id}, because receiver was dropped");
}
while let Some(quinn_conn) = quinn_endpoint.accept().await {
match quinn_conn.await {
Ok(connection) => {
let connection = Connection::new(connection, quinn_endpoint.clone());
let conn_id = connection.0.id();
trace!("Incoming new connection conn_id={conn_id}");
if connection_tx.send(connection).await.is_err() {
warn!("Dropping incoming connection conn_id={conn_id}, because receiver was dropped");
}
Err(err) => {
warn!("An incoming connection failed because of: {:?}", err);
}
},
None => {
trace!("quinn::Incoming::next() returned None. There will be no more incoming connections");
break;
}
Err(err) => {
warn!("An incoming connection failed because of: {:?}", err);
}
}
}

trace!(
"quinn::Endpoint::accept() returned None. There will be no more incoming connections"
);
});
}

Expand Down

0 comments on commit 8e00563

Please sign in to comment.