Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use --syncmode=execution-layer from op-node for optimistic pipeline sync #7552

Merged
merged 62 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
458ee1a
dont set finalized on op-reth start
joshieDo Apr 10, 2024
1704948
make UnknownSafeOrFinalizedBlock error with upper case
joshieDo Apr 10, 2024
12f5ee9
sync to head if finalized does not exist on optimism pipeline
joshieDo Apr 10, 2024
7df63db
add some docs and scenario on finished pipeline
joshieDo Apr 18, 2024
1f02388
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 20, 2024
f56103a
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 21, 2024
42158aa
enable optimism feature flags on missing dep
joshieDo Apr 21, 2024
de8928a
refactor e2e op tests
joshieDo Apr 21, 2024
1d51433
smol clean-up
joshieDo Apr 21, 2024
bd9a17b
add StageCheckpointReader to FullProvider
joshieDo Apr 22, 2024
72501de
add span node identifiers on e2e
joshieDo Apr 22, 2024
2c96574
e2e: chain of payloads + optimism reorg
joshieDo Apr 22, 2024
522d65c
on pipeline outcome, skip handling if it's optimistic sync but update…
joshieDo Apr 22, 2024
9320432
move node creation and peering into setup
joshieDo Apr 22, 2024
e7a6aa6
merge origin/main
joshieDo Apr 24, 2024
d28dc31
restore optimism wallet logic
joshieDo Apr 24, 2024
57e530b
fmt
joshieDo Apr 24, 2024
e4c11eb
fix: derank peers that responded with bad data
mattsse Apr 24, 2024
e377c57
reduce to 90 blocks
joshieDo Apr 25, 2024
13a426c
optimistic sync with reorg failure
joshieDo Apr 25, 2024
4c35e9d
Merge branch 'main' into joshie/hot-op
joshieDo Apr 25, 2024
5703bbd
doc fix
joshieDo Apr 25, 2024
63df12d
reset download_range on is_terminated
joshieDo Apr 25, 2024
3167f7f
handle reorgs from optimistic syncing
joshieDo Apr 25, 2024
93f0f64
remove unused test code
joshieDo Apr 25, 2024
3ad4cb2
wait_unwind more aggr and check header stage checkpoint
joshieDo Apr 29, 2024
c7ba89d
remove canonical block from buffered ones
joshieDo Apr 29, 2024
3dd46f7
rename to update_block_hashes_and_clear_buffered
joshieDo Apr 29, 2024
0fcfb8d
clippy
joshieDo Apr 29, 2024
7e05cc6
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 29, 2024
dab38a0
remove optimistic syncing as an optimism feature flag only
joshieDo Apr 29, 2024
e22e187
add doc to OptimisticCanonicalRevert
joshieDo Apr 29, 2024
f9c584e
clippy fmt
joshieDo Apr 29, 2024
ce6de02
cargo doc
joshieDo Apr 29, 2024
0dcf372
add more docs on revert_canonical_from_database & static
joshieDo Apr 30, 2024
b44d842
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 30, 2024
c2f7b39
import StaticFileProviderFactory
joshieDo Apr 30, 2024
c0c83c5
extend test
joshieDo Apr 30, 2024
a3f08cd
allow downloading subset, if the set has terminated
joshieDo May 1, 2024
0da350c
dont allow subsets
joshieDo May 1, 2024
fd51d36
only run pruner once we have a finalized block
joshieDo May 1, 2024
17f886c
review traces and docs
joshieDo May 2, 2024
982dd8b
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo May 2, 2024
89077c6
clippy
joshieDo May 2, 2024
61e5898
unbreak continuous pipeline
joshieDo May 2, 2024
89b9645
handle err if optimistic revert
joshieDo May 2, 2024
5fdadde
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo May 2, 2024
02d7d0a
add set_canonical_head
joshieDo May 2, 2024
963156a
dedup pipeline unwind target setting
joshieDo May 2, 2024
e18a704
renames
joshieDo May 2, 2024
a2b9ac3
use PayloadStatus::from_status instead
joshieDo May 2, 2024
b3e354e
cargo docs fix
joshieDo May 2, 2024
1a3939f
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo May 3, 2024
6119832
clippy
joshieDo May 3, 2024
e0867c8
updated clippy
joshieDo May 3, 2024
d620b13
move update_block_hashes_and_clear_buffered impl to blockchain tree s…
joshieDo May 3, 2024
643c420
add TODO to doc
joshieDo May 3, 2024
53cacf4
whitespace
joshieDo May 3, 2024
42df97a
review comments
joshieDo May 6, 2024
0419207
remove unnecessary todo
joshieDo May 6, 2024
0dd1825
add more docs on PipelineTarget
joshieDo May 6, 2024
c17460b
fmt
joshieDo May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 3 additions & 29 deletions bin/reth/src/optimism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

