Skip to content

Commit

Permalink
refactor!: Use Duration in config
Browse files Browse the repository at this point in the history
This needed a bit of 'plumbing', but it's nice to use properly typed
values in the configuration.

BREAKING CHANGE: The duration fields in config have all changed:

- `idle_timeout_msec` is renamed `idle_timeout` and is now a
  `DurationMsec<DefaultIdleTimeout>`.
- `keep_alive_interval_msec` is renamed `keep_alive_interval` and is now
  a `DurationMsec<DefaultKeepAliveInterval>`.
- `upnp_lease_duration` is now a
  `DurationMsec<DefaultUpnpLeaseDuration>`.
- `retry_duration_msec` is renamed `min_retry_duration` and is now a
  `DurationMsec<DefaultMinRetryDuration>`.
  • Loading branch information
Chris Connelly committed Aug 13, 2021
1 parent 82c721f commit c15cef0
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 84 deletions.
9 changes: 6 additions & 3 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
use anyhow::Result;
use bytes::Bytes;
use qp2p::{Config, ConnId, QuicP2p};
use std::env;
use std::net::{Ipv4Addr, SocketAddr};
use std::{
env,
net::{Ipv4Addr, SocketAddr},
time::Duration,
};

#[derive(Default, Ord, PartialEq, PartialOrd, Eq, Clone, Copy)]
struct XId(pub [u8; 32]);
Expand All @@ -37,7 +40,7 @@ async fn main() -> Result<()> {

// instantiate QuicP2p with custom config
let qp2p: QuicP2p<XId> = QuicP2p::with_config(Config {
idle_timeout_msec: Some(1000 * 3600), // 1 hour idle timeout.
idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
..Default::default()
})?;

Expand Down
25 changes: 8 additions & 17 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ use super::{
connections::DisconnectionEvents,
endpoint::{Endpoint, IncomingConnections, IncomingMessages},
error::{Error, Result},
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
peer_config,
};
use std::marker::PhantomData;
use std::net::{SocketAddr, UdpSocket};
use tracing::{debug, error, trace};

/// Default duration of a UPnP lease, in seconds.
pub(crate) const DEFAULT_UPNP_LEASE_DURATION_SEC: u32 = 120;

const MAIDSAFE_DOMAIN: &str = "maidsafe.net";

/// Main QuicP2p instance to communicate with QuicP2p using an async API
Expand Down Expand Up @@ -59,33 +56,27 @@ impl<I: ConnId> QuicP2p<I> {
pub fn with_config(cfg: Config) -> Result<Self> {
debug!("Config passed in to qp2p: {:?}", cfg);

let idle_timeout_msec = cfg.idle_timeout_msec.unwrap_or(DEFAULT_IDLE_TIMEOUT_MSEC);

let keep_alive_interval_msec = cfg
.keep_alive_interval_msec
.unwrap_or(DEFAULT_KEEP_ALIVE_INTERVAL_MSEC);

let (key, cert) = {
let our_complete_cert =
SerialisableCertificate::new(vec![MAIDSAFE_DOMAIN.to_string()])?;
our_complete_cert.obtain_priv_key_and_cert()?
};

let endpoint_cfg =
peer_config::new_our_cfg(idle_timeout_msec, keep_alive_interval_msec, cert, key)?;
let idle_timeout = cfg.idle_timeout.into();
let keep_alive_interval = cfg.keep_alive_interval.into();
let upnp_lease_duration = cfg.upnp_lease_duration.into();
let min_retry_duration = cfg.min_retry_duration.into();

let client_cfg = peer_config::new_client_cfg(idle_timeout_msec, keep_alive_interval_msec)?;
let endpoint_cfg = peer_config::new_our_cfg(idle_timeout, keep_alive_interval, cert, key)?;

let upnp_lease_duration = cfg
.upnp_lease_duration
.unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC);
let client_cfg = peer_config::new_client_cfg(idle_timeout, keep_alive_interval)?;

let qp2p_config = InternalConfig {
forward_port: cfg.forward_port,
external_port: cfg.external_port,
external_ip: cfg.external_ip,
upnp_lease_duration,
retry_duration_msec: cfg.retry_duration_msec,
min_retry_duration,
};

Ok(Self {
Expand Down
178 changes: 159 additions & 19 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

//! Configuration for `Endpoint`s.

use crate::{
error::{Error, Result},
utils,
};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::{fmt, net::IpAddr, str::FromStr};
use std::{fmt, marker::PhantomData, net::IpAddr, str::FromStr, time::Duration};
use structopt::StructOpt;

/// QuicP2p configurations
Expand All @@ -23,36 +25,174 @@ pub struct Config {
/// is run locally on the network loopback or on a local area network.
#[structopt(long)]
pub forward_port: bool,

/// External port number assigned to the socket address of the program.
/// If this is provided, QP2p considers that the local port provided has been mapped to the
/// provided external port number and automatic port forwarding will be skipped.
#[structopt(long)]
pub external_port: Option<u16>,

/// External IP address of the computer on the WAN. This field is mandatory if the node is the genesis node and
/// port forwarding is not available. In case of non-genesis nodes, the external IP address will be resolved
/// using the Echo service.
#[structopt(long)]
pub external_ip: Option<IpAddr>,
/// If we hear nothing from the peer in the given interval we declare it offline to us. If none
/// supplied we'll default to the documented constant.

/// How long to wait to hear from a peer before timing out a connection.
///
/// The interval is in milliseconds. A value of 0 disables this feature.
#[structopt(long)]
pub idle_timeout_msec: Option<u64>,
/// Interval to send keep-alives if we are idling so that the peer does not disconnect from us
/// declaring us offline. If none is supplied we'll default to the documented constant.
/// In the absence of any keep-alive messages, connections will be closed if they remain idle
/// for at least this duration.
///
/// The interval is in milliseconds. A value of 0 disables this feature.
#[structopt(long)]
pub keep_alive_interval_msec: Option<u32>,
/// Duration of a UPnP port mapping.
#[structopt(long)]
pub upnp_lease_duration: Option<u32>,
/// See [`DefaultIdleTimeout`] for the default value. This is based on average time in which
/// routers would close the UDP mapping to the peer if they see no conversation between them.
#[serde(default)]
#[structopt(long, default_value)]
pub idle_timeout: DurationMsec<DefaultIdleTimeout>,

/// Interval at which to send keep-alives to maintain otherwise idle connections.
///
/// Keep-alives prevent otherwise idle connections from timing out.
///
/// See [`DefaultKeepAliveInterval`] for the default value.
#[serde(default)]
#[structopt(long, default_value)]
pub keep_alive_interval: DurationMsec<DefaultKeepAliveInterval>,

/// How long UPnP port mappings will last.
///
/// Note that UPnP port mappings will be automatically renewed on this interval. See
/// [`DefaultUpnpLeaseDuration`] for the default value, which should be suitable in most cases
/// but some routers may clear UPnP port mapping more frequently.
#[serde(default)]
#[structopt(long, default_value)]
pub upnp_lease_duration: DurationMsec<DefaultUpnpLeaseDuration>,

/// How long to retry establishing connections and sending messages.
///
/// The duration is in milliseconds. Setting this to 0 will effectively disable retries.
#[structopt(long, default_value = "30000")]
pub retry_duration_msec: u64,
/// Retrying will continue for *at least* this duration, but potentially longer as an
/// in-progress back-off delay will not be interrupted.
///
/// See [`DefaultMinRetryDuration`] for the default value.
#[serde(default)]
#[structopt(long, default_value)]
pub min_retry_duration: DurationMsec<DefaultMinRetryDuration>,
}

/// A [`Duration`] that parses and serializes to milliseconds, with specific [`Default`]s.
///
/// This allows [`Config`] to derive [`StructOpt`] and [`Deserialize`]/[`Serialize`]. The generic
/// parameter determines the default value.
///
/// Note that any `Duration` can be converted to/from a `DurationMsec<D>`, for any `D`, since the
/// `D` is only relevant for the implementation of `Default`.
///
/// ```
/// # use std::time::Duration;
/// use qp2p::{Config, config::DefaultKeepAliveInterval};
///
/// let config = qp2p::Config {
/// idle_timeout: Duration::from_secs(123).into(),
/// ..Default::default()
/// };
///
/// assert_eq!(Duration::from(config.idle_timeout), Duration::from_secs(123));
///
/// assert_eq!(
/// Duration::from(config.keep_alive_interval),
/// DefaultKeepAliveInterval.into()
/// );
/// ```
#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(transparent)]
pub struct DurationMsec<D> {
duration: Duration,
_default_marker: PhantomData<D>,
}

impl<D: Default + Into<Duration>> Default for DurationMsec<D> {
fn default() -> Self {
Self {
duration: D::default().into(),
_default_marker: PhantomData,
}
}
}

impl<D> fmt::Display for DurationMsec<D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.duration.as_millis())
}
}

impl<D> From<Duration> for DurationMsec<D> {
fn from(duration: Duration) -> Self {
Self {
duration,
_default_marker: PhantomData,
}
}
}

impl<D> From<DurationMsec<D>> for Duration {
fn from(duration: DurationMsec<D>) -> Self {
duration.duration
}
}

impl<D> FromStr for DurationMsec<D> {
type Err = std::num::ParseIntError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let duration = Duration::from_millis(s.parse()?);
Ok(Self {
duration,
_default_marker: PhantomData,
})
}
}

