Skip to content

Commit

Permalink
feat(validator-node): initial state sync implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Feb 14, 2022
1 parent b1492a0 commit 0a3ddf2
Show file tree
Hide file tree
Showing 65 changed files with 2,130 additions and 661 deletions.
889 changes: 511 additions & 378 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ pub fn copy_config_file<S: AsRef<Path>>(
file: &str,
) -> Result<(), LauncherError> {
let path = Path::new("assets").join(file);
let config_path = resolve_path(config, package_info, &path, Some(BaseDirectory::Resource))?;
let config_path = resolve_path(
config,
package_info,
&Default::default(),
&path,
Some(BaseDirectory::Resource),
)?;
let cfg = std::fs::read_to_string(&config_path).expect("The config assets were not bundled with the App");
info!("Log Configuration file ({}) loaded", file);
debug!("{}", cfg);
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
let response = tari_rpc::ListAssetRegistrationsResponse {
asset_public_key: output
.features
.mint_non_fungible
.map(|mint| mint.asset_public_key.to_vec())
.asset
.map(|asset| asset.public_key.to_vec())
.unwrap_or_default(),
unique_id: output.features.unique_id.unwrap_or_default(),
owner_commitment: output.commitment.to_vec(),
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ async fn run_grpc(
.serve_with_shutdown(grpc_address, interrupt_signal.map(|_| ()))
.await
.map_err(|err| {
error!(target: LOG_TARGET, "GRPC encountered an error:{}", err);
error!(target: LOG_TARGET, "GRPC encountered an error: {}", err);
err
})?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ pub(crate) async fn assets_create_initial_checkpoint(
committee: Vec<String>,
state: tauri::State<'_, ConcurrentAppState>,
) -> Result<(), Status> {
let mut client = state.connect_validator_node_client().await?;
client
.get_initial_checkpoint_merkle_root(&asset_pub_key)
.await;

let mmr = MerkleMountainRange::<Blake256, _>::new(MemBackendVec::new());

let root = mmr.get_merkle_root().unwrap();
Expand Down
43 changes: 42 additions & 1 deletion applications/tari_validator_node/proto/dan/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,48 @@ message GetSidechainBlocksRequest {
bytes end_hash = 3;
}


message GetSidechainBlocksResponse {
tari.dan.common.SideChainBlock block = 1;
}

message GetSidechainStateRequest {
bytes asset_public_key = 1;
}

message GetSidechainStateResponse {
oneof state {
string schema = 1;
KeyValue key_value = 2;
}
}

message KeyValue {
bytes key = 1;
bytes value = 2;
}

message GetStateOpLogsRequest {
bytes asset_public_key = 1;
uint64 height = 2;
}

message GetStateOpLogsResponse {
repeated StateOpLog op_logs = 1;
}

message StateOpLog {
uint64 height = 1;
string operation = 2;
string schema = 3;
bytes key = 4;
bytes value = 5;
bytes merkle_root = 6;
}

message GetTipNodeRequest{
bytes asset_public_key = 1;
}

message GetTipNodeResponse {
tari.dan.common.Node tip_node = 1;
}
24 changes: 21 additions & 3 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::{collections::HashMap, sync::Arc, time::Duration};

use log::info;
use log::*;
use tari_common::{
configuration::ValidatorNodeConfig,
exit_codes::{ExitCode, ExitError},
Expand Down Expand Up @@ -59,6 +59,7 @@ use crate::{
inbound_connection_service::TariCommsInboundConnectionService,
outbound_connection_service::TariCommsOutboundService,
},
TariCommsValidatorNodeClientFactory,
};

const LOG_TARGET: &str = "tari::validator_node::app";
Expand Down Expand Up @@ -108,16 +109,29 @@ impl DanNode {
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.unwrap();
info!(
target: LOG_TARGET,
"Base node returned {} asset(s) to process",
assets.len()
);
for asset in assets {
if tasks.contains_key(&asset.public_key) {
debug!(
target: LOG_TARGET,
"Asset task already running for asset '{}'", asset.public_key
);
continue;
}
if let Some(allow_list) = &dan_config.assets_allow_list {
if !allow_list.contains(&asset.public_key.to_hex()) {
debug!(
target: LOG_TARGET,
"Asset '{}' is not whitelisted for processing ", asset.public_key
);
continue;
}
}
info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key);
info!(target: LOG_TARGET, "Adding asset '{}'", asset.public_key);
let node_identity = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
Expand All @@ -128,7 +142,7 @@ impl DanNode {
tasks.insert(
asset.public_key.clone(),
task::spawn(DanNode::start_asset_worker(
asset.clone(),
asset,
node_identity,
mempool,
handles,
Expand Down Expand Up @@ -199,6 +213,9 @@ impl DanNode {
let chain_storage = SqliteStorageService {};
let wallet_client = GrpcWalletClient::new(config.wallet_grpc_address);
let checkpoint_manager = ConcreteCheckpointManager::new(asset_definition.clone(), wallet_client);
let connectivity = handles.expect_handle();
let validator_node_client_factory =
TariCommsValidatorNodeClientFactory::new(connectivity, dht.discovery_service_requester());
let mut consensus_worker = ConsensusWorker::<DefaultServiceSpecification>::new(
receiver,
outbound,
Expand All @@ -214,6 +231,7 @@ impl DanNode {
db_factory,
chain_storage,
checkpoint_manager,
validator_node_client_factory,
);

consensus_worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::{convert::TryInto, net::SocketAddr};

use async_trait::async_trait;
use log::*;
use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
Expand All @@ -32,6 +33,8 @@ use tari_dan_core::{
DigitalAssetError,
};

const LOG_TARGET: &str = "tari::validator_node::app";

#[derive(Clone)]
pub struct GrpcBaseNodeClient {
endpoint: SocketAddr,
Expand Down Expand Up @@ -112,18 +115,23 @@ impl BaseNodeClient for GrpcBaseNodeClient {
self.inner.as_mut().unwrap()
},
};
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 };
// TODO: probably should use output mmr indexes here
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 100 };
let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner();
let mut assets: Vec<AssetDefinition> = vec![];
let tip = self.get_tip_info().await?;
while let Some(r) = result.message().await.unwrap() {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.asset_public_key.as_bytes()) {
if let Some(checkpoint) = self
.get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32])
.await?
{
if let Some(committee) = checkpoint.get_side_chain_committee() {
if committee.contains(&dan_node_public_key) {
debug!(
target: LOG_TARGET,
"Node is on committee for asset : {}", asset_public_key
);
assets.push(AssetDefinition {
public_key: asset_public_key,
template_parameters: r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
.get_state_db(&asset_public_key)
.map_err(|e| Status::internal(format!("Could not create state db: {}", e)))?
{
let mut unit_of_work = state.new_unit_of_work();
let mut state_db_reader = state.reader();
let response_bytes = self
.asset_processor
.invoke_read_method(template_id, request.method, &request.args, &mut unit_of_work)
.invoke_read_method(template_id, request.method, &request.args, &mut state_db_reader)
.map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?;
Ok(Response::new(rpc::InvokeReadMethodResponse {
result: response_bytes.unwrap_or_default(),
Expand Down
8 changes: 6 additions & 2 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ fn main_inner() -> Result<(), ExitError> {

async fn run_node(config: GlobalConfig, create_id: bool) -> Result<(), ExitError> {
let shutdown = Shutdown::new();
let validator_node_config = config
.validator_node
.as_ref()
.ok_or_else(|| ExitCodes::ConfigError("validator_node configuration not found".to_string()))?;

fs::create_dir_all(&config.peer_db_path).map_err(|err| ExitError::new(ExitCode::ConfigError, err))?;
let node_identity = setup_node_identity(
Expand Down Expand Up @@ -125,7 +129,7 @@ async fn run_node(config: GlobalConfig, create_id: bool) -> Result<(), ExitError
handles.expect_handle::<Dht>().discovery_service_requester(),
);
let asset_proxy: ConcreteAssetProxy<DefaultServiceSpecification> = ConcreteAssetProxy::new(
GrpcBaseNodeClient::new(config.validator_node.clone().unwrap().base_node_grpc_address),
GrpcBaseNodeClient::new(validator_node_config.base_node_grpc_address),
validator_node_client_factory,
5,
mempool_service.clone(),
Expand Down Expand Up @@ -197,7 +201,7 @@ async fn run_grpc<TServiceSpecification: ServiceSpecification + 'static>(
.serve_with_shutdown(grpc_address, shutdown_signal.map(|_| ()))
.await
.map_err(|err| {
error!(target: LOG_TARGET, "GRPC encountered an error:{}", err);
error!(target: LOG_TARGET, "GRPC encountered an error: {}", err);
err
})?;

Expand Down
115 changes: 96 additions & 19 deletions applications/tari_validator_node/src/p2p/proto/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,27 @@ use std::convert::{TryFrom, TryInto};

use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::models::{
CheckpointData,
HotStuffMessage,
HotStuffMessageType,
HotStuffTreeNode,
Instruction,
InstructionSet,
Node,
QuorumCertificate,
SideChainBlock,
Signature,
StateRoot,
TariDanPayload,
TemplateId,
TreeNodeHash,
ViewId,
use tari_dan_core::{
models::{
CheckpointData,
HotStuffMessage,
HotStuffMessageType,
HotStuffTreeNode,
Instruction,
InstructionSet,
KeyValue,
Node,
QuorumCertificate,
SideChainBlock,
Signature,
StateOpLogEntry,
StateRoot,
TariDanPayload,
TemplateId,
TreeNodeHash,
ViewId,
},
storage::state::DbStateOpLogEntry,
};

use crate::p2p::proto;
Expand Down Expand Up @@ -159,14 +164,19 @@ impl TryFrom<proto::consensus::HotStuffTreeNode> for HotStuffTreeNode<TariDanPay
if value.parent.is_empty() {
return Err("parent not provided".to_string());
}
let state_root = value
.state_root
.try_into()
.map(StateRoot::new)
.map_err(|_| "Incorrect length for state_root")?;
Ok(Self::new(
TreeNodeHash::try_from(value.parent).map_err(|err| err.to_string())?,
TreeNodeHash::try_from(value.parent).map_err(|_| "Incorrect length for parent")?,
value
.payload
.map(|p| p.try_into())
.transpose()?
.ok_or_else(|| "payload not provided".to_string())?,
StateRoot::new(value.state_root),
.ok_or_else(|| "payload not provided")?,
state_root,
value.height,
))
}
Expand Down Expand Up @@ -281,3 +291,70 @@ impl TryFrom<proto::common::Node> for Node {
Ok(Self::new(hash, parent, height, is_committed))
}
}

impl From<KeyValue> for proto::validator_node::KeyValue {
fn from(kv: KeyValue) -> Self {
Self {
key: kv.key,
value: kv.value,
}
}
}

impl TryFrom<proto::validator_node::KeyValue> for KeyValue {
type Error = String;

fn try_from(kv: proto::validator_node::KeyValue) -> Result<Self, Self::Error> {
if kv.key.is_empty() {
return Err("KeyValue: key cannot be empty".to_string());
}

Ok(Self {
key: kv.key,
value: kv.value,
})
}
}

impl From<StateOpLogEntry> for proto::validator_node::StateOpLog {
fn from(entry: StateOpLogEntry) -> Self {
let DbStateOpLogEntry {
height,
merkle_root,
operation,
schema,
key,
value,
} = entry.into_inner();
Self {
height,
merkle_root: merkle_root.map(|r| r.as_bytes().to_vec()).unwrap_or_default(),
operation: operation.as_op_str().to_string(),
schema,
key,
value: value.unwrap_or_default(),
}
}
}
impl TryFrom<proto::validator_node::StateOpLog> for StateOpLogEntry {
type Error = String;

fn try_from(value: proto::validator_node::StateOpLog) -> Result<Self, Self::Error> {
Ok(DbStateOpLogEntry {
height: value.height,
merkle_root: Some(value.merkle_root)
.filter(|r| !r.is_empty())
.map(TryInto::try_into)
.transpose()
.map_err(|_| "Invalid merkle root value".to_string())?,
operation: value
.operation
.parse()
.map_err(|_| "Invalid oplog operation string".to_string())?,
schema: value.schema,
key: value.key,
value: Some(value.value).filter(|v| !v.is_empty()),
}
.into())
}
}
Loading

0 comments on commit 0a3ddf2

Please sign in to comment.