Skip to content

Commit

Permalink
Gossipsub topic filters (#1767)
Browse files Browse the repository at this point in the history
## Proposed Changes

Adds a gossipsub topic filter that only allows subscribing and incoming subscriptions from valid ETH2 topics.

## Additional Info

Currently the preparation of the valid topic hashes uses only the current fork id but in the future it must also use all possible future fork ids for planned forks. This has to get added when hard coded forks get implemented.

DO NOT MERGE: We first need to merge the libp2p changes (see sigp/rust-libp2p#70) so that we can refer from here to a commit hash inside the lighthouse branch.
  • Loading branch information
blacktemplar committed Oct 14, 2020
1 parent 8248afa commit a0634cc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/Cargo.toml
Expand Up @@ -42,7 +42,7 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "fb4fda2e393fc113577ef45f0ecdfe68e24f13dd"
rev = "a731aa803d986977c25a77ed2b002d9578f7377c"
default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]

Expand Down
49 changes: 46 additions & 3 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Expand Up @@ -6,6 +6,9 @@ use crate::Eth2Enr;
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut};
use libp2p::gossipsub::subscription_filter::{
MaxCountSubscriptionFilter, WhitelistSubscriptionFilter,
};
use libp2p::{
core::{
connection::{ConnectedPoint, ConnectionId, ListenerId},
Expand All @@ -25,6 +28,7 @@ use libp2p::{
};
use slog::{crit, debug, o, trace, warn};
use ssz::Encode;
use std::collections::HashSet;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
Expand All @@ -43,7 +47,8 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10;
/// Identifier of requests sent by a peer.
pub type PeerRequestId = (ConnectionId, SubstreamId);

pub type Gossipsub = GenericGossipsub<MessageData>;
pub type SubscriptionFilter = MaxCountSubscriptionFilter<WhitelistSubscriptionFilter>;
pub type Gossipsub = GenericGossipsub<MessageData, SubscriptionFilter>;
pub type GossipsubEvent = GenericGossipsubEvent<MessageData>;

/// The types of events than can be obtained from polling the behaviour.
Expand Down Expand Up @@ -149,8 +154,19 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.eth2()
.expect("Local ENR must have a fork id");

let gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, net_conf.gs_config.clone())
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;
let possible_fork_digests = vec![enr_fork_id.fork_digest];
let filter = MaxCountSubscriptionFilter {
filter: Self::create_whitelist_filter(possible_fork_digests, 64), //TODO change this to a constant
max_subscribed_topics: 200, //TODO change this to a constant
max_subscriptions_per_request: 100, //this is according to the current go implementation
};

let gossipsub = Gossipsub::new_with_subscription_filter(
MessageAuthenticity::Anonymous,
net_conf.gs_config.clone(),
filter,
)
.map_err(|e| format!("Could not construct gossipsub: {:?}", e))?;

// Temporarily disable scoring until parameters are tested.
/*
Expand Down Expand Up @@ -785,6 +801,33 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
waker.wake_by_ref();
}
}

/// Creates a whitelist topic filter that covers all possible topics using the given set of
/// possible fork digests.
fn create_whitelist_filter(
possible_fork_digests: Vec<[u8; 4]>,
attestation_subnet_count: u64,
) -> WhitelistSubscriptionFilter {
let mut possible_hashes = HashSet::new();
for fork_digest in possible_fork_digests {
let mut add = |kind| {
let topic: Topic =
GossipTopic::new(kind, GossipEncoding::SSZSnappy, fork_digest).into();
possible_hashes.insert(topic.hash());
};

use GossipKind::*;
add(BeaconBlock);
add(BeaconAggregateAndProof);
add(VoluntaryExit);
add(ProposerSlashing);
add(AttesterSlashing);
for id in 0..attestation_subnet_count {
add(Attestation(SubnetId::new(id)));
}
}
WhitelistSubscriptionFilter(possible_hashes)
}
}

/// Calls the given function with the given args on all sub behaviours.
Expand Down

0 comments on commit a0634cc

Please sign in to comment.