Skip to content

Commit

Permalink
Merge 0fa1687 into b76771e
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Jul 15, 2021
2 parents b76771e + 0fa1687 commit aeafdbd
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 97 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ on:
- master

env:
# Run all cargo commands with --verbose.
CARGO_TERM_VERBOSE: true
RUST_BACKTRACE: 1

jobs:
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ name: PR
on: pull_request

env:
# Run all cargo commands with --verbose.
CARGO_TERM_VERBOSE: true
RUST_BACKTRACE: 1
# Deny all compiler warnings.
RUSTFLAGS: "-D warnings -C opt-level=2 -C codegen-units=8"
Expand Down Expand Up @@ -93,7 +91,7 @@ jobs:
- name: rust-tarpaulin code coverage check
uses: actions-rs/tarpaulin@v0.1
with:
args: '-v --release --out Lcov -- --skip echo_service' # test would timeout on CI
args: '-v --release --out Lcov -- --skip echo_service --skip multiple_connections_with_many_larger_concurrent_messages' # tests would timeout on CI
- name: Push code coverage results to coveralls.io
uses: coverallsapp/github-action@master
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
Cargo.lock
*.*~
.idea
*.log
3 changes: 2 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
use super::{
bootstrap_cache::BootstrapCache,
config::{Config, SerialisableCertificate},
endpoint::{DisconnectionEvents, Endpoint, IncomingConnections, IncomingMessages},
connections::DisconnectionEvents,
endpoint::{Endpoint, IncomingConnections, IncomingMessages},
error::{Error, Result},
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
};
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub struct Config {
/// Duration of a UPnP port mapping.
#[structopt(long)]
pub upnp_lease_duration: Option<u32>,
/// Duration to wait before retrying to resend / reconnect
#[structopt(long, default_value = "500")]
pub retry_interval: u64,
}

