Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use --syncmode=execution-layer from op-node for optimistic pipeline sync #7552

Merged
merged 62 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
458ee1a
dont set finalized on op-reth start
joshieDo Apr 10, 2024
1704948
make UnknownSafeOrFinalizedBlock error with upper case
joshieDo Apr 10, 2024
12f5ee9
sync to head if finalized does not exist on optimism pipeline
joshieDo Apr 10, 2024
7df63db
add some docs and scenario on finished pipeline
joshieDo Apr 18, 2024
1f02388
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 20, 2024
f56103a
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 21, 2024
42158aa
enable optimism feature flags on missing dep
joshieDo Apr 21, 2024
de8928a
refactor e2e op tests
joshieDo Apr 21, 2024
1d51433
smol clean-up
joshieDo Apr 21, 2024
bd9a17b
add StageCheckpointReader to FullProvider
joshieDo Apr 22, 2024
72501de
add span node identifiers on e2e
joshieDo Apr 22, 2024
2c96574
e2e: chain of payloads + optimism reorg
joshieDo Apr 22, 2024
522d65c
on pipeline outcome, skip handling if it's optimistic sync but update…
joshieDo Apr 22, 2024
9320432
move node creation and peering into setup
joshieDo Apr 22, 2024
e7a6aa6
merge origin/main
joshieDo Apr 24, 2024
d28dc31
restore optimism wallet logic
joshieDo Apr 24, 2024
57e530b
fmt
joshieDo Apr 24, 2024
e4c11eb
fix: derank peers that responded with bad data
mattsse Apr 24, 2024
e377c57
reduce to 90 blocks
joshieDo Apr 25, 2024
13a426c
optimistic sync with reorg failure
joshieDo Apr 25, 2024
4c35e9d
Merge branch 'main' into joshie/hot-op
joshieDo Apr 25, 2024
5703bbd
doc fix
joshieDo Apr 25, 2024
63df12d
reset download_range on is_terminated
joshieDo Apr 25, 2024
3167f7f
handle reorgs from optimistic syncing
joshieDo Apr 25, 2024
93f0f64
remove unused test code
joshieDo Apr 25, 2024
3ad4cb2
wait_unwind more aggr and check header stage checkpoint
joshieDo Apr 29, 2024
c7ba89d
remove canonical block from buffered ones
joshieDo Apr 29, 2024
3dd46f7
rename to update_block_hashes_and_clear_buffered
joshieDo Apr 29, 2024
0fcfb8d
clippy
joshieDo Apr 29, 2024
7e05cc6
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 29, 2024
dab38a0
remove optimistic syncing as an optimism feature flag only
joshieDo Apr 29, 2024
e22e187
add doc to OptimisticCanonicalRevert
joshieDo Apr 29, 2024
f9c584e
clippy fmt
joshieDo Apr 29, 2024
ce6de02
cargo doc
joshieDo Apr 29, 2024
0dcf372
add more docs on revert_canonical_from_database & static
joshieDo Apr 30, 2024
b44d842
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo Apr 30, 2024
c2f7b39
import StaticFileProviderFactory
joshieDo Apr 30, 2024
c0c83c5
extend test
joshieDo Apr 30, 2024
a3f08cd
allow downloading subset, if the set has terminated
joshieDo May 1, 2024
0da350c
dont allow subsets
joshieDo May 1, 2024
fd51d36
only run pruner once we have a finalized block
joshieDo May 1, 2024
17f886c
review traces and docs
joshieDo May 2, 2024
982dd8b
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo May 2, 2024
89077c6
clippy
joshieDo May 2, 2024
61e5898
unbreak continuous pipeline
joshieDo May 2, 2024
89b9645
handle err if optimistic revert
joshieDo May 2, 2024
5fdadde
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo May 2, 2024
02d7d0a
add set_canonical_head
joshieDo May 2, 2024
963156a
dedup pipeline unwind target setting
joshieDo May 2, 2024
e18a704
renames
joshieDo May 2, 2024
a2b9ac3
use PayloadStatus::from_status instead
joshieDo May 2, 2024
b3e354e
cargo docs fix
joshieDo May 2, 2024
1a3939f
Merge remote-tracking branch 'origin/main' into joshie/hot-op
joshieDo May 3, 2024
6119832
clippy
joshieDo May 3, 2024
e0867c8
updated clippy
joshieDo May 3, 2024
d620b13
move update_block_hashes_and_clear_buffered impl to blockchain tree s…
joshieDo May 3, 2024
643c420
add TODO to doc
joshieDo May 3, 2024
53cacf4
whitespace
joshieDo May 3, 2024
42df97a
review comments
joshieDo May 6, 2024
0419207
remove unnecessary todo
joshieDo May 6, 2024
0dd1825
add more docs on PipelineTarget
joshieDo May 6, 2024
c17460b
fmt
joshieDo May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 2 additions & 27 deletions bin/reth/src/optimism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
use clap::Parser;
use reth::cli::Cli;
use reth_node_builder::NodeHandle;
use reth_node_optimism::{
args::RollupArgs, rpc::SequencerClient, OptimismEngineTypes, OptimismNode,
};
use reth_provider::BlockReaderIdExt;
use reth_node_optimism::{args::RollupArgs, rpc::SequencerClient, OptimismNode};
use std::sync::Arc;