/// Marker struct for the default value for [`Config::idle_timeout`].
///
/// The default duration is 60 seconds.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DefaultIdleTimeout;
impl From<DefaultIdleTimeout> for Duration {
fn from(_: DefaultIdleTimeout) -> Self {
Duration::from_secs(60)
}
}

/// Marker struct for the default value for [`Config::keep_alive_interval`].
///
/// The default duration is 20 seconds.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DefaultKeepAliveInterval;
impl From<DefaultKeepAliveInterval> for Duration {
fn from(_: DefaultKeepAliveInterval) -> Self {
Duration::from_secs(20)
}
}

/// Marker struct for the default value for [`Config::upnp_lease_duration`].
///
/// The default duration is 120 seconds.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DefaultUpnpLeaseDuration;
impl From<DefaultUpnpLeaseDuration> for Duration {
fn from(_: DefaultUpnpLeaseDuration) -> Self {
Duration::from_secs(120)
}
}

/// Marker struct for the default value for [`Config::min_retry_duration`].
///
/// The default duration is 30 seconds.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DefaultMinRetryDuration;
impl From<DefaultMinRetryDuration> for Duration {
fn from(_: DefaultMinRetryDuration) -> Self {
Duration::from_secs(30)
}
}

/// Config that has passed validation.
Expand All @@ -63,8 +203,8 @@ pub(crate) struct InternalConfig {
pub(crate) forward_port: bool,
pub(crate) external_port: Option<u16>,
pub(crate) external_ip: Option<IpAddr>,
pub(crate) upnp_lease_duration: u32,
pub(crate) retry_duration_msec: u64,
pub(crate) upnp_lease_duration: Duration,
pub(crate) min_retry_duration: Duration,
}

