Skip to content

Commit

Permalink
feat: use pipeline for reth stage unwind (#7085)
Browse files Browse the repository at this point in the history
Co-authored-by: joshieDo <ranriver@protonmail.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
4 people committed Apr 2, 2024
1 parent 16c76b6 commit 3726cd1
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 51 deletions.
197 changes: 175 additions & 22 deletions bin/reth/src/commands/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,37 @@ use crate::{
dirs::{DataDirPath, MaybePlatformPath},
};
use clap::{Parser, Subcommand};
use reth_db::{cursor::DbCursorRO, database::Database, open_db, tables, transaction::DbTx};
use reth_primitives::{BlockHashOrNumber, ChainSpec};
use reth_provider::{BlockExecutionWriter, ProviderFactory};
use reth_beacon_consensus::BeaconConsensus;
use reth_config::{Config, PruneConfig};
use reth_db::{database::Database, open_db};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::consensus::Consensus;
use reth_node_core::{
args::{get_secret_key, NetworkArgs},
dirs::ChainPath,
};
use reth_node_ethereum::EthEvmConfig;
use reth_primitives::{BlockHashOrNumber, ChainSpec, PruneModes, B256};
use reth_provider::{
BlockExecutionWriter, BlockNumReader, ChainSpecProvider, HeaderSyncMode, ProviderFactory,
};
use reth_prune::PrunerBuilder;
use reth_stages::{
sets::DefaultStages,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TransactionLookupStage,
},
Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer;
use std::{ops::RangeInclusive, sync::Arc};
use tokio::sync::watch;
use tracing::info;

/// `reth stage unwind` command
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -42,6 +69,9 @@ pub struct Command {
#[command(flatten)]
db: DatabaseArgs,

#[command(flatten)]
network: NetworkArgs,

#[command(subcommand)]
command: Subcommands,
}
Expand All @@ -55,28 +85,150 @@ impl Command {
if !db_path.exists() {
eyre::bail!("Database {db_path:?} does not exist.")
}
let config_path = data_dir.config_path();
let config: Config = confy::load_path(config_path).unwrap_or_default();

let db = open_db(db_path.as_ref(), self.db.database_args())?;

let range = self.command.unwind_range(&db)?;
let db = Arc::new(open_db(db_path.as_ref(), self.db.database_args())?);
let provider_factory =
ProviderFactory::new(db, self.chain.clone(), data_dir.static_files_path())?;

let range = self.command.unwind_range(provider_factory.clone())?;
if *range.start() == 0 {
eyre::bail!("Cannot unwind genesis block")
}

let factory = ProviderFactory::new(&db, self.chain.clone(), data_dir.static_files_path())?;
let provider = factory.provider_rw()?;
// Only execute a pipeline unwind if the start of the range overlaps the existing static
// files. If that's the case, then copy all available data from MDBX to static files, and
// only then, proceed with the unwind.
if let Some(highest_static_block) = provider_factory
.static_file_provider()
.get_highest_static_files()
.max()
.filter(|highest_static_file_block| highest_static_file_block >= range.start())
{
info!(target: "reth::cli", ?range, ?highest_static_block, "Executing a pipeline unwind.");
let mut pipeline =
self.build_pipeline(data_dir, config, provider_factory.clone()).await?;

let blocks_and_execution = provider
.take_block_and_execution_range(&self.chain, range)
.map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
// Move all applicable data from database to static files.
pipeline.produce_static_files()?;

provider.commit()?;
// Run the pruner so we don't potentially end up with higher height in the database vs
// static files.
let mut pruner = PrunerBuilder::new(PruneConfig::default())
.prune_delete_limit(usize::MAX)
.build(provider_factory);
pruner.run(*range.end())?;

println!("Unwound {} blocks", blocks_and_execution.len());
pipeline.unwind((*range.start()).saturating_sub(1), None)?;
} else {
info!(target: "reth::cli", ?range, "Executing a database unwind.");
let provider = provider_factory.provider_rw()?;

let _ = provider
.take_block_and_execution_range(&self.chain, range.clone())
.map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;

provider.commit()?;
}

println!("Unwound {} blocks", range.count());

Ok(())
}

async fn build_pipeline<DB: Database + 'static>(
self,
data_dir: ChainPath<DataDirPath>,
config: Config,
provider_factory: ProviderFactory<Arc<DB>>,
) -> Result<Pipeline<Arc<DB>>, eyre::Error> {
// Even though we are not planning to download anything, we need to initialize Body and
// Header stage with a network client
let network_secret_path =
self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path());
let p2p_secret_key = get_secret_key(&network_secret_path)?;
let default_peers_path = data_dir.known_peers_path();
let network = self
.network
.network_config(
&config,
provider_factory.chain_spec(),
p2p_secret_key,
default_peers_path,
)
.build(provider_factory.clone())
.start_network()
.await?;

let consensus: Arc<dyn Consensus> =
Arc::new(BeaconConsensus::new(provider_factory.chain_spec()));

// building network downloaders using the fetch client
let fetch_client = network.fetch_client().await?;
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(fetch_client.clone(), Arc::clone(&consensus));
let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies).build(
fetch_client,
Arc::clone(&consensus),
provider_factory.clone(),
);
let stage_conf = &config.stages;

let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let factory = reth_revm::EvmProcessorFactory::new(
provider_factory.chain_spec(),
EthEvmConfig::default(),
);

