Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub use self::peerdb::*;
use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::types::SyncState;
use crate::{error, metrics};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
use futures::prelude::*;
Expand Down Expand Up @@ -844,16 +845,19 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
}
}

loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.status_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
if !matches!(self.network_globals.sync_state(), SyncState::SyncingFinalized{..}|SyncState::SyncingHead{..})
{
loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
self.status_peers.insert(peer_id.clone());
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
Poll::Ready(None) | Poll::Pending => break,
}
}

Expand Down
6 changes: 6 additions & 0 deletions beacon_node/eth2_libp2p/src/types/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub enum SyncState {
/// The node is performing a long-range (batch) sync over one or many head chains.
/// In this state parent lookups are disabled.
SyncingHead { start_slot: Slot, target_slot: Slot },
/// The node has identified the need for is sync operations and is transitioning to a syncing
/// state.
SyncTransition,
/// The node is up to date with all known peers and is connected to at least one
/// fully synced peer. In this state, parent lookups are enabled.
Synced,
Expand All @@ -25,6 +28,7 @@ impl PartialEq for SyncState {
(SyncState::SyncingHead { .. }, SyncState::SyncingHead { .. }) => true,
(SyncState::Synced, SyncState::Synced) => true,
(SyncState::Stalled, SyncState::Stalled) => true,
(SyncState::SyncTransition, SyncState::SyncTransition) => true,
_ => false,
}
}
Expand All @@ -36,6 +40,7 @@ impl SyncState {
match self {
SyncState::SyncingFinalized { .. } => true,
SyncState::SyncingHead { .. } => true,
SyncState::SyncTransition => true,
SyncState::Synced => false,
SyncState::Stalled => false,
}
Expand All @@ -54,6 +59,7 @@ impl std::fmt::Display for SyncState {
SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"),
SyncState::Synced { .. } => write!(f, "Synced"),
SyncState::Stalled { .. } => write!(f, "Stalled"),
SyncState::SyncTransition => write!(f, "Searching syncing peers"),
}
}
}
14 changes: 7 additions & 7 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub fn serve<T: BeaconChainTypes>(
)))
}
}
SyncState::SyncingHead { .. } => Ok(()),
SyncState::SyncingHead { .. } | SyncState::SyncTransition => Ok(()),
SyncState::Synced => Ok(()),
SyncState::Stalled => Err(warp_utils::reject::not_synced(
"sync is stalled".to_string(),
Expand Down Expand Up @@ -1231,12 +1231,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_globals.clone())
.and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
blocking_task(move || match *network_globals.sync_state.read() {
SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } => {
Ok(warp::reply::with_status(
warp::reply(),
warp::http::StatusCode::PARTIAL_CONTENT,
))
}
SyncState::SyncingFinalized { .. }
| SyncState::SyncingHead { .. }
| SyncState::SyncTransition => Ok(warp::reply::with_status(
warp::reply(),
warp::http::StatusCode::PARTIAL_CONTENT,
)),
SyncState::Synced => Ok(warp::reply::with_status(
warp::reply(),
warp::http::StatusCode::OK,
Expand Down
5 changes: 1 addition & 4 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
{
SyncState::Synced
} else if peers.advanced_peers().next().is_some() {
SyncState::SyncingHead {
start_slot: head,
target_slot: current_slot,
}
SyncState::SyncTransition
} else if peers.synced_peers().next().is_none() {
SyncState::Stalled
} else {
Expand Down
32 changes: 29 additions & 3 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ pub struct SyncingChain<T: BeaconChainTypes> {
/// The current processing batch, if any.
current_processing_batch: Option<BatchId>,

/// Batches validated by this chain.
validated_batches: u8,

/// A multi-threaded, non-blocking processor for applying messages to the beacon chain.
beacon_processor_send: Sender<BeaconWorkEvent<T::EthSpec>>,

Expand Down Expand Up @@ -140,6 +143,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
attempted_optimistic_starts: HashSet::default(),
state: ChainSyncingState::Stopped,
current_processing_batch: None,
validated_batches: 0,
beacon_processor_send,
log: log.new(o!("chain" => id)),
}
Expand All @@ -155,6 +159,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.id
}

/// Peers currently syncing this chain.
pub fn peers<'a>(&'a self) -> impl Iterator<Item = PeerId> + 'a {
self.peers.keys().cloned()
}

/// Progress in epochs made by the chain
pub fn validated_epochs(&self) -> u64 {
self.validated_batches as u64 * EPOCHS_PER_BATCH
}

/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
Expand Down Expand Up @@ -447,6 +461,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.advance_chain(network, batch_id);
// we register so that on chain switching we don't try it again
self.attempted_optimistic_starts.insert(batch_id);
self.processing_target += EPOCHS_PER_BATCH;
} else if let Some(epoch) = self.optimistic_start {
// check if this batch corresponds to an optimistic batch. In this case, we
// reject it as an optimistic candidate since the batch was empty
Expand All @@ -456,11 +471,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
false, /* do not re-request */
"batch was empty",
)?;
} else {
self.processing_target += EPOCHS_PER_BATCH;
}
}

