Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always accept incoming connections from trusted peers #7140

Merged
merged 6 commits into from Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 18 additions & 23 deletions crates/net/network/src/peers/manager.rs
Expand Up @@ -213,19 +213,14 @@ impl PeersManager {

/// Invoked when a new _incoming_ tcp connection is accepted.
///
/// returns an error if the inbound ip address is on the ban list or
/// we have reached our limit for max inbound connections
/// returns an error if the inbound ip address is on the ban list
pub(crate) fn on_incoming_pending_session(
&mut self,
addr: IpAddr,
) -> Result<(), InboundConnectionError> {
if self.ban_list.is_banned_ip(&addr) {
return Err(InboundConnectionError::IpBanned)
}
if !self.connection_info.has_in_capacity() {
return Err(InboundConnectionError::ExceedsLimit(self.connection_info.max_inbound))
}
// keep track of new connection
self.connection_info.inc_in();
Ok(())
}
Expand Down Expand Up @@ -284,6 +279,14 @@ impl PeersManager {
return
}
value.state = PeerConnectionState::In;
// if a peer is not trusted and we don't have capacity for more inbound connections,
// disconnecting the peer
if !value.is_trusted() && !self.connection_info.has_in_capacity() {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
Comment on lines +282 to +287
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we have actually have a bug here where we allow multiple incoming sessions from the same peerid.

could you please add a test case for this scenario, because I think we should check the peer's state here

similar setup as https://github.com/paradigmxyz/reth/blob/main/crates/net/network/src/peers/manager.rs#L1913-L1913

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the test I can imagine:

  1. setup a peer with max_in_bound = 1.
  2. receive a incoming session from the another peer X and established the session.
  3. receive another incoming session from peer X again and should be rejected by AlreadyConnected, the connected peers should not be connected and removed.

The point is that a connected peer shouldn't be disconnected by another incoming session. Am I in the right direction?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receive another incoming session from peer X again and should be rejected by AlreadyConnected

exactly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there seems be a bug that num_inbound should have be decreased when on_already_connected, and I add a unit test and an integration test, PTAL.
Also, I think num_inbound should only be increased when session established which may simplify the code to maintain it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think num_inbound should only be increased when session established which may simplify the code to maintain it.

I believe this makes sense, I'd like to introduce a new state PendingIn, similar to PendingOut

we can do this separately

});
}
}
Entry::Vacant(entry) => {
// peer is missing in the table, we add it but mark it as to be removed after
Expand All @@ -292,6 +295,14 @@ impl PeersManager {
peer.remove_after_disconnect = true;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));

