Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always accept incoming connections from trusted peers #7140

Merged
merged 6 commits into from Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 45 additions & 13 deletions crates/net/network/src/peers/manager.rs
Expand Up @@ -213,19 +213,14 @@ impl PeersManager {

/// Invoked when a new _incoming_ tcp connection is accepted.
///
/// returns an error if the inbound ip address is on the ban list or
/// we have reached our limit for max inbound connections
/// returns an error if the inbound ip address is on the ban list
pub(crate) fn on_incoming_pending_session(
&mut self,
addr: IpAddr,
) -> Result<(), InboundConnectionError> {
if self.ban_list.is_banned_ip(&addr) {
return Err(InboundConnectionError::IpBanned)
}
if !self.connection_info.has_in_capacity() {
return Err(InboundConnectionError::ExceedsLimit(self.connection_info.max_inbound))
}
// keep track of new connection
self.connection_info.inc_in();
Ok(())
}
Expand Down Expand Up @@ -284,6 +279,14 @@ impl PeersManager {
return
}
value.state = PeerConnectionState::In;
// if a peer is not trusted and we don't have capacity for more inbound connections,
// disconnecting the peer
if !value.is_trusted() && !self.connection_info.has_in_capacity() {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
Comment on lines +282 to +287
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we have actually have a bug here where we allow multiple incoming sessions from the same peerid.

could you please add a test case for this scenario, because I think we should check the peer's state here

similar setup as https://github.com/paradigmxyz/reth/blob/main/crates/net/network/src/peers/manager.rs#L1913-L1913

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the test I can imagine:

  1. setup a peer with max_in_bound = 1.
  2. receive a incoming session from the another peer X and established the session.
  3. receive another incoming session from peer X again and should be rejected by AlreadyConnected, the connected peers should not be connected and removed.

The point is that a connected peer shouldn't be disconnected by another incoming session. Am I in the right direction?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receive another incoming session from peer X again and should be rejected by AlreadyConnected

exactly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there seems be a bug that num_inbound should have be decreased when on_already_connected, and I add a unit test and an integration test, PTAL.
Also, I think num_inbound should only be increased when session established which may simplify the code to maintain it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think num_inbound should only be increased when session established which may simplify the code to maintain it.

I believe this makes sense, I'd like to introduce a new state PendingIn, similar to PendingOut

we can do this separately

});
}
}
Entry::Vacant(entry) => {
// peer is missing in the table, we add it but mark it as to be removed after
Expand All @@ -292,6 +295,14 @@ impl PeersManager {
peer.remove_after_disconnect = true;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));

