Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: edge cases causing bans during header/block sync #3661

Merged
merged 5 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tari_core::{
state_machine_service::{initializer::BaseNodeStateMachineInitializer, states::HorizonSyncConfig},
BaseNodeStateMachineConfig,
BlockSyncConfig,
LocalNodeCommsInterface,
StateMachineHandle,
},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, BlockchainDatabase},
Expand Down Expand Up @@ -211,6 +212,7 @@ where B: BlockchainBackend + 'static
config: &GlobalConfig,
) -> UnspawnedCommsNode {
let dht = handles.expect_handle::<Dht>();
let base_node_service = handles.expect_handle::<LocalNodeCommsInterface>();
let builder = RpcServer::builder();
let builder = match config.rpc_max_simultaneous_sessions {
Some(limit) => builder.with_maximum_simultaneous_sessions(limit),
Expand All @@ -228,7 +230,10 @@ where B: BlockchainBackend + 'static
// Add your RPC services here ‍🏴‍☠️️☮️🌊
let rpc_server = rpc_server
.add_service(dht.rpc_service())
.add_service(base_node::create_base_node_sync_rpc_service(db.clone()))
.add_service(base_node::create_base_node_sync_rpc_service(
db.clone(),
base_node_service,
))
.add_service(mempool::create_mempool_rpc_service(
handles.expect_handle::<MempoolHandle>(),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,15 @@ where T: BlockchainBackend + 'static
) -> Result<(), CommsInterfaceError> {
let NewBlock { block_hash } = new_block;

if self.blockchain_db.inner().is_add_block_disabled() {
info!(
target: LOG_TARGET,
"Ignoring block message ({}) because add_block is locked",
block_hash.to_hex()
);
return Ok(());
}

// Only a single block request can complete at a time.
// As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for
// the same full block. The first request that succeeds will stop the node from requesting the block from any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ use crate::{
comms_interface::LocalNodeCommsInterface,
state_machine_service::{
states,
states::{BaseNodeState, HorizonSyncConfig, StateEvent, StateInfo, StatusInfo, SyncPeerConfig, SyncStatus},
states::{
BaseNodeState,
HeaderSyncState,
HorizonSyncConfig,
StateEvent,
StateInfo,
StatusInfo,
SyncPeerConfig,
SyncStatus,
},
},
sync::{BlockSyncConfig, SyncValidators},
},
Expand Down Expand Up @@ -137,22 +146,51 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
/// Describe the Finite State Machine for the base node. This function describes _every possible_ state
/// transition for the node given its current state and an event that gets triggered.
pub fn transition(&self, state: BaseNodeState, event: StateEvent) -> BaseNodeState {
let db = self.db.inner();
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(Listening(_), FallenBehind(Lagging(_, sync_peers))) => HeaderSync(sync_peers.into()),
(HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()),
(HeaderSync(s), Continue | NetworkSilence) => Listening(s.into()),
(
Listening(_),
FallenBehind(Lagging {
local: local_metadata,
sync_peers,
..
}),
) => {
db.set_disable_add_block_flag();
HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
},
(HeaderSync(s), HeaderSyncFailed) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peer)) => HorizonStateSync(peer.into()),
(DecideNextSync(s), Continue) => Listening(s.into()),
(DecideNextSync(s), Continue) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},

(DecideNextSync(_), ProceedToBlockSync(peer)) => BlockSync(peer.into()),
(BlockSync(s), BlocksSynchronized) => Listening(s.into()),
(BlockSync(s), BlockSyncFailed) => Waiting(s.into()),
(BlockSync(s), BlocksSynchronized) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(BlockSync(s), BlockSyncFailed) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},