self.processing_target += EPOCHS_PER_BATCH;

// check if the chain has completed syncing
if self.current_processed_slot() >= self.target_head_slot {
// chain is completed
Expand Down Expand Up @@ -574,6 +589,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let removed_batches = std::mem::replace(&mut self.batches, remaining_batches);

for (id, batch) in removed_batches.into_iter() {
self.validated_batches = self.validated_batches.saturating_add(1);
// only for batches awaiting validation can we be sure the last attempt is
// right, and thus, that any different attempt is wrong
match batch.state() {
Expand Down Expand Up @@ -1024,6 +1040,7 @@ impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
)?;
serializer.emit_usize("batches", self.batches.len())?;
serializer.emit_usize("peers", self.peers.len())?;
serializer.emit_u8("validated_batches", self.validated_batches)?;
slog::Result::Ok(())
}
}
Expand All @@ -1037,7 +1054,7 @@ impl From<WrongBatchState> for RemoveChain {

impl std::fmt::Debug for RemoveChain {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// needed to avoid Debuggins Strings
// needed to avoid Debugging Strings
match self {
RemoveChain::ChainCompleted => f.write_str("ChainCompleted"),
RemoveChain::EmptyPeerPool => f.write_str("EmptyPeerPool"),
Expand All @@ -1047,3 +1064,12 @@ impl std::fmt::Debug for RemoveChain {
}
}
}

impl RemoveChain {
pub fn is_critical(&self) -> bool {
matches!(
self,
RemoveChain::WrongBatchState(..) | RemoveChain::WrongChainState(..)
)
}
}
25 changes: 19 additions & 6 deletions beacon_node/network/src/sync/range_sync/chain_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::sync::PeerSyncInfo;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId;
use fnv::FnvHashMap;
use slog::{debug, error};
use slog::{crit, debug, error};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
Expand Down Expand Up @@ -300,7 +300,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
match old_id {
Some(Some(old_id)) => debug!(self.log, "Switching finalized chains";
"old_id" => old_id, &chain),
None => debug!(self.log, "Syncing new chain"; &chain),
None => debug!(self.log, "Syncing new finalized chain"; &chain),
Some(None) => {
// this is the same chain. We try to advance it.
}
Expand All @@ -311,8 +311,12 @@ impl<T: BeaconChainTypes> ChainCollection<T> {

if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch)
{
// this happens only if sending a batch over the `network` fails a lot
error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason);
if remove_reason.is_critical() {
crit!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason);
} else {
// this happens only if sending a batch over the `network` fails a lot
error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason);
}
self.finalized_chains.remove(&new_id);
self.on_chain_removed(&new_id, true);
}
Expand All @@ -330,6 +334,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
) {
// Include the awaiting head peers
for (peer_id, peer_sync_info) in awaiting_head_peers.drain() {
debug!(self.log, "including head peer");
self.add_peer_or_create_chain(
local_epoch,
peer_sync_info.head_root,
Expand Down Expand Up @@ -368,7 +373,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
chain.start_syncing(network, local_epoch, local_head_epoch)
{
self.head_chains.remove(&id);
error!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason);
if remove_reason.is_critical() {
crit!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason);
} else {
error!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason);
}
} else {
syncing_chains.push(id);
}
Expand Down Expand Up @@ -482,7 +491,11 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
debug_assert_eq!(chain.target_head_root, target_head_root);
debug_assert_eq!(chain.target_head_slot, target_head_slot);
if let Err(remove_reason) = chain.add_peer(network, peer) {
debug!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
if remove_reason.is_critical() {
error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
} else {
error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
}
let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing());
}
Expand Down
Loading