Skip to content

Commit

Permalink
fix(core)!: adds utxo and block info to get_template_registrations re…
Browse files Browse the repository at this point in the history
…quest (#4789)

Description
---
- adds UTXO and block info to get_template_registrations request
- adds end_height to get_template_registrations request
- Changes validator node registration key to <block_height, output_hash>
- uses CompositeKey for all composite keys
- Removed some unused (previous contract code) response types

Motivation and Context
---
DAN template scanner requires block hash info to determine last scanned height.
Indexing template validations by block height enable more efficient retrievals

NOTE: blockchain db will need to be resynced

How Has This Been Tested?
---
Manually - DAN block scanner
  • Loading branch information
sdbondi authored Oct 11, 2022
1 parent 64002e9 commit 9e81c7b
Show file tree
Hide file tree
Showing 20 changed files with 411 additions and 225 deletions.
28 changes: 26 additions & 2 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ service BaseNode {
rpc GetCommittee(GetCommitteeRequest) returns (GetCommitteeResponse);
rpc GetShardKey(GetShardKeyRequest) returns (GetShardKeyResponse);
// Get templates
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream TemplateRegistration);
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream GetTemplateRegistrationResponse);
rpc GetSideChainUtxos(GetSideChainUtxosRequest) returns (stream GetSideChainUtxosResponse);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -471,5 +472,28 @@ message GetShardKeyResponse {
}

message GetTemplateRegistrationsRequest {
uint64 from_height = 1;
bytes start_hash = 1;
uint64 count = 2;
}

message GetTemplateRegistrationResponse {
bytes utxo_hash = 1;
TemplateRegistration registration = 2;
}

message BlockInfo {
uint64 height = 1;
bytes hash = 2;
bytes next_block_hash = 3;
}

message GetSideChainUtxosRequest {
bytes start_hash = 1;
uint64 count = 2;
}

message GetSideChainUtxosResponse {
BlockInfo block_info = 1;
repeated TransactionOutput outputs = 2;
}

2 changes: 1 addition & 1 deletion applications/tari_app_grpc/proto/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ message Authority {
bytes proxied_by = 3;
}