// disconnect the peer if we don't have capacity for more inbound connections
if !self.connection_info.has_in_capacity() {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
});
}
}
}
}
Expand Down Expand Up @@ -573,7 +584,10 @@ impl PeersManager {
/// to us at the same time and this connection is already established.
pub(crate) fn on_already_connected(&mut self, direction: Direction) {
match direction {
Direction::Incoming => {}
Direction::Incoming => {
// need to decrement the ingoing counter
self.connection_info.decr_in();
}
Direction::Outgoing(_) => {
// need to decrement the outgoing counter
self.connection_info.decr_out();
Expand Down Expand Up @@ -875,7 +889,7 @@ impl ConnectionInfo {

/// Returns `true` if there's still capacity for a new incoming connection.
fn has_in_capacity(&self) -> bool {
self.num_inbound < self.max_inbound
self.num_inbound <= self.max_inbound
}

fn decr_state(&mut self, state: PeerConnectionState) {
Expand Down Expand Up @@ -1420,7 +1434,6 @@ impl Default for PeerBackoffDurations {

#[derive(Debug, Error)]
pub enum InboundConnectionError {
ExceedsLimit(usize),
IpBanned,
}

Expand Down Expand Up @@ -1449,7 +1462,7 @@ mod tests {
DisconnectReason,
};
use reth_net_common::ban_list::BanList;
use reth_network_api::ReputationChangeKind;
use reth_network_api::{Direction, ReputationChangeKind};
use reth_primitives::{PeerId, B512};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -1989,6 +2002,28 @@ mod tests {
}
}

#[tokio::test]
async fn test_already_connected() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(peer, socket_addr);

// peer should have been added and num_inbound should have been increased
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr);
assert_eq!(peers.connection_info.num_inbound, 1);

assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_already_connected(Direction::Incoming);

// peer should not be connected and num_inbound should not have been increased
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr);
assert_eq!(peers.connection_info.num_inbound, 1);
}

#[tokio::test]
async fn test_reputation_change_trusted_peer() {
let peer = PeerId::random();
Expand Down Expand Up @@ -2166,9 +2201,6 @@ mod tests {
Ok(_) => panic!(),
Err(err) => match err {
super::InboundConnectionError::IpBanned {} => {}
super::InboundConnectionError::ExceedsLimit { .. } => {
panic!()
}
},
}
}
Expand Down
39 changes: 0 additions & 39 deletions crates/net/network/src/session/mod.rs
Expand Up @@ -53,9 +53,6 @@ pub use handle::{
use reth_eth_wire::multiplex::RlpxProtocolMultiplexer;
pub use reth_network_api::{Direction, PeerInfo};

/// Maximum allowed graceful disconnects at a time.
const MAX_GRACEFUL_DISCONNECTS: usize = 15;

/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
Expand Down Expand Up @@ -113,8 +110,6 @@ pub struct SessionManager {
bandwidth_meter: BandwidthMeter,
/// Metrics for the session manager.
metrics: SessionManagerMetrics,
/// Tracks the number of active graceful disconnects for incoming connections.
graceful_disconnects_counter: GracefulDisconnects,
}

// === impl SessionManager ===
Expand Down Expand Up @@ -156,7 +151,6 @@ impl SessionManager {
bandwidth_meter,
extra_protocols,
metrics: Default::default(),
graceful_disconnects_counter: Default::default(),
}
}

Expand Down Expand Up @@ -310,27 +304,6 @@ impl SessionManager {
}
}

/// Sends a disconnect message to the peer with the given [DisconnectReason].
pub(crate) fn disconnect_incoming_connection(
&mut self,
stream: TcpStream,
reason: DisconnectReason,
) {
let counter = self.graceful_disconnects_counter.clone();
if counter.exceeds_limit() {
// simply drop the connection if there are too many active disconnects already
return
}
let secret_key = self.secret_key;

self.spawn(async move {
if let Ok(stream) = get_eciess_stream(stream, secret_key, Direction::Incoming).await {
let _ = UnauthedP2PStream::new(stream).send_disconnect(reason).await;
}
drop(counter)
});
}

/// Initiates a shutdown of all sessions.
///
/// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
Expand Down Expand Up @@ -635,18 +608,6 @@ impl SessionManager {
}
}

/// Keep track of graceful disconnects for incoming connections.
#[derive(Debug, Clone, Default)]
struct GracefulDisconnects(Arc<()>);

impl GracefulDisconnects {
/// Returns true if the number of graceful disconnects exceeds the limit
/// [MAX_GRACEFUL_DISCONNECTS]
fn exceeds_limit(&self) -> bool {
Arc::strong_count(&self.0) > MAX_GRACEFUL_DISCONNECTS
}
}

/// Events produced by the [`SessionManager`]
#[derive(Debug)]
pub enum SessionEvent {
Expand Down
11 changes: 2 additions & 9 deletions crates/net/network/src/swarm.rs
Expand Up @@ -10,7 +10,7 @@ use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
errors::EthStreamError,
DisconnectReason, EthVersion, Status,
EthVersion, Status,
};
use reth_primitives::PeerId;
use reth_provider::{BlockNumReader, BlockReader};
Expand All @@ -29,7 +29,7 @@ use tracing::trace;
///
/// A swarm emits [`SwarmEvent`]s when polled.
///
/// The manages the [`ConnectionListener`] and delegates new incoming connections to the
/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
/// [`NetworkState`] and also delegated to the [`NetworkState`].
///
Expand Down Expand Up @@ -203,13 +203,6 @@ where
InboundConnectionError::IpBanned => {
trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
}
InboundConnectionError::ExceedsLimit(limit) => {
trace!(target: "net", %limit, ?remote_addr, "Exceeded incoming connection limit; disconnecting");
self.sessions.disconnect_incoming_connection(
stream,
DisconnectReason::TooManyPeers,
);
}
}
return None
}
Expand Down
117 changes: 117 additions & 0 deletions crates/net/network/tests/it/connect.rs
Expand Up @@ -585,6 +585,123 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() {
net_handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_always_accept_incoming_connections_from_trusted_peers() {
reth_tracing::init_test_tracing();
let other_peer1 = new_random_peer(10, HashSet::new()).await;
let other_peer2 = new_random_peer(10, HashSet::new()).await;
let other_peer3 = new_random_peer(0, HashSet::new()).await;

// setup the peer with max_inbound = 1, and add other_peer_3 as trust nodes
let peer = new_random_peer(
1,
HashSet::from([NodeRecord::new(other_peer3.local_addr(), *other_peer3.peer_id())]),
)
.await;

let handle = peer.handle().clone();
let other_peer_handle1 = other_peer1.handle().clone();
let other_peer_handle2 = other_peer2.handle().clone();
let other_peer_handle3 = other_peer3.handle().clone();

tokio::task::spawn(peer);
tokio::task::spawn(other_peer1);
tokio::task::spawn(other_peer2);
tokio::task::spawn(other_peer3);

let mut events = NetworkEventStream::new(handle.event_listener());
let mut events2 = NetworkEventStream::new(other_peer_handle2.event_listener());

// though we added other_peer3 as a trust node, the incoming connection should fail because
// peer3 doesn't allow inbound connections
let (peer_id, reason) = events.next_session_closed().await.unwrap();
assert_eq!(peer_id, *other_peer_handle3.peer_id());
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));