/// To be used to read and write our certificate and private key to disk esp. as a part of our
Expand Down
5 changes: 3 additions & 2 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ impl<I: ConnId> Endpoint<I> {
/// Connects to another peer, retries for `config.retry_duration_msec` if the connection fails.
///
/// **Note:** this method is intended for use when it's necessary to connect to a specific peer.
/// See [`connect_to_any`] if you just need a connection with any of a set of peers.
/// See [`connect_to_any`](Self::connect_to_any) if you just need a connection with any of a set
/// of peers.
///
/// Returns `Connection` which is a handle for sending messages to the peer and
/// `IncomingMessages` which is a stream of messages received from the peer.
Expand Down Expand Up @@ -634,7 +635,7 @@ impl<I: ConnId> Endpoint<I> {
Fut: futures::Future<Output = Result<R, backoff::Error<E>>>,
{
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_millis(self.qp2p_config.retry_duration_msec)),
max_elapsed_time: Some(self.qp2p_config.min_retry_duration),
..Default::default()
};
retry(backoff, op)
Expand Down
12 changes: 8 additions & 4 deletions src/igd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ use tracing::{debug, info, warn};
/// Automatically forwards a port and setups a tokio task to renew it periodically.
pub(crate) async fn forward_port(
local_addr: SocketAddr,
lease_duration: u32,
lease_interval: Duration,
mut termination_rx: Receiver<()>,
) -> Result<u16> {
let igd_res = add_port(local_addr, lease_duration).await;
// Cap `lease_interval` at `u32::MAX` seconds due to limits on the IGD API. Since this is an
// outrageous length of time (~136 years) we just do so silently.
let lease_interval = lease_interval.min(Duration::from_secs(u32::MAX.into()));
let lease_interval_u32 = lease_interval.as_secs() as u32;

let igd_res = add_port(local_addr, lease_interval_u32).await;

if let Ok(ext_port) = &igd_res {
// Start a tokio task to renew the lease periodically.
let lease_interval = Duration::from_secs(lease_duration.into());
let ext_port = *ext_port;
let _ = tokio::spawn(async move {
let mut timer = time::interval_at(Instant::now() + lease_interval, lease_interval);
Expand All @@ -37,7 +41,7 @@ pub(crate) async fn forward_port(
}
debug!("Renewing IGD lease for port {}", local_addr);

let renew_res = renew_port(local_addr, ext_port, lease_duration).await;
let renew_res = renew_port(local_addr, ext_port, lease_interval_u32).await;
match renew_res {
Ok(()) => {}
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
)]

mod api;
mod config;
pub mod config;
mod connection_deduplicator;
mod connection_pool;
mod connections;
Expand Down
Loading

0 comments on commit c15cef0

Please sign in to comment.