Skip to content

Commit

Permalink
feat(trie): parallel storage roots (#6903)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Mar 8, 2024
1 parent 820b122 commit 9569692
Show file tree
Hide file tree
Showing 23 changed files with 1,215 additions and 122 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -57,6 +57,7 @@ members = [
"crates/tracing/",
"crates/transaction-pool/",
"crates/trie/",
"crates/trie-parallel/",
"examples/",
"examples/additional-rpc-namespace-in-cli/",
"examples/beacon-api-sse/",
Expand Down Expand Up @@ -182,6 +183,7 @@ reth-tokio-util = { path = "crates/tokio-util" }
reth-tracing = { path = "crates/tracing" }
reth-transaction-pool = { path = "crates/transaction-pool" }
reth-trie = { path = "crates/trie" }
reth-trie-parallel = { path = "crates/trie-parallel" }

# revm
revm = { version = "6.1.0", features = ["std", "secp256k1"], default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/Cargo.toml
Expand Up @@ -45,7 +45,7 @@ reth-basic-payload-builder.workspace = true
reth-discv4.workspace = true
reth-prune.workspace = true
reth-static-file = { workspace = true, features = ["clap"] }
reth-trie.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-nippy-jar.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain-tree/Cargo.toml
Expand Up @@ -17,7 +17,7 @@ reth-interfaces.workspace = true
reth-db.workspace = true
reth-provider.workspace = true
reth-stages.workspace = true
reth-trie.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }

# common
parking_lot.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/interfaces/src/provider.rs
Expand Up @@ -162,7 +162,7 @@ pub enum ConsistentViewError {
Syncing(BlockNumber),
/// Error thrown on inconsistent database view.
#[error("inconsistent database state: {tip:?}")]
InconsistentView {
Inconsistent {
/// The tip diff.
tip: GotExpected<Option<B256>>,
},
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/Cargo.toml
Expand Up @@ -18,7 +18,7 @@ reth-interfaces.workspace = true
reth-db.workspace = true
reth-codecs.workspace = true
reth-provider.workspace = true
reth-trie.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-tokio-util.workspace = true
reth-etl.workspace = true
reth-static-file.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/provider/Cargo.toml
Expand Up @@ -16,7 +16,7 @@ workspace = true
reth-primitives.workspace = true
reth-interfaces.workspace = true
reth-db.workspace = true
reth-trie.workspace = true
reth-trie = { workspace = true, features = ["metrics"] }
reth-nippy-jar.workspace = true
reth-codecs.workspace = true
reth-node-api.workspace = true
Expand Down
11 changes: 9 additions & 2 deletions crates/storage/provider/src/providers/consistent_view.rs
@@ -1,9 +1,11 @@
use crate::{BlockNumReader, DatabaseProviderFactory, DatabaseProviderRO, ProviderError};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::provider::{ConsistentViewError, ProviderResult};
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{GotExpected, B256};
use std::marker::PhantomData;

pub use reth_interfaces::provider::ConsistentViewError;

/// A consistent view over state in the database.
///
/// View gets initialized with the latest or provided tip.
Expand All @@ -14,6 +16,11 @@ use std::marker::PhantomData;
///
/// The view should only be used outside of staged-sync.
/// Otherwise, any attempt to create a provider will result in [ConsistentViewError::Syncing].
///
/// When using the view, the consumer should either
/// 1) have a failover for when the state changes and handle [ConsistentViewError::Inconsistent]
/// appropriately.
/// 2) be sure that the state does not change.
#[derive(Clone, Debug)]
pub struct ConsistentDbView<DB, Provider> {
database: PhantomData<DB>,
Expand Down Expand Up @@ -56,7 +63,7 @@ where

let tip = last_entry.map(|(_, hash)| hash);
if self.tip != tip {
return Err(ConsistentViewError::InconsistentView {
return Err(ConsistentViewError::Inconsistent {
tip: GotExpected { got: tip, expected: self.tip },
})
}
Expand Down
14 changes: 10 additions & 4 deletions crates/storage/provider/src/providers/database/mod.rs
Expand Up @@ -5,10 +5,10 @@ use crate::{
},
to_range,
traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, EvmEnvProvider,
HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, ProviderError,
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, TransactionVariant,
TransactionsProvider, WithdrawalsProvider,
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
EvmEnvProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode,
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox,
TransactionVariant, TransactionsProvider, WithdrawalsProvider,
};
use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv};
use reth_interfaces::{provider::ProviderResult, RethError, RethResult};
Expand Down Expand Up @@ -208,6 +208,12 @@ impl<DB: Database> ProviderFactory<DB> {
}
}

impl<DB: Database> DatabaseProviderFactory<DB> for ProviderFactory<DB> {
fn database_provider_ro(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
self.provider()
}
}

impl<DB: Database> HeaderSyncGapProvider for ProviderFactory<DB> {
fn sync_gap(
&self,
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/provider/src/providers/mod.rs
Expand Up @@ -59,7 +59,7 @@ mod chain_info;
use chain_info::ChainInfoTracker;

mod consistent_view;
pub use consistent_view::ConsistentDbView;
pub use consistent_view::{ConsistentDbView, ConsistentViewError};

/// The main type for interacting with the blockchain.
///
Expand Down
65 changes: 65 additions & 0 deletions crates/trie-parallel/Cargo.toml
@@ -0,0 +1,65 @@
[package]
name = "reth-trie-parallel"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Parallel implementation of merkle root algorithm"

[lints]
workspace = true

[dependencies]
# reth
reth-primitives.workspace = true
reth-db.workspace = true
reth-trie.workspace = true
reth-provider.workspace = true

# alloy
alloy-rlp.workspace = true

# tracing
tracing.workspace = true

# misc
thiserror.workspace = true
derive_more.workspace = true

# `async` feature
reth-tasks = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, default-features = false }
itertools = { workspace = true, optional = true }

# `parallel` feature
rayon = { workspace = true, optional = true }

# `metrics` feature
reth-metrics = { workspace = true, optional = true }
metrics = { workspace = true, optional = true }

[dev-dependencies]
# reth
reth-primitives = { workspace = true, features = ["test-utils", "arbitrary"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-trie = { workspace = true, features = ["test-utils"] }

# misc
rand.workspace = true
tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] }
rayon.workspace = true
criterion = { workspace = true, features = ["async_tokio"] }
proptest.workspace = true

[features]
default = ["metrics"]
metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics"]
async = ["reth-tasks/rayon", "tokio/sync", "itertools"]
parallel = ["rayon"]

[[bench]]
name = "root"
required-features = ["async", "parallel"]
harness = false
135 changes: 135 additions & 0 deletions crates/trie-parallel/benches/root.rs
@@ -0,0 +1,135 @@
#![allow(missing_docs, unreachable_pub)]
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use proptest::{prelude::*, strategy::ValueTree, test_runner::TestRunner};
use rayon::ThreadPoolBuilder;
use reth_primitives::{Account, B256, U256};
use reth_provider::{
bundle_state::HashedStateChanges, providers::ConsistentDbView,
test_utils::create_test_provider_factory,
};
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot,
};
use reth_trie_parallel::{async_root::AsyncStateRoot, parallel_root::ParallelStateRoot};
use std::collections::HashMap;

pub fn calculate_state_root(c: &mut Criterion) {
let mut group = c.benchmark_group("Calculate State Root");
group.sample_size(20);

let runtime = tokio::runtime::Runtime::new().unwrap();
let blocking_pool = BlockingTaskPool::new(ThreadPoolBuilder::default().build().unwrap());

for size in [1_000, 3_000, 5_000, 10_000] {
let (db_state, updated_state) = generate_test_data(size);
let provider_factory = create_test_provider_factory();
{
let provider_rw = provider_factory.provider_rw().unwrap();
HashedStateChanges(db_state).write_to_db(provider_rw.tx_ref()).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
updates.flush(provider_rw.tx_ref()).unwrap();
provider_rw.commit().unwrap();
}

let view = ConsistentDbView::new(provider_factory.clone());

// state root
group.bench_function(BenchmarkId::new("sync root", size), |b| {
b.to_async(&runtime).iter_with_setup(
|| {
let sorted_state = updated_state.clone().into_sorted();
let prefix_sets = updated_state.construct_prefix_sets();
let provider = provider_factory.provider().unwrap();
(provider, sorted_state, prefix_sets)
},
|(provider, sorted_state, prefix_sets)| async move {
StateRoot::from_tx(provider.tx_ref())
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(
provider.tx_ref(),
&sorted_state,
))
.with_prefix_sets(prefix_sets)
.root()
},
)
});

// parallel root
group.bench_function(BenchmarkId::new("parallel root", size), |b| {
b.to_async(&runtime).iter_with_setup(
|| ParallelStateRoot::new(view.clone(), updated_state.clone()),
|calculator| async { calculator.incremental_root() },
);
});

// async root
group.bench_function(BenchmarkId::new("async root", size), |b| {
b.to_async(&runtime).iter_with_setup(
|| AsyncStateRoot::new(view.clone(), blocking_pool.clone(), updated_state.clone()),
|calculator| calculator.incremental_root(),
);
});
}
}

fn generate_test_data(size: usize) -> (HashedPostState, HashedPostState) {
let storage_size = 1_000;
let mut runner = TestRunner::new(ProptestConfig::default());

use proptest::{collection::hash_map, sample::subsequence};
let db_state = hash_map(
any::<B256>(),
(
any::<Account>().prop_filter("non empty account", |a| !a.is_empty()),
hash_map(
any::<B256>(),
any::<U256>().prop_filter("non zero value", |v| !v.is_zero()),
storage_size,
),
),
size,
)
.new_tree(&mut runner)
.unwrap()
.current();

let keys = db_state.keys().cloned().collect::<Vec<_>>();
let keys_to_update = subsequence(keys, size / 2).new_tree(&mut runner).unwrap().current();

let updated_storages = keys_to_update
.into_iter()
.map(|address| {
let (_, storage) = db_state.get(&address).unwrap();
let slots = storage.keys().cloned().collect::<Vec<_>>();
let slots_to_update =
subsequence(slots, storage_size / 2).new_tree(&mut runner).unwrap().current();
(
address,
slots_to_update
.into_iter()
.map(|slot| (slot, any::<U256>().new_tree(&mut runner).unwrap().current()))
.collect::<HashMap<_, _>>(),
)
})
.collect::<HashMap<_, _>>();

(
HashedPostState::default()
.with_accounts(
db_state.iter().map(|(address, (account, _))| (*address, Some(*account))),
)
.with_storages(db_state.into_iter().map(|(address, (_, storage))| {
(address, HashedStorage::from_iter(false, storage))
})),
HashedPostState::default().with_storages(
updated_storages
.into_iter()
.map(|(address, storage)| (address, HashedStorage::from_iter(false, storage))),
),
)
}

criterion_group!(state_root, calculate_state_root);
criterion_main!(state_root);

0 comments on commit 9569692

Please sign in to comment.