Skip to content

Commit

Permalink
add receipt buffers to state trie
Browse files Browse the repository at this point in the history
This adds one outgoing buffer for each other shard
to the trie.

This PR is a part of NEP-539.
  • Loading branch information
jakmeier committed Apr 21, 2024
1 parent 3743c40 commit a4a4a84
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 4 deletions.
20 changes: 20 additions & 0 deletions core/primitives/src/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,25 @@ pub struct PromiseYieldTimeout {
pub expires_at: BlockHeight,
}

/// Stores indices for a persistent queue for buffered receipts that couldn't be forwarded.
#[derive(Default, BorshSerialize, BorshDeserialize, Clone, PartialEq, Debug)]
pub struct BufferedReceiptIndices {
pub shard_buffer_indices: std::collections::BTreeMap<ShardId, ShardBufferedReceiptIndices>,
}

#[derive(Default, BorshSerialize, BorshDeserialize, Clone, PartialEq, Debug)]
pub struct ShardBufferedReceiptIndices {
// First inclusive index in the queue.
pub first_index: u64,
// Exclusive end index of the queue
pub next_available_index: u64,
}

impl ShardBufferedReceiptIndices {
pub fn len(&self) -> u64 {
self.next_available_index - self.first_index
}
}

/// Map of shard to list of receipts to send to it.
pub type ReceiptResult = HashMap<ShardId, Vec<Receipt>>;
32 changes: 31 additions & 1 deletion core/primitives/src/trie_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::mem::size_of;
use borsh::{BorshDeserialize, BorshSerialize};

use near_crypto::PublicKey;
use near_primitives_core::types::ShardId;

use crate::hash::CryptoHash;
use crate::types::AccountId;
Expand Down Expand Up @@ -50,9 +51,14 @@ pub mod col {
/// This column id is used when storing the postponed PromiseYield receipts
/// (`primitives::receipt::Receipt`).
pub const PROMISE_YIELD_RECEIPT: u8 = 12;
/// Indics of outgoing receipts. A singleton per shard.
pub const BUFFERED_RECEIPT_INDICES: u8 = 13;
/// Outgoing receipts that need to be buffered due to congestion +
/// backpressure on the receiving shard.
pub const BUFFERED_RECEIPT: u8 = 14;
/// All columns except those used for the delayed receipts queue and the yielded promises
/// queue, which are both global state for the shard.
pub const COLUMNS_WITH_ACCOUNT_ID_IN_KEY: [(u8, &str); 9] = [
pub const COLUMNS_WITH_ACCOUNT_ID_IN_KEY: [(u8, &str); 11] = [
(ACCOUNT, "Account"),
(CONTRACT_CODE, "ContractCode"),
(ACCESS_KEY, "AccessKey"),
Expand All @@ -62,6 +68,8 @@ pub mod col {
(POSTPONED_RECEIPT, "PostponedReceipt"),
(CONTRACT_DATA, "ContractData"),
(PROMISE_YIELD_RECEIPT, "PromiseYieldReceipt"),
(BUFFERED_RECEIPT_INDICES, "BufferedReceiptIndices"),
(BUFFERED_RECEIPT, "BufferedReceipt"),
];
}

Expand Down Expand Up @@ -109,6 +117,14 @@ pub enum TrieKey {
/// Used to store the postponed promise yield receipt `primitives::receipt::Receipt`
/// for a given receiver's `AccountId` and a given `data_id`.
PromiseYieldReceipt { receiver_id: AccountId, data_id: CryptoHash },
/// Used to store indices of the buffered receipts queues per shard.
/// NOTE: It is a singleton per shard, holding indices for all outgoing shards.
BufferedReceiptIndices,
/// Used to store a buffered receipt `primitives::receipt::Receipt` for a
/// given index `u64` and receiving shard. There is one unique queue
/// per ordered shard pair. The trie for shard X stores all queues for pairs
/// (X,*).
BufferedReceipt { receiving_shard: ShardId, index: u64 },
}

/// Provides `len` function.
Expand Down Expand Up @@ -178,6 +194,12 @@ impl TrieKey {
+ ACCOUNT_DATA_SEPARATOR.len()
+ key.len()
}
TrieKey::BufferedReceiptIndices => col::BUFFERED_RECEIPT_INDICES.len(),
TrieKey::BufferedReceipt { index, receiving_shard } => {
col::BUFFERED_RECEIPT.len()
+ std::mem::size_of_val(receiving_shard)
+ std::mem::size_of_val(index)
}
}
}