// We use jemalloc for performance reasons
Expand All @@ -27,7 +24,7 @@ fn main() {
}

if let Err(err) = Cli::<RollupArgs>::parse().run(|builder, rollup_args| async move {
let NodeHandle { node, node_exit_future } = builder
let NodeHandle { node: _node, node_exit_future } = builder
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
.node(OptimismNode::new(rollup_args.clone()))
.extend_rpc_modules(move |ctx| {
// register sequencer tx forwarder
Expand All @@ -42,28 +39,6 @@ fn main() {
.launch()
.await?;

// If `enable_genesis_walkback` is set to true, the rollup client will need to
// perform the derivation pipeline from genesis, validating the data dir.
// When set to false, set the finalized, safe, and unsafe head block hashes
// on the rollup client using a fork choice update. This prevents the rollup
// client from performing the derivation pipeline from genesis, and instead
// starts syncing from the current tip in the DB.
if node.chain_spec().is_optimism() && !rollup_args.enable_genesis_walkback {
let client = node.rpc_server_handles.auth.http_client();
if let Ok(Some(head)) = node.provider.latest_header() {
reth_rpc_api::EngineApiClient::<OptimismEngineTypes>::fork_choice_updated_v2(
&client,
reth_rpc_types::engine::ForkchoiceState {
head_block_hash: head.hash(),
safe_block_hash: head.hash(),
finalized_block_hash: head.hash(),
Comment on lines -57 to -59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this was problematic from the start

},
None,
)
.await?;
}
}

node_exit_future.await
}) {
eprintln!("Error: {err:?}");
Expand Down
6 changes: 3 additions & 3 deletions crates/blockchain-tree/src/block_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ impl BlockBuffer {
removed
}

/// Discard all blocks that precede finalized block number from the buffer.
pub fn remove_old_blocks(&mut self, finalized_number: BlockNumber) {
/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block_number: BlockNumber) {
let mut block_hashes_to_remove = Vec::new();

// discard all blocks that are before the finalized number.
while let Some(entry) = self.earliest_blocks.first_entry() {
if *entry.key() > finalized_number {
if *entry.key() > block_number {
break
}
let block_hashes = entry.remove();
Expand Down
41 changes: 36 additions & 5 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use reth_interfaces::{
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, B256, U256,
};
use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use std::{
Expand Down Expand Up @@ -770,6 +771,11 @@ where
Ok(InsertPayloadOk::Inserted(status))
}

/// Discard all blocks that precede block number from the buffer.
pub fn remove_old_blocks(&mut self, block: BlockNumber) {
self.state.buffered_blocks.remove_old_blocks(block);
}

/// Finalize blocks up until and including `finalized_block`, and remove them from the tree.
pub fn finalize_block(&mut self, finalized_block: BlockNumber) {
// remove blocks
Expand All @@ -784,7 +790,7 @@ where
}
}
// clean block buffer.
self.state.buffered_blocks.remove_old_blocks(finalized_block);
self.remove_old_blocks(finalized_block);
}

/// Reads the last `N` canonical hashes from the database and updates the block indices of the
Expand All @@ -804,6 +810,16 @@ where
) -> RethResult<()> {
self.finalize_block(last_finalized_block);

let last_canonical_hashes = self.update_block_hashes()?;

self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;

Ok(())
}

/// Update all block hashes. iterate over present and new list of canonical hashes and compare
/// them. Remove all mismatches, disconnect them and removes all chains.
pub fn update_block_hashes(&mut self) -> RethResult<BTreeMap<BlockNumber, B256>> {
let last_canonical_hashes = self
.externals
.fetch_latest_canonical_hashes(self.config.num_of_canonical_hashes() as usize)?;
Expand All @@ -818,9 +834,7 @@ where
}
}

self.connect_buffered_blocks_to_hashes(last_canonical_hashes)?;

Ok(())
Ok(last_canonical_hashes)
}

/// Reads the last `N` canonical hashes from the database and updates the block indices of the
Expand Down Expand Up @@ -1207,6 +1221,23 @@ where
&mut self,
revert_until: BlockNumber,
) -> Result<Option<Chain>, CanonicalError> {
if self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be feature gated? feel free to introduce one if there isn't

.externals
.provider_factory
.static_file_provider()
.get_highest_static_file_block(StaticFileSegment::Headers)
.unwrap_or_default() >
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
revert_until
{
// This should only happen when an optimistic sync target was re-orged.
mattsse marked this conversation as resolved.
Show resolved Hide resolved
//
// Static files generally contain finalized data. The blockchain tree only deals
// with unfinalized data. The only scenario where canonical reverts go past the highest
// static file is when an optimistic sync occured and unfinalized data was written to
// static files.
return Err(CanonicalError::OptimisticCanonicalRevert(revert_until))
}

// read data that is needed for new sidechain
let provider_rw = self.externals.provider_factory.provider_rw()?;

Expand Down
6 changes: 6 additions & 0 deletions crates/blockchain-tree/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ impl BlockchainTreeEngine for NoopBlockchainTree {
fn make_canonical(&self, block_hash: BlockHash) -> Result<CanonicalOutcome, CanonicalError> {
Err(BlockchainTreeError::BlockHashNotFoundInChain { block_hash }.into())
}

fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
Ok(BTreeMap::new())
}
}

impl BlockchainTreeViewer for NoopBlockchainTree {
Expand Down
16 changes: 16 additions & 0 deletions crates/blockchain-tree/src/shareable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ where
res
}

fn update_block_hashes_and_clear_buffered(
&self,
) -> RethResult<BTreeMap<BlockNumber, BlockHash>> {
let mut tree = self.tree.write();
let res = tree.update_block_hashes();

if let Ok(Some((block, _))) =
res.as_ref().map(|last_canonical_hashes| last_canonical_hashes.last_key_value())
{
tree.remove_old_blocks(*block);
}

tree.update_chains_metrics();
res
}

joshieDo marked this conversation as resolved.
Show resolved Hide resolved
fn connect_buffered_blocks_to_canonical_hashes(&self) -> RethResult<()> {
trace!(target: "blockchain_tree", "Connecting buffered blocks to canonical hashes");
let mut tree = self.tree.write();
Expand Down
6 changes: 6 additions & 0 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ impl EngineHooksController {
return Poll::Pending
}

// If we don't have a finalized block, we might be on an optimistic sync scenario where we
// cannot skip the FCU with the finalized hash, otherwise CL might misbehave.
if hook.db_access_level().is_read_write() && args.finalized_block_number.is_none() {
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
return Poll::Pending
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems fine, I'd just clarify the comment and mention that we skip FCUs when the hook is active

if let Poll::Ready(event) = hook.poll(cx, args)? {
let result =
PolledHook { name: hook.name(), event, db_access_level: hook.db_access_level() };
Expand Down
71 changes: 54 additions & 17 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use reth_interfaces::{
};
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives::{
constants::EPOCH_SLOTS, stage::StageId, BlockNumHash, BlockNumber, Head, Header, SealedBlock,
SealedHeader, B256,
constants::EPOCH_SLOTS,
stage::{PipelineTarget, StageId},
BlockNumHash, BlockNumber, Head, Header, SealedBlock, SealedHeader, B256,
};
use reth_provider::{
BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
Expand Down Expand Up @@ -320,7 +321,7 @@ where
};

if let Some(target) = maybe_pipeline_target {
this.sync.set_pipeline_sync_target(target);
this.sync.set_pipeline_sync_target(target.into());
}

Ok((this, handle))
Expand Down Expand Up @@ -705,6 +706,20 @@ where
// threshold
return Some(state.finalized_block_hash)
}

// OPTIMISTIC SYNCING
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we feature gate this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should we do this only for OP (chainspec.is_optimism) for now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean, its unlikely that it can happen on mainnet, but it can happen on testnets/devnets l1 for sure.

i can add the feature gate or check... but what would be the reason?

//
// It can happen when the node is doing an
// optimistic sync, where the CL has no knowledge of the finalized hash,
// but is expecting the EL to sync as high
// as possible before finalizing.
//
// This usually doesn't happen on ETH mainnet since CLs use the more
// secure checkpoint syncing.
//
// However, optimism chains will do this. The risk of a reorg is however
// low.
return Some(state.head_block_hash)
}
Ok(Some(_)) => {
// we're fully synced to the finalized block
Expand Down Expand Up @@ -997,6 +1012,10 @@ where
// so we should not warn the user, since this will result in us attempting to sync
// to a new target and is considered normal operation during sync
}
CanonicalError::OptimisticCanonicalRevert(block_number) => {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number));
return PayloadStatus::from_status(PayloadStatusEnum::Syncing)
}
_ => {
warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
// TODO(mattsse) better error handling before attempting to sync (FCU could be
Expand Down Expand Up @@ -1027,7 +1046,7 @@ where
if self.pipeline_run_threshold == 0 {
// use the pipeline to sync to the target
trace!(target: "consensus::engine", %target, "Triggering pipeline run to sync missing ancestors of the new head");
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
} else {
// trigger a full block download for missing hash, or the parent of its lowest buffered
// ancestor
Expand Down Expand Up @@ -1110,6 +1129,11 @@ where
return if error.is_fatal() {
error!(target: "consensus::engine", %error, "Encountered fatal error");
Err(BeaconOnNewPayloadError::Internal(Box::new(error)))
} else if let Some(block_number) = error.is_optimistic_revert() {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(
block_number,
));
Err(BeaconOnNewPayloadError::Internal(Box::new(error)))
} else {
// If we could not make the sync target block canonical, we
// should return the error as an invalid payload status.
Expand Down Expand Up @@ -1383,6 +1407,9 @@ where
error,
hash
)
} else if let Some(block_number) = error.is_optimistic_revert() {
self.sync
.set_pipeline_sync_target(PipelineTarget::Unwind(block_number));
}
}
}
Expand Down Expand Up @@ -1433,7 +1460,7 @@ where
) {
// we don't have the block yet and the distance exceeds the allowed
// threshold
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
// we can exit early here because the pipeline will take care of syncing
return
}
Expand Down Expand Up @@ -1517,6 +1544,8 @@ where
// TODO: do not ignore this
let _ = self.blockchain.make_canonical(*target_hash.as_ref());
}
} else if let Some(block_number) = err.is_optimistic_revert() {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(block_number));
}

