Skip to content

Commit

Permalink
Unsubscribe from pre fork topics
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Apr 6, 2021
1 parent 241ddac commit 16cf433
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 21 deletions.
10 changes: 10 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2655,6 +2655,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.spec.enr_fork_id(slot, self.genesis_validators_root)
}

/// Gets the current `ForkDigest`.
pub fn fork_digest(&self) -> [u8; 4] {
// If we are unable to read the slot clock we assume that it is prior to genesis and
// therefore use the genesis slot.
// TODO: check
let slot = self.slot().unwrap_or(self.spec.genesis_slot);

self.spec.fork_digest(slot, self.genesis_validators_root)
}

/// Calculates the `Duration` to the next fork, if one exists.
pub fn duration_to_next_fork(&self) -> Option<Duration> {
let epoch = self.spec.next_fork_epoch()?;
Expand Down
33 changes: 13 additions & 20 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,19 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.unsubscribe(topic)
}

/// Unsubscribe from all topics that doesn't have the given fork_digest
pub fn unsubscribe_from_fork_topics(&mut self, except: [u8; 4]) {
for topic in self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.filter(|topic| topic.fork_id != except)
{
self.unsubscribe(topic);
}
}

/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: GossipTopic) -> bool {
// update the network globals
Expand Down Expand Up @@ -549,26 +562,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.discovery_mut()
.update_eth2_enr(enr_fork_id.clone());

// unsubscribe from all gossip topics and re-subscribe to their new fork counterparts
let subscribed_topics = self
.network_globals
.gossipsub_subscriptions
.read()
.iter()
.cloned()
.collect::<Vec<GossipTopic>>();

// unsubscribe from all topics
for topic in &subscribed_topics {
self.unsubscribe(topic.clone());
}

// re-subscribe modifying the fork version
for mut topic in subscribed_topics {
*topic.digest() = enr_fork_id.fork_digest;
self.subscribe(topic);
}

// update the local reference
self.enr_fork_id = enr_fork_id;
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct GossipTopic {
/// The encoding of the topic.
encoding: GossipEncoding,
/// The fork digest of the topic,
fork_digest: [u8; 4],
pub fork_digest: [u8; 4],
/// The kind of topic.
kind: GossipKind,
}
Expand Down
27 changes: 27 additions & 0 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use eth2_libp2p::{types::{GossipKind, GossipEncoding, GossipTopic}, BehaviourEve
use eth2_libp2p::{MessageAcceptance, Service as LibP2PService};
use futures::prelude::*;
use slog::{debug, error, info, o, trace, warn};
use slot_clock::SlotClock;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use store::HotColdDB;
use tokio::sync::mpsc;
Expand All @@ -24,6 +25,8 @@ mod tests;

/// The interval (in seconds) that various network metrics will update.
const METRIC_UPDATE_INTERVAL: u64 = 1;
/// Delay (in epochs) after a fork where we unsubscribe from pre-fork topics.
const UNSUBSCRIBE_DELAY: u64 = 2;

/// Types of messages that the network service can receive.
#[derive(Debug)]
Expand Down Expand Up @@ -113,6 +116,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
discovery_auto_update: bool,
/// A delay that expires when a new fork takes place.
next_fork_update: Option<Sleep>,
/// A delay that expires when we need to unsubscribe from old fork topics
next_unsubscribe: Option<Sleep>,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// A timer for updating various network metrics.
Expand Down Expand Up @@ -158,6 +163,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {

// keep track of when our fork_id needs to be updated
let next_fork_update = next_fork_delay(&beacon_chain);
let next_unsubsribe = topic_unsubscribe_delay(&beacon_chain);

// launch libp2p service
let (network_globals, mut libp2p) = LibP2PService::new(
Expand Down Expand Up @@ -216,6 +222,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
upnp_mappings: (None, None),
discovery_auto_update: config.discv5_config.enr_update,
next_fork_update,
next_unsubscribe,
subscribe_all_subnets: config.subscribe_all_subnets,
metrics_update,
gossipsub_parameter_update,
Expand Down Expand Up @@ -578,6 +585,15 @@ fn spawn_service<T: BeaconChainTypes>(
}
}

// TODO: try sticking this logic in the next_fork_update delay
if let Some(delay) = &service.next_unsubscribe() {
if delay.is_elapsed() {
let current_fork_digest = service.beacon_chain.fork_digest();
self.libp2p.swarm.unsubscribe_from_fork_topics(current_fork_digest);
}
service.next_fork_update = topic_unsubscribe_delay(&service.beacon_chain);
}

metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone());
}
}, "network");
Expand All @@ -595,6 +611,17 @@ fn next_fork_delay<T: BeaconChainTypes>(
})
}

/// Returns a `Sleep` that triggers `UNSUBSCRIBE_DELAY` epochs after change in the beacon chain fork version.
/// If there is no scheduled fork, `None` is returned.
fn topic_unsubscribe_delay<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) -> Option<tokio::time::Sleep> {
beacon_chain.duration_to_next_fork().map(|until_fork| {
let epoch_duration = beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
let delay = Duration::from_secs( UNSUBSCRIBE_DELAY * epoch_duration) ;
tokio::time::sleep_until(tokio::time::Instant::now() + until_fork + delay)
})
}


impl<T: BeaconChainTypes> Drop for NetworkService<T> {
fn drop(&mut self) {
// network thread is terminating
Expand Down

0 comments on commit 16cf433

Please sign in to comment.