Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: make the resend delay configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Sep 29, 2020
1 parent cbc57ba commit 8a0d043
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 19 deletions.
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@
unused_results,
clippy::needless_borrow
)]
// FIXME: find a way to not need this.
#![type_length_limit = "2259754"]
// FIXME: it seems the code in `Comm::send_message_to_targets` is triggering type-length limit
// reached error for some reason. This is a quick workaround, but we should probably look into it
// closely and find a proper fix (or establish that this is already a proper fix).
#![type_length_limit = "2268004"]

#[macro_use]
extern crate log;
Expand Down
12 changes: 4 additions & 8 deletions src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ mod stage;
#[cfg(all(test, feature = "mock"))]
mod tests;

pub use self::event_stream::EventStream;
#[cfg(feature = "mock")]
pub use self::stage::{BOOTSTRAP_TIMEOUT, JOIN_TIMEOUT};

pub use event_stream::EventStream;

use self::{executor::Executor, stage::Stage};
use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -83,16 +82,13 @@ impl Node {
let mut rng = MainRng::default();
let full_id = config.full_id.unwrap_or_else(|| FullId::gen(&mut rng));
let node_name = *full_id.public_id().name();
let transport_config = config.transport_config;
let network_params = config.network_params;
let is_genesis = config.first;

let (stage, incoming_conns, timer_rx, events_rx) = if is_genesis {
let (stage, incoming_conns, timer_rx, events_rx) = if config.first {
info!("{} Starting a new network as the seed node.", node_name);
Stage::first_node(transport_config, full_id, network_params).await?
Stage::first_node(config.transport_config, full_id, config.network_params).await?
} else {
info!("{} Bootstrapping a new node.", node_name);
Stage::bootstrap(transport_config, full_id, network_params).await?
Stage::bootstrap(config.transport_config, full_id, config.network_params).await?
};

let stage = Arc::new(Mutex::new(stage));
Expand Down
45 changes: 38 additions & 7 deletions src/node/stage/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
};
use lru_time_cache::LruCache;
use qp2p::{Config, Connection, Endpoint, IncomingConnections, QuicP2p};
use qp2p::{Connection, Endpoint, IncomingConnections, QuicP2p};
use std::{net::SocketAddr, slice, sync::Arc, time::Duration};
use tokio::time;

Expand All @@ -22,18 +22,45 @@ const CONNECTIONS_CACHE_SIZE: usize = 1024;

/// Maximal number of resend attempts to the same target.
pub const RESEND_MAX_ATTEMPTS: u8 = 3;
/// Delay before attempting to resend a previously failed message.
/// Default delay before attempting to resend a previously failed message.
pub const RESEND_DELAY: Duration = Duration::from_secs(10);

/// Configuration for the communication component.
pub struct Config {
/// Config for the underlying network transport.
pub transport_config: qp2p::Config,
/// Delay before attempting to resend a message that previously failed to send.
pub resend_delay: Duration,
}

impl Default for Config {
fn default() -> Self {
Self {
transport_config: Default::default(),
resend_delay: RESEND_DELAY,
}
}
}

impl From<qp2p::Config> for Config {
fn from(transport_config: qp2p::Config) -> Self {
Self {
transport_config,
..Default::default()
}
}
}

// Communication component of the node to interact with other nodes.
#[derive(Clone)]
pub(crate) struct Comm {
inner: Arc<Inner>,
}

impl Comm {
pub async fn new(transport_config: Config) -> Result<Self> {
let quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?;
pub async fn new(config: Config) -> Result<Self> {
let quic_p2p =
QuicP2p::with_config(Some(config.transport_config), Default::default(), true)?;

// Don't bootstrap, just create an endpoint where to listen to
// the incoming messages from other nodes.
Expand All @@ -45,12 +72,14 @@ impl Comm {
_quic_p2p: quic_p2p,
endpoint,
node_conns,
resend_delay: config.resend_delay,
}),
})
}

pub async fn from_bootstrapping(transport_config: Config) -> Result<(Self, SocketAddr)> {
let mut quic_p2p = QuicP2p::with_config(Some(transport_config), Default::default(), true)?;
pub async fn from_bootstrapping(config: Config) -> Result<(Self, SocketAddr)> {
let mut quic_p2p =
QuicP2p::with_config(Some(config.transport_config), Default::default(), true)?;

// Bootstrap to the network returning the connection to a node.
let (endpoint, conn) = quic_p2p.bootstrap().await?;
Expand All @@ -66,6 +95,7 @@ impl Comm {
_quic_p2p: quic_p2p,
endpoint,
node_conns,
resend_delay: config.resend_delay,
}),
},
addr,
Expand Down Expand Up @@ -166,6 +196,7 @@ struct Inner {
_quic_p2p: QuicP2p,
endpoint: Endpoint,
node_conns: Mutex<LruCache<SocketAddr, Arc<Connection>>>,
resend_delay: Duration,
}

impl Inner {
Expand Down Expand Up @@ -199,7 +230,7 @@ impl Inner {
delay: bool,
) -> (SocketAddr, Result<()>) {
if delay {
time::delay_for(RESEND_DELAY).await;
time::delay_for(self.resend_delay).await;
}

let result = self.send(&recipient, msg).await;
Expand Down
4 changes: 2 additions & 2 deletions src/node/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Stage {
mpsc::UnboundedReceiver<u64>,
mpsc::UnboundedReceiver<Event>,
)> {
let comm = Comm::new(transport_config).await?;
let comm = Comm::new(transport_config.into()).await?;
let connection_info = comm.our_connection_info()?;
let p2p_node = P2pNode::new(*full_id.public_id(), connection_info);

Expand Down Expand Up @@ -154,7 +154,7 @@ impl Stage {
mpsc::UnboundedReceiver<u64>,
mpsc::UnboundedReceiver<Event>,
)> {
let (comm, addr) = Comm::from_bootstrapping(transport_config).await?;
let (comm, addr) = Comm::from_bootstrapping(transport_config.into()).await?;

let (events_tx, events_rx) = mpsc::unbounded_channel();
let node_info = NodeInfo {
Expand Down

0 comments on commit 8a0d043

Please sign in to comment.