Skip to content

Commit

Permalink
[Memtrie] Implement trie iterator based on memtrie. (#11299)
Browse files Browse the repository at this point in the history
Closes #11278

This mostly copies from the on-disk trie implementation except adapting
it to memtrie data structures. This eliminates the sole remaining
disk-trie use during normal validator node operation (when memtrie is
enabled), remove_account.

The memtrie implementation requires a different API because there's no
way to grab a read lock and construct an iterator at the same time, due
to there necessarily being two layers of lifetimes. So whereas we used
to have Trie::iter(), we now must have Trie::lock_for_iter() and then
use the result to .iter().
  • Loading branch information
robin-near committed May 16, 2024
1 parent 1479c90 commit a7a4f31
Show file tree
Hide file tree
Showing 25 changed files with 519 additions and 119 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl FlatStorageShardCreator {
let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total).unwrap();
let hex_path_begin = Self::nibbles_to_hex(&path_begin);
debug!(target: "store", "Preload state part from {hex_path_begin}");
let mut trie_iter = trie.iter().unwrap();
let mut trie_iter = trie.disk_iter().unwrap();

let mut store_update = store.store_update();
let mut num_items = 0;
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ pub(crate) fn trie_changes_chunk_extra_exists(
// 4) Trie should exist for `shard_uid` and the root
let trie = sv.runtime.get_tries().get_trie_for_shard(*shard_uid, new_root);
let trie_iterator = unwrap_or_err!(
trie.iter(),
trie.disk_iter(),
"Trie Node Missing for shard {:?} root {:?}",
shard_uid,
new_root
Expand Down
6 changes: 3 additions & 3 deletions chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ fn gc_fork_common(simple_chains: Vec<SimpleChain>, max_changes: usize) {

let mut state_root2 = state_roots2[simple_chain.from as usize];
let state_root1 = states1[simple_chain.from as usize].1[shard_to_check_trie as usize];
tries1.get_trie_for_shard(shard_uid, state_root1).iter().unwrap();
tries1.get_trie_for_shard(shard_uid, state_root1).disk_iter().unwrap();
assert_eq!(state_root1, state_root2);

for i in start_index..start_index + simple_chain.length {
Expand Down Expand Up @@ -277,13 +277,13 @@ fn gc_fork_common(simple_chains: Vec<SimpleChain>, max_changes: usize) {
);
let a = tries1
.get_trie_for_shard(shard_uid, state_root1)
.iter()
.disk_iter()
.unwrap()
.map(|item| item.unwrap().0)
.collect::<Vec<_>>();
let b = tries2
.get_trie_for_shard(shard_uid, state_root1)
.iter()
.disk_iter()
.unwrap()
.map(|item| item.unwrap().0)
.collect::<Vec<_>>();
Expand Down
10 changes: 8 additions & 2 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,9 @@ pub fn remove_account(
state_update.remove(TrieKey::ContractCode { account_id: account_id.clone() });

// Removing access keys
let lock = state_update.trie().lock_for_iter();
let public_keys = state_update
.iter(&trie_key_parsers::get_raw_prefix_for_access_keys(account_id))?
.locked_iter(&trie_key_parsers::get_raw_prefix_for_access_keys(account_id), &lock)?
.map(|raw_key| {
trie_key_parsers::parse_public_key_from_access_key_key(&raw_key?, account_id).map_err(
|_e| {
Expand All @@ -978,13 +979,16 @@ pub fn remove_account(
)
})
.collect::<Result<Vec<_>, _>>()?;
drop(lock);

for public_key in public_keys {
state_update.remove(TrieKey::AccessKey { account_id: account_id.clone(), public_key });
}

// Removing contract data
let lock = state_update.trie().lock_for_iter();
let data_keys = state_update
.iter(&trie_key_parsers::get_raw_prefix_for_contract_data(account_id, &[]))?
.locked_iter(&trie_key_parsers::get_raw_prefix_for_contract_data(account_id, &[]), &lock)?
.map(|raw_key| {
trie_key_parsers::parse_data_key_from_contract_data_key(&raw_key?, account_id)
.map_err(|_e| {
Expand All @@ -995,6 +999,8 @@ pub fn remove_account(
.map(Vec::from)
})
.collect::<Result<Vec<_>, _>>()?;
drop(lock);

for key in data_keys {
state_update.remove(TrieKey::ContractData { account_id: account_id.clone(), key });
}
Expand Down
42 changes: 37 additions & 5 deletions core/store/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ use crate::flat::{
use crate::metadata::{DbKind, DbVersion, DB_VERSION};
use crate::{
get, get_delayed_receipt_indices, get_promise_yield_indices, DBCol, NodeStorage, ShardTries,
StateSnapshotConfig, Store, TrieConfig,
StateSnapshotConfig, Store, Trie, TrieConfig,
};
use itertools::Itertools;
use near_primitives::account::id::AccountId;
use near_primitives::congestion_info::CongestionInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::{DataReceipt, PromiseYieldTimeout, Receipt, ReceiptEnum, ReceiptV1};
use near_primitives::shard_layout::{ShardUId, ShardVersion};
use near_primitives::shard_layout::{get_block_shard_uid, ShardUId, ShardVersion};
use near_primitives::state::FlatStateValue;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{NumShards, StateRoot};
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};
use rand::seq::SliceRandom;
use rand::Rng;
use std::collections::HashMap;
Expand Down Expand Up @@ -99,19 +102,22 @@ impl TestTriesBuilder {
self
}

pub fn with_in_memory_tries(mut self) -> Self {
self.enable_in_memory_tries = true;
pub fn with_in_memory_tries(mut self, enable: bool) -> Self {
self.enable_in_memory_tries = enable;
self
}

pub fn build(self) -> ShardTries {
if self.enable_in_memory_tries && !self.enable_flat_storage {
panic!("In-memory tries require flat storage");
}
let store = self.store.unwrap_or_else(create_test_store);
let shard_uids = (0..self.num_shards)
.map(|shard_id| ShardUId { shard_id: shard_id as u32, version: self.shard_version })
.collect::<Vec<_>>();
let flat_storage_manager = FlatStorageManager::new(store.clone());
let tries = ShardTries::new(
store,
store.clone(),
TrieConfig {
load_mem_tries_for_tracked_shards: self.enable_in_memory_tries,
..Default::default()
Expand Down Expand Up @@ -147,6 +153,32 @@ impl TestTriesBuilder {
}
}
if self.enable_in_memory_tries {
// ChunkExtra is needed for in-memory trie loading code to query state roots.
let congestion_info = ProtocolFeature::CongestionControl
.enabled(PROTOCOL_VERSION)
.then(CongestionInfo::default);
let chunk_extra = ChunkExtra::new(
PROTOCOL_VERSION,
&Trie::EMPTY_ROOT,
CryptoHash::default(),
Vec::new(),
0,
0,
0,
congestion_info,
);
let mut update_for_chunk_extra = store.store_update();
for shard_uid in &shard_uids {
update_for_chunk_extra
.set_ser(
DBCol::ChunkExtra,
&get_block_shard_uid(&CryptoHash::default(), shard_uid),
&chunk_extra,
)
.unwrap();
}
update_for_chunk_extra.commit().unwrap();

tries.load_mem_tries_for_enabled_shards(&shard_uids).unwrap();
}
tries
Expand Down
Loading

0 comments on commit a7a4f31

Please sign in to comment.