Skip to content

Commit

Permalink
Refine naming and config API.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman S. Borschel committed Nov 23, 2020
1 parent ee14e9c commit 4e447e9
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 62 deletions.
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# 0.25.0 [unreleased]

- The `NetworkConfig` API is now a builder that moves `self`.

- New configurable connection limits for established connections and
dedicated connection counters. Removed the connection limit dedicated
to outgoing pending connection _per peer_. Connection limits are now
Expand Down
87 changes: 57 additions & 30 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_incoming()?;
self.counters.check_max_pending_incoming()?;
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, None))
}
Expand Down Expand Up @@ -286,7 +286,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
self.counters.check_max_outgoing()?;
self.counters.check_max_pending_outgoing()?;
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
Expand Down Expand Up @@ -893,9 +893,9 @@ pub struct ConnectionCounters {
/// The effective connection limits.
limits: ConnectionLimits,
/// The current number of incoming connections.
incoming: u32,
pending_incoming: u32,
/// The current number of outgoing connections.
outgoing: u32,
pending_outgoing: u32,
/// The current number of established inbound connections.
established_incoming: u32,
/// The current number of established outbound connections.
Expand All @@ -906,8 +906,8 @@ impl ConnectionCounters {
fn new(limits: ConnectionLimits) -> Self {
Self {
limits,
incoming: 0,
outgoing: 0,
pending_incoming: 0,
pending_outgoing: 0,
established_incoming: 0,
established_outgoing: 0,
}
Expand All @@ -925,17 +925,17 @@ impl ConnectionCounters {

/// The total number of pending connections, both incoming and outgoing.
pub fn num_pending(&self) -> u32 {
self.incoming + self.outgoing
self.pending_incoming + self.pending_outgoing
}

/// The number of incoming connections being established.
pub fn num_incoming(&self) -> u32 {
self.incoming
pub fn num_pending_incoming(&self) -> u32 {
self.pending_incoming
}

/// The number of outgoing connections being established.
pub fn num_outgoing(&self) -> u32 {
self.outgoing
pub fn num_pending_outgoing(&self) -> u32 {
self.pending_outgoing
}

/// The number of established incoming connections.
Expand All @@ -955,15 +955,15 @@ impl ConnectionCounters {

fn inc_pending(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.outgoing += 1; }
ConnectedPoint::Listener { .. } => { self.incoming += 1; }
ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; }
ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; }
}
}

fn dec_pending(&mut self, endpoint: &ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => { self.outgoing -= 1; }
ConnectedPoint::Listener { .. } => { self.incoming -= 1; }
ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; }
ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; }
}
}

Expand All @@ -981,12 +981,12 @@ impl ConnectionCounters {
}
}

fn check_max_outgoing(&self) -> Result<(), ConnectionLimit> {
Self::check(self.outgoing, self.limits.max_outgoing)
fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
}

fn check_max_incoming(&self) -> Result<(), ConnectionLimit> {
Self::check(self.incoming, self.limits.max_incoming)
fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
Self::check(self.pending_incoming, self.limits.max_pending_incoming)
}

fn check_max_established(&self, endpoint: &ConnectedPoint)
Expand Down Expand Up @@ -1031,17 +1031,44 @@ fn num_peer_established(
/// By default no connection limits apply.
#[derive(Debug, Clone, Default)]
pub struct ConnectionLimits {
/// The maximum number of concurrently incoming connections being established.
pub max_incoming: Option<u32>,
/// The maximum number of concurrently outgoing connections being established.
pub max_outgoing: Option<u32>,
/// The maximum number of concurrent established inbound connections.
pub max_established_incoming: Option<u32>,
/// The maximum number of concurrent established outbound connections.
pub max_established_outgoing: Option<u32>,
/// The maximum number of concurrent established connections per peer,
/// regardless of direction.
pub max_established_per_peer: Option<u32>,
max_pending_incoming: Option<u32>,
max_pending_outgoing: Option<u32>,
max_established_incoming: Option<u32>,
max_established_outgoing: Option<u32>,
max_established_per_peer: Option<u32>,
}

impl ConnectionLimits {
/// Configures the maximum number of concurrently incoming connections being established.
pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
self.max_pending_incoming = limit;
self
}

/// Configures the maximum number of concurrently outgoing connections being established.
pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_pending_outgoing = limit;
self
}

/// Configures the maximum number of concurrent established inbound connections.
pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
self.max_established_incoming = limit;
self
}

/// Configures the maximum number of concurrent established outbound connections.
pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
self.max_established_outgoing = limit;
self
}

/// Configures the maximum number of concurrent established connections per peer,
/// regardless of direction (incoming or outgoing).
pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
self.max_established_per_peer = limit;
self
}
}

/// Information about a former established connection to a peer
Expand Down
26 changes: 15 additions & 11 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,15 +630,20 @@ pub struct NetworkConfig {
}