let header_mode = HeaderSyncMode::Tip(tip_rx);
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
header_mode,
Arc::clone(&consensus),
header_downloader,
body_downloader,
factory.clone(),
stage_conf.etl.clone(),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds {
max_blocks: None,
max_changes: None,
max_cumulative_gas: None,
max_duration: None,
},
stage_conf
.merkle
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
))
.set(AccountHashingStage::default())
.set(StorageHashingStage::default())
.set(MerkleStage::default_unwind())
.set(TransactionLookupStage::default())
.set(IndexAccountHistoryStage::default())
.set(IndexStorageHistoryStage::default()),
)
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
Ok(pipeline)
}
}

/// `reth stage unwind` subcommand
Expand All @@ -94,21 +246,22 @@ impl Subcommands {
/// Returns the block range to unwind.
///
/// This returns an inclusive range: [target..=latest]
fn unwind_range<DB: Database>(&self, db: DB) -> eyre::Result<RangeInclusive<u64>> {
let tx = db.tx()?;
let mut cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let last = cursor.last()?.ok_or_else(|| eyre::eyre!("No blocks in database"))?;

fn unwind_range<DB: Database>(
&self,
factory: ProviderFactory<DB>,
) -> eyre::Result<RangeInclusive<u64>> {
let provider = factory.provider()?;
let last = provider.last_block_number()?;
let target = match self {
Subcommands::ToBlock { target } => match target {
BlockHashOrNumber::Hash(hash) => tx
.get::<tables::HeaderNumbers>(*hash)?
BlockHashOrNumber::Hash(hash) => provider
.block_number(*hash)?
.ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
BlockHashOrNumber::Number(num) => *num,
},
Subcommands::NumBlocks { amount } => last.0.saturating_sub(*amount),
Subcommands::NumBlocks { amount } => last.saturating_sub(*amount),
} + 1;
Ok(target..=last.0)
Ok(target..=last)
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/primitives/src/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl HighestStaticFiles {
StaticFileSegment::Receipts => &mut self.receipts,
}
}

/// Returns the maximum block of all segments.
pub fn max(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).max()
}
}

/// Each static file has a fixed number of blocks. This gives out the range where the requested
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ where
///
/// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
/// lock is occupied.
fn produce_static_files(&mut self) -> RethResult<()> {
pub fn produce_static_files(&mut self) -> RethResult<()> {
let mut static_file_producer = self.static_file_producer.lock();

let provider = self.provider_factory.provider()?;
Expand Down
70 changes: 46 additions & 24 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ use reth_interfaces::{
};
use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
StaticFileSegment,
StaticFileSegment, TxNumber,
};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, StatsReader,
};
use reth_provider::{providers::StaticFileWriter, DatabaseProviderRW, HeaderProvider, StatsReader};
use std::{
cmp::Ordering,
task::{ready, Context, Poll},
Expand Down Expand Up @@ -145,17 +148,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// error will trigger an unwind, that will bring the database to the same height as the
// static files.
Ordering::Less => {
let last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)
.unwrap_or_default();

let missing_block =
Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());

return Err(StageError::MissingStaticFileData {
block: missing_block,
segment: StaticFileSegment::Transactions,
})
return Err(missing_static_data_error(
next_static_file_tx_num.saturating_sub(1),
static_file_provider,
provider,
)?)
}
Ordering::Equal => {}
}
Expand Down Expand Up @@ -311,17 +308,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// If there are more transactions on database, then we are missing static file data and we
// need to unwind further.
if db_tx_num > static_file_tx_num {
let last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)
.unwrap_or_default();

let missing_block =
Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());

return Err(StageError::MissingStaticFileData {
block: missing_block,
segment: StaticFileSegment::Transactions,
})
return Err(missing_static_data_error(
static_file_tx_num,
static_file_provider,
provider,
)?)
}

// Unwinds static file
Expand All @@ -335,6 +326,37 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
}
}

fn missing_static_data_error<DB: Database>(
last_tx_num: TxNumber,
static_file_provider: &StaticFileProvider,
provider: &DatabaseProviderRW<DB>,
) -> Result<StageError, ProviderError> {
let mut last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Transactions)
.unwrap_or_default();

// To be extra safe, we make sure that the last tx num matches the last block from its indices.
// If not, get it.
loop {
if let Some(indices) = provider.block_body_indices(last_block)? {
if indices.last_tx_num() <= last_tx_num {
break
}
}
if last_block == 0 {
break
}
last_block -= 1;
}

let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());

Ok(StageError::MissingStaticFileData {
block: missing_block,
segment: StaticFileSegment::Transactions,
})
}

// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
// beforehand how many bytes we need to download. So the good solution would be to measure the
// progress in gas as a proxy to size. Execution stage uses a similar approach.
Expand Down
25 changes: 21 additions & 4 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,13 +578,30 @@ where
start_block.saturating_sub(1),
)?,
Ordering::Less => {
let last_block = static_file_provider
let mut last_block = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Receipts)
.unwrap_or(0);

let missing_block = Box::new(
tx.get::<tables::Headers>(last_block + 1)?.unwrap_or_default().seal_slow(),
);
let last_receipt_num = static_file_provider
.get_highest_static_file_tx(StaticFileSegment::Receipts)
.unwrap_or(0);

// To be extra safe, we make sure that the last receipt num matches the last block from
// its indices. If not, get it.
loop {
if let Some(indices) = provider.block_body_indices(last_block)? {
if indices.last_tx_num() <= last_receipt_num {
break
}
}
if last_block == 0 {
break
}
last_block -= 1;
}

let missing_block =
Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());

return Err(StageError::MissingStaticFileData {
block: missing_block,
Expand Down

0 comments on commit 3726cd1

Please sign in to comment.