Skip to content

Commit

Permalink
Subscribe to required gossip topics
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Apr 6, 2021
1 parent f112802 commit 241ddac
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 22 deletions.
9 changes: 6 additions & 3 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.eth2()
.expect("Local ENR must have a fork id");

let possible_fork_digests = vec![enr_fork_id.fork_digest];
let possible_fork_digests = vec![
chain_spec.altair_fork_digest(genesis_validators_root),
chain_spec.genesis_fork_digest(genesis_validators_root),
];
let filter = MaxCountSubscriptionFilter {
filter: Self::create_whitelist_filter(possible_fork_digests, 64), //TODO change this to a constant
max_subscribed_topics: 200, //TODO change this to a constant
Expand Down Expand Up @@ -350,7 +353,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}

/// Subscribes to a gossipsub topic.
fn subscribe(&mut self, topic: GossipTopic) -> bool {
pub fn subscribe(&mut self, topic: GossipTopic) -> bool {
// update the network globals
self.network_globals
.gossipsub_subscriptions
Expand All @@ -372,7 +375,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}

/// Unsubscribe from a gossipsub topic.
fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool {
// update the network globals
self.network_globals
.gossipsub_subscriptions
Expand Down
74 changes: 55 additions & 19 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use eth2_libp2p::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response,
};
use eth2_libp2p::{types::GossipKind, BehaviourEvent, MessageId, NetworkGlobals, PeerId};
use eth2_libp2p::{types::{GossipKind, GossipEncoding, GossipTopic}, BehaviourEvent, MessageId, NetworkGlobals, PeerId};
use eth2_libp2p::{MessageAcceptance, Service as LibP2PService};
use futures::prelude::*;
use slog::{debug, error, info, o, trace, warn};
Expand Down Expand Up @@ -226,6 +226,29 @@ impl<T: BeaconChainTypes> NetworkService<T> {

Ok((network_globals, network_send))
}

/// Returns the required fork digests that gossipsub needs to subscribe to based on the current slot.
/// For current_slot < fork_slot, this function returns both the pre-fork and post-fork
/// digests since we should be subscribed to post fork topics before the fork.
pub fn required_gossip_fork_digests(&self) -> Result<Vec<[u8; 4]>, String> {
let current_slot = self
.beacon_chain
.slot()
.map_err(|e| format!("Failed to get current slot: {}", e))?;
let spec = self.beacon_chain.spec;
let genesis_validators_root = self.beacon_chain.genesis_validators_root;
// Return both pre-altair and post-altair fork digests
if current_slot < spec.altair_fork_slot {
Ok(vec![
spec.genesis_fork_digest(genesis_validators_root),
spec.altair_fork_digest(genesis_validators_root),
])
}
// Return only the altair fork digest
else {
Ok(vec![spec.altair_fork_digest(genesis_validators_root)])
}
}
}

fn spawn_service<T: BeaconChainTypes>(
Expand Down Expand Up @@ -371,29 +394,36 @@ fn spawn_service<T: BeaconChainTypes>(
}
}
NetworkMessage::SubscribeCoreTopics => {
let mut subscribed_topics: Vec<GossipKind> = vec![];
let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone();
let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::<std::collections::HashSet<_>>();
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) {
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
subscribed_topics.push(topic_kind.clone());
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
let mut subscribed_topics: Vec<GossipTopic> = vec![];
// TODO: not sure why we need to filter already subscribed
let _already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone();
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter() {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(topic_kind, GossiEncoding::default(), fork_digest);
if service.libp2p.swarm.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
}
}

}

// if we are to subscribe to all subnets we do it here
//
if service.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
let subnet_id = SubnetId::new(subnet_id);
let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id);
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
// Update the ENR bitfield.
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
subscribed_topics.push(topic_kind.clone());
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
}
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.subscribe(topic.clone()) {
// Update the ENR bitfield.
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
}
}
}
}

Expand All @@ -407,10 +437,16 @@ fn spawn_service<T: BeaconChainTypes>(
Some(attestation_service_message) = service.attestation_service.next() => {
match attestation_service_message {
AttServiceMessage::Subscribe(subnet_id) => {
service.libp2p.swarm.subscribe_to_subnet(subnet_id);
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.subscribe(topic);
}
}
AttServiceMessage::Unsubscribe(subnet_id) => {
service.libp2p.swarm.unsubscribe_from_subnet(subnet_id);
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet_id.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.unsubscribe(topic);
}
}
AttServiceMessage::EnrAdd(subnet_id) => {
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
Expand Down

0 comments on commit 241ddac

Please sign in to comment.