Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding merkle proof to prev chunk header #1353

Merged
merged 14 commits into from Oct 1, 2019
287 changes: 149 additions & 138 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions chain/chain/src/store.rs
Expand Up @@ -24,6 +24,7 @@ use near_store::{
COL_STATE_DL_INFOS, COL_TRANSACTION_RESULT,
};

use crate::byzantine_assert;
use crate::error::{Error, ErrorKind};
use crate::types::{Block, BlockHeader, LatestKnown, ReceiptProofResponse, ReceiptResponse, Tip};

Expand Down Expand Up @@ -84,7 +85,7 @@ pub trait ChainStoreAccess {
return Err(ErrorKind::ChunksMissing(vec![header.clone()]).into());
}
Ok(shard_chunk) => {
assert_ne!(header.height_included, 0);
byzantine_assert!(header.height_included > 0);
if header.height_included == 0 {
return Err(ErrorKind::Other(format!(
"Invalid header: {:?} for chunk {:?}",
Expand Down Expand Up @@ -494,7 +495,10 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
loop {
let header = self.get_block_header(&block_hash)?;

// TODO >= <= ?
if header.inner.height < last_chunk_height_included {
panic!("get_incoming_receipts_for_shard failed");
}

if header.inner.height == last_chunk_height_included {
break;
}
Expand All @@ -503,6 +507,8 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {

if let Ok(receipt_proofs) = self.get_incoming_receipts(&block_hash, shard_id) {
ret.push(ReceiptProofResponse(block_hash, receipt_proofs.clone()));
} else {
ret.push(ReceiptProofResponse(block_hash, vec![]));
}

block_hash = prev_hash;
Expand Down
82 changes: 76 additions & 6 deletions chain/chain/src/types.rs
Expand Up @@ -4,19 +4,20 @@ use borsh::{BorshDeserialize, BorshSerialize};

use near_crypto::{BlsSignature, BlsSigner};
pub use near_primitives::block::{Block, BlockHeader, Weight};
use near_primitives::errors::InvalidTxErrorOrStorageError;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::MerklePath;
use near_primitives::merkle::{merklize, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ReceiptProof, ShardChunkHeader};
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, ShardChunkHeader};
use near_primitives::transaction::{ExecutionOutcomeWithId, SignedTransaction};
use near_primitives::types::{
AccountId, Balance, BlockIndex, EpochId, Gas, MerkleHash, ShardId, ValidatorStake,
};
use near_primitives::views::QueryResponse;
use near_store::{PartialStorage, StoreUpdate, WrappedTrieChanges};

use crate::error::Error;
use near_primitives::errors::InvalidTxErrorOrStorageError;
use crate::byzantine_assert;
use crate::error::{Error, ErrorKind};

#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)]
pub struct ReceiptResponse(pub CryptoHash, pub Vec<Receipt>);
Expand Down Expand Up @@ -318,14 +319,50 @@ pub trait RuntimeAdapter: Send + Sync {
.filter(|&receipt| self.account_id_to_shard_id(&receipt.receiver_id) == shard_id)
.cloned()
.collect();
receipts_hashes.push(hash(&ReceiptList(shard_receipts).try_to_vec()?));
receipts_hashes.push(hash(&ReceiptList(shard_id, shard_receipts).try_to_vec()?));
}
Ok(receipts_hashes)
}

/// Check chunk validity.
fn check_chunk_validity(&self, chunk: &ShardChunk) -> Result<(), Error> {
// 1. Checking that chunk header is valid
// 1a. Checking chunk.header.hash
if chunk.header.hash != ChunkHash(hash(&chunk.header.inner.try_to_vec()?)) {
byzantine_assert!(false);
return Err(ErrorKind::Other("Incorrect chunk hash".to_string()).into());
}
// 1b. Checking signature
if !self.verify_chunk_header_signature(&chunk.header)? {
byzantine_assert!(false);
return Err(ErrorKind::Other("Incorrect chunk signature".to_string()).into());
}
// 2. Checking that chunk body is valid
// 2a. Checking chunk hash
if chunk.chunk_hash != chunk.header.hash {
byzantine_assert!(false);
return Err(ErrorKind::Other("Incorrect chunk hash".to_string()).into());
}
// 2b. Checking that chunk transactions are valid
let (tx_root, _) = merklize(&chunk.transactions);
if tx_root != chunk.header.inner.tx_root {
byzantine_assert!(false);
return Err(ErrorKind::Other("Incorrect chunk tx_root".to_string()).into());
}
// 2c. Checking that chunk receipts are valid
let outgoing_receipts_hashes = self.build_receipts_hashes(&chunk.receipts)?;
let (receipts_root, _) = merklize(&outgoing_receipts_hashes);
if receipts_root != chunk.header.inner.outgoing_receipts_root {
byzantine_assert!(false);
return Err(ErrorKind::Other("Incorrect chunk receipts root".to_string()).into());
}

Ok(())
}
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Default)]
struct ReceiptList(Vec<Receipt>);
pub struct ReceiptList(pub ShardId, pub Vec<Receipt>);

/// The last known / checked height and time when we have processed it.
/// Required to keep track of skipped blocks and not fallback to produce blocks at lower height.
Expand Down Expand Up @@ -380,6 +417,39 @@ impl BlockApproval {
}
}

#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
pub struct ShardStateSyncResponse {
pub chunk: ShardChunk,
pub chunk_proof: MerklePath,
pub prev_chunk_header: ShardChunkHeader,
pub prev_chunk_proof: MerklePath,
pub prev_payload: Vec<u8>,
pub incoming_receipts_proofs: Vec<ReceiptProofResponse>,
pub root_proofs: Vec<Vec<RootProof>>,
}

impl ShardStateSyncResponse {
pub fn new(
chunk: ShardChunk,
chunk_proof: MerklePath,
prev_chunk_header: ShardChunkHeader,
prev_chunk_proof: MerklePath,
prev_payload: Vec<u8>,
incoming_receipts_proofs: Vec<ReceiptProofResponse>,
root_proofs: Vec<Vec<RootProof>>,
) -> Self {
Self {
chunk,
chunk_proof,
prev_chunk_header,
prev_chunk_proof,
prev_payload,
incoming_receipts_proofs,
root_proofs,
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
31 changes: 19 additions & 12 deletions chain/chunks/src/lib.rs
Expand Up @@ -379,7 +379,11 @@ impl ShardsManager {
})
.cloned()
.collect(),
ShardProof(from_shard_id, proofs[to_shard_id as usize].clone()),
ShardProof {
from_shard_id,
to_shard_id,
proof: proofs[to_shard_id as usize].clone(),
},
))
}
}
Expand Down Expand Up @@ -600,9 +604,10 @@ impl ShardsManager {
true,
) {
if proof_index == one_part.receipt_proofs.len()
|| shard_id != (one_part.receipt_proofs[proof_index].1).to_shard_id
|| !verify_path(
one_part.header.inner.outgoing_receipts_root,
&(one_part.receipt_proofs[proof_index].1).1,
&(one_part.receipt_proofs[proof_index].1).proof,
&receipts_hashes[shard_id as usize],
)
{
Expand Down Expand Up @@ -717,8 +722,8 @@ impl ShardsManager {
rent_paid: Balance,
validator_proposals: Vec<ValidatorStake>,
transactions: &Vec<SignedTransaction>,
receipts: &Vec<Receipt>,
receipts_root: CryptoHash,
outgoing_receipts: &Vec<Receipt>,
outgoing_receipts_root: CryptoHash,
tx_root: CryptoHash,
signer: Arc<dyn BlsSigner>,
) -> Result<EncodedShardChunk, Error> {
Expand All @@ -737,8 +742,8 @@ impl ShardsManager {
tx_root,
validator_proposals,
transactions,
receipts,
receipts_root,
outgoing_receipts,
outgoing_receipts_root,
signer,
)?;

Expand All @@ -753,16 +758,18 @@ impl ShardsManager {
pub fn distribute_encoded_chunk(
&mut self,
encoded_chunk: EncodedShardChunk,
receipts: Vec<Receipt>,
outgoing_receipts: Vec<Receipt>,
) {
// TODO: if the number of validators exceeds the number of parts, this logic must be changed
let prev_block_hash = encoded_chunk.header.inner.prev_block_hash;
let mut processed_one_part = false;
let chunk_hash = encoded_chunk.chunk_hash();
let shard_id = encoded_chunk.header.inner.shard_id;
let receipts_hashes = self.runtime_adapter.build_receipts_hashes(&receipts).unwrap();
let (receipts_root, receipts_proofs) = merklize(&receipts_hashes);
assert_eq!(encoded_chunk.header.inner.outgoing_receipts_root, receipts_root);
let outgoing_receipts_hashes =
self.runtime_adapter.build_receipts_hashes(&outgoing_receipts).unwrap();
let (outgoing_receipts_root, outgoing_receipts_proofs) =
merklize(&outgoing_receipts_hashes);
assert_eq!(encoded_chunk.header.inner.outgoing_receipts_root, outgoing_receipts_root);
for part_ord in 0..self.runtime_adapter.num_total_parts(&prev_block_hash) {
let part_ord = part_ord as u64;
let to_whom = self.runtime_adapter.get_part_owner(&prev_block_hash, part_ord).unwrap();
Expand All @@ -780,8 +787,8 @@ impl ShardsManager {
let one_part_receipt_proofs = self.receipts_recipient_filter(
shard_id,
&tracking_shards,
&receipts,
&receipts_proofs,
&outgoing_receipts,
&outgoing_receipts_proofs,
);
let one_part = encoded_chunk.create_chunk_one_part(
part_ord,
Expand Down
15 changes: 8 additions & 7 deletions chain/client/src/client.rs
Expand Up @@ -332,7 +332,7 @@ impl Client {
shard_id
);

let ReceiptResponse(_, receipts) = self.chain.get_outgoing_receipts_for_shard(
let ReceiptResponse(_, outgoing_receipts) = self.chain.get_outgoing_receipts_for_shard(
prev_block_hash,
shard_id,
last_header.height_included,
Expand All @@ -350,8 +350,9 @@ impl Client {
// 2. anyone who just asks for one's incoming receipts
// will receive a piece of incoming receipts only
// with merkle receipts proofs which can be checked locally
let receipts_hashes = self.runtime_adapter.build_receipts_hashes(&receipts)?;
let (receipts_root, _) = merklize(&receipts_hashes);
let outgoing_receipts_hashes =
self.runtime_adapter.build_receipts_hashes(&outgoing_receipts)?;
let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes);

let encoded_chunk = self.shards_mgr.create_encoded_shard_chunk(
prev_block_hash,
Expand All @@ -363,8 +364,8 @@ impl Client {
chunk_extra.rent_paid,
chunk_extra.validator_proposals.clone(),
&filtered_transactions,
&receipts,
receipts_root,
&outgoing_receipts,
outgoing_receipts_root,
tx_root,
block_producer.signer.clone(),
)?;
Expand All @@ -375,12 +376,12 @@ impl Client {
next_height,
shard_id,
filtered_transactions.len(),
receipts.len(),
outgoing_receipts.len(),
block_producer.account_id,
encoded_chunk.chunk_hash().0,
);

self.shards_mgr.distribute_encoded_chunk(encoded_chunk, receipts);
self.shards_mgr.distribute_encoded_chunk(encoded_chunk, outgoing_receipts);

Ok(())
}
Expand Down
27 changes: 4 additions & 23 deletions chain/client/src/client_actor.rs
Expand Up @@ -283,34 +283,19 @@ impl Handler<NetworkClientMessages> for ClientActor {
}
}
NetworkClientMessages::StateRequest(shard_id, hash) => {
if let Ok((
chunk,
chunk_proof,
prev_payload,
incoming_receipts_proofs,
root_proofs,
)) = self.client.chain.get_state_for_shard(shard_id, hash)
{
if let Ok(shard_state) = self.client.chain.get_state_for_shard(shard_id, hash) {
return NetworkClientResponses::StateResponse(StateResponseInfo {
shard_id,
hash,
chunk,
chunk_proof,
prev_payload,
incoming_receipts_proofs,
root_proofs,
shard_state,
});
}
NetworkClientResponses::NoResponse
}
NetworkClientMessages::StateResponse(StateResponseInfo {
shard_id,
hash,
chunk,
chunk_proof,
prev_payload,
incoming_receipts_proofs,
root_proofs,
shard_state,
}) => {
// Populate the hashmaps with shard statuses that might be interested in this state
// (naturally, the plural of statuses is statuseses)
Expand Down Expand Up @@ -346,11 +331,7 @@ impl Handler<NetworkClientMessages> for ClientActor {
&self.client.block_producer.as_ref().map(|bp| bp.account_id.clone()),
shard_id,
hash,
chunk,
chunk_proof,
prev_payload,
incoming_receipts_proofs,
root_proofs,
shard_state,
) {
Ok(()) => {
for shard_statuses in shard_statuseses {
Expand Down
2 changes: 1 addition & 1 deletion chain/client/tests/cross_shard_tx.rs
Expand Up @@ -439,7 +439,7 @@ mod tests {
}

// On X1 it takes ~1m 15s
near_network::test_utils::wait_or_panic(600000);
near_network::test_utils::wait_or_panic(120000);
})
.unwrap();
}
Expand Down
11 changes: 3 additions & 8 deletions chain/network/src/types.rs
Expand Up @@ -12,13 +12,12 @@ use borsh::{BorshDeserialize, BorshSerialize};
use chrono::{DateTime, Utc};
use tokio::net::TcpStream;

use near_chain::types::{ReceiptProofResponse, RootProof};
use near_chain::types::ShardStateSyncResponse;
use near_chain::{Block, BlockApproval, BlockHeader, Weight};
use near_crypto::{BlsSignature, PublicKey, ReadablePublicKey, SecretKey, Signature};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::MerklePath;
pub use near_primitives::sharding::ChunkPartMsg;
use near_primitives::sharding::{ChunkHash, ChunkOnePart, ShardChunk};
use near_primitives::sharding::{ChunkHash, ChunkOnePart};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, BlockIndex, EpochId, ShardId};
use near_primitives::utils::{from_timestamp, to_timestamp};
Expand Down Expand Up @@ -711,11 +710,7 @@ impl Message for NetworkRequests {
pub struct StateResponseInfo {
pub shard_id: ShardId,
pub hash: CryptoHash,
pub chunk: ShardChunk,
pub chunk_proof: MerklePath,
pub prev_payload: Vec<u8>,
pub incoming_receipts_proofs: Vec<ReceiptProofResponse>,
pub root_proofs: Vec<Vec<RootProof>>,
pub shard_state: ShardStateSyncResponse,
}

#[derive(Debug)]
Expand Down