use clap::Parser;
use reth::cli::Cli;
use reth_node_builder::NodeHandle;
use reth_node_optimism::{
args::RollupArgs, rpc::SequencerClient, OptimismEngineTypes, OptimismNode,
};
use reth_provider::BlockReaderIdExt;
use reth_node_optimism::{args::RollupArgs, rpc::SequencerClient, OptimismNode};
use std::sync::Arc;

// We use jemalloc for performance reasons
Expand All @@ -27,7 +23,7 @@ fn main() {
}

if let Err(err) = Cli::<RollupArgs>::parse().run(|builder, rollup_args| async move {
let NodeHandle { node, node_exit_future } = builder
let handle = builder
.node(OptimismNode::new(rollup_args.clone()))
.extend_rpc_modules(move |ctx| {
// register sequencer tx forwarder
Expand All @@ -42,29 +38,7 @@ fn main() {
.launch()
.await?;

// If `enable_genesis_walkback` is set to true, the rollup client will need to
// perform the derivation pipeline from genesis, validating the data dir.
// When set to false, set the finalized, safe, and unsafe head block hashes
// on the rollup client using a fork choice update. This prevents the rollup
// client from performing the derivation pipeline from genesis, and instead
// starts syncing from the current tip in the DB.
if node.chain_spec().is_optimism() && !rollup_args.enable_genesis_walkback {
let client = node.rpc_server_handles.auth.http_client();
if let Ok(Some(head)) = node.provider.latest_header() {
reth_rpc_api::EngineApiClient::<OptimismEngineTypes>::fork_choice_updated_v2(
&client,
reth_rpc_types::engine::ForkchoiceState {
head_block_hash: head.hash(),
safe_block_hash: head.hash(),
finalized_block_hash: head.hash(),
Comment on lines -57 to -59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this was problematic from the start

},
None,
)
.await?;
}
}

node_exit_future.await
handle.node_exit_future.await
}) {
eprintln!("Error: {err:?}");
std::process::exit(1);
Expand Down
6 changes: 3 additions & 3 deletions crates/blockchain-tree/src/block_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ impl BlockBuffer {
removed
}

/// Discard all blocks that precede finalized block number from the buffer.
pub fn remove_old_blocks(&mut self, finalized_number: BlockNumber) {
/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block_number: BlockNumber) {
let mut block_hashes_to_remove = Vec::new();

// discard all blocks that are before the finalized number.
while let Some(entry) = self.earliest_blocks.first_entry() {
if *entry.key() > finalized_number {
if *entry.key() > block_number {
break
}
let block_hashes = entry.remove();
Expand Down
46 changes: 41 additions & 5 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use reth_interfaces::{
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, B256, U256,
};
use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ChainSpecProvider, DisplayBlocksChain, HeaderProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use std::{
Expand Down Expand Up @@ -783,6 +784,11 @@ where
Ok(InsertPayloadOk::Inserted(status))
}

/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block: BlockNumber) {
self.state.buffered_blocks.remove_old_blocks(block);
}

/// Finalize blocks up until and including `finalized_block`, and remove them from the tree.
pub fn finalize_block(&mut self, finalized_block: BlockNumber) {
// remove blocks
Expand All @@ -797,7 +803,7 @@ where
}
}
// clean block buffer.
self.state.buffered_blocks.remove_old_blocks(finalized_block);
self.remove_old_blocks(finalized_block);
}

/// Reads the last `N` canonical hashes from the database and updates the block indices of the
Expand All @@ -817,6 +823,16 @@ where
) -> RethResult<()> {
self.finalize_block(last_finalized_block);

let last_canonical_hashes = self.update_block_hashes()?;

self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;

Ok(())
}

/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them and removes all chains.
pub fn update_block_hashes(&mut self) -> RethResult<BTreeMap<BlockNumber, B256>> {
let last_canonical_hashes = self
.externals
.fetch_latest_canonical_hashes(self.config.num_of_canonical_hashes() as usize)?;
Expand All @@ -831,9 +847,7 @@ where
}
}

self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;

Ok(())
Ok(last_canonical_hashes)
}

/// Reads the last `N` canonical hashes from the database and updates the block indices of the
Expand Down Expand Up @@ -1220,6 +1234,28 @@ where
&self,
revert_until: BlockNumber,
) -> Result<Option<Chain>, CanonicalError> {
if self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be feature gated? feel free to introduce one if there isn't

.externals
.provider_factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::Headers)
.unwrap_or_default() >
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
revert_until
{
trace!(
target: "blockchain_tree",
"Reverting optimistic canonical chain to block {}",
revert_until
);
// This should only happen when an optimistic sync target was re-orged.
mattsse marked this conversation as resolved.
Show resolved Hide resolved
//
// Static files generally contain finalized data. The blockchain tree only deals
// with unfinalized data. The only scenario where canonical reverts go past the highest
// static file is when an optimistic sync occured and unfinalized data was written to
// static files.
return Err(CanonicalError::OptimisticTargetRevert(revert_until))
}

