Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
fix/chain_head: Ensure correct events for finalized branch (#13632)
Browse files Browse the repository at this point in the history
* chain_head/follow: Ensure correct events for finalized branch

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Reenable tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Do some clean ups and add some more docs

* Fix gramatic

* Update client/rpc-spec-v2/src/chain_head/chain_head_follow.rs

Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* rpc/chain_head: Introduce error for absent headers

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Bastian Köcher <info@kchr.de>
Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
4 people committed Mar 22, 2023
1 parent c120cd3 commit d6899f8
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 78 deletions.
100 changes: 32 additions & 68 deletions client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
};
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, One},
Saturating,
};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{collections::HashSet, sync::Arc};

/// Generates the events of the `chainHead_follow` method.
Expand Down Expand Up @@ -116,7 +113,7 @@ struct StartupPoint<Block: BlockT> {
/// The head of the finalized chain.
pub finalized_hash: Block::Hash,
/// Last finalized block number.
pub finalized_number: <<Block as BlockT>::Header as HeaderT>::Number,
pub finalized_number: NumberFor<Block>,
}

impl<Block: BlockT> From<Info<Block>> for StartupPoint<Block> {
Expand Down Expand Up @@ -318,10 +315,7 @@ where
}

// Ensure we are only reporting blocks after the starting point.
let Some(block_number) = self.client.number(notification.hash)? else {
return Err(SubscriptionManagementError::BlockNumberAbsent)
};
if block_number < startup_point.finalized_number {
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}

Expand Down Expand Up @@ -349,59 +343,48 @@ where
return Ok(Default::default())
};

// Find the parent hash.
let Some(first_number) = self.client.number(*first_hash)? else {
return Err(SubscriptionManagementError::BlockNumberAbsent)
};
let Some(parent) = self.client.hash(first_number.saturating_sub(One::one()))? else {
return Err(SubscriptionManagementError::BlockHashAbsent)
// Find the parent header.
let Some(first_header) = self.client.header(*first_hash)? else {
return Err(SubscriptionManagementError::BlockHeaderAbsent)
};

let last_finalized = finalized_block_hashes
.last()
.expect("At least one finalized hash inserted; qed");
let parents = std::iter::once(&parent).chain(finalized_block_hashes.iter());
for (hash, parent) in finalized_block_hashes.iter().zip(parents) {
// This block is already reported by the import notification.
let parents =
std::iter::once(first_header.parent_hash()).chain(finalized_block_hashes.iter());
for (i, (hash, parent)) in finalized_block_hashes.iter().zip(parents).enumerate() {
// Check if the block was already reported and thus, is already pinned.
if !self.sub_handle.pin_block(*hash)? {
continue
}

// Generate only the `NewBlock` event for this block.
if hash != last_finalized {
// Generate `NewBlock` events for all blocks beside the last block in the list
if i + 1 != finalized_block_hashes.len() {
// Generate only the `NewBlock` event for this block.
events.extend(self.generate_import_events(*hash, *parent, false));
continue
}

match self.best_block_cache {
Some(best_block_hash) => {
// If the best reported block is a children of the last finalized,
// then we had a gap in notification.
} else {
// If we end up here and the `best_block` is a descendent of the finalized block
// (last block in the list), it means that there were skipped notifications.
// Otherwise `pin_block` would had returned `true`.
//
// When the node falls out of sync and then syncs up to the tip of the chain, it can
// happen that we skip notifications. Then it is better to terminate the connection
// instead of trying to send notifications for all missed blocks.
if let Some(best_block_hash) = self.best_block_cache {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
*last_finalized,
*hash,
best_block_hash,
)?;

// A descendent of the finalized block was already reported
// before the `NewBlock` event containing the finalized block
// is reported.
if ancestor.hash == *last_finalized {
if ancestor.hash == *hash {
return Err(SubscriptionManagementError::Custom(
"A descendent of the finalized block was already reported".into(),
))
}
self.best_block_cache = Some(*hash);
},
// This is the first best block event that we generate.
None => {
self.best_block_cache = Some(*hash);
},
};
}

// This is the first time we see this block. Generate the `NewBlock` event; if this is
// the last block, also generate the `BestBlock` event.
events.extend(self.generate_import_events(*hash, *parent, true))
// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
events.extend(self.generate_import_events(*hash, *parent, true))
}
}

Ok(events)
Expand Down Expand Up @@ -448,17 +431,13 @@ where
let last_finalized = notification.hash;

// Ensure we are only reporting blocks after the starting point.
let Some(block_number) = self.client.number(last_finalized)? else {
return Err(SubscriptionManagementError::BlockNumberAbsent)
};
if block_number < startup_point.finalized_number {
if *notification.header.number() < startup_point.finalized_number {
return Ok(Default::default())
}

// The tree route contains the exclusive path from the last finalized block to the block
// reported by the notification. Ensure the finalized block is also reported.
let mut finalized_block_hashes =
notification.tree_route.iter().cloned().collect::<Vec<_>>();
let mut finalized_block_hashes = notification.tree_route.to_vec();
finalized_block_hashes.push(last_finalized);

// If the finalized hashes were not reported yet, generate the `NewBlock` events.
Expand All @@ -476,9 +455,8 @@ where

match self.best_block_cache {
Some(block_cache) => {
// Check if the current best block is also reported as pruned.
let reported_pruned = pruned_block_hashes.iter().find(|&&hash| hash == block_cache);
if reported_pruned.is_none() {
// If the best block wasn't pruned, we are done here.
if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
events.push(finalized_event);
return Ok(events)
}
Expand All @@ -499,20 +477,6 @@ where
events.push(finalized_event);
Ok(events)
} else {
let ancestor = sp_blockchain::lowest_common_ancestor(
&*self.client,
last_finalized,
best_block_hash,
)?;

// The client's best block must be a descendent of the last finalized block.
// In other words, the lowest common ancestor must be the last finalized block.
if ancestor.hash != last_finalized {
return Err(SubscriptionManagementError::Custom(
"The finalized block is not an ancestor of the best block".into(),
))
}

// The RPC needs to also submit a new best block changed before the
// finalized event.
self.best_block_cache = Some(best_block_hash);
Expand Down
6 changes: 2 additions & 4 deletions client/rpc-spec-v2/src/chain_head/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ pub enum SubscriptionManagementError {
ExceededLimits,
/// Error originated from the blockchain (client or backend).
Blockchain(Error),
/// The database does not contain a block number.
BlockNumberAbsent,
/// The database does not contain a block hash.
BlockHashAbsent,
/// The database does not contain a block header.
BlockHeaderAbsent,
/// Custom error.
Custom(String),
}
Expand Down
6 changes: 0 additions & 6 deletions client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,9 +1024,6 @@ async fn follow_prune_best_block() {
}

#[tokio::test]
#[cfg(disable_flaky)]
#[allow(dead_code)]
// FIXME: https://github.com/paritytech/substrate/issues/11321
async fn follow_forks_pruned_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
Expand Down Expand Up @@ -1140,9 +1137,6 @@ async fn follow_forks_pruned_block() {
}

#[tokio::test]
#[cfg(disable_flaky)]
#[allow(dead_code)]
// FIXME: https://github.com/paritytech/substrate/issues/11321
async fn follow_report_multiple_pruned_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
Expand Down

0 comments on commit d6899f8

Please sign in to comment.