Skip to content

Commit

Permalink
cleanup & docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Mar 1, 2024
1 parent f53984b commit 28d1a6f
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 216 deletions.
111 changes: 50 additions & 61 deletions crates/trie-parallel/src/async_root.rs
@@ -1,3 +1,4 @@
use crate::StorageRootTargets;
use alloy_rlp::{BufMut, Encodable};
use reth_db::database::Database;
use reth_primitives::{
Expand All @@ -10,11 +11,10 @@ use reth_provider::{
};
use reth_tasks::TaskSpawner;
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
hashed_cursor::HashedPostStateCursorFactory,
node_iter::{AccountNode, AccountNodeIter},
prefix_set::PrefixSet,
trie_cursor::TrieCursorFactory,
updates::{TrieKey, TrieUpdates},
updates::TrieUpdates,
walker::TrieWalker,
HashedPostState, StorageRoot, StorageRootError,
};
Expand All @@ -23,6 +23,7 @@ use thiserror::Error;
use tokio::sync::oneshot;
use tracing::*;

#[derive(Debug)]
pub struct AsyncStateRoot<DB, Provider> {
/// Consistent view of the database.
view: ConsistentDbView<DB, Provider>,
Expand All @@ -33,6 +34,7 @@ pub struct AsyncStateRoot<DB, Provider> {
}

impl<DB, Provider> AsyncStateRoot<DB, Provider> {
/// Create new async state root calculator.
pub fn new(
view: ConsistentDbView<DB, Provider>,
task_spawner: Arc<dyn TaskSpawner>,
Expand All @@ -47,10 +49,12 @@ where
DB: Database + Clone + 'static,
Provider: DatabaseProviderFactory<DB> + Clone + Send + 'static,
{
/// Calculate incremental state root asynchronously.
pub async fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
self.calculate(false).await.map(|(root, _)| root)
}

/// Calculate incremental state root with updates asynchronously.
pub async fn incremental_root_with_updates(
self,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
Expand All @@ -62,13 +66,8 @@ where
retain_updates: bool,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let prefix_sets = self.hashed_state.construct_prefix_sets();
let storage_root_targets = self
.hashed_state
.accounts
.keys()
.map(|address| (*address, PrefixSet::default()))
.chain(prefix_sets.storage_prefix_sets)
.collect::<HashMap<_, _>>();
let storage_root_targets =
StorageRootTargets::new(&self.hashed_state, prefix_sets.storage_prefix_sets);
let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted());

// Pre-calculate storage roots async for accounts which were changed.
Expand All @@ -84,20 +83,17 @@ where
.provider_ro()
.map_err(AsyncStateRootError::ConsistentView)
.and_then(|provider_ro| {
let tx = provider_ro.tx_ref();
let calculator = StorageRoot::new_hashed(
tx,
HashedPostStateCursorFactory::new(tx, &hashed_state_sorted),
StorageRoot::new_hashed(
provider_ro.tx_ref(),
HashedPostStateCursorFactory::new(
provider_ro.tx_ref(),
&hashed_state_sorted,
),
hashed_address,
)
.with_prefix_set(prefix_set);

Ok(if retain_updates {
let (root, _, updates) = calculator.root_with_updates()?;
(root, Some(updates))
} else {
(calculator.root()?, None)
})
.with_prefix_set(prefix_set)
.calculate(retain_updates)
.map_err(AsyncStateRootError::StorageRoot)
});

let _ = tx.send(result);
Expand All @@ -113,17 +109,15 @@ where
let hashed_cursor_factory = HashedPostStateCursorFactory::new(tx, &hashed_state_sorted);
let trie_cursor_factory = tx;

let hashed_account_cursor =
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?;
let trie_cursor =
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?;

let walker = TrieWalker::new(trie_cursor, prefix_sets.account_prefix_set);
let mut hash_builder = HashBuilder::default();
let mut account_node_iter = AccountNodeIter::new(walker, hashed_account_cursor);

account_node_iter.walker.set_updates(retain_updates);
hash_builder.set_updates(retain_updates);
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
let walker = TrieWalker::new(trie_cursor, prefix_sets.account_prefix_set)
.with_updates(retain_updates);
let mut account_node_iter =
AccountNodeIter::from_factory(walker, hashed_cursor_factory.clone())
.map_err(ProviderError::Database)?;

let mut account_rlp = Vec::with_capacity(128);
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
Expand All @@ -132,63 +126,58 @@ where
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
}
AccountNode::Leaf(hashed_address, account) => {
let (storage_root, updates) = match storage_roots.remove(&hashed_address) {
Some(rx) => rx
.await
.map_err(|_| AsyncStateRootError::StorageRootChannelClosed)??,
None => {
let calculator = StorageRoot::new_hashed(
trie_cursor_factory,
hashed_cursor_factory.clone(),
hashed_address,
);

if retain_updates {
let (root, _, updates) = calculator.root_with_updates()?;
(root, Some(updates))
} else {
(calculator.root()?, None)
}
}
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
Some(rx) => rx.await.map_err(|_| {
AsyncStateRootError::StorageRootChannelClosed { hashed_address }
})??,
None => StorageRoot::new_hashed(
trie_cursor_factory,
hashed_cursor_factory.clone(),
hashed_address,
)
.calculate(retain_updates)?,
};

if let Some(updates) = updates {
if retain_updates {
trie_updates.extend(updates.into_iter());
}

let account = TrieAccount::from((account, storage_root));

account_rlp.clear();
let account = TrieAccount::from((account, storage_root));
account.encode(&mut account_rlp as &mut dyn BufMut);

hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
}
}
}

let root = hash_builder.root();

let (_, walker_updates) = account_node_iter.walker.split();
let (_, hash_builder_updates) = hash_builder.split();

trie_updates.extend(walker_updates);
trie_updates.extend_with_account_updates(hash_builder_updates);
trie_updates.extend_with_deletes(
prefix_sets.destroyed_accounts.into_iter().map(TrieKey::StorageTrie),
trie_updates.finalize_state_updates(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
);

Ok((root, trie_updates))
}
}

/// Error during async state root calculation.
#[derive(Error, Debug)]
pub enum AsyncStateRootError {
#[error("storage root channel closed")]
StorageRootChannelClosed,
/// Storage root channel for a given address was closed.
#[error("storage root channel for {hashed_address} got closed")]
StorageRootChannelClosed {
/// The hashed address for which channel was closed.
hashed_address: B256,
},
/// Consistency error on attempt to create new database provider.
#[error(transparent)]
ConsistentView(#[from] ConsistentViewError),
/// Error while calculating storage root.
#[error(transparent)]
StorageRoot(#[from] StorageRootError),
/// Provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
}
3 changes: 3 additions & 0 deletions crates/trie-parallel/src/lib.rs
Expand Up @@ -7,6 +7,9 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod storage_root_targets;
pub(crate) use storage_root_targets::StorageRootTargets;

mod async_root;
pub use async_root::*;

Expand Down

0 comments on commit 28d1a6f

Please sign in to comment.