diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 55aa1db6a32..1a3dea089bd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2655,6 +2655,16 @@ impl BeaconChain { self.spec.enr_fork_id(slot, self.genesis_validators_root) } + /// Gets the current `ForkDigest`. + pub fn fork_digest(&self) -> [u8; 4] { + // If we are unable to read the slot clock we assume that it is prior to genesis and + // therefore use the genesis slot. + // TODO: check + let slot = self.slot().unwrap_or(self.spec.genesis_slot); + + self.spec.fork_digest(slot, self.genesis_validators_root) + } + /// Calculates the `Duration` to the next fork, if one exists. pub fn duration_to_next_fork(&self) -> Option { let epoch = self.spec.next_fork_epoch()?; diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 5a84dba3f37..4912ff6a2b5 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -352,6 +352,19 @@ impl Behaviour { self.unsubscribe(topic) } + /// Unsubscribe from all topics that doesn't have the given fork_digest + pub fn unsubscribe_from_fork_topics(&mut self, except: [u8; 4]) { + for topic in self + .network_globals + .gossipsub_subscriptions + .read() + .iter() + .filter(|topic| topic.fork_id != except) + { + self.unsubscribe(topic); + } + } + /// Subscribes to a gossipsub topic. pub fn subscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals @@ -549,26 +562,6 @@ impl Behaviour { .discovery_mut() .update_eth2_enr(enr_fork_id.clone()); - // unsubscribe from all gossip topics and re-subscribe to their new fork counterparts - let subscribed_topics = self - .network_globals - .gossipsub_subscriptions - .read() - .iter() - .cloned() - .collect::>(); - - // unsubscribe from all topics - for topic in &subscribed_topics { - self.unsubscribe(topic.clone()); - } - - // re-subscribe modifying the fork version - for mut topic in subscribed_topics { - *topic.digest() = enr_fork_id.fork_digest; - self.subscribe(topic); - } - // update the local reference self.enr_fork_id = enr_fork_id; } diff --git a/beacon_node/eth2_libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs index 7ecc56e5c5f..4c0fe7d8c0f 100644 --- a/beacon_node/eth2_libp2p/src/types/topics.rs +++ b/beacon_node/eth2_libp2p/src/types/topics.rs @@ -33,7 +33,7 @@ pub struct GossipTopic { /// The encoding of the topic. encoding: GossipEncoding, /// The fork digest of the topic, - fork_digest: [u8; 4], + pub fork_digest: [u8; 4], /// The kind of topic. kind: GossipKind, } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0e826adabaf..6422f279b4e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,6 +14,7 @@ use eth2_libp2p::{types::{GossipKind, GossipEncoding, GossipTopic}, BehaviourEve use eth2_libp2p::{MessageAcceptance, Service as LibP2PService}; use futures::prelude::*; use slog::{debug, error, info, o, trace, warn}; +use slot_clock::SlotClock; use std::{net::SocketAddr, sync::Arc, time::Duration}; use store::HotColdDB; use tokio::sync::mpsc; @@ -24,6 +25,8 @@ mod tests; /// The interval (in seconds) that various network metrics will update. const METRIC_UPDATE_INTERVAL: u64 = 1; +/// Delay (in epochs) after a fork where we unsubscribe from pre-fork topics. +const UNSUBSCRIBE_DELAY: u64 = 2; /// Types of messages that the network service can receive. #[derive(Debug)] @@ -113,6 +116,8 @@ pub struct NetworkService { discovery_auto_update: bool, /// A delay that expires when a new fork takes place. next_fork_update: Option, + /// A delay that expires when we need to unsubscribe from old fork topics + next_unsubscribe: Option, /// Subscribe to all the subnets once synced. subscribe_all_subnets: bool, /// A timer for updating various network metrics. @@ -158,6 +163,7 @@ impl NetworkService { // keep track of when our fork_id needs to be updated let next_fork_update = next_fork_delay(&beacon_chain); + let next_unsubsribe = topic_unsubscribe_delay(&beacon_chain); // launch libp2p service let (network_globals, mut libp2p) = LibP2PService::new( @@ -216,6 +222,7 @@ impl NetworkService { upnp_mappings: (None, None), discovery_auto_update: config.discv5_config.enr_update, next_fork_update, + next_unsubscribe, subscribe_all_subnets: config.subscribe_all_subnets, metrics_update, gossipsub_parameter_update, @@ -578,6 +585,15 @@ fn spawn_service( } } + // TODO: try sticking this logic in the next_fork_update delay + if let Some(delay) = &service.next_unsubscribe() { + if delay.is_elapsed() { + let current_fork_digest = service.beacon_chain.fork_digest(); + self.libp2p.swarm.unsubscribe_from_fork_topics(current_fork_digest); + } + service.next_fork_update = topic_unsubscribe_delay(&service.beacon_chain); + } + metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); } }, "network"); @@ -595,6 +611,17 @@ fn next_fork_delay( }) } +/// Returns a `Sleep` that triggers `UNSUBSCRIBE_DELAY` epochs after change in the beacon chain fork version. +/// If there is no scheduled fork, `None` is returned. +fn topic_unsubscribe_delay(beacon_chain: &BeaconChain) -> Option { + beacon_chain.duration_to_next_fork().map(|until_fork| { + let epoch_duration = beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); + let delay = Duration::from_secs( UNSUBSCRIBE_DELAY * epoch_duration) ; + tokio::time::sleep_until(tokio::time::Instant::now() + until_fork + delay) + }) +} + + impl Drop for NetworkService { fn drop(&mut self) { // network thread is terminating