/// To be used to read and write our certificate and private key to disk esp. as a part of our
Expand Down
2 changes: 1 addition & 1 deletion src/connection_deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ConnectionDeduplicator {
pub(crate) enum Error {
#[error("Connect error")]
Connect(#[from] quinn::ConnectError),
#[error("Connection error")]
#[error("Quinn Connection error during deduplication")]
Connection(#[from] quinn::ConnectionError),
}

Expand Down
98 changes: 64 additions & 34 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use super::{
use bytes::Bytes;
use futures::{future, stream::StreamExt};
use std::net::SocketAddr;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{timeout, Duration};
use tracing::{debug, error, trace, warn};
use tracing::{trace, warn};

/// Connection instance to a node which can be used to send messages to it
#[derive(Clone)]
Expand All @@ -28,6 +28,17 @@ pub(crate) struct Connection {
remover: ConnectionRemover,
}

/// Disconnection events, and the result that led to disconnection.
pub struct DisconnectionEvents(pub Receiver<SocketAddr>);

/// Disconnection
impl DisconnectionEvents {
/// Blocks until there is a disconnection event and returns the address of the disconnected peer
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}

impl Connection {
pub(crate) fn new(quic_conn: quinn::Connection, remover: ConnectionRemover) -> Self {
Self { quic_conn, remover }
Expand Down Expand Up @@ -167,10 +178,7 @@ pub(super) fn listen_for_incoming_connections(
);
}
Err(err) => {
error!(
"An incoming connection failed because of an error: {:?}",
err
);
warn!("An incoming connection failed because of: {:?}", err);
}
},
None => {
Expand All @@ -193,15 +201,22 @@ pub(super) fn listen_for_incoming_messages(
) {
let src = *remover.remote_addr();
let _ = tokio::spawn(async move {
debug!("qp2p another incoming listerner spawned");
let _ = future::join(
match future::try_join(
read_on_uni_streams(&mut uni_streams, src, message_tx.clone()),
read_on_bi_streams(&mut bi_streams, src, message_tx, &endpoint),
)
.await;
.await
{
Ok(_) => {
let _ = disconnection_tx.send(src).await;
}
Err(error) => {
trace!("Issue on stream reading from: {:?} :: {:?}", src, error);
let _ = disconnection_tx.send(src).await;
}
}

trace!("The connection to {:?} has been terminated.", src);
let _ = disconnection_tx.send(src).await;
remover.remove().await;
});
}
Expand All @@ -211,30 +226,35 @@ async fn read_on_uni_streams(
uni_streams: &mut quinn::IncomingUniStreams,
peer_addr: SocketAddr,
message_tx: Sender<(SocketAddr, Bytes)>,
) {
) -> Result<()> {
while let Some(result) = uni_streams.next().await {
match result {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
break;
Err(quinn::ConnectionError::ConnectionClosed(_))
| Err(quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::QuinnConnectionClosed);
}
Err(err) => {
warn!(
"Failed to read incoming message on uni-stream for peer {:?} with error: {:?}",
"Failed to read incoming message on uni-stream for peer {:?} with: {:?}",
peer_addr, err
);
break;
return Err(Error::from(err));
}
Ok(mut recv) => loop {
match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => {
trace!("bytes received fine from: {:?} ", peer_addr);
let _ = message_tx.send((peer_addr, bytes)).await;
}
Ok(msg) => error!("Unexpected message type: {:?}", msg),
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => break,
Ok(msg) => warn!("Unexpected message type: {:?}", msg),
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => {
warn!("Stream read finished early");
break;
}
Err(err) => {
error!(
"Failed reading from a uni-stream for peer {:?} with error: {:?}",
warn!(
"Failed reading from a uni-stream for peer {:?} with: {:?}",
peer_addr, err
);
break;
Expand All @@ -243,6 +263,7 @@ async fn read_on_uni_streams(
},
}
}
Ok(())
}

// Read messages sent by peer in a bidirectional stream.
Expand All @@ -251,20 +272,20 @@ async fn read_on_bi_streams(
peer_addr: SocketAddr,
message_tx: Sender<(SocketAddr, Bytes)>,
endpoint: &Endpoint,
) {
) -> Result<()> {
while let Some(result) = bi_streams.next().await {
match result {
Err(quinn::ConnectionError::ApplicationClosed { .. })
| Err(quinn::ConnectionError::ConnectionClosed { .. }) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
break;
Err(quinn::ConnectionError::ConnectionClosed(_))
| Err(quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::QuinnConnectionClosed);
}
Err(err) => {
warn!(
"Failed to read incoming message on bi-stream for peer {:?} with error: {:?}",
"Failed to read incoming message on bi-stream for peer {:?} with: {:?}",
peer_addr, err
);
break;
return Err(Error::from(err));
}
Ok((mut send, mut recv)) => loop {
match read_bytes(&mut recv).await {
Expand All @@ -273,10 +294,12 @@ async fn read_on_bi_streams(
}
Ok(WireMsg::EndpointEchoReq) => {
if let Err(error) = handle_endpoint_echo_req(peer_addr, &mut send).await {
error!(
"Failed to handle Echo Request for peer {:?} with error: {:?}",
warn!(
"Failed to handle Echo Request for peer {:?} with: {:?}",
peer_addr, error
);

return Err(error);
}
}
Ok(WireMsg::EndpointVerificationReq(address_sent)) => {
Expand All @@ -288,19 +311,24 @@ async fn read_on_bi_streams(
)
.await
{
error!("Failed to handle Endpoint verification request for peer {:?} with error: {:?}", peer_addr, error);
warn!("Failed to handle Endpoint verification request for peer {:?} with: {:?}", peer_addr, error);

return Err(error);
}
}
Ok(msg) => {
error!(
warn!(
"Unexpected message type from peer {:?}: {:?}",
peer_addr, msg
);
}
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => break,
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => {
warn!("Stream finished early");
break;
}
Err(err) => {
error!(
"Failed reading from a bi-stream for peer {:?} with error: {:?}",
warn!(
"Failed reading from a bi-stream for peer {:?} with: {:?}",
peer_addr, err
);
break;
Expand All @@ -309,6 +337,8 @@ async fn read_on_bi_streams(
},
}
}

Ok(())
}

async fn handle_endpoint_echo_req(
Expand Down
Loading

0 comments on commit aeafdbd

Please sign in to comment.