Skip to content

Commit

Permalink
feat: bad block list for invalid blocks after sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Dec 1, 2021
1 parent 6e46957 commit b215d5c
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 34 deletions.
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/sync/block_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ pub enum BlockSyncError {
RpcRequestError(#[from] RpcStatus),
#[error("Chain storage error: {0}")]
ChainStorageError(#[from] ChainStorageError),
#[error("Peer sent invalid block body: {0}")]
ReceivedInvalidBlockBody(String),
#[error("Peer sent a block that did not form a chain. Expected hash = {expected}, got = {got}")]
PeerSentBlockThatDidNotFormAChain { expected: String, got: String },
#[error("Connectivity Error: {0}")]
Expand All @@ -48,4 +46,6 @@ pub enum BlockSyncError {
FailedToBan(ConnectivityError),
#[error("Failed to construct valid chain block")]
FailedToConstructChainBlock,
#[error("Peer violated the block sync protocol: {0}")]
ProtocolViolation(String),
}
58 changes: 50 additions & 8 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use crate::{
sync::{hooks::Hooks, rpc, SyncPeer},
BlockSyncConfig,
},
blocks::{Block, ChainBlock},
blocks::{Block, BlockValidationError, ChainBlock},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto::base_node::SyncBlocksRequest,
tari_utilities::{hex::Hex, Hashable},
transactions::aggregated_body::AggregateBody,
validation::BlockSyncBodyValidation,
validation::{BlockSyncBodyValidation, ValidationError},
};
use futures::StreamExt;
use log::*;
Expand Down Expand Up @@ -96,7 +96,29 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
self.db.cleanup_orphans().await?;
Ok(())
},
Err(err @ BlockSyncError::ValidationError(_)) | Err(err @ BlockSyncError::ReceivedInvalidBlockBody(_)) => {
Err(BlockSyncError::ValidationError(err)) => {
match &err {
ValidationError::BlockHeaderError(_) => {},
ValidationError::BlockError(BlockValidationError::MismatchedMmrRoots) |
ValidationError::BadBlockFound { .. } |
ValidationError::BlockError(BlockValidationError::MismatchedMmrSize { .. }) => {
let num_cleared = self.db.clear_all_pending_headers().await?;
warn!(
target: LOG_TARGET,
"Cleared {} incomplete headers from bad chain", num_cleared
);
},
_ => {},
}
warn!(
target: LOG_TARGET,
"Banning peer because provided block failed validation: {}", err
);
self.ban_peer(node_id, &err).await?;
Err(err.into())
},
Err(err @ BlockSyncError::ProtocolViolation(_)) => {
warn!(target: LOG_TARGET, "Banning peer: {}", err);
self.ban_peer(node_id, &err).await?;
Err(err)
},
Expand Down Expand Up @@ -166,9 +188,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.fetch_chain_header_by_block_hash(block.hash.clone())
.await?
.ok_or_else(|| {
BlockSyncError::ReceivedInvalidBlockBody("Peer sent hash for block header we do not have".into())
BlockSyncError::ProtocolViolation("Peer sent hash for block header we do not have".into())
})?;

let current_height = header.height();
let header_hash = header.hash().clone();

if header.header().prev_hash != prev_hash {
Expand All @@ -183,21 +206,40 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
let body = block
.body
.map(AggregateBody::try_from)
.ok_or_else(|| BlockSyncError::ReceivedInvalidBlockBody("Block body was empty".to_string()))?
.map_err(BlockSyncError::ReceivedInvalidBlockBody)?;
.ok_or_else(|| BlockSyncError::ProtocolViolation("Block body was empty".to_string()))?
.map_err(BlockSyncError::ProtocolViolation)?;

debug!(
target: LOG_TARGET,
"Validating block body #{} (PoW = {}, {})",
header.height(),
current_height,
header.header().pow_algo(),
body.to_counts_string(),
);

let timer = Instant::now();
let (header, header_accum_data) = header.into_parts();

let block = self.block_validator.validate_body(Block::new(header, body)).await?;
let block = match self.block_validator.validate_body(Block::new(header, body)).await {
Ok(block) => block,
Err(err @ ValidationError::BadBlockFound { .. }) |
Err(err @ ValidationError::FatalStorageError(_)) |
Err(err @ ValidationError::AsyncTaskFailed(_)) |
Err(err @ ValidationError::CustomError(_)) => return Err(err.into()),
Err(err) => {
// Add to bad blocks
if let Err(err) = self
.db
.write_transaction()
.insert_bad_block(header_hash, current_height)
.commit()
.await
{
error!(target: LOG_TARGET, "Failed to insert bad block: {}", err);
}
return Err(err.into());
},
};

let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
.map(Arc::new)
Expand Down
11 changes: 9 additions & 2 deletions base_layer/core/src/base_node/sync/header_sync/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
tari_utilities::{epoch_time::EpochTime, hash::Hashable, hex::Hex},
validation::helpers::{
check_header_timestamp_greater_than_median,
check_not_bad_block,
check_pow_data,
check_target_difficulty,
check_timestamp_ftl,
Expand Down Expand Up @@ -138,7 +139,13 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
);
let achieved_target = check_target_difficulty(&header, target_difficulty, &self.randomx_factory)?;

check_pow_data(&header, &self.consensus_rules, &*self.db.inner().db_read_access()?)?;
let block_hash = header.hash();

{
let txn = self.db.inner().db_read_access()?;
check_not_bad_block(&*txn, &block_hash)?;
check_pow_data(&header, &self.consensus_rules, &*txn)?;
}

// Header is valid, add this header onto the validation state for the next round
// Mutable borrow done later in the function to allow multiple immutable borrows before this line. This has
Expand All @@ -159,7 +166,7 @@ impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
state.target_difficulties.add_back(&header, target_difficulty);

let accumulated_data = BlockHeaderAccumulatedData::builder(&state.previous_accum)
.with_hash(header.hash())
.with_hash(block_hash)
.with_achieved_target_difficulty(achieved_target)
.with_total_kernel_offset(header.total_kernel_offset.clone())
.build()?;
Expand Down
9 changes: 9 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_last_header() -> BlockHeader, "fetch_last_header");

make_async_fn!(clear_all_pending_headers() -> usize, "clear_all_pending_headers");

make_async_fn!(fetch_last_chain_header() -> ChainHeader, "fetch_last_chain_header");

make_async_fn!(fetch_tip_header() -> ChainHeader, "fetch_tip_header");
Expand All @@ -222,6 +224,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(block_exists(block_hash: BlockHash) -> bool, "block_exists");

make_async_fn!(bad_block_exists(block_hash: BlockHash) -> bool, "bad_block_exists");

make_async_fn!(fetch_block(height: u64) -> HistoricalBlock, "fetch_block");

make_async_fn!(fetch_blocks<T: RangeBounds<u64>>(bounds: T) -> Vec<HistoricalBlock>, "fetch_blocks");
Expand Down Expand Up @@ -376,6 +380,11 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> {
self
}

pub fn insert_bad_block(&mut self, hash: HashOutput, height: u64) -> &mut Self {
self.transaction.insert_bad_block(hash, height);
self
}

pub async fn commit(&mut self) -> Result<(), ChainStorageError> {
let transaction = mem::take(&mut self.transaction);
self.db.write(transaction).await
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub trait BlockchainBackend: Send + Sync {
fn orphan_count(&self) -> Result<usize, ChainStorageError>;
/// Returns the stored header with the highest corresponding height.
fn fetch_last_header(&self) -> Result<BlockHeader, ChainStorageError>;

/// Clear all headers that are beyond the current height of longest chain, returning the number of headers that were
/// deleted.
fn clear_all_pending_headers(&self) -> Result<usize, ChainStorageError>;
/// Returns the stored header and accumulated data with the highest height.
fn fetch_last_chain_header(&self) -> Result<ChainHeader, ChainStorageError>;
/// Returns the stored header with the highest corresponding height.
Expand Down Expand Up @@ -181,4 +185,7 @@ pub trait BlockchainBackend: Send + Sync {
&self,
mmr_positions: Vec<u32>,
) -> Result<Vec<Option<(u64, HashOutput)>>, ChainStorageError>;

/// Check if a block hash is in the bad block list
fn bad_block_exists(&self, block_hash: HashOutput) -> Result<bool, ChainStorageError>;
}
23 changes: 18 additions & 5 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ use crate::{
HeaderValidation,
OrphanValidation,
PostOrphanBodyValidation,
ValidationError,
},
};

Expand Down Expand Up @@ -853,6 +852,11 @@ where B: BlockchainBackend
Ok(())
}

pub fn clear_all_pending_headers(&self) -> Result<usize, ChainStorageError> {
let db = self.db_write_access()?;
db.clear_all_pending_headers()
}

/// Clean out the entire orphan pool
pub fn cleanup_all_orphans(&self) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
Expand Down Expand Up @@ -949,6 +953,12 @@ where B: BlockchainBackend
Ok(db.contains(&DbKey::BlockHash(hash.clone()))? || db.contains(&DbKey::OrphanBlock(hash))?)
}

