Skip to content

Commit

Permalink
feat(trie): parallel storage roots
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Mar 1, 2024
1 parent ba00472 commit f53984b
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 5 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ members = [
"crates/tracing/",
"crates/transaction-pool/",
"crates/trie/",
"crates/trie-parallel/",
"examples/",
"examples/additional-rpc-namespace-in-cli/",
"examples/beacon-api-sse/",
Expand Down
4 changes: 3 additions & 1 deletion crates/storage/provider/src/providers/consistent_view.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::{BlockNumReader, DatabaseProviderFactory, DatabaseProviderRO, ProviderError};
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
use reth_interfaces::provider::{ConsistentViewError, ProviderResult};
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{GotExpected, B256};
use std::marker::PhantomData;

pub use reth_interfaces::provider::ConsistentViewError;

/// A consistent view over state in the database.
///
/// View gets initialized with the latest or provided tip.
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mod chain_info;
use chain_info::ChainInfoTracker;

mod consistent_view;
pub use consistent_view::ConsistentDbView;
pub use consistent_view::{ConsistentDbView, ConsistentViewError};

/// The main type for interacting with the blockchain.
///
Expand Down
50 changes: 50 additions & 0 deletions crates/trie-parallel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "reth-trie-parallel"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Parallel implementation of merkle root algorithm"

[lints]
workspace = true

[dependencies]
# reth
reth-primitives.workspace = true
reth-interfaces.workspace = true
reth-db.workspace = true
reth-trie.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true

# alloy
alloy-rlp.workspace = true
alloy-chains.workspace = true

# tracing
tracing.workspace = true

# misc
tokio = { workspace = true, default-features = false, features = ["sync"] }
rayon.workspace = true
thiserror.workspace = true
derive_more.workspace = true
auto_impl = "1"

[dev-dependencies]
# reth
reth-primitives = { workspace = true, features = ["test-utils", "arbitrary"] }
reth-db = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }

# misc
proptest.workspace = true
tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] }
tokio-stream.workspace = true
once_cell.workspace = true
serde_json.workspace = true
similar-asserts.workspace = true
criterion.workspace = true
194 changes: 194 additions & 0 deletions crates/trie-parallel/src/async_root.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use alloy_rlp::{BufMut, Encodable};
use reth_db::database::Database;
use reth_primitives::{
trie::{HashBuilder, Nibbles, TrieAccount},
B256,
};
use reth_provider::{
providers::{ConsistentDbView, ConsistentViewError},
DatabaseProviderFactory, ProviderError,
};
use reth_tasks::TaskSpawner;
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{AccountNode, AccountNodeIter},
prefix_set::PrefixSet,
trie_cursor::TrieCursorFactory,
updates::{TrieKey, TrieUpdates},
walker::TrieWalker,
HashedPostState, StorageRoot, StorageRootError,
};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::*;

pub struct AsyncStateRoot<DB, Provider> {
/// Consistent view of the database.
view: ConsistentDbView<DB, Provider>,
/// Task spawner.
task_spawner: Arc<dyn TaskSpawner>,
/// Changed hashed state.
hashed_state: HashedPostState,
}

impl<DB, Provider> AsyncStateRoot<DB, Provider> {
pub fn new(
view: ConsistentDbView<DB, Provider>,
task_spawner: Arc<dyn TaskSpawner>,
hashed_state: HashedPostState,
) -> Self {
Self { view, task_spawner, hashed_state }
}
}

impl<DB, Provider> AsyncStateRoot<DB, Provider>
where
DB: Database + Clone + 'static,
Provider: DatabaseProviderFactory<DB> + Clone + Send + 'static,
{
pub async fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
self.calculate(false).await.map(|(root, _)| root)
}

pub async fn incremental_root_with_updates(
self,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
self.calculate(true).await
}

async fn calculate(
self,
retain_updates: bool,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let prefix_sets = self.hashed_state.construct_prefix_sets();
let storage_root_targets = self
.hashed_state
.accounts
.keys()
.map(|address| (*address, PrefixSet::default()))
.chain(prefix_sets.storage_prefix_sets)
.collect::<HashMap<_, _>>();
let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted());