Err((target.head_block_hash, err))
Expand Down Expand Up @@ -1576,8 +1605,19 @@ where
return Ok(())
}

let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No fork choice state available");
return Ok(())
}
};

// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
if self.sync.run_pipeline_continuously() || sync_target_state.finalized_block_hash.is_zero()
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might break continuous syncing. previously, we first checked if it's continuous and only then looked at the forkchoice state which iirc is not set during continuous

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

let max_block = ctrl.block_number().unwrap_or_default();
let max_header = self.blockchain.sealed_header(max_block)
.inspect_err(|error| {
Expand All @@ -1587,15 +1627,12 @@ where
self.blockchain.set_canonical_head(max_header);
}

let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No fork choice state available");
return Ok(())
}
};
if sync_target_state.finalized_block_hash.is_zero() {
self.blockchain.update_block_hashes_and_clear_buffered()?;
self.blockchain.connect_buffered_blocks_to_canonical_hashes()?;
// We are on a optimistic syncing process, better to wait for the next FCU to handle
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
return Ok(())
}

// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
Expand Down Expand Up @@ -1652,7 +1689,7 @@ where
// the tree update from executing too many blocks and blocking.
if let Some(target) = pipeline_target {
// run the pipeline to the target since the distance is sufficient
self.sync.set_pipeline_sync_target(target);
self.sync.set_pipeline_sync_target(target.into());
} else if let Some(number) =
self.blockchain.block_number(sync_target_state.finalized_block_hash)?
{
Expand All @@ -1664,7 +1701,7 @@ where
} else {
// We don't have the finalized block in the database, so we need to
// trigger another pipeline run.
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash);
self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash.into());
}

Ok(())
Expand Down
Loading
Loading