(Waiting(s), Continue) => Listening(s.into()),
(_, FatalError(s)) => Shutdown(states::Shutdown::with_reason(s)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::base_node::{
state_machine_service::states::{
BlockSync,
DecideNextSync,
HeaderSync,
HeaderSyncState,
HorizonStateSync,
Listening,
ListeningInfo,
Expand All @@ -44,7 +44,7 @@ use crate::base_node::{
#[derive(Debug)]
pub enum BaseNodeState {
Starting(Starting),
HeaderSync(HeaderSync),
HeaderSync(HeaderSyncState),
DecideNextSync(DecideNextSync),
HorizonStateSync(HorizonStateSync),
BlockSync(BlockSync),
Expand Down Expand Up @@ -86,7 +86,11 @@ impl<E: std::error::Error> From<E> for StateEvent {
#[derive(Debug, Clone, PartialEq)]
pub enum SyncStatus {
// We are behind the chain tip.
Lagging(ChainMetadata, Vec<SyncPeer>),
Lagging {
local: ChainMetadata,
network: ChainMetadata,
sync_peers: Vec<SyncPeer>,
},
UpToDate,
}

Expand All @@ -104,12 +108,14 @@ impl Display for SyncStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
use SyncStatus::*;
match self {
Lagging(m, v) => write!(
Lagging {
network, sync_peers, ..
} => write!(
f,
"Lagging behind {} peers (#{}, Difficulty: {})",
v.len(),
m.height_of_longest_chain(),
m.accumulated_difficulty(),
sync_peers.len(),
network.height_of_longest_chain(),
network.accumulated_difficulty(),
),
UpToDate => f.write_str("UpToDate"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
use std::{cmp::Ordering, time::Instant};

use log::*;
use tari_common_types::chain_metadata::ChainMetadata;

use crate::{
base_node::{
comms_interface::BlockEvent,
state_machine_service::states::{BlockSyncInfo, Listening, StateEvent, StateInfo, StatusInfo},
state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
},
Expand All @@ -36,14 +37,15 @@ use crate::{

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug, Default)]
pub struct HeaderSync {
#[derive(Clone, Debug)]
pub struct HeaderSyncState {
sync_peers: Vec<SyncPeer>,
is_synced: bool,
local_metadata: ChainMetadata,
}

impl HeaderSync {
pub fn new(mut sync_peers: Vec<SyncPeer>) -> Self {
impl HeaderSyncState {
pub fn new(mut sync_peers: Vec<SyncPeer>, local_metadata: ChainMetadata) -> Self {
// Sort by latency lowest to highest
sync_peers.sort_by(|a, b| match (a.latency(), b.latency()) {
(None, None) => Ordering::Equal,
Expand All @@ -55,6 +57,7 @@ impl HeaderSync {
Self {
sync_peers,
is_synced: false,
local_metadata,
}
}

Expand All @@ -77,6 +80,7 @@ impl HeaderSync {
shared.connectivity.clone(),
&self.sync_peers,
shared.randomx_factory.clone(),
&self.local_metadata,
);

let status_event_sender = shared.status_event_sender.clone();
Expand Down Expand Up @@ -141,14 +145,3 @@ impl HeaderSync {
}
}
}

impl From<Listening> for HeaderSync {
fn from(_: Listening) -> Self {
Default::default()
}
}
impl From<Vec<SyncPeer>> for HeaderSync {
fn from(peers: Vec<SyncPeer>) -> Self {
Self::new(peers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
states::{
BlockSync,
DecideNextSync,
HeaderSync,
HeaderSyncState,
StateEvent,
StateEvent::FatalError,
StateInfo,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Listening {
if self.is_synced &&
best_metadata.height_of_longest_chain() == local.height_of_longest_chain() + 1 &&
time_since_better_block
.map(|ts: Instant| ts.elapsed() < Duration::from_secs(30))
.map(|ts: Instant| ts.elapsed() < Duration::from_secs(60))
.unwrap_or(true)
{
if time_since_better_block.is_none() {
Expand All @@ -217,7 +217,7 @@ impl Listening {
peer_metadata_list
};

let local = match shared.db.get_chain_metadata().await {
let local_metadata = match shared.db.get_chain_metadata().await {
Ok(m) => m,
Err(e) => {
return FatalError(format!("Could not get local blockchain metadata. {}", e));
Expand All @@ -227,7 +227,7 @@ impl Listening {

let sync_mode = determine_sync_mode(
shared.config.blocks_behind_before_considered_lagging,
&local,
&local_metadata,
best_metadata,
sync_peers,
);
Expand Down Expand Up @@ -266,8 +266,8 @@ impl From<Waiting> for Listening {
}
}

impl From<HeaderSync> for Listening {
fn from(sync: HeaderSync) -> Self {
impl From<HeaderSyncState> for Listening {
fn from(sync: HeaderSyncState) -> Self {
Self {
is_synced: sync.is_synced(),
}
Expand Down Expand Up @@ -356,12 +356,15 @@ fn determine_sync_mode(
return UpToDate;
};

let sync_peers = sync_peers.into_iter().cloned().collect();
debug!(
target: LOG_TARGET,
"Lagging (local height = {}, network height = {})", local_tip_height, network_tip_height
);
Lagging(network.clone(), sync_peers)
Lagging {
local: local.clone(),
network: network.clone(),
sync_peers: sync_peers.into_iter().cloned().collect(),
}
} else {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -497,28 +500,28 @@ mod test {

let network = ChainMetadata::new(0, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(100, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(0, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(100, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}

let local = ChainMetadata::new(99, Vec::new(), 50, 50, 500_000);
let network = ChainMetadata::new(150, Vec::new(), 0, 0, 500_001);
match determine_sync_mode(0, &local, &network, vec![]) {
SyncStatus::Lagging(n, _) => assert_eq!(n, network),
SyncStatus::Lagging { network: n, .. } => assert_eq!(n, network),
_ => panic!(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) mod helpers;
pub use helpers::SyncPeerConfig;

mod header_sync;
pub use header_sync::HeaderSync;
pub use header_sync::HeaderSyncState;

mod sync_decide;
pub use sync_decide::DecideNextSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use log::*;
use crate::{
base_node::{
state_machine_service::{
states::{HeaderSync, StateEvent},
states::{HeaderSyncState, StateEvent},
BaseNodeStateMachine,
},
sync::SyncPeer,
Expand Down Expand Up @@ -118,8 +118,8 @@ fn find_best_latency<'a, I: IntoIterator<Item = &'a SyncPeer>>(iter: I) -> Optio
.cloned()
}

impl From<HeaderSync> for DecideNextSync {
fn from(sync: HeaderSync) -> Self {
impl From<HeaderSyncState> for DecideNextSync {
fn from(sync: HeaderSyncState) -> Self {
Self {
sync_peers: sync.into_sync_peers(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::time::Duration;
use log::info;
use tokio::time::sleep;

use crate::base_node::state_machine_service::states::{BlockSync, HeaderSync, HorizonStateSync, StateEvent};
use crate::base_node::state_machine_service::states::{BlockSync, HeaderSyncState, HorizonStateSync, StateEvent};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::waiting";

Expand Down Expand Up @@ -68,8 +68,8 @@ impl From<BlockSync> for Waiting {
}
}

impl From<HeaderSync> for Waiting {
fn from(_: HeaderSync) -> Self {
impl From<HeaderSyncState> for Waiting {
fn from(_: HeaderSyncState) -> Self {
Default::default()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.fetch_chain_header_by_block_hash(block.hash.clone())
.await?
.ok_or_else(|| {
BlockSyncError::ProtocolViolation("Peer sent hash for block header we do not have".into())
BlockSyncError::ProtocolViolation(format!(
"Peer sent hash ({}) for block header we do not have",
block.hash.to_hex()
))
})?;

let current_height = header.height();
Expand Down
Loading