From 7df14978975a90ed91acd16c0b8d6e7fadd6f612 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 11:25:25 +1100 Subject: [PATCH 1/7] Try merge in change to reduce fork choice calls --- beacon_node/network/src/sync/manager.rs | 46 ++++++-- .../network/src/sync/message_processor.rs | 23 +++- .../network/src/sync/range_sync/chain.rs | 46 +++++++- beacon_node/rest_api/src/validator.rs | 108 +++++++++++++----- 4 files changed, 183 insertions(+), 40 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 421ab2ed595..a793204654b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -68,7 +68,7 @@ use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use futures::prelude::*; -use slog::{crit, debug, info, trace, warn, Logger}; +use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::HashSet; use std::ops::Sub; @@ -367,6 +367,20 @@ impl SyncManager { match outcome { BlockProcessingOutcome::Processed { block_root } => { info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success after gossip block"; + "location" => "single block" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "single block" + ), + } } BlockProcessingOutcome::ParentUnknown { .. } => { // We don't know of the blocks parent, begin a parent lookup search @@ -531,6 +545,8 @@ impl SyncManager { self.request_parent(parent_request); self.network.downvote_peer(peer); } else { + let mut successes = 0; + // try and process the list of blocks up to the requested block while let Some(block) = parent_request.downloaded_blocks.pop() { // check if the chain exists @@ -542,8 +558,8 @@ impl SyncManager { self.request_parent(parent_request); break; } - Ok(BlockProcessingOutcome::Processed { .. }) - | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} + Ok(BlockProcessingOutcome::Processed { .. }) => successes += 1, + Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again parent_request.failed_attempts += 1; @@ -555,7 +571,7 @@ impl SyncManager { self.network .downvote_peer(parent_request.last_submitted_peer.clone()); self.request_parent(parent_request); - return; + break; } Err(e) => { parent_request.failed_attempts += 1; @@ -566,14 +582,30 @@ impl SyncManager { self.network .downvote_peer(parent_request.last_submitted_peer.clone()); self.request_parent(parent_request); - return; + break; } } } else { - // chain doesn't exist - return early - return; + break; } } + + if let Some(chain) = self.chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "block_imports" => successes, + "location" => "parent request" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "parent request" + ), + }; + } } } diff --git a/beacon_node/network/src/sync/message_processor.rs b/beacon_node/network/src/sync/message_processor.rs index bcb251e2207..e2f8c29d2f5 100644 --- a/beacon_node/network/src/sync/message_processor.rs +++ b/beacon_node/network/src/sync/message_processor.rs @@ -6,7 +6,7 @@ use beacon_chain::{ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, o, trace, warn}; +use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; @@ -438,6 +438,27 @@ impl MessageProcessor { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; "peer_id" => format!("{:?}",peer_id)); + + // TODO: It would be better if we can run this _after_ we publish the block to + // reduce block propagation latency. + // + // The `MessageHandler` would be the place to put this, however it doesn't seem + // to have a reference to the `BeaconChain`. I will leave this for future + // works. + match self.chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "location" => "block gossip" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "block gossip" + ), + } + SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::ParentUnknown { .. } => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index f117090d983..56b1926a6f3 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -5,7 +5,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; -use slog::{crit, debug, trace, warn, Logger}; +use slog::{crit, debug, error, trace, warn, Logger}; use std::cmp::Ordering; use std::collections::HashSet; use std::ops::Sub; @@ -243,16 +243,53 @@ impl SyncingChain { self.to_be_processed_id += 1; continue; } else { - warn!(log, "Processing batch"; "batch_id" => batch.id); - match process_batch(chain.clone(), batch, log) { + let mut successes = 0; + debug!(log, "Processing batch"; "batch_id" => batch.id); + match process_batch(chain.clone(), batch, &mut successes, log) { Ok(_) => { trace!(log, "Blocks Processed"; "current_slot" => batch.end_slot); // batch was successfully processed self.last_processed_id = self.to_be_processed_id; self.to_be_processed_id += 1; + + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "batch import success" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import success" + ), + } + } } Err(e) => { warn!(log, "Block processing error"; "error"=> format!("{:?}", e)); + + if successes > 0 { + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "block_imports" => successes, + "location" => "batch import error" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import error" + ), + } + } + } + // batch processing failed // this could be because this batch is invalid, or a previous invalidated batch // is invalid. We need to find out which and downvote the peer that has sent us @@ -489,6 +526,7 @@ impl SyncingChain { fn process_batch( chain: Weak>, batch: &Batch, + successes: &mut usize, log: &Logger, ) -> Result<(), String> { for block in &batch.downloaded_blocks { @@ -504,6 +542,8 @@ fn process_batch( "slot" => block.slot, "block_root" => format!("{}", block_root), ); + + *successes += 1 } BlockProcessingOutcome::ParentUnknown { parent } => { // blocks should be sequential and all parents should exist diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 9eb85262366..28472ff7162 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -12,7 +12,7 @@ use futures::future::Future; use futures::stream::Stream; use hyper::{Body, Request}; use serde::{Deserialize, Serialize}; -use slog::{info, warn, Logger}; +use slog::{error, info, warn, Logger}; use ssz_derive::{Decode, Encode}; use std::sync::Arc; use types::beacon_state::EthSpec; @@ -219,35 +219,85 @@ pub fn publish_beacon_block( let response_builder = ResponseBuilder::new(&req); let body = req.into_body(); - Box::new(body - .concat2() - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}",e))) - .and_then(|chunks| { - serde_json::from_slice(&chunks).map_err(|e| ApiError::BadRequest(format!("Unable to parse JSON into BeaconBlock: {:?}",e))) - }) - .and_then(move |block: BeaconBlock| { - let slot = block.slot; - match beacon_chain.process_block(block.clone()) { - Ok(BlockProcessingOutcome::Processed { block_root }) => { - // Block was processed, publish via gossipsub - info!(log, "Processed valid block from API, transmitting to network."; "block_slot" => slot, "block_root" => format!("{}", block_root)); - publish_beacon_block_to_network::(network_chan, block) - } - Ok(outcome) => { - warn!(log, "BeaconBlock could not be processed, but is being sent to the network anyway."; "outcome" => format!("{:?}", outcome)); - publish_beacon_block_to_network::(network_chan, block)?; - Err(ApiError::ProcessingError(format!( - "The BeaconBlock could not be processed, but has still been published: {:?}", - outcome - ))) - } - Err(e) => { - Err(ApiError::ServerError(format!( - "Error while processing block: {:?}", - e - ))) + Box::new( + body.concat2() + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e))) + .and_then(|chunks| { + serde_json::from_slice(&chunks).map_err(|e| { + ApiError::BadRequest(format!("Unable to parse JSON into BeaconBlock: {:?}", e)) + }) + }) + .and_then(move |block: BeaconBlock| { + let slot = block.slot; + match beacon_chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::Processed { block_root }) => { + // Block was processed, publish via gossipsub + info!( + log, + "Block from local validator"; + "block_root" => format!("{}", block_root), + "block_slot" => slot, + ); + + publish_beacon_block_to_network::(network_chan, block)?; + + // Run the fork choice algorithm and enshrine a new canonical head, if + // found. + // + // The new head may or may not be the block we just received. + if let Err(e) = beacon_chain.fork_choice() { + error!( + log, + "Failed to find beacon chain head"; + "error" => format!("{:?}", e) + ); + } else { + // In the best case, validators should produce blocks that become the + // head. + // + // Potential reasons this may not be the case: + // + // - A quick re-org between block produce and publish. + // - Excessive time between block produce and publish. + // - A validator is using another beacon node to produce blocks and + // submitting them here. + if beacon_chain.head().beacon_block_root != block_root { + warn!( + log, + "Block from validator is not head"; + "desc" => "potential re-org", + ); + + } + } + + Ok(()) + } + Ok(outcome) => { + warn!( + log, + "Invalid block from local validator"; + "outcome" => format!("{:?}", outcome) + ); + + Err(ApiError::ProcessingError(format!( + "The BeaconBlock could not be processed and has not been published: {:?}", + outcome + ))) + } + Err(e) => { + error!( + log, + "Error whilst processing block"; + "error" => format!("{:?}", e) + ); + + Err(ApiError::ServerError(format!( + "Error while processing block: {:?}", + e + ))) + } } - } }).and_then(|_| { response_builder?.body_no_ssz(&()) })) From 87fdac9a1f05719e639d17a4d9fa663c9eb40bb8 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 11:41:01 +1100 Subject: [PATCH 2/7] Remove fork choice from process block --- beacon_node/beacon_chain/src/beacon_chain.rs | 17 ----------------- beacon_node/beacon_chain/src/metrics.rs | 4 ---- beacon_node/rest_api/src/validator.rs | 6 +++--- 3 files changed, 3 insertions(+), 24 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4f7d180ea27..e721a750251 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1293,23 +1293,6 @@ impl BeaconChain { metrics::stop_timer(fork_choice_register_timer); - let find_head_timer = - metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD); - - // Execute the fork choice algorithm, enthroning a new head if discovered. - // - // Note: in the future we may choose to run fork-choice less often, potentially based upon - // some heuristic around number of attestations seen for the block. - if let Err(e) = self.fork_choice() { - error!( - self.log, - "fork choice failed to find head"; - "error" => format!("{:?}", e) - ) - }; - - metrics::stop_timer(find_head_timer); - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); metrics::observe( &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index bd1742b58b1..7d7fa5ede66 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -48,10 +48,6 @@ lazy_static! { "beacon_block_processing_fork_choice_register_seconds", "Time spent registering the new block with fork choice (but not finding head)" ); - pub static ref BLOCK_PROCESSING_FORK_CHOICE_FIND_HEAD: Result = try_create_histogram( - "beacon_block_processing_fork_choice_find_head_seconds", - "Time spent finding the new head after processing a new block" - ); /* * Block Production diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index a691def3885..5b3c703ccca 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -310,9 +310,9 @@ pub fn publish_beacon_block( ))) } } - }).and_then(|_| { - response_builder?.body_no_ssz(&()) - })) + }) + .and_then(|_| response_builder?.body_no_ssz(&())) + ) } /// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator. From 0d0b2afef7d92df7e1b871c4a6a3523ee01c41f4 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 11:43:37 +1100 Subject: [PATCH 3/7] Minor log fix --- beacon_node/network/src/sync/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a793204654b..c44e93514fe 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -371,7 +371,7 @@ impl SyncManager { match chain.fork_choice() { Ok(()) => trace!( self.log, - "Fork choice success after gossip block"; + "Fork choice success"; "location" => "single block" ), Err(e) => error!( From dfc0c055bb39515bdcd53f552a9ad4e9c0e06182 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 11:57:24 +1100 Subject: [PATCH 4/7] Check successes > 0 --- beacon_node/network/src/sync/manager.rs | 32 +++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c44e93514fe..4889a7270f3 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -590,21 +590,23 @@ impl SyncManager { } } - if let Some(chain) = self.chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "block_imports" => successes, - "location" => "parent request" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "parent request" - ), - }; + if successes > 0 { + if let Some(chain) = self.chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + self.log, + "Fork choice success"; + "block_imports" => successes, + "location" => "parent request" + ), + Err(e) => error!( + self.log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "parent request" + ), + }; + } } } } From 03febae853b9ebe7b273352561a36011985cf58d Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 14:33:50 +1100 Subject: [PATCH 5/7] Fix failing beacon chain tests --- beacon_node/beacon_chain/src/test_utils.rs | 2 ++ beacon_node/beacon_chain/tests/tests.rs | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 0ec92aa2807..737d7f9c4b4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -258,6 +258,8 @@ where .process_block(block) .expect("should not error during block processing"); + self.chain.fork_choice().expect("should find head"); + if let BlockProcessingOutcome::Processed { block_root } = outcome { head_block_root = Some(block_root); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index a06c652e3dd..a278d3f5b4b 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -494,6 +494,11 @@ fn run_skip_slot_test(skip_slots: u64) { }) ); + harness_b + .chain + .fork_choice() + .expect("should run fork choice"); + assert_eq!( harness_b.chain.head().beacon_block.slot, Slot::new(skip_slots + 1) From 11cf9a6ff8f8907dd20d63c54a93a07108261646 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 17:59:09 +1100 Subject: [PATCH 6/7] Fix re-org warnings --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e721a750251..b4ddea8c303 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1439,7 +1439,11 @@ impl BeaconChain { let previous_slot = self.head().beacon_block.slot; let new_slot = beacon_block.slot; - let is_reorg = self.head().beacon_block_root != beacon_block.parent_root; + let is_reorg = self.head().beacon_block_root + != beacon_state + .get_state_root(self.head().beacon_block.slot) + .map(|root| *root) + .unwrap_or_else(|_| Hash256::random()); // If we switched to a new chain (instead of building atop the present chain). if is_reorg { From 97a9ec54bf0a08154e73b63aea02a8b6383d2d91 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 7 Dec 2019 18:12:38 +1100 Subject: [PATCH 7/7] Fix mistake in prev commit --- beacon_node/beacon_chain/src/beacon_chain.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b4ddea8c303..53630820c2c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1439,9 +1439,12 @@ impl BeaconChain { let previous_slot = self.head().beacon_block.slot; let new_slot = beacon_block.slot; + // Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks + // between calls to fork choice without swapping between chains. This seems like an + // extreme-enough scenario that a warning is fine. let is_reorg = self.head().beacon_block_root != beacon_state - .get_state_root(self.head().beacon_block.slot) + .get_block_root(self.head().beacon_block.slot) .map(|root| *root) .unwrap_or_else(|_| Hash256::random());