Skip to content

Commit

Permalink
feat: add validator node checkpointing (#4217)
Browse files Browse the repository at this point in the history
Description
---
feat(wallet): create ContractCheckpoint outputs via grpc
feat(base-node): add get_current_contract_outputs grpc call
feat(base-layer/core): add checkpoint data to SidechainFeatures
feat(validator-node): spawn consensus workers when new constitution is found

Motivation and Context
---
Validator node periodically submits basic checkpoints containing a merkle root of the state. 

TODOs:
- Validator node should resume active contracts after restart
- Wallet should spend previous checkpoint
- Checkpoint committee signatures need to be populated
- Validator node needs to keep track of contracts that it has accepted
- Checkpoint proofs need to be finalised and implemented
- Constitution amendments need to be recognised and handled in the VN

How Has This Been Tested?
---
Manually and partially with existing tests
  • Loading branch information
sdbondi committed Jun 22, 2022
1 parent 48af38d commit 8b0add0
Show file tree
Hide file tree
Showing 85 changed files with 1,443 additions and 743 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Expand Up @@ -94,6 +94,8 @@ service BaseNode {

// Get all constitutions where the public key is in the committee
rpc GetConstitutions(GetConstitutionsRequest) returns (stream GetConstitutionsResponse);
// Get the current contract outputs matching the given contract id and type
rpc GetCurrentContractOutputs(GetCurrentContractOutputsRequest) returns (GetCurrentContractOutputsResponse);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -456,3 +458,19 @@ message GetConstitutionsResponse {
uint64 mined_height = 3;
bytes header_hash = 4;
}

message GetCurrentContractOutputsRequest {
bytes contract_id = 1;
uint32 output_type = 2;
}

message GetCurrentContractOutputsResponse {
repeated UtxoMinedInfo outputs = 1;
}

message UtxoMinedInfo {
TransactionOutput output = 1;
uint32 mmr_position = 2;
uint64 mined_height = 3;
bytes header_hash = 4;
}
6 changes: 6 additions & 0 deletions applications/tari_app_grpc/proto/types.proto
Expand Up @@ -240,6 +240,7 @@ message SideChainFeatures {
ContractUpdateProposal update_proposal = 5;
ContractUpdateProposalAcceptance update_proposal_acceptance = 6;
ContractAmendment amendment = 7;
ContractCheckpoint checkpoint = 8;
}

message ContractConstitution {
Expand All @@ -260,6 +261,11 @@ message CommitteeMembers {
repeated bytes members = 1;
}

message ContractCheckpoint {
bytes merkle_root = 1;
CommitteeSignatures signatures = 2;
}

message CheckpointParameters {
uint64 abandoned_interval = 1;
uint32 minimum_quorum_required = 2;
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_app_grpc/proto/validator_node.proto
Expand Up @@ -97,7 +97,7 @@ message GetTokenDataResponse {
//}

message InvokeReadMethodRequest{
bytes asset_public_key = 1;
bytes contract_id = 1;
uint32 template_id = 2;
string method = 3;
bytes args = 4;
Expand All @@ -115,7 +115,7 @@ message Authority {
}

message InvokeMethodRequest{
bytes asset_public_key = 1;
bytes contract_id = 1;
uint32 template_id = 2;
string method = 3;
bytes args = 4;
Expand Down
8 changes: 3 additions & 5 deletions applications/tari_app_grpc/proto/wallet.proto
Expand Up @@ -269,7 +269,7 @@ message RegisterAssetResponse {
}

message CreateInitialAssetCheckpointRequest {
bytes asset_public_key = 1;
bytes contract_id = 1;
bytes merkle_root = 2;
repeated bytes committee = 3;
}
Expand All @@ -279,10 +279,8 @@ message CreateInitialAssetCheckpointResponse {
}

message CreateFollowOnAssetCheckpointRequest {
bytes asset_public_key = 1;
bytes unique_id = 2;
bytes merkle_root = 3;
repeated bytes next_committee = 4;
bytes contract_id = 1;
bytes merkle_root = 2;
}

message CreateFollowOnAssetCheckpointResponse {
Expand Down
28 changes: 28 additions & 0 deletions applications/tari_app_grpc/src/conversions/sidechain_features.rs
Expand Up @@ -33,6 +33,7 @@ use tari_core::transactions::transaction_components::{
ContractAcceptance,
ContractAcceptanceRequirements,
ContractAmendment,
ContractCheckpoint,
ContractConstitution,
ContractDefinition,
ContractSpecification,
Expand All @@ -58,6 +59,7 @@ impl From<SideChainFeatures> for grpc::SideChainFeatures {
update_proposal: value.update_proposal.map(Into::into),
update_proposal_acceptance: value.update_proposal_acceptance.map(Into::into),
amendment: value.amendment.map(Into::into),
checkpoint: value.checkpoint.map(Into::into),
}
}
}
Expand All @@ -78,6 +80,7 @@ impl TryFrom<grpc::SideChainFeatures> for SideChainFeatures {
.map(ContractUpdateProposalAcceptance::try_from)
.transpose()?;
let amendment = features.amendment.map(ContractAmendment::try_from).transpose()?;
let checkpoint = features.checkpoint.map(ContractCheckpoint::try_from).transpose()?;

Ok(Self {
contract_id: features.contract_id.try_into().map_err(|_| "Invalid contract_id")?,
Expand All @@ -87,6 +90,7 @@ impl TryFrom<grpc::SideChainFeatures> for SideChainFeatures {
update_proposal,
update_proposal_acceptance,
amendment,
checkpoint,
})
}
}
Expand Down Expand Up @@ -130,6 +134,7 @@ impl TryFrom<grpc::CreateConstitutionDefinitionRequest> for SideChainFeatures {
update_proposal: None,
update_proposal_acceptance: None,
amendment: None,
checkpoint: None,
})
}
}
Expand Down Expand Up @@ -298,6 +303,29 @@ impl TryFrom<grpc::ContractConstitution> for ContractConstitution {
}
}

//---------------------------------- ContractCheckpoint --------------------------------------------//
impl From<ContractCheckpoint> for grpc::ContractCheckpoint {
fn from(value: ContractCheckpoint) -> Self {
Self {
merkle_root: value.merkle_root.to_vec(),
signatures: Some(value.signatures.into()),
}
}
}

impl TryFrom<grpc::ContractCheckpoint> for ContractCheckpoint {
type Error = String;

fn try_from(value: grpc::ContractCheckpoint) -> Result<Self, Self::Error> {
let merkle_root = value.merkle_root.try_into().map_err(|_| "Invalid merkle root")?;
let signatures = value.signatures.map(TryInto::try_into).transpose()?.unwrap_or_default();
Ok(Self {
merkle_root,
signatures,
})
}
}

//---------------------------------- ContractAcceptanceRequirements --------------------------------------------//
impl From<ContractAcceptanceRequirements> for grpc::ContractAcceptanceRequirements {
fn from(value: ContractAcceptanceRequirements) -> Self {
Expand Down
59 changes: 52 additions & 7 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Expand Up @@ -132,6 +132,11 @@ pub async fn get_heights(
) -> Result<(u64, u64), Status> {
block_heights(handler, request.start_height, request.end_height, request.from_tip).await
}
impl BaseNodeGrpcServer {
fn report_error(&self, status: Status) -> Status {
report_error(self.report_grpc_error, status)
}
}

#[tonic::async_trait]
impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
Expand Down Expand Up @@ -452,6 +457,50 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
Ok(Response::new(rx))
}

async fn get_current_contract_outputs(
&self,
request: Request<tari_rpc::GetCurrentContractOutputsRequest>,
) -> Result<Response<tari_rpc::GetCurrentContractOutputsResponse>, Status> {
let request = request.into_inner();

let contract_id = FixedHash::try_from(request.contract_id.as_slice())
.map_err(|err| Status::invalid_argument(format!("Contract ID is not a valid: {}", err)))?;
debug!(
target: LOG_TARGET,
"Incoming GRPC request for GetCurrentContractOutputs: contract_id: {}", contract_id
);

let output_type = u8::try_from(request.output_type)
.ok()
.and_then(OutputType::from_byte)
.ok_or_else(|| Status::invalid_argument("Invalid output_type"))?;

let mut node_service = self.node_service.clone();
let outputs = node_service
.get_outputs_for_contract(contract_id, output_type)
.await
.map_err(|err| self.report_error(Status::internal(err.to_string())))?;

let outputs = outputs
.into_iter()
.map(|output| {
let unpruned = output
.output
.as_transaction_output()
.ok_or_else(|| Status::failed_precondition("Checkpoint output has been pruned"))?;

Ok(tari_rpc::UtxoMinedInfo {
output: Some(unpruned.clone().into()),
mmr_position: output.mmr_position,
mined_height: output.mined_height,
header_hash: output.header_hash,
})
})
.collect::<Result<_, Status>>()?;

Ok(Response::new(tari_rpc::GetCurrentContractOutputsResponse { outputs }))
}

async fn get_tokens(
&self,
request: Request<tari_rpc::GetTokensRequest>,
Expand All @@ -460,14 +509,8 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
let request = request.into_inner();
debug!(
target: LOG_TARGET,
"Incoming GRPC request for GetTokens: asset_pub_key: {}, unique_ids: [{}]",
"Incoming GRPC request for GetTokens: asset_pub_key: {}",
request.asset_public_key.to_hex(),
request
.unique_ids
.iter()
.map(|s| s.to_hex())
.collect::<Vec<_>>()
.join(",")
);

let pub_key = PublicKey::from_bytes(&request.asset_public_key).map_err(|err| {
Expand Down Expand Up @@ -1837,6 +1880,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
) -> Result<Response<Self::GetConstitutionsStream>, Status> {
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
println!("{:?}", request);
let dan_node_public_key = PublicKey::from_bytes(&request.dan_node_public_key)
.map_err(|_| Status::invalid_argument("Dan node public key is not a valid public key"))?;

Expand All @@ -1846,6 +1890,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
.transpose()
.map_err(|_| Status::invalid_argument("Block hash has an invalid length"))?;

println!("{:?}", start_hash);
let mut node_service = self.node_service.clone();
// Check the start_hash is correct, or if not provided, start from genesis
let start_header = match start_hash {
Expand Down
Expand Up @@ -24,7 +24,7 @@ use crate::error::CollectiblesError;
use futures::StreamExt;
use log::debug;
use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::{PublicKey, COMMITTEE_DEFINITION_ID};
use tari_common_types::types::PublicKey;
use tari_utilities::{ByteArray, ByteArrayError};

const LOG_TARGET: &str = "collectibles::base";
Expand Down Expand Up @@ -102,7 +102,7 @@ impl BaseNodeClient {
let client = self.client_mut();
let request = grpc::GetTokensRequest {
asset_public_key: Vec::from(asset_public_key.as_bytes()),
unique_ids: vec![COMMITTEE_DEFINITION_ID.into()],
unique_ids: vec![],
};

debug!(target: LOG_TARGET, "get sidechain request {:?}", request);
Expand Down
@@ -1,3 +1,4 @@
use std::convert::TryInto;
// Copyright 2021. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
Expand Down Expand Up @@ -55,7 +56,8 @@ impl GrpcValidatorNodeClient {
args: Vec<u8>,
) -> Result<Vec<u8>, CollectiblesError> {
let req = grpc::InvokeReadMethodRequest {
asset_public_key: Vec::from(asset_public_key.as_bytes()),
// TODO: contract id
contract_id: asset_public_key.to_vec(),
template_id,
method,
args,
Expand Down Expand Up @@ -86,7 +88,8 @@ impl GrpcValidatorNodeClient {
args: Vec<u8>,
) -> Result<Vec<u8>, CollectiblesError> {
let req = grpc::InvokeMethodRequest {
asset_public_key: Vec::from(asset_public_key.as_bytes()),
// TODO: contract id
contract_id: asset_public_key.as_bytes().try_into().unwrap(),
template_id,
method,
args,
Expand Down
Expand Up @@ -119,7 +119,8 @@ impl WalletClient {
let inner = self.get_inner_mut()?;
let committee = vec![];
let request = grpc::CreateInitialAssetCheckpointRequest {
asset_public_key: Vec::from_hex(asset_public_key)?,
// TODO: contract id
contract_id: Vec::from_hex(asset_public_key)?,
merkle_root,
committee,
};
Expand Down
33 changes: 19 additions & 14 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Expand Up @@ -82,10 +82,7 @@ use tari_app_grpc::{
TransferResult,
},
};
use tari_common_types::{
array::copy_into_fixed_array,
types::{BlockHash, FixedHash, PublicKey, Signature},
};
use tari_common_types::types::{BlockHash, FixedHash, PublicKey, Signature};
use tari_comms::{multiaddr::Multiaddr, types::CommsPublicKey, CommsNode};
use tari_core::transactions::{
tari_amount::MicroTari,
Expand Down Expand Up @@ -684,7 +681,7 @@ impl wallet_server::Wallet for WalletGrpcServer {
.await
.map_err(|e| Status::internal(e.to_string()))?;

let contract_id_hex = contract_id.to_vec().to_hex();
let contract_id_hex = contract_id.to_hex();
let message = format!("Contract acceptance for contract with id={}", contract_id_hex);
transaction_service
.submit_transaction(tx_id, transaction, 0.into(), message)
Expand Down Expand Up @@ -812,18 +809,22 @@ impl wallet_server::Wallet for WalletGrpcServer {
let mut transaction_service = self.wallet.transaction_service.clone();
let message = request.into_inner();

let asset_public_key = PublicKey::from_bytes(message.asset_public_key.as_slice())
let contract_id = message
.contract_id
.try_into()
.map_err(|e| Status::invalid_argument(format!("Asset public key was not a valid pub key:{}", e)))?;

let merkle_root = copy_into_fixed_array(&message.merkle_root)
let merkle_root = message
.merkle_root
.try_into()
.map_err(|_| Status::invalid_argument("Merkle root has an incorrect length"))?;

let (tx_id, transaction) = asset_manager
.create_initial_asset_checkpoint(&asset_public_key, merkle_root.into())
.create_initial_asset_checkpoint(contract_id, merkle_root)
.await
.map_err(|e| Status::internal(e.to_string()))?;

let message = format!("Initial asset checkpoint for {}", asset_public_key);
let message = format!("Initial asset checkpoint for {}", contract_id);
let _ = transaction_service
.submit_transaction(tx_id, transaction, 0.into(), message)
.await
Expand All @@ -840,18 +841,22 @@ impl wallet_server::Wallet for WalletGrpcServer {
let mut transaction_service = self.wallet.transaction_service.clone();
let message = request.into_inner();

let asset_public_key = PublicKey::from_bytes(message.asset_public_key.as_slice())
.map_err(|e| Status::invalid_argument(format!("Asset public key was not a valid pub key:{}", e)))?;
let contract_id = message
.contract_id
.try_into()
.map_err(|e| Status::invalid_argument(format!("Contract ID was not valid :{}", e)))?;

let merkle_root = copy_into_fixed_array(&message.merkle_root)
let merkle_root = message
.merkle_root
.try_into()
.map_err(|_| Status::invalid_argument("Incorrect merkle root length"))?;

let (tx_id, transaction) = asset_manager
.create_follow_on_asset_checkpoint(&asset_public_key, message.unique_id.as_slice(), merkle_root.into())
.create_follow_on_asset_checkpoint(contract_id, merkle_root)
.await
.map_err(|e| Status::internal(e.to_string()))?;

let message = format!("Asset state checkpoint for {}", asset_public_key);
let message = format!("Sidechain state checkpoint for {}", contract_id);
let _ = transaction_service
.submit_transaction(tx_id, transaction, 0.into(), message)
.await
Expand Down

0 comments on commit 8b0add0

Please sign in to comment.