Skip to content

Commit

Permalink
feat: basic state witness production (#10413)
Browse files Browse the repository at this point in the history
Add state transition data to state witness and validate chunk execution
against it. Merge `get_chunk_validation_job` with
`validate_chunk_state_witness` and remove the first one.
All the changes look necessary to make it work. We plan to do a shallow
review now and improve the code later, to unblock adding required
features separately.

Summary of changes:
* If a node is a chunk producer for any shard for this or next epoch, it
will record state transition data on disk. This is suboptimal, but works
for prototype.
* If a node produces a chunk, it will aggregate all transition data,
create state witness and send it to others.
* Aggregating source receipt chunks is not yet implemented and
temporarily commented, as everyone tracks receipts anyway. Validating
new transactions is also not implemented.
* In `validate_chunk_state_witness`, we are using more explicit
apply_[new|old_chunk] to simplify code. I temporary make lots of structs
`pub` to make it work, but it can be refactored if needed.
* test_chunk_validation_basic works again. It is adjusted to make
validation against state much less trivial.

---------

Co-authored-by: Longarithm <the.aleksandr.logunov@gmail.com>
  • Loading branch information
Longarithm and Longarithm committed Jan 13, 2024
1 parent c536f46 commit 57e84d8
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 575 deletions.
364 changes: 54 additions & 310 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

33 changes: 14 additions & 19 deletions chain/chain/src/chain_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,25 +149,8 @@ impl<'a> ChainUpdate<'a> {
apply_results: Vec<ShardUpdateResult>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "chain", "apply_chunk_postprocessing").entered();
for result in apply_results {
match result {
ShardUpdateResult::Stateful(result) => {
self.process_apply_chunk_result(block, result)?
}
ShardUpdateResult::Stateless(results) => {
for (block_hash, shard_uid, chunk_extra) in results {
let expected_chunk_extra =
self.chain_store_update.get_chunk_extra(&block_hash, &shard_uid)?;
assert_eq!(
&chunk_extra,
expected_chunk_extra.as_ref(),
"For stateless validation, chunk extras for block {} and shard {} do not match",
block_hash,
shard_uid
);
}
}
}
for ShardUpdateResult::Stateful(result) in apply_results {
self.process_apply_chunk_result(block, result)?;
}
Ok(())
}
Expand Down Expand Up @@ -370,6 +353,12 @@ impl<'a> ChainUpdate<'a> {
apply_result.outcomes,
outcome_paths,
);
self.chain_store_update.save_state_transition_data(
*block_hash,
shard_id,
apply_result.proof,
apply_result.applied_receipts_hash,
);
if let Some(resharding_results) = resharding_results {
self.process_resharding_results(block, &shard_uid, resharding_results)?;
}
Expand All @@ -396,6 +385,12 @@ impl<'a> ChainUpdate<'a> {

self.chain_store_update.save_chunk_extra(block_hash, &shard_uid, new_extra);
self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_state_transition_data(
*block_hash,
shard_uid.shard_id(),
apply_result.proof,
apply_result.applied_receipts_hash,
);

if let Some(resharding_config) = resharding_results {
self.process_resharding_results(block, &shard_uid, resharding_config)?;
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,9 @@ impl<'a> ChainStoreUpdate<'a> {
DBCol::HeaderHashesByHeight => {
store_update.delete(col, key);
}
DBCol::StateTransitionData => {
store_update.delete(col, key);
}
DBCol::DbVersion
| DBCol::BlockMisc
| DBCol::_GCCount
Expand Down
36 changes: 31 additions & 5 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ use near_primitives::utils::{
use near_primitives::version::ProtocolVersion;
use near_primitives::views::LightClientBlockView;
use near_store::{
DBCol, KeyForStateChanges, Store, StoreUpdate, WrappedTrieChanges, CHUNK_TAIL_KEY,
FINAL_HEAD_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY,
LATEST_KNOWN_KEY, TAIL_KEY,
DBCol, KeyForStateChanges, PartialStorage, Store, StoreUpdate, WrappedTrieChanges,
CHUNK_TAIL_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY, HEADER_HEAD_KEY, HEAD_KEY,
LARGEST_TARGET_HEIGHT_KEY, LATEST_KNOWN_KEY, TAIL_KEY,
};

use crate::byzantine_assert;
use crate::chunks_store::ReadOnlyChunksStore;
use crate::types::{Block, BlockHeader, LatestKnown};
use near_primitives::chunk_validation::StoredChunkStateTransitionData;
use near_store::db::{StoreStatistics, STATE_SYNC_DUMP_KEY};
use std::sync::Arc;

Expand Down Expand Up @@ -1438,11 +1439,10 @@ pub struct ChainStoreUpdate<'a> {
final_head: Option<Tip>,
largest_target_height: Option<BlockHeight>,
trie_changes: Vec<WrappedTrieChanges>,

state_transition_data: HashMap<(CryptoHash, ShardId), StoredChunkStateTransitionData>,
// All state changes made by a chunk, this is only used for resharding.
add_state_changes_for_resharding: HashMap<(CryptoHash, ShardId), StateChangesForResharding>,
remove_state_changes_for_resharding: HashSet<(CryptoHash, ShardId)>,

add_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>,
// A pair (prev_hash, hash) to be removed from blocks to catchup
remove_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>,
Expand All @@ -1467,6 +1467,7 @@ impl<'a> ChainStoreUpdate<'a> {
final_head: None,
largest_target_height: None,
trie_changes: vec![],
state_transition_data: Default::default(),
add_state_changes_for_resharding: HashMap::new(),
remove_state_changes_for_resharding: HashSet::new(),
add_blocks_to_catchup: vec![],
Expand Down Expand Up @@ -2077,6 +2078,24 @@ impl<'a> ChainStoreUpdate<'a> {
self.trie_changes.push(trie_changes);
}

pub fn save_state_transition_data(
&mut self,
block_hash: CryptoHash,
shard_id: ShardId,
partial_storage: Option<PartialStorage>,
applied_receipts_hash: CryptoHash,
) {
if let Some(partial_storage) = partial_storage {
self.state_transition_data.insert(
(block_hash, shard_id),
StoredChunkStateTransitionData {
base_state: partial_storage.nodes,
receipts_hash: applied_receipts_hash,
},
);
}
}

pub fn add_state_changes_for_resharding(
&mut self,
block_hash: CryptoHash,
Expand Down Expand Up @@ -2526,6 +2545,13 @@ impl<'a> ChainStoreUpdate<'a> {
}
}

for ((block_hash, shard_id), state_transition_data) in self.state_transition_data.drain() {
store_update.set_ser(
DBCol::StateTransitionData,
&get_block_shard_id(&block_hash, shard_id),
&state_transition_data,
)?;
}
for ((block_hash, shard_id), state_changes) in self.add_state_changes_for_resharding.drain()
{
store_update.set_ser(
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ impl RuntimeAdapter for KeyValueRuntime {
total_balance_burnt: 0,
proof: None,
processed_delayed_receipts: vec![],
applied_receipts_hash: hash(&borsh::to_vec(receipts).unwrap()),
})
}

Expand Down
5 changes: 5 additions & 0 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ pub struct ApplyChunkResult {
pub total_balance_burnt: Balance,
pub proof: Option<PartialStorage>,
pub processed_delayed_receipts: Vec<Receipt>,
/// Hash of Vec<Receipt> which were applied in a chunk, later used for
/// chunk validation with state witness.
/// Note that applied receipts are not necessarily executed as they can
/// be delayed.
pub applied_receipts_hash: CryptoHash,
}

impl ApplyChunkResult {
Expand Down
91 changes: 28 additions & 63 deletions chain/chain/src/update_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use near_primitives::hash::CryptoHash;
use near_primitives::receipt::Receipt;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_primitives::shard_layout::ShardUId;
use near_primitives::sharding::ShardChunk;
use near_primitives::sharding::ShardChunkHeader;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{BlockHeight, Gas, StateChangesForResharding, StateRoot};
use std::collections::HashMap;
Expand All @@ -18,21 +19,21 @@ use std::collections::HashMap;
/// shard.
#[derive(Debug)]
pub struct NewChunkResult {
pub(crate) shard_uid: ShardUId,
pub(crate) gas_limit: Gas,
pub(crate) apply_result: ApplyChunkResult,
pub(crate) resharding_results: Option<ReshardingResults>,
pub shard_uid: ShardUId,
pub gas_limit: Gas,
pub apply_result: ApplyChunkResult,
pub resharding_results: Option<ReshardingResults>,
}

/// Result of updating a shard for some block when it doesn't have a new chunk
/// for this shard, so previous chunk header is copied.
#[derive(Debug)]
pub struct OldChunkResult {
pub(crate) shard_uid: ShardUId,
pub shard_uid: ShardUId,
/// Note that despite the naming, no transactions are applied in this case.
/// TODO(logunov): exclude receipts/txs context from all related types.
pub(crate) apply_result: ApplyChunkResult,
pub(crate) resharding_results: Option<ReshardingResults>,
pub apply_result: ApplyChunkResult,
pub resharding_results: Option<ReshardingResults>,
}

/// Result of updating a shard for some block when we apply only resharding
Expand All @@ -44,15 +45,11 @@ pub struct ReshardingResult {
pub(crate) results: Vec<ApplyResultForResharding>,
}

/// Result of processing shard update, covering both stateful and stateless scenarios.
/// Result of processing shard update.
#[derive(Debug)]
pub enum ShardUpdateResult {
/// Stateful scenario - processed update for a single block.
Stateful(ShardBlockUpdateResult),
/// Stateless scenario - processed update based on state witness in a chunk.
/// Contains `ChunkExtra`s - results for processing updates corresponding
/// to state witness.
Stateless(Vec<(CryptoHash, ShardUId, ChunkExtra)>),
}

/// Result for a shard update for a single block.
Expand All @@ -66,16 +63,17 @@ pub enum ShardBlockUpdateResult {
/// State roots of children shards which are ready.
type ReshardingStateRoots = HashMap<ShardUId, StateRoot>;

pub(crate) struct NewChunkData {
pub chunk: ShardChunk,
pub struct NewChunkData {
pub chunk_header: ShardChunkHeader,
pub transactions: Vec<SignedTransaction>,
pub receipts: Vec<Receipt>,
pub resharding_state_roots: Option<ReshardingStateRoots>,
pub block: ApplyChunkBlockContext,
pub is_first_block_with_chunk_of_version: bool,
pub storage_context: StorageContext,
}

pub(crate) struct OldChunkData {
pub struct OldChunkData {
pub prev_chunk_extra: ChunkExtra,
pub resharding_state_roots: Option<ReshardingStateRoots>,
pub block: ApplyChunkBlockContext,
Expand Down Expand Up @@ -106,7 +104,7 @@ pub(crate) enum ShardUpdateReason {
}

/// Information about shard to update.
pub(crate) struct ShardContext {
pub struct ShardContext {
pub shard_uid: ShardUId,
/// Whether node cares about shard in this epoch.
pub cares_about_shard_this_epoch: bool,
Expand All @@ -119,10 +117,11 @@ pub(crate) struct ShardContext {
}

/// Information about storage used for applying txs and receipts.
pub(crate) struct StorageContext {
pub struct StorageContext {
/// Data source used for processing shard update.
pub storage_data_source: StorageDataSource,
pub state_patch: SandboxStatePatch,
pub record_storage: bool,
}

/// Processes shard update with given block and shard.
Expand Down Expand Up @@ -155,52 +154,19 @@ pub(crate) fn process_shard_update(
})
}

/// Processes shard updates for the execution contexts range which must
/// correspond to missing chunks for some shard.
/// `current_chunk_extra` must correspond to `ChunkExtra` just before
/// execution; in the end it will correspond to the latest execution
/// result.
pub(crate) fn process_missing_chunks_range(
parent_span: &tracing::Span,
mut current_chunk_extra: ChunkExtra,
runtime: &dyn RuntimeAdapter,
epoch_manager: &dyn EpochManagerAdapter,
execution_contexts: Vec<(ApplyChunkBlockContext, ShardContext)>,
) -> Result<Vec<(CryptoHash, ShardUId, ChunkExtra)>, Error> {
let mut result = vec![];
for (block_context, shard_context) in execution_contexts {
let OldChunkResult { shard_uid, apply_result, resharding_results: _ } = apply_old_chunk(
parent_span,
OldChunkData {
block: block_context.clone(),
resharding_state_roots: None,
prev_chunk_extra: current_chunk_extra.clone(),
storage_context: StorageContext {
storage_data_source: StorageDataSource::DbTrieOnly,
state_patch: Default::default(),
},
},
shard_context,
runtime,
epoch_manager,
)?;
*current_chunk_extra.state_root_mut() = apply_result.new_root;
result.push((block_context.block_hash, shard_uid, current_chunk_extra.clone()));
}
Ok(result)
}
/// Applies new chunk, which includes applying transactions from chunk and
/// receipts filtered from outgoing receipts from previous chunks.
pub(crate) fn apply_new_chunk(
pub fn apply_new_chunk(
parent_span: &tracing::Span,
data: NewChunkData,
shard_context: ShardContext,
runtime: &dyn RuntimeAdapter,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<NewChunkResult, Error> {
let NewChunkData {
chunk_header,
transactions,
block,
chunk,
receipts,
resharding_state_roots,
is_first_block_with_chunk_of_version,
Expand All @@ -213,29 +179,28 @@ pub(crate) fn apply_new_chunk(
"new_chunk",
shard_id)
.entered();
let chunk_inner = chunk.cloned_header().take_inner();
let gas_limit = chunk_inner.gas_limit();
let gas_limit = chunk_header.gas_limit();

let _timer = CryptoHashTimer::new(chunk.chunk_hash().0);
let _timer = CryptoHashTimer::new(chunk_header.chunk_hash().0);
let storage_config = RuntimeStorageConfig {
state_root: *chunk_inner.prev_state_root(),
state_root: chunk_header.prev_state_root(),
use_flat_storage: true,
source: storage_context.storage_data_source,
state_patch: storage_context.state_patch,
record_storage: false,
record_storage: storage_context.record_storage,
};
match runtime.apply_chunk(
storage_config,
ApplyChunkShardContext {
shard_id,
last_validator_proposals: chunk_inner.prev_validator_proposals(),
last_validator_proposals: chunk_header.prev_validator_proposals(),
gas_limit,
is_new_chunk: true,
is_first_block_with_chunk_of_version,
},
block.clone(),
&receipts,
chunk.transactions(),
&transactions,
) {
Ok(apply_result) => {
let apply_split_result_or_state_changes = if shard_context.will_shard_layout_change {
Expand Down Expand Up @@ -263,7 +228,7 @@ pub(crate) fn apply_new_chunk(
/// Applies shard update corresponding to missing chunk.
/// (logunov) From what I know, the state update may include only validator
/// accounts update on epoch start.
fn apply_old_chunk(
pub fn apply_old_chunk(
parent_span: &tracing::Span,
data: OldChunkData,
shard_context: ShardContext,
Expand All @@ -284,7 +249,7 @@ fn apply_old_chunk(
use_flat_storage: true,
source: storage_context.storage_data_source,
state_patch: storage_context.state_patch,
record_storage: false,
record_storage: storage_context.record_storage,
};
match runtime.apply_chunk(
storage_config,
Expand Down
Loading

0 comments on commit 57e84d8

Please sign in to comment.