// incoming connection should succeed
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle1.peer_id());
assert_eq!(handle.num_connected_peers(), 1);

// incoming connection should fail because exceeding max_inbound
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
let (peer_id, reason) = events.next_session_closed().await.unwrap();
assert_eq!(peer_id, *other_peer_handle2.peer_id());
// fixme: this should be `Some(DisconnectReason::TooManyPeers)` but `None`
assert_eq!(reason, None);

let (peer_id, reason) = events2.next_session_closed().await.unwrap();
assert_eq!(peer_id, *handle.peer_id());
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));

// outbound connection from `other_peer3` should succeed
other_peer_handle3.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle3.peer_id());

// sleep is needed because the disconnect event happened after session_established event
tokio::time::sleep(Duration::from_secs(3)).await;
assert_eq!(handle.num_connected_peers(), 2);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_rejected_by_already_connect() {
reth_tracing::init_test_tracing();
let other_peer1 = new_random_peer(10, HashSet::new()).await;
let other_peer2 = new_random_peer(10, HashSet::new()).await;

// setup the peer with max_inbound = 2
let peer = new_random_peer(2, HashSet::new()).await;

let handle = peer.handle().clone();
let other_peer_handle1 = other_peer1.handle().clone();
let other_peer_handle2 = other_peer2.handle().clone();

tokio::task::spawn(peer);
tokio::task::spawn(other_peer1);
tokio::task::spawn(other_peer2);

let mut events = NetworkEventStream::new(handle.event_listener());

// incoming connection should succeed
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle1.peer_id());
assert_eq!(handle.num_connected_peers(), 1);

// incoming connection from the same peer should be rejected by already connected
// and num_inbount should still be 1
other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr());
tokio::time::sleep(Duration::from_secs(1)).await;

// incoming connection from other_peer2 should succeed
other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr());
let peer_id = events.next_session_established().await.unwrap();
assert_eq!(peer_id, *other_peer_handle2.peer_id());

// wait 2 seconds and check that other_peer2 is not rejected by TooManyPeers
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(handle.num_connected_peers(), 2);
}

async fn new_random_peer(
max_in_bound: usize,
trusted_nodes: HashSet<NodeRecord>,
) -> NetworkManager<NoopProvider> {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let peers_config =
PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes);

let config = NetworkConfigBuilder::new(secret_key)
.listener_port(0)
.disable_discovery()
.peer_config(peers_config)
.build(NoopProvider::default());

NetworkManager::new(config).await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_connect_many() {
reth_tracing::init_test_tracing();
Expand Down