Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,23 +1293,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

metrics::stop_timer(fork_choice_register_timer);

let find_head_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD);

// Execute the fork choice algorithm, enthroning a new head if discovered.
//
// Note: in the future we may choose to run fork-choice less often, potentially based upon
// some heuristic around number of attestations seen for the block.
if let Err(e) = self.fork_choice() {
error!(
self.log,
"fork choice failed to find head";
"error" => format!("{:?}", e)
)
};

metrics::stop_timer(find_head_timer);

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
Expand Down Expand Up @@ -1456,7 +1439,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let previous_slot = self.head().beacon_block.slot;
let new_slot = beacon_block.slot;

let is_reorg = self.head().beacon_block_root != beacon_block.parent_root;
// Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks
// between calls to fork choice without swapping between chains. This seems like an
// extreme-enough scenario that a warning is fine.
let is_reorg = self.head().beacon_block_root
!= beacon_state
.get_block_root(self.head().beacon_block.slot)
.map(|root| *root)
.unwrap_or_else(|_| Hash256::random());

// If we switched to a new chain (instead of building atop the present chain).
if is_reorg {
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ lazy_static! {
"beacon_block_processing_fork_choice_register_seconds",
"Time spent registering the new block with fork choice (but not finding head)"
);
pub static ref BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD: Result<Histogram> = try_create_histogram(
"beacon_block_processing_fork_choice_find_head_seconds",
"Time spent finding the new head after processing a new block"
);

/*
* Block Production
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ where
.process_block(block)
.expect("should not error during block processing");

self.chain.fork_choice().expect("should find head");

if let BlockProcessingOutcome::Processed { block_root } = outcome {
head_block_root = Some(block_root);

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ fn run_skip_slot_test(skip_slots: u64) {
})
);

harness_b
.chain
.fork_choice()
.expect("should run fork choice");

assert_eq!(
harness_b.chain.head().beacon_block.slot,
Slot::new(skip_slots + 1)
Expand Down
48 changes: 41 additions & 7 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use fnv::FnvHashMap;
use futures::prelude::*;
use slog::{crit, debug, info, trace, warn, Logger};
use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec;
use std::collections::HashSet;
use std::ops::Sub;
Expand Down Expand Up @@ -367,6 +367,20 @@ impl<T: BeaconChainTypes> SyncManager<T> {
match outcome {
BlockProcessingOutcome::Processed { block_root } => {
info!(self.log, "Processed block"; "block" => format!("{}", block_root));

match chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "single block"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "single block"
),
}
}
BlockProcessingOutcome::ParentUnknown { .. } => {
// We don't know of the blocks parent, begin a parent lookup search
Expand Down Expand Up @@ -531,6 +545,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.request_parent(parent_request);
self.network.downvote_peer(peer);
} else {
let mut successes = 0;

// try and process the list of blocks up to the requested block
while let Some(block) = parent_request.downloaded_blocks.pop() {
// check if the chain exists
Expand All @@ -542,8 +558,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.request_parent(parent_request);
break;
}
Ok(BlockProcessingOutcome::Processed { .. })
| Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {}
Ok(BlockProcessingOutcome::Processed { .. }) => successes += 1,
Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {}
Ok(outcome) => {
// it's a future slot or an invalid block, remove it and try again
parent_request.failed_attempts += 1;
Expand All @@ -555,7 +571,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.network
.downvote_peer(parent_request.last_submitted_peer.clone());
self.request_parent(parent_request);
return;
break;
Copy link
Member Author

@paulhauner paulhauner Dec 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Age please confirm break is equivalent to return here and below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the break, so we can optionally run fork choice.. but we only want to run fork choice if we have processed a block.
You need to check if successes > 0, which it looks like you have forgotten

}
Err(e) => {
parent_request.failed_attempts += 1;
Expand All @@ -566,12 +582,30 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.network
.downvote_peer(parent_request.last_submitted_peer.clone());
self.request_parent(parent_request);
return;
break;
}
}
} else {
// chain doesn't exist - return early
return;
break;
}
}

if successes > 0 {
if let Some(chain) = self.chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"block_imports" => successes,
"location" => "parent request"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "parent request"
),
};
}
}
}
Expand Down
23 changes: 22 additions & 1 deletion beacon_node/network/src/sync/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use beacon_chain::{
use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId};
use eth2_libp2p::PeerId;
use slog::{debug, o, trace, warn};
use slog::{debug, error, o, trace, warn};
use ssz::Encode;
use std::sync::Arc;
use store::Store;
Expand Down Expand Up @@ -438,6 +438,27 @@ impl<T: BeaconChainTypes> MessageProcessor<T> {
BlockProcessingOutcome::Processed { .. } => {
trace!(self.log, "Gossipsub block processed";
"peer_id" => format!("{:?}",peer_id));

// TODO: It would be better if we can run this _after_ we publish the block to
// reduce block propagation latency.
//
// The `MessageHandler` would be the place to put this, however it doesn't seem
// to have a reference to the `BeaconChain`. I will leave this for future
// works.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed. The threading will be refactored in the network re-write. Will deal with this then

match self.chain.fork_choice() {
Ok(()) => trace!(
self.log,
"Fork choice success";
"location" => "block gossip"
),
Err(e) => error!(
self.log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "block gossip"
),
}

SHOULD_FORWARD_GOSSIP_BLOCK
}
BlockProcessingOutcome::ParentUnknown { .. } => {
Expand Down
44 changes: 42 additions & 2 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::rpc::RequestId;
use eth2_libp2p::PeerId;
use fnv::FnvHashMap;
use slog::{crit, debug, trace, warn, Logger};
use slog::{crit, debug, error, trace, warn, Logger};
use std::cmp::Ordering;
use std::collections::HashSet;
use std::ops::Sub;
Expand Down Expand Up @@ -243,16 +243,53 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.to_be_processed_id += 1;
continue;
} else {
let mut successes = 0;
debug!(log, "Processing batch"; "batch_id" => batch.id);
match process_batch(chain.clone(), batch, log) {
match process_batch(chain.clone(), batch, &mut successes, log) {
Ok(_) => {
trace!(log, "Blocks Processed"; "current_slot" => batch.end_slot);
// batch was successfully processed
self.last_processed_id = self.to_be_processed_id;
self.to_be_processed_id += 1;

if let Some(chain) = chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"location" => "batch import success"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "batch import success"
),
}
}
}
Err(e) => {
warn!(log, "Block processing error"; "error"=> format!("{:?}", e));

if successes > 0 {
if let Some(chain) = chain.upgrade() {
match chain.fork_choice() {
Ok(()) => trace!(
log,
"Fork choice success";
"block_imports" => successes,
"location" => "batch import error"
),
Err(e) => error!(
log,
"Fork choice failed";
"error" => format!("{:?}", e),
"location" => "batch import error"
),
}
}
}

// batch processing failed
// this could be because this batch is invalid, or a previous invalidated batch
// is invalid. We need to find out which and downvote the peer that has sent us
Expand Down Expand Up @@ -496,6 +533,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
fn process_batch<T: BeaconChainTypes>(
chain: Weak<BeaconChain<T>>,
batch: &Batch<T::EthSpec>,
successes: &mut usize,
log: &Logger,
) -> Result<(), String> {
for block in &batch.downloaded_blocks {
Expand All @@ -511,6 +549,8 @@ fn process_batch<T: BeaconChainTypes>(
"slot" => block.slot,
"block_root" => format!("{}", block_root),
);

*successes += 1
}
BlockProcessingOutcome::ParentUnknown { parent } => {
// blocks should be sequential and all parents should exist
Expand Down
38 changes: 35 additions & 3 deletions beacon_node/rest_api/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,39 @@ pub fn publish_beacon_block<T: BeaconChainTypes>(
"block_slot" => slot,
);

publish_beacon_block_to_network::<T>(network_chan, block)
publish_beacon_block_to_network::<T>(network_chan, block)?;

// Run the fork choice algorithm and enshrine a new canonical head, if
// found.
//
// The new head may or may not be the block we just received.
if let Err(e) = beacon_chain.fork_choice() {
error!(
log,
"Failed to find beacon chain head";
"error" => format!("{:?}", e)
);
} else {
// In the best case, validators should produce blocks that become the
// head.
//
// Potential reasons this may not be the case:
//
// - A quick re-org between block produce and publish.
// - Excessive time between block produce and publish.
// - A validator is using another beacon node to produce blocks and
// submitting them here.
if beacon_chain.head().beacon_block_root != block_root {
warn!(
log,
"Block from validator is not head";
"desc" => "potential re-org",
);

}
}

Ok(())
}
Ok(outcome) => {
warn!(
Expand All @@ -278,8 +310,8 @@ pub fn publish_beacon_block<T: BeaconChainTypes>(
)))
}
}
})
.and_then(|_| response_builder?.body_no_ssz(&())),
})
.and_then(|_| response_builder?.body_no_ssz(&()))
)
}

Expand Down