Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::rpc::*;
use crate::service::behaviour::BehaviourEvent;
pub use crate::service::behaviour::Gossipsub;
use crate::types::{
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet,
SubnetDiscovery,
fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic,
SnappyTransform, Subnet, SubnetDiscovery,
};
use crate::EnrExt;
use crate::Eth2Enr;
Expand All @@ -41,6 +41,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use types::ForkName;
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
};
Expand Down Expand Up @@ -559,13 +560,20 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
self.unsubscribe(gossip_topic)
}

/// Subscribe to all currently subscribed topics with the new fork digest.
pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) {
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
// Subscribe to existing topics with new fork digest
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for mut topic in subscriptions.into_iter() {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
}

// Subscribe to core topics for the new fork
for kind in fork_core_topics(&new_fork) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}
}

/// Unsubscribe from all topics that doesn't have the given fork_digest
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS,
LIGHT_CLIENT_GOSSIP_TOPICS,
core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind,
GossipTopic, LIGHT_CLIENT_GOSSIP_TOPICS,
};
43 changes: 39 additions & 4 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libp2p::gossipsub::{IdentTopic as Topic, TopicHash};
use serde_derive::{Deserialize, Serialize};
use strum::AsRefStr;
use types::{SubnetId, SyncSubnetId};
use types::{ForkName, SubnetId, SyncSubnetId};

use crate::Subnet;

Expand All @@ -22,21 +22,45 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";

pub const CORE_TOPICS: [GossipKind; 7] = [
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
GossipKind::SignedContributionAndProof,
GossipKind::BlsToExecutionChange,
];

pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];

pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];

pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
];

/// Returns the core topics associated with each fork that are new to the previous fork
pub fn fork_core_topics(fork_name: &ForkName) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
ForkName::Merge => vec![],
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
}
}

/// Returns all the topics that we need to subscribe to for a given fork
/// including topics from older forks and new topics for the current fork.
pub fn core_topics_to_subscribe(mut current_fork: ForkName) -> Vec<GossipKind> {
let mut topics = fork_core_topics(&current_fork);
while let Some(previous_fork) = current_fork.previous_fork() {
let previous_fork_topics = fork_core_topics(&previous_fork);
topics.extend(previous_fork_topics);
current_fork = previous_fork;
}
topics
}

/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
/// the pubsub protocol and the way the messages should be encoded.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -390,4 +414,15 @@ mod tests {
assert_eq!("proposer_slashing", ProposerSlashing.as_ref());
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
}

#[test]
fn test_core_topics_to_subscribe() {
let mut all_topics = Vec::new();
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);

let latest_fork = *ForkName::list_all().last().unwrap();
assert_eq!(core_topics_to_subscribe(latest_fork), all_topics);
}
}
6 changes: 3 additions & 3 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lighthouse_network::{
Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
};
use lighthouse_network::{
types::{GossipEncoding, GossipTopic},
types::{core_topics_to_subscribe, GossipEncoding, GossipTopic},
MessageId, NetworkEvent, NetworkGlobals, PeerId,
};
use slog::{crit, debug, error, info, o, trace, warn};
Expand Down Expand Up @@ -445,7 +445,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
info!(self.log, "Subscribing to new fork topics");
self.libp2p.subscribe_new_fork_topics(fork_digest);
self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest);
self.next_fork_subscriptions = Box::pin(None.into());
}
else {
Expand Down Expand Up @@ -684,7 +684,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}

let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() {
for topic_kind in core_topics_to_subscribe(self.fork_context.current_fork()) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
topic_kind.clone(),
Expand Down