// Pre-calculate storage roots async for accounts which were changed.
debug!(target: "trie::async_state_root", len = storage_root_targets.len(), "pre-calculating storage roots");

let mut storage_roots = HashMap::with_capacity(storage_root_targets.len());
for (hashed_address, prefix_set) in storage_root_targets {
let (tx, rx) = oneshot::channel();
let view = self.view.clone();
let hashed_state_sorted = hashed_state_sorted.clone();
self.task_spawner.spawn(Box::pin(async move {
let result = view
.provider_ro()
.map_err(AsyncStateRootError::ConsistentView)
.and_then(|provider_ro| {
let tx = provider_ro.tx_ref();
let calculator = StorageRoot::new_hashed(
tx,
HashedPostStateCursorFactory::new(tx, &hashed_state_sorted),
hashed_address,
)
.with_prefix_set(prefix_set);

Ok(if retain_updates {
let (root, _, updates) = calculator.root_with_updates()?;
(root, Some(updates))
} else {
(calculator.root()?, None)
})
});

let _ = tx.send(result);
}));
storage_roots.insert(hashed_address, rx);
}

trace!(target: "trie::async_state_root", "calculating state root");
let mut trie_updates = TrieUpdates::default();

let provider_ro = self.view.provider_ro()?;
let tx = provider_ro.tx_ref();
let hashed_cursor_factory = HashedPostStateCursorFactory::new(tx, &hashed_state_sorted);
let trie_cursor_factory = tx;

let hashed_account_cursor =
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?;
let trie_cursor =
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?;

let walker = TrieWalker::new(trie_cursor, prefix_sets.account_prefix_set);
let mut hash_builder = HashBuilder::default();
let mut account_node_iter = AccountNodeIter::new(walker, hashed_account_cursor);

account_node_iter.walker.set_updates(retain_updates);
hash_builder.set_updates(retain_updates);

let mut account_rlp = Vec::with_capacity(128);
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
match node {
AccountNode::Branch(node) => {
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
AccountNode::Leaf(hashed_address, account) => {
let (storage_root, updates) = match storage_roots.remove(&hashed_address) {
Some(rx) => rx
.await
.map_err(|_| AsyncStateRootError::StorageRootChannelClosed)??,
None => {
let calculator = StorageRoot::new_hashed(
trie_cursor_factory,
hashed_cursor_factory.clone(),
hashed_address,
);

if retain_updates {
let (root, _, updates) = calculator.root_with_updates()?;
(root, Some(updates))
} else {
(calculator.root()?, None)
}
}
};

if let Some(updates) = updates {
trie_updates.extend(updates.into_iter());
}

let account = TrieAccount::from((account, storage_root));

account_rlp.clear();
account.encode(&mut account_rlp as &mut dyn BufMut);

hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
}
}
}

let root = hash_builder.root();

let (_, walker_updates) = account_node_iter.walker.split();
let (_, hash_builder_updates) = hash_builder.split();

trie_updates.extend(walker_updates);
trie_updates.extend_with_account_updates(hash_builder_updates);
trie_updates.extend_with_deletes(
prefix_sets.destroyed_accounts.into_iter().map(TrieKey::StorageTrie),
);

Ok((root, trie_updates))
}
}

#[derive(Error, Debug)]
pub enum AsyncStateRootError {
#[error("storage root channel closed")]
StorageRootChannelClosed,
#[error(transparent)]
ConsistentView(#[from] ConsistentViewError),
#[error(transparent)]
StorageRoot(#[from] StorageRootError),
#[error(transparent)]
Provider(#[from] ProviderError),
}
14 changes: 14 additions & 0 deletions crates/trie-parallel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! TODO:

#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod async_root;
pub use async_root::*;

mod parallel_root;
pub use parallel_root::*;

0 comments on commit f53984b

Please sign in to comment.