diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4f7d180ea27..53630820c2c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1293,23 +1293,6 @@ impl BeaconChain { metrics::stop_timer(fork_choice_register_timer); - let find_head_timer = - metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD); - - // Execute the fork choice algorithm, enthroning a new head if discovered. - // - // Note: in the future we may choose to run fork-choice less often, potentially based upon - // some heuristic around number of attestations seen for the block. - if let Err(e) = self.fork_choice() { - error!( - self.log, - "fork choice failed to find head"; - "error" => format!("{:?}", e) - ) - }; - - metrics::stop_timer(find_head_timer); - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); metrics::observe( &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, @@ -1456,7 +1439,14 @@ impl BeaconChain { let previous_slot = self.head().beacon_block.slot; let new_slot = beacon_block.slot; - let is_reorg = self.head().beacon_block_root != beacon_block.parent_root; + // Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks + // between calls to fork choice without swapping between chains. This seems like an + // extreme-enough scenario that a warning is fine. + let is_reorg = self.head().beacon_block_root + != beacon_state + .get_block_root(self.head().beacon_block.slot) + .map(|root| *root) + .unwrap_or_else(|_| Hash256::random()); // If we switched to a new chain (instead of building atop the present chain). if is_reorg { diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index bd1742b58b1..7d7fa5ede66 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -48,10 +48,6 @@ lazy_static! { "beacon_block_processing_fork_choice_register_seconds", "Time spent registering the new block with fork choice (but not finding head)" ); - pub static ref BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD: Result = try_create_histogram( - "beacon_block_processing_fork_choice_find_head_seconds", - "Time spent finding the new head after processing a new block" - ); /* * Block Production diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 0ec92aa2807..737d7f9c4b4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -258,6 +258,8 @@ where .process_block(block) .expect("should not error during block processing"); + self.chain.fork_choice().expect("should find head"); + if let BlockProcessingOutcome::Processed { block_root } = outcome { head_block_root = Some(block_root); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index a06c652e3dd..a278d3f5b4b 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -494,6 +494,11 @@ fn run_skip_slot_test(skip_slots: u64) { }) ); + harness_b + .chain + .fork_choice() + .expect("should run fork choice"); + assert_eq!( harness_b.chain.head().beacon_block.slot, Slot::new(skip_slots + 1) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 421ab2ed595..4889a7270f3 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -68,7 +68,7 @@ use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use futures::prelude::*; -use slog::{crit, debug, info, trace, warn, Logger}; +use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::HashSet; use std::ops::Sub; @@ -367,6 +367,20 @@ impl SyncManager { match outcome { BlockProcessingOutcome::Processed { block_root } => { info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "single block" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "single block" + ), + } } BlockProcessingOutcome::ParentUnknown { .. } => { // We don't know of the blocks parent, begin a parent lookup search @@ -531,6 +545,8 @@ impl SyncManager { self.request_parent(parent_request); self.network.downvote_peer(peer); } else { + let mut successes = 0; + // try and process the list of blocks up to the requested block while let Some(block) = parent_request.downloaded_blocks.pop() { // check if the chain exists @@ -542,8 +558,8 @@ impl SyncManager { self.request_parent(parent_request); break; } - Ok(BlockProcessingOutcome::Processed { .. }) - | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} + Ok(BlockProcessingOutcome::Processed { .. }) => successes += 1, + Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again parent_request.failed_attempts += 1; @@ -555,7 +571,7 @@ impl SyncManager { self.network .downvote_peer(parent_request.last_submitted_peer.clone()); self.request_parent(parent_request); - return; + break; } Err(e) => { parent_request.failed_attempts += 1; @@ -566,12 +582,30 @@ impl SyncManager { self.network .downvote_peer(parent_request.last_submitted_peer.clone()); self.request_parent(parent_request); - return; + break; } } } else { - // chain doesn't exist - return early - return; + break; + } + } + + if successes > 0 { + if let Some(chain) = self.chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "block_imports" => successes, + "location" => "parent request" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "parent request" + ), + }; } } } diff --git a/beacon_node/network/src/sync/message_processor.rs b/beacon_node/network/src/sync/message_processor.rs index bcb251e2207..e2f8c29d2f5 100644 --- a/beacon_node/network/src/sync/message_processor.rs +++ b/beacon_node/network/src/sync/message_processor.rs @@ -6,7 +6,7 @@ use beacon_chain::{ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, o, trace, warn}; +use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; @@ -438,6 +438,27 @@ impl MessageProcessor { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; "peer_id" => format!("{:?}",peer_id)); + + // TODO: It would be better if we can run this _after_ we publish the block to + // reduce block propagation latency. + // + // The `MessageHandler` would be the place to put this, however it doesn't seem + // to have a reference to the `BeaconChain`. I will leave this for future + // works. + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "block gossip" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "block gossip" + ), + } + SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::ParentUnknown { .. } => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 2f8425beffc..0f0d4118328 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -5,7 +5,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; -use slog::{crit, debug, trace, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; use std::cmp::Ordering; use std::collections::HashSet; use std::ops::Sub; @@ -243,16 +243,53 @@ impl SyncingChain { self.to_be_processed_id += 1; continue; } else { + let mut successes = 0; debug!(log, "Processing batch"; "batch_id" => batch.id); - match process_batch(chain.clone(), batch, log) { + match process_batch(chain.clone(), batch, &mut successes, log) { Ok(_) => { trace!(log, "Blocks Processed"; "current_slot" => batch.end_slot); // batch was successfully processed self.last_processed_id = self.to_be_processed_id; self.to_be_processed_id += 1; + + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "batch import success" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import success" + ), + } + } } Err(e) => { warn!(log, "Block processing error"; "error"=> format!("{:?}", e)); + + if successes > 0 { + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "block_imports" => successes, + "location" => "batch import error" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import error" + ), + } + } + } + // batch processing failed // this could be because this batch is invalid, or a previous invalidated batch // is invalid. We need to find out which and downvote the peer that has sent us @@ -496,6 +533,7 @@ impl SyncingChain { fn process_batch( chain: Weak>, batch: &Batch, + successes: &mut usize, log: &Logger, ) -> Result<(), String> { for block in &batch.downloaded_blocks { @@ -511,6 +549,8 @@ fn process_batch( "slot" => block.slot, "block_root" => format!("{}", block_root), ); + + *successes += 1 } BlockProcessingOutcome::ParentUnknown { parent } => { // blocks should be sequential and all parents should exist diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index b3da4593612..5b3c703ccca 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -251,7 +251,39 @@ pub fn publish_beacon_block( "block_slot" => slot, ); - publish_beacon_block_to_network::(network_chan, block) + publish_beacon_block_to_network::(network_chan, block)?; + + // Run the fork choice algorithm and enshrine a new canonical head, if + // found. + // + // The new head may or may not be the block we just received. + if let Err(e) = beacon_chain.fork_choice() { + error!( + log, + "Failed to find beacon chain head"; + "error" => format!("{:?}", e) + ); + } else { + // In the best case, validators should produce blocks that become the + // head. + // + // Potential reasons this may not be the case: + // + // - A quick re-org between block produce and publish. + // - Excessive time between block produce and publish. + // - A validator is using another beacon node to produce blocks and + // submitting them here. + if beacon_chain.head().beacon_block_root != block_root { + warn!( + log, + "Block from validator is not head"; + "desc" => "potential re-org", + ); + + } + } + + Ok(()) } Ok(outcome) => { warn!( @@ -278,8 +310,8 @@ pub fn publish_beacon_block( ))) } } - }) - .and_then(|_| response_builder?.body_no_ssz(&())), + }) + .and_then(|_| response_builder?.body_no_ssz(&())) ) }