Skip to content

Commit

Permalink
Merge branch 'development' into wallet-cli
Browse files Browse the repository at this point in the history
* development:
  fix: dont dial local shards if epoch manager has not synced (tari-project#370)
  fix: allow web UI to use OS-assigned JRPC address (tari-project#371)
  fix: dont initially register automatically (tari-project#372)
  fix: dont dial local shards if epoch manager has not synced (tari-project#369)
  • Loading branch information
sdbondi committed Feb 15, 2023
2 parents 82be965 + 9a9cf5e commit 8620154
Show file tree
Hide file tree
Showing 17 changed files with 376 additions and 549 deletions.
705 changes: 217 additions & 488 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 22 additions & 5 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

use log::*;
use tari_comms::{connectivity::ConnectivityEvent, peer_manager::NodeId};
use tari_dan_core::{services::epoch_manager::EpochManager, workers::events::HotStuffEvent};
use tari_dan_core::{
services::epoch_manager::{EpochManager, EpochManagerError},
workers::events::HotStuffEvent,
};
use tari_shutdown::ShutdownSignal;
use tari_template_lib::Hash;

Expand All @@ -44,7 +47,9 @@ impl DanNode {

let mut connectivity_events = self.services.comms.connectivity().get_event_subscription();

self.dial_local_shard_peers().await?;
if let Err(err) = self.dial_local_shard_peers().await {
error!(target: LOG_TARGET, "Failed to dial local shard peers: {}", err);
}

let status = self.services.comms.connectivity().get_connectivity_status().await?;
if status.is_online() {
Expand Down Expand Up @@ -89,18 +94,30 @@ impl DanNode {

async fn dial_local_shard_peers(&mut self) -> Result<(), anyhow::Error> {
let epoch = self.services.epoch_manager.current_epoch().await?;
let shard_id = self
let res = self
.services
.epoch_manager
.get_validator_shard_key(epoch, self.services.comms.node_identity().public_key().clone())
.await?;
.await;

let shard_id = match res {
Ok(shard_id) => shard_id,
Err(EpochManagerError::BaseLayerConsensusConstantsNotSet) => {
info!(target: LOG_TARGET, "Epoch manager has not synced with base layer yet");
return Ok(());
},
Err(err) => {
return Err(err.into());
},
};

let local_shard_peers = self.services.epoch_manager.get_committee(epoch, shard_id).await?;
info!(
target: LOG_TARGET,
"Dialing {} local shard peers",
local_shard_peers.members.len()
);
// TODO: Peer sync may not have completed yet, so some addresses for local shard peers may not be known yet.

self.services
.comms
.connectivity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ impl GrpcBaseNodeClient {
let consensus_constants = BaseLayerConsensusConstants {
validator_node_registration_expiry: result.validator_node_validity_period,
epoch_length: result.epoch_length,
validator_node_registration_min_deposit_amount: result
.validator_node_registration_min_deposit_amount
.into(),
};
Ok(consensus_constants)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use tari_app_grpc::tari_rpc::{
BuildInfo,
CreateTemplateRegistrationRequest,
CreateTemplateRegistrationResponse,
GetBalanceRequest,
GetBalanceResponse,
RegisterValidatorNodeRequest,
RegisterValidatorNodeResponse,
TemplateRegistration,
Expand Down Expand Up @@ -68,6 +70,12 @@ impl GrpcWalletClient {
.ok_or_else(|| DigitalAssetError::FatalError("no connection".into()))
}

pub async fn get_balance(&mut self) -> Result<GetBalanceResponse, DigitalAssetError> {
let inner = self.connection().await?;
let resp = inner.get_balance(GetBalanceRequest {}).await?;
Ok(resp.into_inner())
}

pub async fn register_validator_node(
&mut self,
node_identity: &NodeIdentity,
Expand Down
5 changes: 4 additions & 1 deletion applications/tari_validator_node/src/http_ui/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use reqwest::StatusCode;

const LOG_TARGET: &str = "tari::validator_node::http_ui::server";

pub async fn run_http_ui_server(address: SocketAddr, json_rpc_address: Option<String>) -> Result<(), anyhow::Error> {
pub async fn run_http_ui_server(
address: SocketAddr,
json_rpc_address: Option<SocketAddr>,
) -> Result<(), anyhow::Error> {
let json_rpc_address = Arc::new(json_rpc_address);
let router = Router::new()
.route(
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/json_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ pub use handlers::JsonRpcHandlers;
mod jrpc_errors;
mod server;

pub use server::run_json_rpc;
pub use server::spawn_json_rpc;
9 changes: 5 additions & 4 deletions applications/tari_validator_node/src/json_rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use super::handlers::JsonRpcHandlers;

const LOG_TARGET: &str = "tari::validator_node::json_rpc";

pub async fn run_json_rpc(preferred_address: SocketAddr, handlers: JsonRpcHandlers) -> Result<(), anyhow::Error> {
pub fn spawn_json_rpc(preferred_address: SocketAddr, handlers: JsonRpcHandlers) -> Result<SocketAddr, anyhow::Error> {
let router = Router::new()
.route("/", post(handler))
.route("/json_rpc", post(handler))
Expand All @@ -46,11 +46,12 @@ pub async fn run_json_rpc(preferred_address: SocketAddr, handlers: JsonRpcHandle
axum::Server::try_bind(&"127.0.0.1:0".parse().unwrap())
})?;
let server = server.serve(router.into_make_service());
info!(target: LOG_TARGET, "🌐 JSON-RPC listening on {}", server.local_addr());
server.await?;
let addr = server.local_addr();
info!(target: LOG_TARGET, "🌐 JSON-RPC listening on {}", addr);
tokio::spawn(server);

info!(target: LOG_TARGET, "💤 Stopping JSON-RPC");
Ok(())
Ok(addr)
}

async fn handler(Extension(handlers): Extension<Arc<JsonRpcHandlers>>, value: JsonRpcExtractor) -> JrpcResult {
Expand Down
12 changes: 5 additions & 7 deletions applications/tari_validator_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::{
bootstrap::{spawn_services, Services},
dan_node::DanNode,
http_ui::server::run_http_ui_server,
json_rpc::{run_json_rpc, JsonRpcHandlers},
json_rpc::{spawn_json_rpc, JsonRpcHandlers},
p2p::services::networking::DAN_PEER_FEATURES,
};
pub use crate::{
Expand Down Expand Up @@ -132,23 +132,21 @@ pub async fn run_validator_node(config: &ApplicationConfig, shutdown_signal: Shu
.await?;

// Run the JSON-RPC API
if let Some(address) = config.validator_node.json_rpc_address {
let mut jrpc_address = config.validator_node.json_rpc_address;
if let Some(address) = jrpc_address.as_mut() {
info!(target: LOG_TARGET, "🌐 Started JSON-RPC server on {}", address);
let handlers = JsonRpcHandlers::new(
wallet_client,
base_node_client,
&services,
config.validator_node.clone(),
);
task::spawn(run_json_rpc(address, handlers));
*address = spawn_json_rpc(*address, handlers)?;
}

// Run the http ui
if let Some(address) = config.validator_node.http_ui_address {
task::spawn(run_http_ui_server(
address,
config.validator_node.json_rpc_address.map(|addr| addr.to_string()),
));
task::spawn(run_http_ui_server(address, jrpc_address));
}

run_dan_node(services, shutdown_signal).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl BaseLayerEpochManager {
}
}

pub async fn load_initial_state(&mut self) -> Result<(), EpochManagerError> {
pub fn load_initial_state(&mut self) -> Result<(), EpochManagerError> {
let tx = self.global_db.create_transaction()?;
let metadata = self.global_db.metadata(&tx);
self.current_epoch = metadata.get_metadata(MetadataKey::CurrentEpoch)?.unwrap_or(Epoch(0));
Expand Down Expand Up @@ -125,7 +125,9 @@ impl BaseLayerEpochManager {
Ok(())
}

async fn get_base_layer_consensus_constants(&mut self) -> Result<&BaseLayerConsensusConstants, EpochManagerError> {
pub async fn get_base_layer_consensus_constants(
&mut self,
) -> Result<&BaseLayerConsensusConstants, EpochManagerError> {
if let Some(ref constants) = self.base_layer_consensus_constants {
return Ok(constants);
}
Expand Down Expand Up @@ -471,6 +473,24 @@ impl BaseLayerEpochManager {

Ok(())
}

pub async fn remaining_registration_epochs(&mut self) -> Result<Option<Epoch>, EpochManagerError> {
let last_registration_epoch = match self.last_registration_epoch()? {
Some(epoch) => epoch,
None => return Ok(None),
};

let constants = self.get_base_layer_consensus_constants().await?;
let expiry = constants.validator_node_registration_expiry();

let num_blocks_since_last_reg = self
.current_epoch
.checked_sub(last_registration_epoch)
.expect("current epoch was less than the epoch we registered"); // Reorgs are not supported

// None indicates that we are not registered, or a previous registration has expired
Ok(expiry.checked_sub(num_blocks_since_last_reg))
}
}

fn get_committee_shard_range<TAddr>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_core::{transactions::transaction_components::ValidatorNodeRegistration, ValidatorNodeMmr};
use tari_dan_common_types::{Epoch, ShardId};
use tari_dan_core::{
consensus_constants::ConsensusConstants,
consensus_constants::{BaseLayerConsensusConstants, ConsensusConstants},
models::{Committee, ValidatorNode},
services::epoch_manager::{EpochManagerError, ShardCommitteeAllocation},
};
Expand Down Expand Up @@ -131,6 +131,12 @@ pub enum EpochManagerRequest {
NotifyScanningComplete {
reply: Reply<()>,
},
RemainingRegistrationEpochs {
reply: Reply<Option<Epoch>>,
},
GetBaseLayerConsensusConstants {
reply: Reply<BaseLayerConsensusConstants>,
},
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -172,7 +178,7 @@ impl EpochManagerService {

pub async fn run(&mut self, mut shutdown: ShutdownSignal) -> Result<(), EpochManagerError> {
// first, load initial state
self.inner.load_initial_state().await?;
self.inner.load_initial_state()?;

loop {
tokio::select! {
Expand Down Expand Up @@ -253,6 +259,12 @@ impl EpochManagerService {
EpochManagerRequest::NotifyScanningComplete { reply } => {
handle(reply, self.inner.on_scanning_complete().await)
},
EpochManagerRequest::RemainingRegistrationEpochs { reply } => {
handle(reply, self.inner.remaining_registration_epochs().await)
},
EpochManagerRequest::GetBaseLayerConsensusConstants { reply } => {
handle(reply, self.inner.get_base_layer_consensus_constants().await.cloned())
},
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tari_comms::types::CommsPublicKey;
use tari_core::{transactions::transaction_components::ValidatorNodeRegistration, ValidatorNodeMmr};
use tari_dan_common_types::{Epoch, ShardId};
use tari_dan_core::{
consensus_constants::BaseLayerConsensusConstants,
models::{Committee, ValidatorNode},
services::epoch_manager::{EpochManager, EpochManagerError, ShardCommitteeAllocation},
};
Expand Down Expand Up @@ -56,6 +57,15 @@ impl EpochManagerHandle {
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

pub async fn get_base_layer_consensus_constants(&self) -> Result<BaseLayerConsensusConstants, EpochManagerError> {
let (tx, rx) = oneshot::channel();
self.tx_request
.send(EpochManagerRequest::GetBaseLayerConsensusConstants { reply: tx })
.await
.map_err(|_| EpochManagerError::SendError)?;
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

pub async fn last_registration_epoch(&self) -> Result<Option<Epoch>, EpochManagerError> {
let (tx, rx) = oneshot::channel();
self.tx_request
Expand All @@ -75,6 +85,16 @@ impl EpochManagerHandle {
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

/// Returns the number of epochs remaining for the current registration if registered, otherwise None
pub async fn remaining_registration_epochs(&self) -> Result<Option<Epoch>, EpochManagerError> {
let (tx, rx) = oneshot::channel();
self.tx_request
.send(EpochManagerRequest::RemainingRegistrationEpochs { reply: tx })
.await
.map_err(|_| EpochManagerError::SendError)?;
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

pub async fn add_validator_node_registration(
&self,
block_height: u64,
Expand Down
Loading

0 comments on commit 8620154

Please sign in to comment.