// read data that is needed for new sidechain
let provider_rw = self.externals.provider_factory.provider_rw()?;

Expand Down
6 changes: 6 additions & 0 deletions crates/blockchain-tree/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ impl BlockchainTreeEngine for NoopBlockchainTree {
fn make_canonical(&self, block_hash: BlockHash) -> Result<CanonicalOutcome, CanonicalError> {
Err(BlockchainTreeError::BlockHashNotFoundInChain { block_hash }.into())
}

fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
Ok(BTreeMap::new())
}
}

impl BlockchainTreeViewer for NoopBlockchainTree {
Expand Down
16 changes: 16 additions & 0 deletions crates/blockchain-tree/src/shareable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ where
res
}

fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
let mut tree = self.tree.write();
let res = tree.update_block_hashes();

if let Ok(Some((block, _))) =
res.as_ref().map(|last_canonical_hashes| last_canonical_hashes.last_key_value())
{
tree.remove_old_blocks(*block);
}

tree.update_chains_metrics();
res
}

joshieDo marked this conversation as resolved.
Show resolved Hide resolved
fn connect_buffered_blocks_to_canonical_hashes(&self) -> RethResult<()> {
trace!(target: "blockchain_tree", "Connecting buffered blocks to canonical hashes");
let mut tree = self.tree.write();
Expand Down
12 changes: 9 additions & 3 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,16 @@ impl EngineHooksController {
args: EngineHookContext,
db_write_active: bool,
) -> Poll<Result<PolledHook, EngineHookError>> {
// Hook with DB write access level is not allowed to run due to already running hook with DB
// write access level or active DB write according to passed argument
// Hook with DB write access level is not allowed to run due to any of the following
// reasons:
// - An already running hook with DB write access level
// - Active DB write according to passed argument
// - Missing a finalized block number. We might be on an optimistic sync scenario where we
// cannot skip the FCU with the finalized hash, otherwise CL might misbehave.
if hook.db_access_level().is_read_write() &&
(self.active_db_write_hook.is_some() || db_write_active)
(self.active_db_write_hook.is_some() ||
db_write_active ||
args.finalized_block_number.is_none())
{
return Poll::Pending
}
Expand Down
85 changes: 62 additions & 23 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use reth_interfaces::{
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock,
SealedHeader, B256,
constants::EPOCH_SLOTS,
stage::{PipelineTarget, StageId},
BlockNumHash, BlockNumber, Head, Header, SealedBlock, SealedHeader, B256,
};
use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
Expand Down Expand Up @@ -316,7 +317,7 @@ where
};

if let Some(target) = maybe_pipeline_target {
this.sync.set_pipeline_sync_target(target);
this.sync.set_pipeline_sync_target(target.into());
}

Ok((this, handle))
Expand Down Expand Up @@ -668,6 +669,21 @@ where
// threshold
return Some(state.finalized_block_hash)
}

// OPTIMISTIC SYNCING
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we feature gate this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should we do this only for OP (chainspec.is_optimism) for now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, its unlikely that it can happen on mainnet, but it can happen on testnets/devnets l1 for sure.

i can add the feature gate or check... but what would be the reason?

