Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): store and fetch templates from lmdb #4726

Merged
merged 8 commits into from
Sep 27, 2022
9 changes: 8 additions & 1 deletion applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import "types.proto";
import "transaction.proto";
import "block.proto";
import "network.proto";
import "sidechain_types.proto";

package tari.rpc;

Expand Down Expand Up @@ -92,6 +93,8 @@ service BaseNode {
rpc GetActiveValidatorNodes(GetActiveValidatorNodesRequest) returns (stream ActiveValidatorNode);
rpc GetCommittee(GetCommitteeRequest) returns (GetCommitteeResponse);
rpc GetShardKey(GetShardKeyRequest) returns (GetShardKeyResponse);
// Get templates
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream TemplateRegistration);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -464,4 +467,8 @@ message GetShardKeyRequest {

message GetShardKeyResponse {
bytes shard_key = 1;
}
}

message GetTemplateRegistrationsRequest {
uint64 from_height = 1;
}
76 changes: 76 additions & 0 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::TemplateRegistration, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<tari_rpc::ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<tari_rpc::BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
Expand Down Expand Up @@ -1541,6 +1542,81 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
);
Ok(Response::new(rx))
}

async fn get_template_registrations(
&self,
request: Request<tari_rpc::GetTemplateRegistrationsRequest>,
) -> Result<Response<Self::GetTemplateRegistrationsStream>, Status> {
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetTemplateRegistrations");

let mut handler = self.node_service.clone();
let (mut tx, rx) = mpsc::channel(1000);

task::spawn(async move {
let template_registrations = match handler.get_template_registrations(request.from_height).await {
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,);
return;
},
Ok(data) => data,
};

for template_registration in template_registrations {
let template_registration = match tari_rpc::TemplateRegistration::try_from(template_registration) {
Ok(t) => t,
Err(e) => {
warn!(
target: LOG_TARGET,
"Error sending converting template registration for GRPC: {}", e
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::internal("Error converting template_registration"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
};

match tx.send(Ok(template_registration)).await {
Ok(_) => (),
Err(err) => {
warn!(
target: LOG_TARGET,
"Error sending template registration via GRPC: {}", err
);
match tx
.send(Err(obscure_error_if_true(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
}
}
});
debug!(
target: LOG_TARGET,
"Sending GetTemplateRegistrations response stream to client"
);
Ok(Response::new(rx))
}
}

enum BlockGroupType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum NodeCommsRequest {
FetchValidatorNodesKeys { height: u64 },
FetchCommittee { height: u64, shard: [u8; 32] },
GetShardKey { height: u64, public_key: PublicKey },
FetchTemplateRegistrations { from_height: u64 },
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -106,6 +107,9 @@ impl Display for NodeCommsRequest {
GetShardKey { height, public_key } => {
write!(f, "GetShardKey height ({}), public key ({:?})", height, public_key)
},
FetchTemplateRegistrations { from_height } => {
write!(f, "FetchTemplateRegistrations ({})", from_height)
},
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::{ActiveValidatorNode, UtxoMinedInfo},
proof_of_work::Difficulty,
transactions::transaction_components::{Transaction, TransactionKernel, TransactionOutput},
transactions::transaction_components::{
CodeTemplateRegistration,
Transaction,
TransactionKernel,
TransactionOutput,
},
};

/// API Response enum
Expand Down Expand Up @@ -74,6 +79,7 @@ pub enum NodeCommsResponse {
FetchValidatorNodesKeysResponse(Vec<ActiveValidatorNode>),
FetchCommitteeResponse(Vec<ActiveValidatorNode>),
GetShardKeyResponse([u8; 32]),
FetchTemplateRegistrationsResponse(Vec<CodeTemplateRegistration>),
}

impl Display for NodeCommsResponse {
Expand Down Expand Up @@ -115,6 +121,7 @@ impl Display for NodeCommsResponse {
FetchValidatorNodesKeysResponse(_) => write!(f, "FetchValidatorNodesKeysResponse"),
FetchCommitteeResponse(_) => write!(f, "FetchCommitteeResponse"),
GetShardKeyResponse(_) => write!(f, "GetShardKeyResponse"),
FetchTemplateRegistrationsResponse(_) => write!(f, "FetchTemplateRegistrationsResponse"),
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,18 @@ where B: BlockchainBackend + 'static
let shard_key = self.blockchain_db.get_shard_key(height, public_key).await?;
Ok(NodeCommsResponse::GetShardKeyResponse(shard_key))
},
NodeCommsRequest::FetchTemplateRegistrations { from_height } => {
let template_registrations = self
.blockchain_db
.fetch_template_registrations(from_height)
.await?
.into_iter()
.map(|tr| tr.registration_data)
.collect();
Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
template_registrations,
))
},
}
}

Expand Down
16 changes: 15 additions & 1 deletion base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::ActiveValidatorNode,
proof_of_work::PowAlgorithm,
transactions::transaction_components::{TransactionKernel, TransactionOutput},
transactions::transaction_components::{CodeTemplateRegistration, TransactionKernel, TransactionOutput},
};

pub type BlockEventSender = broadcast::Sender<Arc<BlockEvent>>;
Expand Down Expand Up @@ -312,4 +312,18 @@ impl LocalNodeCommsInterface {
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}

pub async fn get_template_registrations(
&mut self,
from_height: u64,
) -> Result<Vec<CodeTemplateRegistration>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchTemplateRegistrations { from_height })
.await??
{
NodeCommsResponse::FetchTemplateRegistrationsResponse(template_registrations) => Ok(template_registrations),
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}
}
4 changes: 3 additions & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_common_types::{
};
use tari_utilities::epoch_time::EpochTime;

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -271,6 +271,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
make_async_fn!(fetch_committee(height: u64, shard: [u8;32]) -> Vec<ActiveValidatorNode>, "fetch_committee");

