Skip to content

Commit

Permalink
feat: drop retry support
Browse files Browse the repository at this point in the history
  • Loading branch information
b-zee authored and bochaco committed Dec 12, 2022
1 parent 5da6a33 commit 7909ca5
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 233 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Expand Up @@ -15,7 +15,6 @@ edition = "2021"
name = "p2p_node"

[dependencies]
backoff = { version = "0.3.0", features = ["tokio"] }
bincode = "1.2.1"
bytes = { version = "1.0.1", features = ["serde"] }
futures = "~0.3.8"
Expand Down
7 changes: 1 addition & 6 deletions README.md
Expand Up @@ -43,12 +43,7 @@ To minimise this overhead, all opened connections are pooled and reused based on

### Fault tolerance

All operations are retried using an exponential back-off strategy (with jitter), configurable by how long to keep retrying for.
This can help ensure continuity over flaky connections.

**Note:** This means that all messages sent with this library will have 'at least once' delivery – ideally message handling should be idempotent, or else deduplication will have to be performed on receipt.

Additionally, APIs are available to connect to any peer from a list of peers.
APIs are available to connect to any peer from a list of peers.
Connections are established concurrently, and the first to succeed is kept, while the rest are discarded.
This allows connecting to any of a set of equivalent peers, finding a still-reachable peer in a set of previously known peers, etc.

Expand Down
119 changes: 2 additions & 117 deletions src/config.rs
Expand Up @@ -13,50 +13,16 @@ use quinn::{IdleTimeout, VarInt};

use rustls::{Certificate, ClientConfig, ServerName};
use serde::{Deserialize, Serialize};
use std::{future::Future, net::IpAddr, sync::Arc, time::Duration};
use std::{net::IpAddr, sync::Arc, time::Duration};

#[cfg(feature = "structopt")]
use structopt::StructOpt;

/// Default for [`RetryConfig::max_retry_interval`] (500 ms).
///
/// Together with the default max and multiplier,
/// gives 5-6 retries in ~30 s total retry time.
pub const DEFAULT_INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(500);
#[cfg(feature = "structopt")]
const DEFAULT_INITIAL_RETRY_INTERVAL_STR: &str = "500";

/// Default for [`Config::idle_timeout`] (18seconds).
///
/// Ostensibly a little inside the 20s that a lot of routers might cut off at.
pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(18);

/// Default for [`RetryConfig::max_retry_interval`] (15 s).
///
/// Together with the default min and multiplier,
/// gives 5-6 retries in ~30 s total retry time.
pub const DEFAULT_MAX_RETRY_INTERVAL: Duration = Duration::from_secs(15);
#[cfg(feature = "structopt")]
const DEFAULT_MAX_RETRY_INTERVAL_STR: &str = "15";

/// Default for [`RetryConfig::retry_delay_multiplier`] (x1.5).
///
/// Together with the default max and initial,
/// gives 5-6 retries in ~30 s total retry time.
pub const DEFAULT_RETRY_INTERVAL_MULTIPLIER: f64 = 1.5;
#[cfg(feature = "structopt")]
const DEFAULT_RETRY_INTERVAL_MULTIPLIER_STR: &str = "1.5";

/// Default for [`RetryConfig::retry_delay_rand_factor`] (0.3).
pub const DEFAULT_RETRY_DELAY_RAND_FACTOR: f64 = 0.3;
#[cfg(feature = "structopt")]
const DEFAULT_RETRY_DELAY_RAND_FACTOR_STR: &str = "0.3";

/// Default for [`RetryConfig::retrying_max_elapsed_time`] (30 s).
pub const DEFAULT_RETRYING_MAX_ELAPSED_TIME: Duration = Duration::from_secs(30);
#[cfg(feature = "structopt")]
const DEFAULT_RETRYING_MAX_ELAPSED_TIME_STR: &str = "30";

// We use a hard-coded server name for self-signed certificates.
pub(crate) const SERVER_NAME: &str = "maidsafe.net";