Expand Down Expand Up @@ -250,6 +272,12 @@ impl TrieKey {
buf.push(ACCOUNT_DATA_SEPARATOR);
buf.extend(data_id.as_ref());
}
TrieKey::BufferedReceiptIndices => buf.push(col::BUFFERED_RECEIPT_INDICES),
TrieKey::BufferedReceipt { index, receiving_shard } => {
buf.push(col::BUFFERED_RECEIPT);
buf.extend(&receiving_shard.to_le_bytes());
buf.extend(&index.to_le_bytes());
}
};
debug_assert_eq!(expected_len, buf.len() - start_len);
}
Expand All @@ -276,6 +304,8 @@ impl TrieKey {
TrieKey::PromiseYieldIndices => None,
TrieKey::PromiseYieldTimeout { .. } => None,
TrieKey::PromiseYieldReceipt { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::BufferedReceiptIndices => None,
TrieKey::BufferedReceipt { .. } => None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ impl StateChanges {
TrieKey::PromiseYieldIndices => {}
TrieKey::PromiseYieldTimeout { .. } => {}
TrieKey::PromiseYieldReceipt { .. } => {}
TrieKey::BufferedReceiptIndices => {}
TrieKey::BufferedReceipt { .. } => {}
}
}

Expand Down
39 changes: 36 additions & 3 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ use metadata::{DbKind, DbVersion, KIND_KEY, VERSION_KEY};
use near_crypto::PublicKey;
use near_fmt::{AbbrBytes, StorageKey};
use near_primitives::account::{AccessKey, Account};
use near_primitives::errors::IntegerOverflowError;
pub use near_primitives::errors::{MissingTrieValueContext, StorageError};
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::{
DelayedReceiptIndices, PromiseYieldIndices, PromiseYieldTimeout, Receipt, ReceiptEnum,
ReceivedData,
BufferedReceiptIndices, DelayedReceiptIndices, PromiseYieldIndices, PromiseYieldTimeout,
Receipt, ReceiptEnum, ReceivedData, ShardBufferedReceiptIndices,
};
pub use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::{trie_key_parsers, TrieKey};
use near_primitives::types::{AccountId, BlockHeight, StateRoot};
use near_primitives::types::{AccountId, BlockHeight, ShardId, StateRoot};
use near_vm_runner::{CompiledContractInfo, ContractCode, ContractRuntimeCache};
use once_cell::sync::Lazy;
use std::fs::File;
Expand Down Expand Up @@ -902,6 +903,38 @@ pub fn has_promise_yield_receipt(
trie.contains_key(&TrieKey::PromiseYieldReceipt { receiver_id, data_id })
}

pub fn get_buffered_receipt_indices(
trie: &dyn TrieAccess,
) -> Result<BufferedReceiptIndices, StorageError> {
Ok(get(trie, &TrieKey::BufferedReceiptIndices)?.unwrap_or_default())
}

pub fn remove_buffered_receipt(
state_update: &mut TrieUpdate,
index: u64,
receiving_shard: ShardId,
) {
state_update.remove(TrieKey::BufferedReceipt { index, receiving_shard });
}

// Adds the given receipt into the end of the buffered receipt queue for the
// given receiver shard.
pub fn push_buffered_receipt(
state_update: &mut TrieUpdate,
receipts_indices: &mut ShardBufferedReceiptIndices,
receipt: &Receipt,
receiving_shard: ShardId,
) -> Result<(), IntegerOverflowError> {
set(
state_update,
TrieKey::BufferedReceipt { index: receipts_indices.next_available_index, receiving_shard },
receipt,
);
receipts_indices.next_available_index =
receipts_indices.next_available_index.checked_add(1).ok_or(IntegerOverflowError)?;
Ok(())
}

pub fn set_access_key(
state_update: &mut TrieUpdate,
account_id: AccountId,
Expand Down
1 change: 1 addition & 0 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub mod mem;
mod nibble_slice;
mod prefetching_trie_storage;
mod raw_node;
pub mod receipts_column_helper;
pub mod resharding;
mod shard_tries;
mod state_parts;
Expand Down
67 changes: 67 additions & 0 deletions core/store/src/trie/receipts_column_helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use near_primitives::errors::StorageError;
use near_primitives::receipt::{Receipt, ShardBufferedReceiptIndices};
use near_primitives::trie_key::TrieKey;
use near_primitives::types::ShardId;

use crate::{get, TrieAccess};

/// Read-only iterator over receipt queues stored in the state trie.
pub struct ReceiptIterator<'a> {
trie_keys: Box<dyn Iterator<Item = TrieKey>>,
trie: &'a dyn TrieAccess,
}

impl<'a> ReceiptIterator<'a> {
pub fn delayed_receipts(trie: &'a dyn TrieAccess) -> Result<Self, StorageError> {
let indices = crate::get_delayed_receipt_indices(trie)?;
Ok(Self {
trie_keys: Box::new(
(indices.first_index..indices.next_available_index)
.map(move |index| TrieKey::DelayedReceipt { index }),
),
trie,
})
}

/// Iterates over all receipts in any receipt buffer.
pub fn buffered_receipts(trie: &'a dyn TrieAccess) -> Result<Self, StorageError> {
let all_indices = crate::get_buffered_receipt_indices(trie)?;
let trie_keys_iter =
all_indices.shard_buffer_indices.into_iter().flat_map(|(shard, indices)| {
(indices.first_index..indices.next_available_index)
.map(move |index| TrieKey::BufferedReceipt { index, receiving_shard: shard })
});
Ok(Self { trie_keys: Box::new(trie_keys_iter), trie })
}

/// Iterates over receipts in a receipt buffer to a specific receiver shard.
pub fn shard_buffered_receipts(
trie: &'a dyn TrieAccess,
receiving_shard: ShardId,
indices: &ShardBufferedReceiptIndices,
) -> Result<Self, StorageError> {
Ok(Self {
trie_keys: Box::new(
(indices.first_index..indices.next_available_index)
.map(move |index| TrieKey::BufferedReceipt { index, receiving_shard }),
),
trie,
})
}
}

impl<'a> Iterator for ReceiptIterator<'a> {
type Item = Result<Receipt, StorageError>;

fn next(&mut self) -> Option<Self::Item> {
let key = self.trie_keys.next()?;
let result = match get(self.trie, &key) {
Err(e) => Err(e),
Ok(None) => Err(StorageError::StorageInconsistentState(
"Receipt referenced by index should be in the state".to_owned(),
)),
Ok(Some(receipt)) => Ok(receipt),
};
Some(result)
}
}
3 changes: 3 additions & 0 deletions core/store/src/trie/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ impl ShardTries {
// in `changes.processed_yield_timeouts`.
}
},
// TODO: figure out what we need to do here
TrieKey::BufferedReceiptIndices => todo!(),
TrieKey::BufferedReceipt { .. } => todo!(),
TrieKey::Account { account_id }
| TrieKey::ContractCode { account_id }
| TrieKey::AccessKey { account_id, .. }
Expand Down
6 changes: 6 additions & 0 deletions tools/state-viewer/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ pub enum RecordType {
DelayedReceiptOrIndices = col::DELAYED_RECEIPT_OR_INDICES,
ContractData = col::CONTRACT_DATA,
PromiseYieldReceipt = col::PROMISE_YIELD_RECEIPT,
BufferedReceiptIndices = col::BUFFERED_RECEIPT_INDICES,
BufferedReceipt = col::BUFFERED_RECEIPT,
}

impl clap::ValueEnum for RecordType {
Expand Down Expand Up @@ -735,6 +737,10 @@ impl clap::ValueEnum for RecordType {
Self::PromiseYieldReceipt => {
Some(clap::builder::PossibleValue::new("promise-yield-receipt"))
}
Self::BufferedReceipt => Some(clap::builder::PossibleValue::new("buffered-receipt")),
Self::BufferedReceiptIndices => {
Some(clap::builder::PossibleValue::new("buffered-receipt-indices"))
}
}
}
}
Expand Down

0 comments on commit a4a4a84

Please sign in to comment.