Skip to content

Commit

Permalink
Add Subnet enum to include SyncCommittee subnet_ids
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Apr 26, 2021
1 parent 40685d7 commit 2ae7861
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 83 deletions.
6 changes: 3 additions & 3 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings;
use crate::peer_manager::{
score::{PeerAction, ReportSource},
ConnectionDirection, PeerManager, PeerManagerEvent,
Expand All @@ -10,6 +9,7 @@ use crate::types::{
SubnetDiscovery,
};
use crate::Eth2Enr;
use crate::{behaviour::gossipsub_scoring_parameters::PeerScoreSettings, Subnet};
use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut};
Expand Down Expand Up @@ -327,7 +327,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}

/// Subscribes to a specific subnet id;
pub fn subscribe_to_subnet(&mut self, subnet_id: SubnetId) -> bool {
pub fn subscribe_to_subnet(&mut self, subnet_id: Subnet) -> bool {
let topic = GossipTopic::new(
subnet_id.into(),
GossipEncoding::default(),
Expand All @@ -337,7 +337,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}

/// Un-Subscribes from a specific subnet id;
pub fn unsubscribe_from_subnet(&mut self, subnet_id: SubnetId) -> bool {
pub fn unsubscribe_from_subnet(&mut self, subnet_id: Subnet) -> bool {
let topic = GossipTopic::new(
subnet_id.into(),
GossipEncoding::default(),
Expand Down
27 changes: 13 additions & 14 deletions beacon_node/eth2_libp2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt};
pub use libp2p::core::identity::{Keypair, PublicKey};

use crate::{config, metrics};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, SubnetDiscovery};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery};
use discv5::{enr::NodeId, Discv5, Discv5Event};
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p::core::PeerId;
use lru::LruCache;
use slog::{crit, debug, error, info, warn};
use ssz::{Decode, Encode};
use ssz_types::BitVector;
use ssz::Encode;
use std::{
collections::{HashMap, VecDeque},
net::{IpAddr, SocketAddr},
Expand Down Expand Up @@ -67,7 +66,7 @@ pub enum DiscoveryEvent {

#[derive(Debug, Clone, PartialEq)]
struct SubnetQuery {
subnet_id: SubnetId,
subnet_id: Subnet,
min_ttl: Option<Instant>,
retries: usize,
}
Expand Down Expand Up @@ -576,7 +575,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
/// updates the min_ttl field.
fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option<Instant>, retries: usize) {
fn add_subnet_query(&mut self, subnet_id: Subnet, min_ttl: Option<Instant>, retries: usize) {
// remove the entry and complete the query if greater than the maximum search count
if retries > MAX_DISCOVERY_RETRY {
debug!(
Expand Down Expand Up @@ -611,7 +610,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
retries,
});
// update the metrics and insert into the queue.
debug!(self.log, "Queuing subnet query"; "subnet" => *subnet_id, "retries" => retries);
debug!(self.log, "Queuing subnet query"; "subnet" => ?subnet_id, "retries" => retries);
self.queued_queries.push_back(query);
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
}
Expand Down Expand Up @@ -690,7 +689,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

/// Runs a discovery request for a given group of subnets.
fn start_subnet_query(&mut self, subnet_queries: Vec<SubnetQuery>) {
let mut filtered_subnet_ids: Vec<SubnetId> = Vec::new();
let mut filtered_subnet_ids: Vec<Subnet> = Vec::new();

// find subnet queries that are still necessary
let filtered_subnet_queries: Vec<SubnetQuery> = subnet_queries
Expand All @@ -715,7 +714,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet;
debug!(self.log, "Discovery query started for subnet";
"subnet_id" => *subnet_query.subnet_id,
"subnet_id" => ?subnet_query.subnet_id,
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
"peers_to_find" => target_peers,
Expand Down Expand Up @@ -823,7 +822,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
}
GroupedQueryType::Subnet(queries) => {
let subnets_searched_for: Vec<SubnetId> =
let subnets_searched_for: Vec<Subnet> =
queries.iter().map(|query| query.subnet_id).collect();
match query_result.1 {
Ok(r) if r.is_empty() => {
Expand Down Expand Up @@ -1001,7 +1000,7 @@ mod tests {
use enr::EnrBuilder;
use slog::{o, Drain};
use std::net::UdpSocket;
use types::MinimalEthSpec;
use types::{BitVector, MinimalEthSpec};

type E = MinimalEthSpec;

Expand Down Expand Up @@ -1053,7 +1052,7 @@ mod tests {
let mut discovery = build_discovery().await;
let now = Instant::now();
let mut subnet_query = SubnetQuery {
subnet_id: SubnetId::new(1),
subnet_id: Subnet::Attestation(SubnetId::new(1)),
min_ttl: Some(now),
retries: 0,
};
Expand Down Expand Up @@ -1100,7 +1099,7 @@ mod tests {

let now = Instant::now();
let subnet_query = SubnetQuery {
subnet_id: SubnetId::new(1),
subnet_id: Subnet::Attestation(SubnetId::new(1)),
min_ttl: Some(now + Duration::from_secs(10)),
retries: 0,
};
Expand Down Expand Up @@ -1147,12 +1146,12 @@ mod tests {

let query = GroupedQueryType::Subnet(vec![
SubnetQuery {
subnet_id: SubnetId::new(1),
subnet_id: Subnet::Attestation(SubnetId::new(1)),
min_ttl: instant1,
retries: 0,
},
SubnetQuery {
subnet_id: SubnetId::new(2),
subnet_id: Subnet::Attestation(SubnetId::new(2)),
min_ttl: instant2,
retries: 0,
},
Expand Down
57 changes: 26 additions & 31 deletions beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
///! The subnet predicate used for searching for a particular subnet.
use super::*;
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use slog::trace;
use std::ops::Deref;

/// Returns the predicate for a given subnet.
pub fn subnet_predicate<TSpec>(
subnet_ids: Vec<SubnetId>,
subnet_ids: Vec<Subnet>,
log: &slog::Logger,
) -> impl Fn(&Enr) -> bool + Send
where
Expand All @@ -14,39 +15,33 @@ where
let log_clone = log.clone();

move |enr: &Enr| {
if let Some(bitfield_bytes) = enr.get(ATTESTATION_BITFIELD_ENR_KEY) {
let bitfield = match BitVector::<TSpec::SubnetBitfieldLength>::from_ssz_bytes(
bitfield_bytes,
) {
Ok(v) => v,
Err(e) => {
warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e));
return false;
}
let attestation_bitfield: EnrAttestationBitfield<TSpec> =
match enr.attestation_bitfield::<TSpec>() {
Ok(b) => b,
Err(_e) => return false,
};

let matches: Vec<&SubnetId> = subnet_ids
.iter()
.filter(|id| bitfield.get(**id.deref() as usize).unwrap_or(false))
.collect();
// Pre-fork/fork-boundary enrs may not contain a syncnets field.
// Don't return early here
let sync_committee_bitfield: Result<EnrSyncCommitteeBitfield<TSpec>, _> =
enr.sync_committee_bitfield::<TSpec>();

if matches.is_empty() {
trace!(
log_clone,
"Peer found but not on any of the desired subnets";
"peer_id" => %enr.peer_id()
);
return false;
} else {
trace!(
log_clone,
"Peer found on desired subnet(s)";
"peer_id" => %enr.peer_id(),
"subnets" => ?matches.as_slice()
);
return true;
}
let predicate = subnet_ids.iter().any(|subnet| match subnet {
Subnet::Attestation(s) => attestation_bitfield
.get(*s.deref() as usize)
.unwrap_or(false),
Subnet::SyncCommittee(s) => sync_committee_bitfield
.as_ref()
.map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)),
});

if !predicate {
trace!(
log_clone,
"Peer found but not on any of the desired subnets";
"peer_id" => %enr.peer_id()
);
}
false
return predicate;
}
}
4 changes: 3 additions & 1 deletion beacon_node/eth2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ impl<'de> Deserialize<'de> for PeerIdSerialized {
}
}

pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
pub use crate::types::{
error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, SubnetDiscovery,
};
pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
Expand Down
17 changes: 10 additions & 7 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
//! Implementation of a Lighthouse's peer management system.

pub use self::peerdb::*;
use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::types::SyncState;
use crate::{
discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS},
Subnet,
};
use crate::{error, metrics, Gossipsub};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
use futures::prelude::*;
Expand All @@ -20,7 +23,7 @@ use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use types::{EthSpec, SubnetId};
use types::EthSpec;

pub use libp2p::core::{identity::Keypair, Multiaddr};

Expand Down Expand Up @@ -241,14 +244,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.network_globals
.peers
.write()
.extend_peers_on_subnet(s.subnet_id, min_ttl);
.extend_peers_on_subnet(&s.subnet_id, min_ttl);
}
// Already have target number of peers, no need for subnet discovery
let peers_on_subnet = self
.network_globals
.peers
.read()
.good_peers_on_subnet(s.subnet_id)
.good_peers_on_subnet(s.subnet_id.clone())
.count();
if peers_on_subnet >= TARGET_SUBNET_PEERS {
trace!(
Expand Down Expand Up @@ -282,14 +285,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}

/// Adds a gossipsub subscription to a peer in the peerdb.
pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
pub fn add_subscription(&self, peer_id: &PeerId, subnet_id: Subnet) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.insert(subnet_id);
}
}

/// Removes a gossipsub subscription to a peer in the peerdb.
pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: SubnetId) {
pub fn remove_subscription(&self, peer_id: &PeerId, subnet_id: Subnet) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.remove(&subnet_id);
}
Expand Down Expand Up @@ -658,7 +661,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

/// Dial cached enrs in discovery service that are in the given `subnet_id` and aren't
/// in Connected, Dialing or Banned state.
fn dial_cached_enrs_in_subnet(&mut self, subnet_id: SubnetId) {
fn dial_cached_enrs_in_subnet(&mut self, subnet_id: Subnet) {
let predicate = subnet_predicate::<TSpec>(vec![subnet_id], &self.log);
let peers_to_dial: Vec<PeerId> = self
.discovery()
Expand Down
18 changes: 12 additions & 6 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::client::Client;
use super::score::{PeerAction, Score, ScoreState};
use super::PeerSyncStatus;
use crate::rpc::MetaData;
use crate::Multiaddr;
use crate::{rpc::MetaData, types::Subnet};
use discv5::Enr;
use serde::{
ser::{SerializeStruct, Serializer},
Expand All @@ -12,7 +12,7 @@ use std::collections::HashSet;
use std::net::{IpAddr, SocketAddr};
use std::time::Instant;
use strum::AsRefStr;
use types::{EthSpec, SubnetId};
use types::EthSpec;
use PeerConnectionStatus::*;

/// Information about a given connected peer.
Expand Down Expand Up @@ -40,7 +40,7 @@ pub struct PeerInfo<T: EthSpec> {
/// connection.
pub meta_data: Option<MetaData<T>>,
/// Subnets the peer is connected to.
pub subnets: HashSet<SubnetId>,
pub subnets: HashSet<Subnet>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
Expand Down Expand Up @@ -85,15 +85,21 @@ impl<T: EthSpec> PeerInfo<T> {
}

/// Returns if the peer is subscribed to a given `SubnetId` from the metadata attnets field.
pub fn on_subnet_metadata(&self, subnet_id: SubnetId) -> bool {
pub fn on_subnet_metadata(&self, subnet_id: &Subnet) -> bool {
if let Some(meta_data) = &self.meta_data {
return meta_data.attnets.get(*subnet_id as usize).unwrap_or(false);
match subnet_id {
Subnet::Attestation(id) => {
return meta_data.attnets.get(**id as usize).unwrap_or(false)
}
// TODO(pawan): add syncnets to metadata
Subnet::SyncCommittee(_id) => unimplemented!(),
}
}
false
}

/// Returns if the peer is subscribed to a given `SubnetId` from the gossipsub subscriptions.
pub fn on_subnet_gossipsub(&self, subnet_id: SubnetId) -> bool {
pub fn on_subnet_gossipsub(&self, subnet_id: &Subnet) -> bool {
self.subnets.contains(&subnet_id)
}

Expand Down
Loading

0 comments on commit 2ae7861

Please sign in to comment.