message InvokeMethodRequest{
message InvokeMethodRequest {
bytes contract_id = 1;
uint32 template_id = 2;
string method = 3;
Expand Down
221 changes: 168 additions & 53 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tari_app_grpc::{
tari_rpc::{CalcType, Sorting},
};
use tari_app_utilities::consts;
use tari_common_types::types::{Commitment, PublicKey, Signature};
use tari_common_types::types::{Commitment, FixedHash, PublicKey, Signature};
use tari_comms::{Bytes, CommsNode};
use tari_core::{
base_node::{
Expand Down Expand Up @@ -140,7 +140,8 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::TemplateRegistration, Status>>;
type GetSideChainUtxosStream = mpsc::Receiver<Result<tari_rpc::GetSideChainUtxosResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
Expand Down Expand Up @@ -1484,7 +1485,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
request: Request<tari_rpc::GetActiveValidatorNodesRequest>,
) -> Result<Response<Self::GetActiveValidatorNodesStream>, Status> {
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetActiveValidatorNodes");

let mut handler = self.node_service.clone();
Expand All @@ -1493,39 +1493,24 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
task::spawn(async move {
let active_validator_nodes = match handler.get_active_validator_nodes(request.height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,);
warn!(target: LOG_TARGET, "Base node service error: {}", err,);
return;
},
Ok(data) => data,
};
// dbg!(&active_validator_nodes);

for (public_key, shard_key) in active_validator_nodes {
let active_validator_node = tari_rpc::GetActiveValidatorNodesResponse {
public_key: public_key.to_vec(),
shard_key: shard_key.to_vec(),
};

match tx.send(Ok(active_validator_node)).await {
Ok(_) => (),
Err(err) => {
warn!(
target: LOG_TARGET,
"Error sending mempool transaction via GRPC: {}", err
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
if tx.send(Ok(active_validator_node)).await.is_err() {
debug!(
target: LOG_TARGET,
"[get_active_validator_nodes] Client has disconnected before stream completed"
);
return;
}
}
});
Expand All @@ -1544,63 +1529,193 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetTemplateRegistrations");

let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(1000);
let (mut tx, rx) = mpsc::channel(10);

let start_hash = Some(request.start_hash)
.filter(|x| !x.is_empty())
.map(FixedHash::try_from)
.transpose()
.map_err(|_| Status::invalid_argument("Invalid start_hash"))?;

let mut node_service = self.node_service.clone();

let start_height = match start_hash {
Some(hash) => {
let header = node_service
.get_header_by_hash(hash)
.await
.map_err(|err| obscure_error_if_true(self.report_grpc_error, Status::internal(err.to_string())))?;
header
.map(|h| h.height())
.ok_or_else(|| Status::not_found("Start hash not found"))?
},
None => 0,
};

if request.count == 0 {
return Ok(Response::new(rx));
}

let end_height = start_height
.checked_add(request.count)
.ok_or_else(|| Status::invalid_argument("Request start height + count overflows u64"))?;

task::spawn(async move {
let template_registrations = match handler.get_template_registrations(request.from_height).await {
let template_registrations = match node_service.get_template_registrations(start_height, end_height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,);
warn!(target: LOG_TARGET, "Base node service error: {}", err);
return;
},
Ok(data) => data,
};

for template_registration in template_registrations {
let template_registration = match tari_rpc::TemplateRegistration::try_from(template_registration) {
let registration = match template_registration.registration_data.try_into() {
Ok(t) => t,
Err(e) => {
warn!(
target: LOG_TARGET,
"Error sending converting template registration for GRPC: {}", e
);
match tx
let _ignore = tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal("Error converting template_registration"),
Status::internal(format!("Error converting template_registration: {}", e)),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
.await;
return;
},
};

match tx.send(Ok(template_registration)).await {
Ok(_) => (),
Err(err) => {
let resp = tari_rpc::GetTemplateRegistrationResponse {
utxo_hash: template_registration.output_hash.to_vec(),
registration: Some(registration),
};

if tx.send(Ok(resp)).await.is_err() {
debug!(
target: LOG_TARGET,
"[get_template_registrations] Client has disconnected before stream completed"
);
return;
}
}
});
debug!(
target: LOG_TARGET,
"Sending GetTemplateRegistrations response stream to client"
);
Ok(Response::new(rx))
}

async fn get_side_chain_utxos(
&self,
request: Request<tari_rpc::GetSideChainUtxosRequest>,
) -> Result<Response<Self::GetSideChainUtxosStream>, Status> {
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetTemplateRegistrations");

let (mut tx, rx) = mpsc::channel(10);

let start_hash = Some(request.start_hash)
.filter(|x| !x.is_empty())
.map(FixedHash::try_from)
.transpose()
.map_err(|_| Status::invalid_argument("Invalid start_hash"))?;

let mut node_service = self.node_service.clone();

let start_header = match start_hash {
Some(hash) => node_service
.get_header_by_hash(hash)
.await
.map_err(|err| obscure_error_if_true(self.report_grpc_error, Status::internal(err.to_string())))?
.ok_or_else(|| Status::not_found("Start hash not found"))?,
None => node_service
.get_header(0)
.await
.map_err(|err| obscure_error_if_true(self.report_grpc_error, Status::internal(err.to_string())))?
.ok_or_else(|| Status::unavailable("Genesis block not available"))?,
};

if request.count == 0 {
return Ok(Response::new(rx));
}

let start_height = start_header.height();
let end_height = start_height
.checked_add(request.count - 1)
.ok_or_else(|| Status::invalid_argument("Request start height + count overflows u64"))?;

task::spawn(async move {
let mut current_header = start_header;

for height in start_height..=end_height {
let header_hash = *current_header.hash();
let utxos = match node_service.fetch_unspent_utxos_in_block(header_hash).await {
Ok(utxos) => utxos,
Err(e) => {
warn!(target: LOG_TARGET, "Base node service error: {}", e);
return;
},
};

let next_header = match node_service.get_header(height + 1).await {
Ok(h) => h,
Err(e) => {
let _ignore = tx.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal(e.to_string()),
)));
return;
},
};

let sidechain_outputs = utxos
.into_iter()
.filter(|u| u.features.output_type.is_sidechain_type())
.collect::<Vec<_>>();

match sidechain_outputs.into_iter().map(TryInto::try_into).collect() {
Ok(outputs) => {
let resp = tari_rpc::GetSideChainUtxosResponse {
block_info: Some(tari_rpc::BlockInfo {
height: current_header.height(),
hash: header_hash.to_vec(),
next_block_hash: next_header.as_ref().map(|h| h.hash().to_vec()).unwrap_or_default(),
}),
outputs,
};

if tx.send(Ok(resp)).await.is_err() {
debug!(
target: LOG_TARGET,
"[get_template_registrations] Client has disconnected before stream completed"
);
return;
}
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Error sending template registration via GRPC: {}", err
"Error sending converting sidechain output for GRPC: {}", e
);
match tx
let _ignore = tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::unknown("Error sending data"),
Status::internal(format!("Error converting sidechain output: {}", e)),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
.await;
return;
},
};

match next_header {
Some(header) => {
current_header = header;
},
None => break,
}
}
});
Expand Down
18 changes: 14 additions & 4 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
};

use serde::{Deserialize, Serialize};
use tari_common_types::types::{Commitment, HashOutput, PrivateKey, PublicKey, Signature};
use tari_common_types::types::{BlockHash, Commitment, HashOutput, PrivateKey, PublicKey, Signature};
use tari_utilities::hex::Hex;

use crate::{blocks::NewBlockTemplate, chain_storage::MmrTree, proof_of_work::PowAlgorithm};
Expand Down Expand Up @@ -76,7 +76,11 @@ pub enum NodeCommsRequest {
public_key: PublicKey,
},
FetchTemplateRegistrations {
from_height: u64,
start_height: u64,
end_height: u64,
},
FetchUnspentUtxosInBlock {
block_hash: BlockHash,
},
}

Expand Down Expand Up @@ -127,8 +131,14 @@ impl Display for NodeCommsRequest {
GetShardKey { height, public_key } => {
write!(f, "GetShardKey height ({}), public key ({:?})", height, public_key)
},
FetchTemplateRegistrations { from_height } => {
write!(f, "FetchTemplateRegistrations ({})", from_height)
FetchTemplateRegistrations {
start_height: start,
end_height: end,
} => {
write!(f, "FetchTemplateRegistrations ({}..={})", start, end)
},
FetchUnspentUtxosInBlock { block_hash } => {
write!(f, "FetchUnspentUtxosInBlock ({})", block_hash)
},
}
}
Expand Down
Loading

0 comments on commit 9e81c7b

Please sign in to comment.