Skip to content

Commit

Permalink
fix: use --syncmode=execution-layer from op-node for optimistic p…
Browse files Browse the repository at this point in the history
…ipeline sync (paradigmxyz#7552)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
2 people authored and mw2000 committed Jun 5, 2024
1 parent 5baacc1 commit 4dca796
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 111 deletions.
32 changes: 3 additions & 29 deletions bin/reth/src/optimism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

use clap::Parser;
use reth::cli::Cli;
use reth_node_builder::NodeHandle;
use reth_node_optimism::{
args::RollupArgs, rpc::SequencerClient, OptimismEngineTypes, OptimismNode,
};
use reth_provider::BlockReaderIdExt;
use reth_node_optimism::{args::RollupArgs, rpc::SequencerClient, OptimismNode};
use std::sync::Arc;

// We use jemalloc for performance reasons
Expand All @@ -27,7 +23,7 @@ fn main() {
}

if let Err(err) = Cli::<RollupArgs>::parse().run(|builder, rollup_args| async move {
let NodeHandle { node, node_exit_future } = builder
let handle = builder
.node(OptimismNode::new(rollup_args.clone()))
.extend_rpc_modules(move |ctx| {
// register sequencer tx forwarder
Expand All @@ -42,29 +38,7 @@ fn main() {
.launch()
.await?;

// If `enable_genesis_walkback` is set to true, the rollup client will need to
// perform the derivation pipeline from genesis, validating the data dir.
// When set to false, set the finalized, safe, and unsafe head block hashes
// on the rollup client using a fork choice update. This prevents the rollup
// client from performing the derivation pipeline from genesis, and instead
// starts syncing from the current tip in the DB.
if node.chain_spec().is_optimism() && !rollup_args.enable_genesis_walkback {
let client = node.rpc_server_handles.auth.http_client();
if let Ok(Some(head)) = node.provider.latest_header() {
reth_rpc_api::EngineApiClient::<OptimismEngineTypes>::fork_choice_updated_v2(
&client,
reth_rpc_types::engine::ForkchoiceState {
head_block_hash: head.hash(),
safe_block_hash: head.hash(),
finalized_block_hash: head.hash(),
},
None,
)
.await?;
}
}

node_exit_future.await
handle.node_exit_future.await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
Expand Down
6 changes: 3 additions & 3 deletions crates/blockchain-tree/src/block_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ impl BlockBuffer {
removed
}

/// Discard all blocks that precede finalized block number from the buffer.
pub fn remove_old_blocks(&mut self, finalized_number: BlockNumber) {
/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block_number: BlockNumber) {
let mut block_hashes_to_remove = Vec::new();

// discard all blocks that are before the finalized number.
while let Some(entry) = self.earliest_blocks.first_entry() {
if *entry.key() > finalized_number {
if *entry.key() > block_number {
break
}
let block_hashes = entry.remove();
Expand Down
59 changes: 55 additions & 4 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use reth_interfaces::{
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, B256, U256,
};
use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ChainSpecProvider, DisplayBlocksChain, HeaderProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use std::{
Expand Down Expand Up @@ -783,6 +784,11 @@ where
Ok(InsertPayloadOk::Inserted(status))
}

/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block: BlockNumber) {
self.state.buffered_blocks.remove_old_blocks(block);
}

/// Finalize blocks up until and including `finalized_block`, and remove them from the tree.
pub fn finalize_block(&mut self, finalized_block: BlockNumber) {
// remove blocks
Expand All @@ -797,7 +803,7 @@ where
}
}
// clean block buffer.
self.state.buffered_blocks.remove_old_blocks(finalized_block);
self.remove_old_blocks(finalized_block);
}

/// Reads the last `N` canonical hashes from the database and updates the block indices of the
Expand All @@ -817,6 +823,16 @@ where
) -> RethResult<()> {
self.finalize_block(last_finalized_block);

let last_canonical_hashes = self.update_block_hashes()?;

self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;

Ok(())
}

/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them and removes all chains.
pub fn update_block_hashes(&mut self) -> RethResult<BTreeMap<BlockNumber, B256>> {
let last_canonical_hashes = self
.externals
.fetch_latest_canonical_hashes(self.config.num_of_canonical_hashes() as usize)?;
Expand All @@ -831,9 +847,22 @@ where
}
}

self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;
Ok(last_canonical_hashes)
}

Ok(())
/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them, removes all chains and clears all buffered
/// blocks before the tip.
pub fn update_block_hashes_and_clear_buffered(
&mut self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
let chain = self.update_block_hashes()?;

if let Some((block, _)) = chain.last_key_value() {
self.remove_old_blocks(*block);
}

Ok(chain)
}

/// Reads the last `N` canonical hashes from the database and updates the block indices of the
Expand Down Expand Up @@ -1220,6 +1249,28 @@ where
&self,
revert_until: BlockNumber,
) -> Result<Option<Chain>, CanonicalError> {
// This should only happen when an optimistic sync target was re-orged.
//
// Static files generally contain finalized data. The blockchain tree only deals
// with unfinalized data. The only scenario where canonical reverts go past the highest
// static file is when an optimistic sync occured and unfinalized data was written to
// static files.
if self
.externals
.provider_factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::Headers)
.unwrap_or_default() >
revert_until
{
trace!(
target: "blockchain_tree",
"Reverting optimistic canonical chain to block {}",
revert_until
);
return Err(CanonicalError::OptimisticTargetRevert(revert_until))
}

// read data that is needed for new sidechain
let provider_rw = self.externals.provider_factory.provider_rw()?;

Expand Down
6 changes: 6 additions & 0 deletions crates/blockchain-tree/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ impl BlockchainTreeEngine for NoopBlockchainTree {
fn make_canonical(&self, block_hash: BlockHash) -> Result<CanonicalOutcome, CanonicalError> {
Err(BlockchainTreeError::BlockHashNotFoundInChain { block_hash }.into())
}

fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
Ok(BTreeMap::new())
}
}

impl BlockchainTreeViewer for NoopBlockchainTree {
Expand Down
9 changes: 9 additions & 0 deletions crates/blockchain-tree/src/shareable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ where
res
}

fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
let mut tree = self.tree.write();
let res = tree.update_block_hashes_and_clear_buffered();
tree.update_chains_metrics();
res
}

fn connect_buffered_blocks_to_canonical_hashes(&self) -> RethResult<()> {
trace!(target: "blockchain_tree", "Connecting buffered blocks to canonical hashes");
let mut tree = self.tree.write();
Expand Down
12 changes: 9 additions & 3 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,16 @@ impl EngineHooksController {
args: EngineHookContext,
db_write_active: bool,
) -> Poll<Result<PolledHook, EngineHookError>> {
// Hook with DB write access level is not allowed to run due to already running hook with DB
// write access level or active DB write according to passed argument
// Hook with DB write access level is not allowed to run due to any of the following
// reasons:
// - An already running hook with DB write access level
// - Active DB write according to passed argument
// - Missing a finalized block number. We might be on an optimistic sync scenario where we
// cannot skip the FCU with the finalized hash, otherwise CL might misbehave.
if hook.db_access_level().is_read_write() &&
(self.active_db_write_hook.is_some() || db_write_active)
(self.active_db_write_hook.is_some() ||
db_write_active ||
args.finalized_block_number.is_none())
{
return Poll::Pending
}
Expand Down
Loading

0 comments on commit 4dca796

Please sign in to comment.