Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"common/lighthouse_metrics",
"common/lighthouse_version",
"common/logging",
"common/lru_cache",
"common/remote_beacon_node",
"common/rest_types",
"common/slot_clock",
Expand Down
1 change: 1 addition & 0 deletions beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
environment = { path = "../../lighthouse/environment" }
itertools = "0.9.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
10 changes: 5 additions & 5 deletions beacon_node/network/src/beacon_processor/chain_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use eth2_libp2p::PeerId;
use slog::{debug, error, trace, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, EthSpec, SignedBeaconBlock};
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock};

/// Id associated to a block processing request, either a batch or a single block.
#[derive(Clone, Debug, PartialEq)]
pub enum ProcessId {
/// Processing Id of a range syncing batch.
RangeBatchId(ChainId, Epoch),
/// Processing Id of the parent lookup of a block
ParentLookup(PeerId),
/// Processing Id of the parent lookup of a block.
ParentLookup(PeerId, Hash256),
}

pub fn handle_chain_segment<T: BeaconChainTypes>(
Expand Down Expand Up @@ -71,7 +71,7 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
});
}
// this a parent lookup request from the sync manager
ProcessId::ParentLookup(peer_id) => {
ProcessId::ParentLookup(peer_id, chain_head) => {
debug!(
log, "Processing parent lookup";
"last_peer_id" => format!("{}", peer_id),
Expand All @@ -83,7 +83,7 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
(_, Err(e)) => {
warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e);
sync_send
.send(SyncMessage::ParentLookupFailed(peer_id))
.send(SyncMessage::ParentLookupFailed{peer_id, chain_head})
.unwrap_or_else(|_| {
// on failure, inform to downvote the peer
debug!(
Expand Down
76 changes: 67 additions & 9 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, Goodbye
use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
use lru_cache::LRUCache;
use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec;
use ssz_types::VariableList;
Expand Down Expand Up @@ -105,8 +106,13 @@ pub enum SyncMessage<T: EthSpec> {
result: BatchProcessResult,
},

/// A parent lookup has failed for a block given by this `peer_id`.
ParentLookupFailed(PeerId),
/// A parent lookup has failed.
ParentLookupFailed {
/// The head of the chain of blocks that failed to process.
chain_head: Hash256,
/// The peer that instigated the chain lookup.
peer_id: PeerId,
},
}

/// The result of processing a multiple blocks (a chain segment).
Expand Down Expand Up @@ -161,6 +167,9 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// A collection of parent block lookups.
parent_queue: SmallVec<[ParentRequests<T::EthSpec>; 3]>,

/// A cache of failed chain lookups to prevent duplicate searches.
failed_chains: LRUCache<Hash256>,

/// A collection of block hashes being searched for and a flag indicating if a result has been
/// received or not.
///
Expand Down Expand Up @@ -222,6 +231,7 @@ pub fn spawn<T: BeaconChainTypes>(
network_globals,
input_channel: sync_recv,
parent_queue: SmallVec::new(),
failed_chains: LRUCache::new(500),
single_block_lookups: FnvHashMap::default(),
log: log.clone(),
beacon_processor_send,
Expand Down Expand Up @@ -351,6 +361,22 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return;
}
};

// check if the parent of this block isn't in our failed cache. If it is, this
// chain should be dropped and the peer downscored.
if self.failed_chains.contains(&block.message.parent_root) {
debug!(self.log, "Parent chain ignored due to past failure"; "block" => format!("{:?}", block.message.parent_root), "slot" => block.message.slot);
if !parent_request.downloaded_blocks.is_empty() {
// Add the root block to failed chains
self.failed_chains
.insert(parent_request.downloaded_blocks[0].canonical_root());
} else {
crit!(self.log, "Parent chain has no blocks");
}
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
return;
}
// add the block to response
parent_request.downloaded_blocks.push(block);
// queue for processing
Expand Down Expand Up @@ -510,6 +536,15 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}

let block_root = block.canonical_root();
// If this block or it's parent is part of a known failed chain, ignore it.
if self.failed_chains.contains(&block.message.parent_root)
|| self.failed_chains.contains(&block_root)
{
debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => format!("{:?}", block_root), "block_slot" => block.message.slot);
return;
}

// Make sure this block is not already being searched for
// NOTE: Potentially store a hashset of blocks for O(1) lookups
for parent_req in self.parent_queue.iter() {
Expand Down Expand Up @@ -697,6 +732,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// If the last block in the queue has an unknown parent, we continue the parent
// lookup-search.

let chain_block_hash = parent_request.downloaded_blocks[0].canonical_root();

let newest_block = parent_request
.downloaded_blocks
.pop()
Expand All @@ -715,8 +752,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.request_parent(parent_request);
}
Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => {
let process_id =
ProcessId::ParentLookup(parent_request.last_submitted_peer.clone());
let process_id = ProcessId::ParentLookup(
parent_request.last_submitted_peer.clone(),
chain_block_hash,
);
let blocks = parent_request.downloaded_blocks;

match self
Expand All @@ -742,6 +781,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"outcome" => format!("{:?}", outcome),
"last_peer" => parent_request.last_submitted_peer.to_string(),
);

// Add this chain to cache of failed chains
self.failed_chains.insert(chain_block_hash);

// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
// TODO: Refine the error types and score the peer appropriately.
Expand All @@ -764,8 +807,17 @@ impl<T: BeaconChainTypes> SyncManager<T> {
|| parent_request.downloaded_blocks.len() >= PARENT_DEPTH_TOLERANCE
{
let error = if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE {
// This is a peer-specific error and the chain could be continued with another
// peer. We don't consider this chain a failure and prevent retries with another
// peer.
"too many failed attempts"
} else {
if !parent_request.downloaded_blocks.is_empty() {
self.failed_chains
.insert(parent_request.downloaded_blocks[0].canonical_root());
} else {
crit!(self.log, "Parent lookup has no blocks");
}
"reached maximum lookup-depth"
};

Expand All @@ -774,6 +826,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"ancestors_found" => parent_request.downloaded_blocks.len(),
"reason" => error
);
// Downscore the peer.
self.network.report_peer(
parent_request.last_submitted_peer,
PeerAction::LowToleranceError,
);
return; // drop the request
}

Expand Down Expand Up @@ -854,12 +911,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
result,
);
}
SyncMessage::ParentLookupFailed(peer_id) => {
SyncMessage::ParentLookupFailed {
chain_head,
peer_id,
} => {
// A peer sent an object (block or attestation) that referenced a parent.
// On request for this parent the peer indicated it did not have this
// block.
// This is not fatal. Peer's could prune old blocks so we moderately
// tolerate this behaviour.
// The processing of this chain failed.
self.failed_chains.insert(chain_head);
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
}
Expand Down
8 changes: 8 additions & 0 deletions common/lru_cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "lru_cache"
version = "0.1.0"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"

[dependencies]
fnv = "1.0.7"
7 changes: 7 additions & 0 deletions common/lru_cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! A library to provide fast and efficient LRU Cache's without updating.

mod space;
mod time;

pub use space::LRUCache;
pub use time::LRUTimeCache;
93 changes: 93 additions & 0 deletions common/lru_cache/src/space.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
///! This implements a time-based LRU cache for fast checking of duplicates
use fnv::FnvHashSet;
use std::collections::VecDeque;

/// Cache that stores keys until the size is used up. Does not update elements for efficiency.
pub struct LRUCache<Key>
where
Key: Eq + std::hash::Hash + Clone,
{
/// The duplicate cache.
map: FnvHashSet<Key>,
/// An ordered list of keys by order.
list: VecDeque<Key>,
// The max size of the cache,
size: usize,
}

impl<Key> LRUCache<Key>
where
Key: Eq + std::hash::Hash + Clone,
{
pub fn new(size: usize) -> Self {
LRUCache {
map: FnvHashSet::default(),
list: VecDeque::new(),
size,
}
}

/// Determines if the key is in the cache.
pub fn contains(&self, key: &Key) -> bool {
self.map.contains(key)
}

// Inserts new elements and removes any expired elements.
//
// If the key was not present this returns `true`. If the value was already present this
// returns `false`.
pub fn insert(&mut self, key: Key) -> bool {
// check the cache before removing elements
let result = self.map.insert(key.clone());

// add the new key to the list, if it doesn't already exist.
if result {
self.list.push_back(key);
}
// remove any overflow keys
self.update();
result
}

/// Removes any expired elements from the cache.
fn update(&mut self) {
// remove any expired results
for _ in 0..self.map.len().saturating_sub(self.size) {
if let Some(key) = self.list.pop_front() {
self.map.remove(&key);
}
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn cache_added_entries_exist() {
let mut cache = LRUCache::new(5);

cache.insert("t");
cache.insert("e");

// Should report that 't' and 't' already exists
assert!(!cache.insert("t"));
assert!(!cache.insert("e"));
}

#[test]
fn cache_entries_get_removed() {
let mut cache = LRUCache::new(2);

cache.insert("t");
assert!(!cache.insert("t"));
cache.insert("e");
assert!(!cache.insert("e"));
// add another element to clear the first key
cache.insert("s");
assert!(!cache.insert("s"));
// should be removed from the cache
assert!(cache.insert("t"));
}
}
Loading