Expand Down Expand Up @@ -107,7 +73,7 @@ pub struct Config {
#[cfg_attr(feature = "structopt", structopt(long))]
pub external_port: Option<u16>,

/// External IP address of the computer on the WAN. This field is mandatory if the node is the genesis node.
/// External IP address of the computer on the WAN. This field is mandatory if the node is the genesis node.
/// In case of non-genesis nodes, the external IP address will be resolved using the Echo service.
#[cfg_attr(feature = "structopt", structopt(long))]
pub external_ip: Option<IpAddr>,
Expand Down Expand Up @@ -136,92 +102,13 @@ pub struct Config {
/// Must be nonzero for the peer to open any bidirectional streams.
#[cfg_attr(feature = "structopt", structopt(long))]
pub max_concurrent_bidi_streams: Option<u32>,

/// Retry configurations for establishing connections and sending messages.
/// Determines the retry behaviour of requests, by setting the back off strategy used.
#[serde(default)]
#[cfg_attr(feature = "structopt", structopt(flatten))]
pub retry_config: RetryConfig,
}

#[cfg(feature = "structopt")]
fn parse_millis(millis: &str) -> Result<Duration, std::num::ParseIntError> {
Ok(Duration::from_millis(millis.parse()?))
}

/// Retry configurations for establishing connections and sending messages.
/// Determines the retry behaviour of requests, by setting the back off strategy used.
#[cfg_attr(feature = "structopt", derive(StructOpt))]
#[derive(Clone, Debug, Copy, Serialize, Deserialize)]
pub struct RetryConfig {
/// The initial retry interval.
///
/// This is the first delay before a retry, for establishing connections and sending messages.
/// The subsequent delay will be decided by the `retry_delay_multiplier`.
#[cfg_attr(feature = "structopt", structopt(long, default_value = DEFAULT_INITIAL_RETRY_INTERVAL_STR, parse(try_from_str = parse_millis), value_name = "MILLIS"))]
pub initial_retry_interval: Duration,
/// The maximum value of the back off period. Once the retry interval reaches this
/// value it stops increasing.
///
/// This is the longest duration we will have,
/// for establishing connections and sending messages.
/// Retrying continues even after the duration times have reached this duration.
/// The number of retries before that happens, will be decided by the `retry_delay_multiplier`.
/// The number of retries after that, will be decided by the `retrying_max_elapsed_time`.
#[cfg_attr(feature = "structopt", structopt(long, default_value = DEFAULT_MAX_RETRY_INTERVAL_STR, parse(try_from_str = parse_millis), value_name = "MILLIS"))]
pub max_retry_interval: Duration,
/// The value to multiply the current interval with for each retry attempt.
#[cfg_attr(feature = "structopt", structopt(long, default_value = DEFAULT_RETRY_INTERVAL_MULTIPLIER_STR))]
pub retry_delay_multiplier: f64,
/// The randomization factor to use for creating a range around the retry interval.
///
/// A randomization factor of 0.5 results in a random period ranging between 50% below and 50%
/// above the retry interval.
#[cfg_attr(feature = "structopt", structopt(long, default_value = DEFAULT_RETRY_DELAY_RAND_FACTOR_STR))]
pub retry_delay_rand_factor: f64,
/// The maximum elapsed time after instantiating
///
/// Retrying continues until this time has elapsed.
/// The number of retries before that happens, will be decided by the other retry config options.
#[cfg_attr(feature = "structopt", structopt(long, default_value = DEFAULT_RETRYING_MAX_ELAPSED_TIME_STR, parse(try_from_str = parse_millis), value_name = "MILLIS"))]
pub retrying_max_elapsed_time: Duration,
}

impl RetryConfig {
// Perform `op` and retry on errors as specified by this configuration.
//
// Note that `backoff::Error<E>` implements `From<E>` for any `E` by creating a
// `backoff::Error::Transient`, meaning that errors will be retried unless explicitly returning
// `backoff::Error::Permanent`.
pub(crate) fn retry<R, E, Fn, Fut>(&self, op: Fn) -> impl Future<Output = Result<R, E>>
where
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<R, backoff::Error<E>>>,
{
let backoff = backoff::ExponentialBackoff {
initial_interval: self.initial_retry_interval,
randomization_factor: self.retry_delay_rand_factor,
multiplier: self.retry_delay_multiplier,
max_interval: self.max_retry_interval,
max_elapsed_time: Some(self.retrying_max_elapsed_time),
..Default::default()
};
backoff::future::retry(backoff, op)
}
}

impl Default for RetryConfig {
fn default() -> Self {
Self {
initial_retry_interval: DEFAULT_INITIAL_RETRY_INTERVAL,
max_retry_interval: DEFAULT_MAX_RETRY_INTERVAL,
retry_delay_multiplier: DEFAULT_RETRY_INTERVAL_MULTIPLIER,
retry_delay_rand_factor: DEFAULT_RETRY_DELAY_RAND_FACTOR,
retrying_max_elapsed_time: DEFAULT_RETRYING_MAX_ELAPSED_TIME,
}
}
}

/// Config that has passed validation.
///
/// Generally this is a copy of [`Config`] without optional values where we would use defaults.
Expand All @@ -231,7 +118,6 @@ pub(crate) struct InternalConfig {
pub(crate) server: quinn::ServerConfig,
pub(crate) external_port: Option<u16>,
pub(crate) external_ip: Option<IpAddr>,
pub(crate) retry_config: Arc<RetryConfig>,
}

impl InternalConfig {
Expand Down Expand Up @@ -278,7 +164,6 @@ impl InternalConfig {
server,
external_port: config.external_port,
external_ip: config.external_ip,
retry_config: Arc::new(config.retry_config),
})
}

Expand Down
48 changes: 10 additions & 38 deletions src/connection.rs
@@ -1,7 +1,7 @@
//! A message-oriented API wrapping the underlying QUIC library (`quinn`).