//
// It can happen when the node is doing an
// optimistic sync, where the CL has no knowledge of the finalized hash,
// but is expecting the EL to sync as high
// as possible before finalizing.
//
// This usually doesn't happen on ETH mainnet since CLs use the more
// secure checkpoint syncing.
//
// However, optimism chains will do this. The risk of a reorg is however
// low.
debug!(target: "consensus::engine", hash=?state.head_block_hash, "Setting head hash as an optimistic pipeline target.");
return Some(state.head_block_hash)
}
Ok(Some(_)) => {
// we're fully synced to the finalized block
Expand Down Expand Up @@ -960,6 +976,10 @@ where
// so we should not warn the user, since this will result in us attempting to sync
// to a new target and is considered normal operation during sync
}
CanonicalError::OptimisticTargetRevert(block_number) => {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number));
return PayloadStatus::from_status(PayloadStatusEnum::Syncing)
}
_ => {
warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
// TODO(mattsse) better error handling before attempting to sync (FCU could be
Expand Down Expand Up @@ -990,7 +1010,7 @@ where
if self.pipeline_run_threshold == 0 {
// use the pipeline to sync to the target
trace!(target: "consensus::engine", %target, "Triggering pipeline run to sync missing ancestors of the new head");
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
} else {
// trigger a full block download for missing hash, or the parent of its lowest buffered
// ancestor
Expand Down Expand Up @@ -1340,7 +1360,7 @@ where
) {
// we don't have the block yet and the distance exceeds the allowed
// threshold
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
// we can exit early here because the pipeline will take care of syncing
return
}
Expand Down Expand Up @@ -1424,6 +1444,8 @@ where
// TODO: do not ignore this
let _ = self.blockchain.make_canonical(*target_hash.as_ref());
}
} else if let Some(block_number) = err.optimistic_revert_block_number() {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(block_number));
}

Err((target.head_block_hash, err))
Expand Down Expand Up @@ -1485,13 +1507,7 @@ where

// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
let max_block = ctrl.block_number().unwrap_or_default();
let max_header = self.blockchain.sealed_header(max_block)
.inspect_err(|error| {
error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
})?
.ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
self.blockchain.set_canonical_head(max_header);
self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
}

let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Expand All @@ -1504,6 +1520,14 @@ where
}
};

if sync_target_state.finalized_block_hash.is_zero() {
self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
self.blockchain.update_block_hashes_and_clear_buffered()?;
self.blockchain.connect_buffered_blocks_to_canonical_hashes()?;
// We are on a optimistic syncing process, better to wait for the next FCU to handle
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
return Ok(())
}

// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
// This can arise if we buffer the forkchoice head, and if the head is an
Expand Down Expand Up @@ -1559,7 +1583,7 @@ where
// the tree update from executing too many blocks and blocking.
if let Some(target) = pipeline_target {
// run the pipeline to the target since the distance is sufficient
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
} else if let Some(number) =
self.blockchain.block_number(sync_target_state.finalized_block_hash)?
{
Expand All @@ -1571,12 +1595,23 @@ where
} else {
// We don't have the finalized block in the database, so we need to
// trigger another pipeline run.
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash);
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash.into());
}

Ok(())
}

fn set_canonical_head(&self, max_block: BlockNumber) -> RethResult<()> {
let max_header = self.blockchain.sealed_header(max_block)
.inspect_err(|error| {
error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
})?
.ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
self.blockchain.set_canonical_head(max_header);

Ok(())
}

fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event {
error!(
Expand Down Expand Up @@ -1725,16 +1760,20 @@ where
Err(BeaconOnNewPayloadError::Internal(Box::new(error.clone())));
let _ = tx.send(response);
return Err(RethError::Canonical(error))
} else if error.optimistic_revert_block_number().is_some() {
// engine already set the pipeline unwind target on
// `try_make_sync_target_canonical`
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
Comment on lines +1763 to +1766
Copy link
Collaborator Author

@joshieDo joshieDo May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsure here if returning Syncing is the best

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be okay

} else {
// If we could not make the sync target block canonical,
// we should return the error as an invalid payload status.
PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: error.to_string() },
// TODO: return a proper latest valid hash
// See: <https://github.com/paradigmxyz/reth/issues/7146>
self.forkchoice_state_tracker.last_valid_head(),
)
}

// If we could not make the sync target block canonical,
// we should return the error as an invalid payload status.
PayloadStatus::new(
PayloadStatusEnum::Invalid { validation_error: error.to_string() },
// TODO: return a proper latest valid hash
// See: <https://github.com/paradigmxyz/reth/issues/7146>
self.forkchoice_state_tracker.last_valid_head(),
)
}
};

Expand Down
Loading
Loading