impl NetworkConfig {
/// Sets the executor to use for spawning connection background tasks.
pub fn set_executor(&mut self, e: Box<dyn Executor + Send>) -> &mut Self {
/// Configures the executor to use for spawning connection background tasks.
pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.manager_config.executor = Some(e);
self
}

/// Gets the currently configured executor for connection background tasks.
pub fn executor(&self) -> Option<&Box<dyn Executor + Send>> {
self.manager_config.executor.as_ref()
/// Configures the executor to use for spawning connection background tasks,
/// only if no executor has already been configured.
pub fn or_else_with_executor<F>(mut self, f: F) -> Self
where
F: FnOnce() -> Option<Box<dyn Executor + Send>>
{
self.manager_config.executor = self.manager_config.executor.or_else(f);
self
}

/// Sets the maximum number of events sent to a connection's background task
Expand All @@ -648,7 +653,7 @@ impl NetworkConfig {
/// When the buffer for a particular connection is full, `notify_handler` will no
/// longer be able to deliver events to the associated `ConnectionHandler`,
/// thus exerting back-pressure on the connection and peer API.
pub fn set_notify_handler_buffer_size(&mut self, n: NonZeroUsize) -> &mut Self {
pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.manager_config.task_command_buffer_size = n.get() - 1;
self
}
Expand All @@ -659,13 +664,13 @@ impl NetworkConfig {
/// When the buffer is full, the background tasks of all connections will stall.
/// In this way, the consumers of network events exert back-pressure on
/// the network connection I/O.
pub fn set_connection_event_buffer_size(&mut self, n: usize) -> &mut Self {
pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
self.manager_config.task_event_buffer_size = n;
self
}

/// Sets the connection limits to enforce.
pub fn set_connection_limits(&mut self, limits: ConnectionLimits) -> &mut Self {
pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.limits = limits;
self
}
Expand All @@ -684,10 +689,9 @@ mod tests {
#[test]
fn set_executor() {
NetworkConfig::default()
.set_executor(Box::new(Dummy))
.set_executor(Box::new(|f| {
.with_executor(Box::new(Dummy))
.with_executor(Box::new(|f| {
async_std::task::spawn(f);
}));
}

}
17 changes: 6 additions & 11 deletions core/tests/connection_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ use util::{TestHandler, test_network};
fn max_outgoing() {
let outgoing_limit = rand::thread_rng().gen_range(1, 10);

let mut limits = ConnectionLimits::default();
limits.max_outgoing = Some(outgoing_limit);
let mut cfg = NetworkConfig::default();
cfg.set_connection_limits(limits);
let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit));
let cfg = NetworkConfig::default().with_connection_limits(limits);
let mut network = test_network(cfg);

let target = PeerId::random();
Expand All @@ -58,7 +56,7 @@ fn max_outgoing() {

let info = network.info();
assert_eq!(info.num_peers(), 0);
assert_eq!(info.connection_counters().num_outgoing(), outgoing_limit);
assert_eq!(info.connection_counters().num_pending_outgoing(), outgoing_limit);

// Abort all dialing attempts.
let mut peer = network.peer(target.clone())
Expand All @@ -70,19 +68,16 @@ fn max_outgoing() {
attempt.abort();
}

assert_eq!(network.info().connection_counters().num_outgoing(), 0);
assert_eq!(network.info().connection_counters().num_pending_outgoing(), 0);
}

#[test]
fn max_established_incoming() {
let limit = rand::thread_rng().gen_range(1, 10);

fn config(limit: u32) -> NetworkConfig {
let mut limits = ConnectionLimits::default();
limits.max_established_incoming = Some(limit);
let mut cfg = NetworkConfig::default();
cfg.set_connection_limits(limits);
cfg
let limits = ConnectionLimits::default().with_max_established_incoming(Some(limit));
NetworkConfig::default().with_connection_limits(limits)
}

let mut network1 = test_network(config(limit));
Expand Down
21 changes: 11 additions & 10 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ where TBehaviour: NetworkBehaviour,
/// By default, unless another executor has been configured,
/// [`SwarmBuilder::build`] will try to set up a `ThreadPool`.
pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.network_config.set_executor(e);
self.network_config = self.network_config.with_executor(e);
self
}

Expand All @@ -1002,7 +1002,7 @@ where TBehaviour: NetworkBehaviour,
/// be sleeping more often than necessary. Increasing this value increases
/// the overall memory usage.
pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.network_config.set_notify_handler_buffer_size(n);
self.network_config = self.network_config.with_notify_handler_buffer_size(n);
self
}

Expand Down Expand Up @@ -1030,13 +1030,13 @@ where TBehaviour: NetworkBehaviour,
/// event is emitted and the moment when it is received by the
/// [`NetworkBehaviour`].
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
self.network_config.set_connection_event_buffer_size(n);
self.network_config = self.network_config.with_connection_event_buffer_size(n);
self
}

/// Configures the connection limits.
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.network_config.set_connection_limits(limits);
self.network_config = self.network_config.with_connection_limits(limits);
self
}

Expand All @@ -1050,20 +1050,21 @@ where TBehaviour: NetworkBehaviour,
.map(|info| info.protocol_name().to_vec())
.collect();

let mut network_cfg = self.network_config;

// If no executor has been explicitly configured, try to set up a thread pool.
if network_cfg.executor().is_none() {
let network_cfg = self.network_config.or_else_with_executor(|| {
match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
{
Ok(tp) => {
network_cfg.set_executor(Box::new(move |f| tp.spawn_ok(f)));
Some(Box::new(move |f| tp.spawn_ok(f)))
},
Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err)
Err(err) => {
log::warn!("Failed to create executor thread pool: {:?}", err);
None
}
}
}
});

let network = Network::new(self.transport, self.local_peer_id, network_cfg);

Expand Down

0 comments on commit 4e447e9

Please sign in to comment.