make_async_fn!(get_shard_key(height:u64, public_key: PublicKey) -> [u8;32], "get_shard_key");

make_async_fn!(fetch_template_registrations(from_height: u64) -> Vec<TemplateRegistration>, "fetch_template_registrations");
}

impl<B: BlockchainBackend + 'static> From<BlockchainDatabase<B>> for AsyncBlockchainDb<B> {
Expand Down
3 changes: 2 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_common_types::{
types::{Commitment, HashOutput, PublicKey, Signature},
};

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -196,4 +196,5 @@ pub trait BlockchainBackend: Send + Sync {
fn fetch_active_validator_nodes(&self, height: u64) -> Result<Vec<ActiveValidatorNode>, ChainStorageError>;
fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError>;
fn get_shard_key(&self, height: u64, public_key: PublicKey) -> Result<[u8; 32], ChainStorageError>;
fn fetch_template_registrations(&self, from_height: u64) -> Result<Vec<TemplateRegistration>, ChainStorageError>;
}
10 changes: 9 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tari_common_types::{
use tari_mmr::pruned_hashset::PrunedHashSet;
use tari_utilities::{epoch_time::EpochTime, hex::Hex, ByteArray};

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -1181,6 +1181,14 @@ where B: BlockchainBackend
let db = self.db_read_access()?;
db.fetch_committee(height, shard)
}

pub fn fetch_template_registrations(
&self,
from_height: u64,
) -> Result<Vec<TemplateRegistration>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_template_registrations(from_height)
}
}

fn unexpected_result<T>(request: DbKey, response: DbValue) -> Result<T, ChainStorageError> {
Expand Down
8 changes: 7 additions & 1 deletion base_layer/core/src/chain_storage/db_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use croaring::Bitmap;
use tari_common_types::types::{BlockHash, Commitment, HashOutput, PublicKey};
use tari_utilities::hex::Hex;

use super::ActiveValidatorNode;
use super::{ActiveValidatorNode, TemplateRegistration};
use crate::{
blocks::{Block, BlockHeader, BlockHeaderAccumulatedData, ChainBlock, ChainHeader, UpdateBlockAccumulatedData},
chain_storage::{error::ChainStorageError, HorizonData, Reorg},
Expand Down Expand Up @@ -365,6 +365,9 @@ pub enum WriteOperation {
DeleteValidatorNode {
public_key: PublicKey,
},
InsertTemplateRegistration {
template_registration: TemplateRegistration,
},
}

impl fmt::Display for WriteOperation {
Expand Down Expand Up @@ -465,6 +468,9 @@ impl fmt::Display for WriteOperation {
write!(f, "Inserting VN {:?}", validator_node)
},
DeleteValidatorNode { public_key } => write!(f, "Delete VN key {}", public_key),
InsertTemplateRegistration { template_registration } => {
write!(f, "Inserting Template {:?}", template_registration)
},
}
}
}
Expand Down
Loading