Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: backfill job single block iterator #9245

Merged
merged 4 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/ethereum/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,11 @@ where
type Output = BlockExecutionOutput<Receipt>;
type Error = BlockExecutionError;

/// Executes the block and commits the state changes.
/// Executes the block and commits the changes to the internal state.
///
/// Returns the receipts of the transactions in the block.
///
/// Returns an error if the block could not be executed or failed verification.
///
/// State changes are committed to the database.
fn execute(mut self, input: Self::Input<'_>) -> Result<Self::Output, Self::Error> {
let BlockExecutionInput { block, total_difficulty } = input;
let EthExecuteOutput { receipts, requests, gas_used } =
Expand Down
2 changes: 1 addition & 1 deletion crates/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub trait BatchExecutor<DB> {
/// Contains the state changes, transaction receipts, and total gas used in the block.
///
/// TODO(mattsse): combine with `ExecutionOutcome`
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct BlockExecutionOutput<T> {
/// The changed state of the block after execution.
pub state: BundleState,
Expand Down
300 changes: 245 additions & 55 deletions crates/exex/exex/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use reth_db_api::database::Database;
use reth_evm::execute::{BatchExecutor, BlockExecutionError, BlockExecutorProvider};
use reth_evm::execute::{
BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_node_api::FullNodeComponents;
use reth_primitives::{Block, BlockNumber};
use reth_primitives::{Block, BlockNumber, BlockWithSenders, Receipt};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{Chain, FullProvider, ProviderError, TransactionVariant};
use reth_prune_types::PruneModes;
Expand Down Expand Up @@ -195,38 +197,124 @@ where
}
}

impl<E, DB, P> BackfillJob<E, DB, P> {
/// Converts the backfill job into a single block backfill job.
pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, DB, P> {
self.into()
}
}

impl<E, DB, P> From<BackfillJob<E, DB, P>> for SingleBlockBackfillJob<E, DB, P> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm ok with it, but we still need an into_single_blocks method on BackfillJob

fn from(value: BackfillJob<E, DB, P>) -> Self {
Self {
executor: value.executor,
provider: value.provider,
range: value.range,
_db: PhantomData,
}
}
}

/// Single block Backfill job started for a specific range.
///
/// It implements [`Iterator`] which executes a block each time the
/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
#[derive(Debug)]
pub struct SingleBlockBackfillJob<E, DB, P> {
executor: E,
provider: P,
range: RangeInclusive<BlockNumber>,
_db: PhantomData<DB>,
}

impl<E, DB, P> Iterator for SingleBlockBackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
{
type Item = Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError>;

fn next(&mut self) -> Option<Self::Item> {
self.range.next().map(|block_number| self.execute_block(block_number))
}
}

impl<E, DB, P> SingleBlockBackfillJob<E, DB, P>
where
E: BlockExecutorProvider,
DB: Database,
P: FullProvider<DB>,
{
fn execute_block(
&self,
block_number: u64,
) -> Result<(BlockWithSenders, BlockExecutionOutput<Receipt>), BlockExecutionError> {
let td = self
.provider
.header_td_by_number(block_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Fetch the block with senders for execution.
let block_with_senders = self
.provider
.block_with_senders(block_number.into(), TransactionVariant::WithHash)?
.ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

// Configure the executor to use the previous block's state.
let executor = self.executor.executor(StateProviderDatabase::new(
self.provider.history_by_block_number(block_number.saturating_sub(1))?,
));

trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body.len(), "Executing block");

let block_execution_output = executor.execute((&block_with_senders, td).into())?;

Ok((block_with_senders, block_execution_output))
}
}

#[cfg(test)]
mod tests {
use crate::BackfillJobFactory;
use eyre::OptionExt;
use reth_blockchain_tree::noop::NoopBlockchainTree;
use reth_chainspec::{ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, MAINNET};
use reth_db_common::init::init_genesis;
use reth_evm::execute::{BatchExecutor, BlockExecutorProvider};
use reth_evm::execute::{
BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor,
};
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::{
b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, Genesis,
GenesisAccount, Header, Transaction, TxEip2930, TxKind, U256,
b256, constants::ETH_TO_WEI, public_key_to_address, Address, Block, BlockWithSenders,
Genesis, GenesisAccount, Header, Receipt, Requests, SealedBlockWithSenders, Transaction,
TxEip2930, TxKind, U256,
};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
BlockWriter, LatestStateProviderRef,
BlockWriter, ExecutionOutcome, LatestStateProviderRef, ProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::{self, sign_tx_with_key_pair};
use secp256k1::Keypair;
use std::sync::Arc;

#[tokio::test]
async fn test_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());
fn to_execution_outcome(
block_number: u64,
block_execution_output: &BlockExecutionOutput<Receipt>,
) -> ExecutionOutcome {
ExecutionOutcome {
bundle: block_execution_output.state.clone(),
receipts: block_execution_output.receipts.clone().into(),
first_block: block_number,
requests: vec![Requests(block_execution_output.requests.clone())],
}
}

// Create a chain spec with a genesis state that contains the sender
let chain_spec = Arc::new(
fn chain_spec(address: Address) -> Arc<ChainSpec> {
// Create a chain spec with a genesis state that contains the
// provided sender
Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(Genesis {
Expand All @@ -239,16 +327,53 @@ mod tests {
})
.paris_activated()
.build(),
);
)
}

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
fn execute_block_and_commit_to_database<DB>(
provider_factory: &ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
block: &BlockWithSenders,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: reth_db_api::database::Database,
{
let provider = provider_factory.provider()?;

// Execute the block to produce a block execution output
let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec)
.executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?;
block_execution_output.state.reverts.sort();

// Convert the block execution output to an execution outcome for committing to the database
let execution_outcome = to_execution_outcome(block.number, &block_execution_output);

// Commit the block's execution outcome to the database
let provider_rw = provider_factory.provider_rw()?;
let block = block.clone().seal_slow();
provider_rw.append_blocks_with_state(
vec![block],
execution_outcome,
Default::default(),
Default::default(),
)?;
provider_rw.commit()?;

Ok(block_execution_output)
}

fn blocks_and_execution_outputs<DB>(
provider_factory: ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
key_pair: Keypair,
) -> eyre::Result<Vec<(SealedBlockWithSenders, BlockExecutionOutput<Receipt>)>>
where
DB: reth_db_api::database::Database,
{
// First block has a transaction that transfers some ETH to zero address
let block1 = Block {
header: Header {
Expand Down Expand Up @@ -279,52 +404,69 @@ mod tests {
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;

// Second block has no state changes
// Second block resends the same transaction with increased nonce
let block2 = Block {
header: Header {
parent_hash: block1.hash_slow(),
parent_hash: block1.header.hash_slow(),
receipts_root: b256!(
"d3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e"
),
difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"),
number: 2,
gas_limit: 21000,
gas_used: 21000,
..Default::default()
},
body: vec![sign_tx_with_key_pair(
key_pair,
Transaction::Eip2930(TxEip2930 {
chain_id: chain_spec.chain.id(),
nonce: 1,
gas_limit: 21000,
gas_price: 1_500_000_000,
to: TxKind::Call(Address::ZERO),
value: U256::from(0.1 * ETH_TO_WEI as f64),
..Default::default()
}),
)],
..Default::default()
}
.with_recovered_senders()
.ok_or_eyre("failed to recover senders")?;

let provider = provider_factory.provider()?;
// Execute only the first block on top of genesis state
let mut outcome_single = EthExecutorProvider::ethereum(chain_spec.clone())
.batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute_and_verify_batch([(&block1, U256::ZERO).into()])?;
outcome_single.bundle.reverts.sort();
// Execute both blocks on top of the genesis state
let outcome_batch = EthExecutorProvider::ethereum(chain_spec)
.batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
)))
.execute_and_verify_batch([
(&block1, U256::ZERO).into(),
(&block2, U256::ZERO).into(),
])?;
drop(provider);
let block_output1 =
execute_block_and_commit_to_database(&provider_factory, chain_spec.clone(), &block1)?;
let block_output2 =
execute_block_and_commit_to_database(&provider_factory, chain_spec, &block2)?;

let block1 = block1.seal_slow();
let block2 = block2.seal_slow();

// Update the state with the execution results of both blocks
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_state(
vec![block1.clone(), block2],
outcome_batch,
Default::default(),
Default::default(),
Ok(vec![(block1, block_output1), (block2, block_output2)])
}

#[test]
fn test_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());

let chain_spec = chain_spec(address);

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;
provider_rw.commit()?;

let blocks_and_execution_outputs =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
let execution_outcome = to_execution_outcome(block.number, block_execution_output);

// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db);
Expand All @@ -336,8 +478,56 @@ mod tests {
assert_eq!(chains.len(), 1);
let mut chain = chains.into_iter().next().unwrap();
chain.execution_outcome_mut().bundle.reverts.sort();
assert_eq!(chain.blocks(), &[(1, block1)].into());
assert_eq!(chain.execution_outcome(), &outcome_single);
assert_eq!(chain.blocks(), &[(1, block.clone())].into());
assert_eq!(chain.execution_outcome(), &execution_outcome);

Ok(())
}

#[test]
fn test_single_block_backfill() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

// Create a key pair for the sender
let key_pair = Keypair::new_global(&mut generators::rng());
let address = public_key_to_address(key_pair.public_key());

let chain_spec = chain_spec(address);

let executor = EthExecutorProvider::ethereum(chain_spec.clone());
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(provider_factory.clone())?;
let blockchain_db = BlockchainProvider::new(
provider_factory.clone(),
Arc::new(NoopBlockchainTree::default()),
)?;

let blocks_and_execution_outcomes =
blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;

// Backfill the first block
let factory = BackfillJobFactory::new(executor, blockchain_db);
let job = factory.backfill(1..=1);
let single_job = job.into_single_blocks();
let block_execution_it = single_job.into_iter();

// Assert that the backfill job only produces a single block
let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
assert_eq!(blocks_and_outcomes.len(), 1);

// Assert that the backfill job single block iterator produces the expected output for each
// block
for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
let (block, mut execution_output) = res?;
execution_output.state.reverts.sort();

let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone();
let expected_block = sealed_block_with_senders.unseal();
let expected_output = &blocks_and_execution_outcomes[i].1;

assert_eq!(block, expected_block);
assert_eq!(&execution_output, expected_output);
}

Ok(())
}
Expand Down
Loading