Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ rand = "0.7.3"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "5139ec3ace4ad52506f217d790f0a9425274caef"
rev = "3096cb6b89b2883a79ce5ffcb03d41778a09b695"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {
}

/// Gives access to identify's handler.
pub fn identify(&self) -> &IdentifyHandler {
pub fn _identify(&self) -> &IdentifyHandler {
&self.identify_handler
}
}
Expand Down
11 changes: 4 additions & 7 deletions beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
fn inject_event(&mut self, event: Self::InEvent) {
match event {
BehaviourHandlerIn::Delegate(delegated_ev) => self.delegate.inject_event(delegated_ev),
/* Events comming from the behaviour */
/* Events coming from the behaviour */
BehaviourHandlerIn::Shutdown(last_message) => {
self.shutting_down = true;
self.delegate.rpc_mut().shutdown(last_message);
Expand Down Expand Up @@ -113,12 +113,9 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
>,
> {
// Disconnect if the sub-handlers are ready.
if self.shutting_down {
let rpc_keep_alive = self.delegate.rpc().connection_keep_alive();
let identify_keep_alive = self.delegate.identify().connection_keep_alive();
if KeepAlive::No == rpc_keep_alive.max(identify_keep_alive) {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected));
}
// Currently we only respect the RPC handler.
if self.shutting_down && KeepAlive::No == self.delegate.rpc().connection_keep_alive() {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected));
}

match self.delegate.poll(cx) {
Expand Down
35 changes: 18 additions & 17 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,25 +694,35 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// If the peer manager (and therefore the behaviour's) believe this peer connected, inform
// about the disconnection.
if self.network_globals.peers.read().is_connected(&peer_id) {
return;
}
delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint);
}

// This gets called once there are no more active connections.
fn inject_disconnected(&mut self, peer_id: &PeerId) {
// If the application/behaviour layers thinks this peer has connected inform it of the disconnect.
if self.network_globals.peers.read().is_connected(&peer_id) {
// Inform the application.
self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone()));
// Inform the behaviour.
delegate_to_behaviours!(self, inject_disconnected, peer_id);
}
// Inform the peer manager.
// NOTE: It may be the case that a rejected node, due to too many peers is disconnected
// here and the peer manager has no knowledge of its connection. We insert it here for
// reference so that peer manager can track this peer.
self.peer_manager.notify_disconnect(&peer_id);
// Inform the application.
self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone()));

// Update the prometheus metrics
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(
&metrics::PEERS_CONNECTED,
self.network_globals.connected_peers() as i64,
);

// Inform the behaviour.
delegate_to_behaviours!(self, inject_disconnected, peer_id);
}

// This gets called every time a connection is established.
Expand Down Expand Up @@ -741,6 +751,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
};

if goodbye_reason.is_some() {
debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.as_ref().expect("Is some").to_string());
self.peers_to_dc
.push_back((peer_id.clone(), goodbye_reason));
return;
Expand Down Expand Up @@ -771,18 +782,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {

// This gets called on the initial connection establishment.
fn inject_connected(&mut self, peer_id: &PeerId) {
// Drop any connection from a banned peer. The goodbye and disconnects are handled in
// `inject_connection_established()`, which gets called first.
// The same holds if we reached the peer limit and the connected peer has no future duty.
if self.peer_manager.is_banned(peer_id)
|| (self.peer_manager.peer_limit_reached()
&& self
.network_globals
.peers
.read()
.peer_info(peer_id)
.map_or(true, |i| !i.has_future_duty()))
{
// If the PeerManager has connected this peer, inform the behaviours
if !self.network_globals.peers.read().is_connected(&peer_id) {
return;
}

Expand Down
23 changes: 14 additions & 9 deletions beacon_node/eth2_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,20 @@ where
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too.
let should_shutdown = if let HandlerState::ShuttingDown(_) = self.state {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
} else {
false
let should_shutdown = match self.state {
HandlerState::ShuttingDown(_) => {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
}
HandlerState::Deactivated => {
// Regardless of events, the timeout has expired. Force the disconnect.
true
}
_ => false,
};

match self.keep_alive {
Expand Down