use crate::{
config::{RetryConfig, SERVER_NAME},
config::SERVER_NAME,
error::{ConnectionError, RecvError, RpcError, SendError, StreamError},
wire_msg::{UsrMsgBytes, WireMsg},
};
Expand Down Expand Up @@ -31,7 +31,6 @@ type ResponseStream = SendStream;
#[derive(Clone)]
pub struct Connection {
inner: quinn::Connection,
default_retry_config: Option<Arc<RetryConfig>>,

// A reference to the 'alive' marker for the connection. This isn't read by `Connection`, but
// must be held to keep background listeners alive until both halves of the connection are
Expand All @@ -42,7 +41,6 @@ pub struct Connection {
impl Connection {
pub(crate) fn new(
endpoint: quinn::Endpoint,
default_retry_config: Option<Arc<RetryConfig>>,
connection: quinn::NewConnection,
) -> (Connection, ConnectionIncoming) {
// this channel serves to keep the background message listener alive so long as one side of
Expand All @@ -52,7 +50,6 @@ impl Connection {
let peer_address = connection.connection.remote_address();
let conn = Self {
inner: connection.connection,
default_retry_config,
_alive_tx: Arc::clone(&alive_tx),
};
let conn_id = conn.id();
Expand Down Expand Up @@ -88,17 +85,14 @@ impl Connection {
self.inner.remote_address()
}

/// Send a message to the peer with default retry configuration.
/// Send a message to the peer.
///
/// The message will be sent on a unidirectional QUIC stream, meaning the application is
/// responsible for correlating any anticipated responses from incoming streams.
///
/// The priority will be `0` and retry behaviour will be determined by the
/// [`Config`](crate::Config) that was used to construct the [`Endpoint`] this connection
/// belongs to. See [`send_with`](Self::send_with) if you want to send a message with specific
/// configuration.
/// The priority will be `0`.
pub async fn send(&self, bytes: UsrMsgBytes) -> Result<(), SendError> {
self.send_with(bytes, 0, None).await
self.send_with(bytes, 0).await
}

/// Send a message to the peer using the given configuration.
Expand All @@ -108,26 +102,8 @@ impl Connection {
&self,
user_msg_bytes: UsrMsgBytes,
priority: i32,
retry_config: Option<&RetryConfig>,
) -> Result<(), SendError> {
match retry_config.or(self.default_retry_config.as_deref()) {
Some(retry_config) => {
retry_config
.retry(|| async {
self.send_uni(user_msg_bytes.clone(), priority)
.await
.map_err(|error| match &error {
// don't retry on connection loss, since we can't recover that from here
SendError::ConnectionLost(_) => backoff::Error::Permanent(error),
_ => backoff::Error::Transient(error),
})
})
.await?;
}
None => {
self.send_uni(user_msg_bytes, priority).await?;
}
}
self.send_uni(user_msg_bytes, priority).await?;
Ok(())
}

Expand Down Expand Up @@ -642,13 +618,12 @@ mod tests {
{
let (p1_tx, mut p1_rx) = Connection::new(
peer1.clone(),
None,
peer1.connect(peer2.local_addr()?, SERVER_NAME)?.await?,
);

let (p2_tx, mut p2_rx) =
if let Some(connection) = timeout(peer2_incoming.then(|c| c).try_next()).await?? {
Connection::new(peer2.clone(), None, connection)
Connection::new(peer2.clone(), connection)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand Down Expand Up @@ -702,13 +677,12 @@ mod tests {
// open a connection between the two peers
let (p1_tx, _) = Connection::new(
peer1.clone(),
None,
peer1.connect(peer2.local_addr()?, SERVER_NAME)?.await?,
);

let (_, mut p2_rx) =
if let Some(connection) = timeout(peer2_incoming.then(|c| c).try_next()).await?? {
Connection::new(peer2.clone(), None, connection)
Connection::new(peer2.clone(), connection)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand Down Expand Up @@ -747,14 +721,13 @@ mod tests {
{
let (p1_tx, _) = Connection::new(
peer1.clone(),
None,
peer1.connect(peer2.local_addr()?, SERVER_NAME)?.await?,
);

// we need to accept the connection on p2, or the message won't be processed
let _p2_handle =
if let Some(connection) = timeout(peer2_incoming.then(|c| c).try_next()).await?? {
Connection::new(peer2.clone(), None, connection)
Connection::new(peer2.clone(), connection)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand Down Expand Up @@ -799,14 +772,13 @@ mod tests {
{
let (p1_tx, _) = Connection::new(
peer1.clone(),
None,
peer1.connect(peer2.local_addr()?, SERVER_NAME)?.await?,
);

// we need to accept the connection on p2, or the message won't be processed
let _p2_handle =
if let Some(connection) = timeout(peer2_incoming.then(|c| c).try_next()).await?? {
Connection::new(peer2.clone(), None, connection)
Connection::new(peer2.clone(), connection)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand All @@ -819,7 +791,7 @@ mod tests {
// we need to accept the connection on p1, or the message won't be processed
let _p1_handle =
if let Some(connection) = timeout(peer1_incoming.then(|c| c).try_next()).await?? {
Connection::new(peer1.clone(), None, connection)
Connection::new(peer1.clone(), connection)
} else {
bail!("did not receive incoming connection when one was expected");
};
Expand Down

0 comments on commit 7909ca5

Please sign in to comment.