Skip to content

Commit

Permalink
chore: refactor DefaultStages to take StageConfig (#8173)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed May 12, 2024
1 parent 487f7e3 commit d9f9504
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 223 deletions.
17 changes: 6 additions & 11 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use reth_provider::{
};
use reth_stages::{
sets::DefaultStages,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
stages::{ExecutionStage, ExecutionStageThresholds},
Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer;
Expand Down Expand Up @@ -109,6 +109,7 @@ impl Command {
.into_task_with(task_executor);

let stage_conf = &config.stages;
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();

let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(self.chain.clone());
Expand All @@ -124,11 +125,9 @@ impl Command {
header_downloader,
body_downloader,
executor.clone(),
stage_conf.etl.clone(),
stage_conf.clone(),
prune_modes.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(
executor,
ExecutionStageThresholds {
Expand All @@ -137,12 +136,8 @@ impl Command {
max_cumulative_gas: None,
max_duration: None,
},
stage_conf
.merkle
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
stage_conf.execution_external_clean_threshold(),
prune_modes,
ExExManagerHandle::empty(),
)),
)
Expand Down
33 changes: 5 additions & 28 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use reth_downloaders::{
file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_exex::ExExManagerHandle;
use reth_interfaces::p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
Expand All @@ -33,11 +32,7 @@ use reth_provider::{
BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError,
ProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
Pipeline, StageSet,
};
use reth_stages::{prelude::*, Pipeline, StageSet};
use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
Expand Down Expand Up @@ -273,29 +268,11 @@ where
consensus.clone(),
header_downloader,
body_downloader,
executor.clone(),
config.stages.etl.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(
executor,
ExecutionStageThresholds {
max_blocks: config.stages.execution.max_blocks,
max_changes: config.stages.execution.max_changes,
max_cumulative_gas: config.stages.execution.max_cumulative_gas,
max_duration: config.stages.execution.max_duration,
},
config
.stages
.merkle
.clean_threshold
.max(config.stages.account_hashing.clean_threshold)
.max(config.stages.storage_hashing.clean_threshold),
config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default(),
ExExManagerHandle::empty(),
))
config.stages.clone(),
PruneModes::default(),
)
.builder()
.disable_all_if(&StageId::STATE_REQUIRED, || should_exec),
)
.build(provider_factory, static_file_producer);
Expand Down
60 changes: 45 additions & 15 deletions bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use crate::{
use clap::Parser;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_cli_runner::CliContext;
use reth_config::{config::EtlConfig, Config};
use reth_config::{
config::{EtlConfig, HashingConfig, SenderRecoveryConfig, TransactionLookupConfig},
Config,
};
use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_exex::ExExManagerHandle;
Expand Down Expand Up @@ -165,6 +168,7 @@ impl Command {
Some(self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(data_dir.data_dir()))),
self.etl_file_size.unwrap_or(EtlConfig::default_file_size()),
);
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();

let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
Expand Down Expand Up @@ -222,7 +226,12 @@ impl Command {
);
(Box::new(stage), None)
}
StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None),
StageEnum::Senders => (
Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
commit_threshold: batch_size,
})),
None,
),
StageEnum::Execution => {
let executor = block_executor!(self.chain.clone());
(
Expand All @@ -235,31 +244,52 @@ impl Command {
max_duration: None,
},
config.stages.merkle.clean_threshold,
config.prune.map(|prune| prune.segments).unwrap_or_default(),
prune_modes,
ExExManagerHandle::empty(),
)),
None,
)
}
StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None)
}
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size, etl_config)), None)
}
StageEnum::StorageHashing => {
(Box::new(StorageHashingStage::new(1, batch_size, etl_config)), None)
}
StageEnum::TxLookup => (
Box::new(TransactionLookupStage::new(
TransactionLookupConfig { chunk_size: batch_size },
etl_config,
prune_modes.transaction_lookup,
)),
None,
),
StageEnum::AccountHashing => (
Box::new(AccountHashingStage::new(
HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
etl_config,
)),
None,
),
StageEnum::StorageHashing => (
Box::new(StorageHashingStage::new(
HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
etl_config,
)),
None,
),
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)),
Some(Box::new(MerkleStage::default_unwind())),
),
StageEnum::AccountHistory => (
Box::new(IndexAccountHistoryStage::default().with_etl_config(etl_config)),
Box::new(IndexAccountHistoryStage::new(
config.stages.index_account_history,
etl_config,
prune_modes.account_history,
)),
None,
),
StageEnum::StorageHistory => (
Box::new(IndexStorageHistoryStage::default().with_etl_config(etl_config)),
Box::new(IndexStorageHistoryStage::new(
config.stages.index_storage_history,
etl_config,
prune_modes.storage_history,
)),
None,
),
_ => return Ok(()),
Expand Down
29 changes: 7 additions & 22 deletions bin/reth/src/commands/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ use reth_provider::{
};
use reth_stages::{
sets::DefaultStages,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
stages::{ExecutionStage, ExecutionStageThresholds},
Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer;
Expand Down Expand Up @@ -133,6 +129,7 @@ impl Command {
let consensus: Arc<dyn Consensus> =
Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
let stage_conf = &config.stages;
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();

let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(provider_factory.chain_spec());
Expand All @@ -148,11 +145,9 @@ impl Command {
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
executor.clone(),
stage_conf.etl.clone(),
stage_conf.clone(),
prune_modes.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(
executor,
ExecutionStageThresholds {
Expand All @@ -161,20 +156,10 @@ impl Command {
max_cumulative_gas: None,
max_duration: None,
},
stage_conf
.merkle
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
stage_conf.execution_external_clean_threshold(),
prune_modes,
ExExManagerHandle::empty(),
))
.set(AccountHashingStage::default())
.set(StorageHashingStage::default())
.set(MerkleStage::default_unwind())
.set(TransactionLookupStage::default())
.set(IndexAccountHistoryStage::default())
.set(IndexStorageHistoryStage::default()),
)),
)
.build(
provider_factory.clone(),
Expand Down
13 changes: 13 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ pub struct StageConfig {
pub etl: EtlConfig,
}

impl StageConfig {
/// The highest threshold (in number of blocks) for switching between incremental and full
/// calculations across `MerkleStage`, `AccountHashingStage` and `StorageHashingStage`. This is
/// required to figure out if can prune or not changesets on subsequent pipeline runs during
/// `ExecutionStage`
pub fn execution_external_clean_threshold(&self) -> u64 {
self.merkle
.clean_threshold
.max(self.account_hashing.clean_threshold)
.max(self.storage_hashing.clean_threshold)
}
}

/// Header stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]
Expand Down
5 changes: 3 additions & 2 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
use reth_config::config::EtlConfig;
use reth_config::config::StageConfig;
use reth_consensus::{test_utils::TestConsensus, Consensus};
use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE};
use reth_downloaders::{
Expand Down Expand Up @@ -375,7 +375,8 @@ where
header_downloader,
body_downloader,
executor_factory.clone(),
EtlConfig::default(),
StageConfig::default(),
PruneModes::default(),
))
}
};
Expand Down
59 changes: 7 additions & 52 deletions crates/node/builder/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@ use reth_node_core::{
primitives::{BlockNumber, B256},
};
use reth_provider::{HeaderSyncMode, ProviderFactory};
use reth_stages::{
prelude::DefaultStages,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
Pipeline, StageSet,
};
use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::debug;
Expand Down Expand Up @@ -131,56 +123,19 @@ where
header_downloader,
body_downloader,
executor.clone(),
stage_config.etl.clone(),
stage_config.clone(),
prune_modes.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
})
.set(
ExecutionStage::new(
executor,
ExecutionStageThresholds {
max_blocks: stage_config.execution.max_blocks,
max_changes: stage_config.execution.max_changes,
max_cumulative_gas: stage_config.execution.max_cumulative_gas,
max_duration: stage_config.execution.max_duration,
},
stage_config
.merkle
.clean_threshold
.max(stage_config.account_hashing.clean_threshold)
.max(stage_config.storage_hashing.clean_threshold),
prune_modes.clone(),
stage_config.execution.into(),
stage_config.execution_external_clean_threshold(),
prune_modes,
exex_manager_handle,
)
.with_metrics_tx(metrics_tx),
)
.set(AccountHashingStage::new(
stage_config.account_hashing.clean_threshold,
stage_config.account_hashing.commit_threshold,
stage_config.etl.clone(),
))
.set(StorageHashingStage::new(
stage_config.storage_hashing.clean_threshold,
stage_config.storage_hashing.commit_threshold,
stage_config.etl.clone(),
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.chunk_size,
stage_config.etl.clone(),
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,
prune_modes.account_history,
stage_config.etl.clone(),
))
.set(IndexStorageHistoryStage::new(
stage_config.index_storage_history.commit_threshold,
prune_modes.storage_history,
stage_config.etl.clone(),
)),
),
)
.build(provider_factory, static_file_producer);

Expand Down
Loading

0 comments on commit d9f9504

Please sign in to comment.