/// Returns true if this block exists in the chain, or is orphaned.
pub fn bad_block_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
db.bad_block_exists(hash)
}

/// Atomically commit the provided transaction to the database backend. This function does not update the metadata.
pub fn commit(&self, txn: DbTransaction) -> Result<(), ChainStorageError> {
let mut db = self.db_write_access()?;
Expand Down Expand Up @@ -1263,10 +1273,13 @@ fn insert_best_block(txn: &mut DbTransaction, block: Arc<ChainBlock>) -> Result<
block_hash.to_hex()
);
if block.header().pow_algo() == PowAlgorithm::Monero {
let monero_seed = MoneroPowData::from_header(block.header())
.map_err(|e| ValidationError::CustomError(e.to_string()))?
.randomx_key;
txn.insert_monero_seed_height(monero_seed.to_vec(), block.height());
let monero_header =
MoneroPowData::from_header(block.header()).map_err(|e| ChainStorageError::InvalidArguments {
func: "insert_best_block",
arg: "block",
message: format!("block contained invalid or malformed monero PoW data: {}", e),
})?;
txn.insert_monero_seed_height(monero_header.randomx_key.to_vec(), block.height());
}

let height = block.height();
Expand Down
14 changes: 14 additions & 0 deletions base_layer/core/src/chain_storage/db_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ impl DbTransaction {
self
}