// disconnect the peer if we don't have capacity for more inbound connections
if !self.connection_info.has_in_capacity() {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
});
}
}
}
}
Expand Down Expand Up @@ -875,7 +886,7 @@ impl ConnectionInfo {

/// Returns `true` if there's still capacity for a new incoming connection.
fn has_in_capacity(&self) -> bool {
self.num_inbound < self.max_inbound
self.num_inbound <= self.max_inbound
}

fn decr_state(&mut self, state: PeerConnectionState) {
Expand Down Expand Up @@ -1241,18 +1252,6 @@ impl PeersConfig {
self
}

/// Maximum occupied slots for outbound connections.
pub fn with_max_pending_outbound(mut self, num_outbound: usize) -> Self {
self.connection_info.num_outbound = num_outbound;
self
}

/// Maximum occupied slots for inbound connections.
pub fn with_max_pending_inbound(mut self, num_inbound: usize) -> Self {
self.connection_info.num_inbound = num_inbound;
self
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/// Maximum allowed outbound connections.
pub fn with_max_outbound(mut self, max_outbound: usize) -> Self {
self.connection_info.max_outbound = max_outbound;
Expand Down Expand Up @@ -1420,7 +1419,6 @@ impl Default for PeerBackoffDurations {

#[derive(Debug, Error)]
pub enum InboundConnectionError {
ExceedsLimit(usize),
IpBanned,
}

Expand Down Expand Up @@ -2166,9 +2164,6 @@ mod tests {
Ok(_) => panic!(),
Err(err) => match err {
super::InboundConnectionError::IpBanned {} => {}
super::InboundConnectionError::ExceedsLimit { .. } => {
panic!()
}
},
}
}
Expand Down
39 changes: 0 additions & 39 deletions crates/net/network/src/session/mod.rs
Expand Up @@ -53,9 +53,6 @@ pub use handle::{
use reth_eth_wire::multiplex::RlpxProtocolMultiplexer;
pub use reth_network_api::{Direction, PeerInfo};

/// Maximum allowed graceful disconnects at a time.
const MAX_GRACEFUL_DISCONNECTS: usize = 15;

/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
Expand Down Expand Up @@ -113,8 +110,6 @@ pub struct SessionManager {
bandwidth_meter: BandwidthMeter,
/// Metrics for the session manager.
metrics: SessionManagerMetrics,
/// Tracks the number of active graceful disconnects for incoming connections.
graceful_disconnects_counter: GracefulDisconnects,
}

// === impl SessionManager ===
Expand Down Expand Up @@ -156,7 +151,6 @@ impl SessionManager {
bandwidth_meter,
extra_protocols,
metrics: Default::default(),
graceful_disconnects_counter: Default::default(),
}
}

Expand Down Expand Up @@ -310,27 +304,6 @@ impl SessionManager {
}
}

/// Sends a disconnect message to the peer with the given [DisconnectReason].
pub(crate) fn disconnect_incoming_connection(
&mut self,
stream: TcpStream,
reason: DisconnectReason,
) {
let counter = self.graceful_disconnects_counter.clone();
if counter.exceeds_limit() {
// simply drop the connection if there are too many active disconnects already
return
}
let secret_key = self.secret_key;

self.spawn(async move {
if let Ok(stream) = get_eciess_stream(stream, secret_key, Direction::Incoming).await {
let _ = UnauthedP2PStream::new(stream).send_disconnect(reason).await;
}
drop(counter)
});
}

/// Initiates a shutdown of all sessions.
///
/// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
Expand Down Expand Up @@ -635,18 +608,6 @@ impl SessionManager {
}
}

/// Keep track of graceful disconnects for incoming connections.
#[derive(Debug, Clone, Default)]
struct GracefulDisconnects(Arc<()>);

impl GracefulDisconnects {
/// Returns true if the number of graceful disconnects exceeds the limit
/// [MAX_GRACEFUL_DISCONNECTS]
fn exceeds_limit(&self) -> bool {
Arc::strong_count(&self.0) > MAX_GRACEFUL_DISCONNECTS
}
}

/// Events produced by the [`SessionManager`]
#[derive(Debug)]
pub enum SessionEvent {
Expand Down
11 changes: 2 additions & 9 deletions crates/net/network/src/swarm.rs
Expand Up @@ -10,7 +10,7 @@ use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
DisconnectReason, EthVersion, Status,
EthVersion, Status,
};
use reth_primitives::PeerId;
use reth_provider::{BlockNumReader, BlockReader};
Expand All @@ -29,7 +29,7 @@ use tracing::trace;
///
/// A swarm emits [`SwarmEvent`]s when polled.
///
/// The manages the [`ConnectionListener`] and delegates new incoming connections to the
/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
/// [`NetworkState`] and also delegated to the [`NetworkState`].
///
Expand Down Expand Up @@ -203,13 +203,6 @@ where
InboundConnectionError::IpBanned => {
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
}
InboundConnectionError::ExceedsLimit(limit) => {
trace!(target: "net", %limit, ?remote_addr, "Exceeded incoming connection limit; disconnecting");
self.sessions.disconnect_incoming_connection(
stream,
DisconnectReason::TooManyPeers,
);
}
}
return None
}
Expand Down
77 changes: 77 additions & 0 deletions crates/net/network/tests/it/connect.rs
Expand Up @@ -585,6 +585,83 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
net_handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_always_accept_incoming_connections_from_trusted_peers() {
reth_tracing::init_test_tracing();
let other_peer1 = new_random_peer(10, HashSet::new()).await;
let other_peer2 = new_random_peer(10, HashSet::new()).await;
let other_peer3 = new_random_peer(0, HashSet::new()).await;

// setup the peer with max_inbound = 1, and add other_peer_3 as trust nodes
let peer = new_random_peer(
1,
HashSet::from([NodeRecord::new(other_peer3.local_addr(), *other_peer3.peer_id())]),
)
.await;

let handle = peer.handle().clone();
let other_peer_handle1 = other_peer1.handle().clone();
let other_peer_handle2 = other_peer2.handle().clone();
let other_peer_handle3 = other_peer3.handle().clone();

tokio::task::spawn(peer);
tokio::task::spawn(other_peer1);
tokio::task::spawn(other_peer2);
tokio::task::spawn(other_peer3);

let mut events = NetworkEventStream::new(handle.event_listener());
let mut events2 = NetworkEventStream::new(other_peer_handle2.event_listener());

// though we added other_peer3 as a trust node, the incoming connection should fail because
// peer3 doesn't allow inbound connections
let (peer_id, reason) = events.next_session_closed().await.unwrap();
assert_eq!(peer_id, *other_peer_handle3.peer_id());
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));

// incoming connection should succeed
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle1.peer_id());
assert_eq!(handle.num_connected_peers(), 1);

// incoming connection should fail because exceeding max_inbound
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
let (peer_id, reason) = events.next_session_closed().await.unwrap();
assert_eq!(peer_id, *other_peer_handle2.peer_id());
// fixme: this should be `Some(DisconnectReason::TooManyPeers)` but `None`
assert_eq!(reason, None);

let (peer_id, reason) = events2.next_session_closed().await.unwrap();
assert_eq!(peer_id, *handle.peer_id());
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));

// outbound connection from `other_peer3` should succeed
other_peer_handle3.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle3.peer_id());

// sleep is needed because the disconnect event happened after session_established event
tokio::time::sleep(Duration::from_secs(3)).await;
assert_eq!(handle.num_connected_peers(), 2);
}

async fn new_random_peer(
max_in_bound: usize,
trusted_nodes: HashSet<NodeRecord>,
) -> NetworkManager<NoopProvider> {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let peers_config =
PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);

let config = NetworkConfigBuilder::new(secret_key)
.listener_port(0)
.disable_discovery()
.peer_config(peers_config)
.build(NoopProvider::default());

NetworkManager::new(config).await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_connect_many() {
reth_tracing::init_test_tracing();
Expand Down