/// Inserts a block hash into the bad block list
pub fn insert_bad_block(&mut self, block_hash: HashOutput, height: u64) -> &mut Self {
self.operations.push(WriteOperation::InsertBadBlock {
hash: block_hash,
height,
});
self
}

/// Stores an orphan block. No checks are made as to whether this is actually an orphan. That responsibility lies
/// with the calling function.
/// The transaction will rollback and write will return an error if the orphan already exists.
Expand Down Expand Up @@ -298,6 +307,10 @@ pub enum WriteOperation {
witness_hash: HashOutput,
mmr_position: u32,
},
InsertBadBlock {
hash: HashOutput,
height: u64,
},
DeleteHeader(u64),
DeleteOrphan(HashOutput),
DeleteBlock(HashOutput),
Expand Down Expand Up @@ -440,6 +453,7 @@ impl fmt::Display for WriteOperation {
SetPrunedHeight { height, .. } => write!(f, "Set pruned height to {}", height),
DeleteHeader(height) => write!(f, "Delete header at height: {}", height),
DeleteOrphan(hash) => write!(f, "Delete orphan with hash: {}", hash.to_hex()),
InsertBadBlock { hash, height } => write!(f, "Insert bad block #{} {}", height, hash.to_hex()),
}
}
}
Expand Down
64 changes: 62 additions & 2 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use lmdb_zero::{
del,
error::{self, LmdbResultExt},
put,
traits::{AsLmdbBytes, FromLmdbBytes},
traits::{AsLmdbBytes, CreateCursor, FromLmdbBytes},
ConstTransaction,
Cursor,
CursorIter,
Expand Down Expand Up @@ -397,7 +397,7 @@ where
Ok(result)
}

/// Fetches all the size of all key/values in the given DB. Returns the number of entries, the total size of all the
/// Fetches the size of all key/values in the given DB. Returns the number of entries, the total size of all the
/// keys and values in bytes.
pub fn fetch_db_entry_sizes(txn: &ConstTransaction<'_>, db: &Database) -> Result<(u64, u64, u64), ChainStorageError> {
let access = txn.access();
Expand All @@ -412,3 +412,63 @@ pub fn fetch_db_entry_sizes(txn: &ConstTransaction<'_>, db: &Database) -> Result
}
Ok((num_entries, total_key_size, total_value_size))
}

pub fn lmdb_delete_each_where<K, V, F>(
txn: &WriteTransaction<'_>,
db: &Database,
mut predicate: F,
) -> Result<usize, ChainStorageError>
where
K: FromLmdbBytes + ?Sized,
V: DeserializeOwned,
F: FnMut(&K, V) -> Option<bool>,
{
let mut cursor = txn.cursor(db)?;
let mut access = txn.access();
let mut num_deleted = 0;
while let Some((k, v)) = cursor.next::<K, [u8]>(&access).to_opt()? {
match deserialize(v) {
Ok(v) => match predicate(k, v) {
Some(true) => {
cursor.del(&mut access, del::Flags::empty())?;
num_deleted += 1;
},
Some(false) => continue,
None => {
break;
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Could not could not deserialize value from lmdb: {:?}", e
);
return Err(ChainStorageError::AccessError(e.to_string()));
},
}
}
Ok(num_deleted)
}

/// Deletes all keys after the given one (exclusive)
pub fn lmdb_delete_all_after<K>(
txn: &WriteTransaction<'_>,
db: &Database,
key: &K,
) -> Result<usize, ChainStorageError>
where
K: AsLmdbBytes + ?Sized,
{
let mut cursor = txn.cursor(db)?;
let mut access = txn.access();
if cursor.seek_k::<_, [u8]>(&access, key).to_opt()?.is_none() {
return Ok(0);
}

let mut num_deleted = 0;
while cursor.next::<[u8], [u8]>(&access).to_opt()?.is_some() {
cursor.del(&mut access, del::NODUPDATA)?;
num_deleted += 1;
}
Ok(num_deleted)
}
Loading

0 comments on commit b215d5c

Please sign in to comment.