Skip to content

Commit

Permalink
feat: add networking grpc calls to wallet and base node (#3100)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Jul 21, 2021
2 parents cb97072 + 6594f1a commit 17f37fb
Show file tree
Hide file tree
Showing 26 changed files with 5,239 additions and 123 deletions.
6 changes: 6 additions & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Expand Up @@ -74,6 +74,12 @@ service BaseNode {
rpc GetPeers(GetPeersRequest) returns (stream GetPeersResponse);
rpc GetMempoolTransactions(GetMempoolTransactionsRequest) returns (stream GetMempoolTransactionsResponse);
rpc TransactionState(TransactionStateRequest) returns (TransactionStateResponse);
// This returns the node's network identity
rpc Identify (Empty) returns (NodeIdentity);
// Get Base Node network connectivity status
rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse);
// List currently connected peers
rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse);
}

message SubmitBlockResponse {
Expand Down
75 changes: 52 additions & 23 deletions applications/tari_app_grpc/proto/types.proto
Expand Up @@ -271,14 +271,31 @@ message ConsensusConstants {
uint64 block_weight_kernels = 16;
}

message UnblindedOutput {
// Value of the output
uint64 value = 1;
// Spending key of the output
bytes spending_key = 2;
// Options for an output's structure or use
OutputFeatures features = 3;
// Tari script serialised script
bytes script = 4;
// Tari script input data for spending
bytes input_data = 5;
// Tari script private key
bytes script_private_key = 7;
// Tari script offset pubkey, K_O
bytes sender_offset_public_key = 8;
// UTXO signature with the script offset private key, k_O
ComSignature metadata_signature = 9;
}

// ----------------------------- Network Types ----------------------------- //

message Address{
bytes address =1;
string last_seen = 2;
uint32 connection_attempts = 3;
uint32 rejected_message_count = 4;
uint64 avg_latency = 5;
message NodeIdentity {
bytes public_key = 1;
string public_address = 2;
bytes node_id = 3;
}

message Peer{
Expand All @@ -305,21 +322,33 @@ message Peer{
string user_agent = 12;
}

message UnblindedOutput {
// Value of the output
uint64 value = 1;
// Spending key of the output
bytes spending_key = 2;
// Options for an output's structure or use
OutputFeatures features = 3;
// Tari script serialised script
bytes script = 4;
// Tari script input data for spending
bytes input_data = 5;
// Tari script private key
bytes script_private_key = 7;
// Tari script offset pubkey, K_O
bytes sender_offset_public_key = 8;
// UTXO signature with the script offset private key, k_O
ComSignature metadata_signature = 9;
enum ConnectivityStatus {
Initializing = 0;
Online = 1;
Degraded = 2;
Offline = 3;
}

message NetworkStatusResponse {
ConnectivityStatus status = 1;
uint32 avg_latency_ms = 2;
uint32 num_node_connections = 3;
// TODO: Implement these
// uint32 incoming_bps = 4;
// uint32 outgoing_bps = 5;
// uint64 total_bytes_read = 6;
// uint64 total_bytes_written = 7;
}

message Address{
bytes address =1;
string last_seen = 2;
uint32 connection_attempts = 3;
uint32 rejected_message_count = 4;
uint64 avg_latency = 5;
}

message ListConnectedPeersResponse {
repeated Peer connected_peers = 1;
}

5 changes: 5 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Expand Up @@ -46,6 +46,10 @@ service Wallet {
rpc CoinSplit (CoinSplitRequest) returns (CoinSplitResponse);
// Import Utxo to wallet
rpc ImportUtxos (ImportUtxosRequest) returns (ImportUtxosResponse);
// Get Base Node network connectivity status
rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse);
// List currently connected peers
rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse);
}

message GetVersionRequest { }
Expand Down Expand Up @@ -179,3 +183,4 @@ message ImportUtxosRequest {
message ImportUtxosResponse {
repeated uint64 tx_ids = 1;
}

14 changes: 13 additions & 1 deletion applications/tari_app_grpc/src/conversions/peer.rs
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{conversions::datetime_to_timestamp, tari_rpc as grpc};
use tari_comms::{net_address::MutliaddrWithStats, peer_manager::Peer};
use tari_comms::{connectivity::ConnectivityStatus, net_address::MutliaddrWithStats, peer_manager::Peer};
use tari_crypto::tari_utilities::ByteArray;

impl From<Peer> for grpc::Peer {
Expand Down Expand Up @@ -90,3 +90,15 @@ impl From<MutliaddrWithStats> for grpc::Address {
}
}
}

impl From<ConnectivityStatus> for grpc::ConnectivityStatus {
fn from(status: ConnectivityStatus) -> Self {
use ConnectivityStatus::*;
match status {
Initializing => grpc::ConnectivityStatus::Initializing,
Online(_) => grpc::ConnectivityStatus::Online,
Degraded(_) => grpc::ConnectivityStatus::Degraded,
Offline => grpc::ConnectivityStatus::Offline,
}
}
}
21 changes: 19 additions & 2 deletions applications/tari_base_node/src/builder.rs
Expand Up @@ -23,7 +23,7 @@
use crate::bootstrap::BaseNodeBootstrapper;
use log::*;
use std::sync::Arc;
use tari_common::{DatabaseType, GlobalConfig};
use tari_common::{configuration::Network, DatabaseType, GlobalConfig};
use tari_comms::{peer_manager::NodeIdentity, protocol::rpc::RpcServerHandle, CommsNode};
use tari_comms_dht::Dht;
use tari_core::{
Expand All @@ -45,7 +45,7 @@ use tari_core::{
DifficultyCalculator,
},
};
use tari_p2p::auto_update::SoftwareUpdaterHandle;
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tokio::sync::watch;
Expand All @@ -57,6 +57,7 @@ const LOG_TARGET: &str = "c::bn::initialization";
/// on the comms stack.
pub struct BaseNodeContext {
config: Arc<GlobalConfig>,
consensus_rules: ConsensusManager,
blockchain_db: BlockchainDatabase<LMDBDatabase>,
base_node_comms: CommsNode,
base_node_dht: Dht,
Expand Down Expand Up @@ -98,6 +99,11 @@ impl BaseNodeContext {
&self.base_node_comms
}

/// Returns the liveness service handle
pub fn liveness(&self) -> LivenessHandle {
self.base_node_handles.expect_handle()
}

/// Returns the base node state machine
pub fn state_machine(&self) -> StateMachineHandle {
self.base_node_handles.expect_handle()
Expand Down Expand Up @@ -128,6 +134,16 @@ impl BaseNodeContext {
self.blockchain_db.clone()
}

/// Returns the configured network
pub fn network(&self) -> Network {
self.config.network
}

/// Returns the consensus rules
pub fn consensus_rules(&self) -> &ConsensusManager {
&self.consensus_rules
}

/// Return the state machine channel to provide info updates
pub fn get_state_machine_info_channel(&self) -> watch::Receiver<StatusInfo> {
self.base_node_handles
Expand Down Expand Up @@ -247,6 +263,7 @@ async fn build_node_context(

Ok(BaseNodeContext {
config,
consensus_rules: rules,
blockchain_db,
base_node_comms,
base_node_dht,
Expand Down
112 changes: 86 additions & 26 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Expand Up @@ -19,24 +19,24 @@
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use crate::grpc::{
blocks::{block_fees, block_heights, block_size, GET_BLOCKS_MAX_HEIGHTS, GET_BLOCKS_PAGE_SIZE},
helpers::{mean, median},
use crate::{
builder::BaseNodeContext,
grpc::{
blocks::{block_fees, block_heights, block_size, GET_BLOCKS_MAX_HEIGHTS, GET_BLOCKS_PAGE_SIZE},
helpers::{mean, median},
},
};
use log::*;
use std::{
cmp,
convert::{TryFrom, TryInto},
sync::Arc,
};

use tari_app_grpc::{
tari_rpc,
tari_rpc::{CalcType, Sorting},
};
use tari_app_utilities::consts;
use tari_common::configuration::Network;
use tari_comms::PeerManager;
use tari_comms::CommsNode;
use tari_core::{
base_node::{
comms_interface::Broadcast,
Expand All @@ -46,13 +46,13 @@ use tari_core::{
},
blocks::{Block, BlockHeader, NewBlockTemplate},
consensus::{emission::Emission, ConsensusManager, NetworkConsensus},
crypto::tari_utilities::hex::Hex,
crypto::tari_utilities::{hex::Hex, ByteArray},
mempool::{service::LocalMempoolService, TxStorageResponse},
proof_of_work::PowAlgorithm,
transactions::{transaction::Transaction, types::Signature},
};
use tari_crypto::tari_utilities::{message_format::MessageFormat, Hashable};
use tari_p2p::auto_update::SoftwareUpdaterHandle;
use tari_p2p::{auto_update::SoftwareUpdaterHandle, services::liveness::LivenessHandle};
use tokio::{sync::mpsc, task};
use tonic::{Request, Response, Status};

Expand All @@ -78,28 +78,23 @@ pub struct BaseNodeGrpcServer {
mempool_service: LocalMempoolService,
network: NetworkConsensus,
state_machine_handle: StateMachineHandle,
peer_manager: Arc<PeerManager>,
consensus_rules: ConsensusManager,
software_updater: SoftwareUpdaterHandle,
comms: CommsNode,
liveness: LivenessHandle,
}

impl BaseNodeGrpcServer {
pub fn new(
local_node: LocalNodeCommsInterface,
local_mempool: LocalMempoolService,
network: Network,
state_machine_handle: StateMachineHandle,
peer_manager: Arc<PeerManager>,
software_updater: SoftwareUpdaterHandle,
) -> Self {
pub fn from_base_node_context(ctx: &BaseNodeContext) -> Self {
Self {
node_service: local_node,
mempool_service: local_mempool,
consensus_rules: ConsensusManager::builder(network).build(),
network: network.into(),
state_machine_handle,
peer_manager,
software_updater,
node_service: ctx.local_node(),
mempool_service: ctx.local_mempool(),
network: ctx.network().into(),
state_machine_handle: ctx.state_machine(),
consensus_rules: ctx.consensus_rules().clone(),
software_updater: ctx.software_updater(),
comms: ctx.base_node_comms().clone(),
liveness: ctx.liveness(),
}
}
}
Expand Down Expand Up @@ -608,7 +603,8 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
debug!(target: LOG_TARGET, "Incoming GRPC request for get all peers");

let peers = self
.peer_manager
.comms
.peer_manager()
.all()
.await
.map_err(|e| Status::unknown(e.to_string()))?;
Expand Down Expand Up @@ -1051,6 +1047,70 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
None => Err(Status::not_found(format!("Header not found with hash `{}`", hash_hex))),
}
}

async fn identify(&self, _: Request<tari_rpc::Empty>) -> Result<Response<tari_rpc::NodeIdentity>, Status> {
let identity = self.comms.node_identity_ref();
Ok(Response::new(tari_rpc::NodeIdentity {
public_key: identity.public_key().to_vec(),
public_address: identity.public_address().to_string(),
node_id: identity.node_id().to_vec(),
}))
}

async fn get_network_status(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::NetworkStatusResponse>, Status> {
let status = self
.comms
.connectivity()
.get_connectivity_status()
.await
.map_err(|err| Status::internal(err.to_string()))?;

let latency = self
.liveness
.clone()
.get_network_avg_latency()
.await
.map_err(|err| Status::internal(err.to_string()))?;

let resp = tari_rpc::NetworkStatusResponse {
status: tari_rpc::ConnectivityStatus::from(status) as i32,
avg_latency_ms: latency.unwrap_or_default(),
num_node_connections: status.num_connected_nodes() as u32,
};

Ok(Response::new(resp))
}

async fn list_connected_peers(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<tari_rpc::ListConnectedPeersResponse>, Status> {
let mut connectivity = self.comms.connectivity();
let peer_manager = self.comms.peer_manager();
let connected_peers = connectivity
.get_active_connections()
.await
.map_err(|err| Status::internal(err.to_string()))?;

let mut peers = Vec::with_capacity(connected_peers.len());
for peer in connected_peers {
peers.push(
peer_manager
.find_by_node_id(peer.peer_node_id())
.await
.map_err(|err| Status::internal(err.to_string()))?,
);
}

let resp = tari_rpc::ListConnectedPeersResponse {
connected_peers: peers.into_iter().map(Into::into).collect(),
};

Ok(Response::new(resp))
}
}

enum BlockGroupType {
Expand Down
10 changes: 1 addition & 9 deletions applications/tari_base_node/src/main.rs
Expand Up @@ -216,15 +216,7 @@ async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) ->

if node_config.grpc_enabled {
// Go, GRPC, go go
let grpc = crate::grpc::base_node_grpc_server::BaseNodeGrpcServer::new(
ctx.local_node(),
ctx.local_mempool(),
node_config.network,
ctx.state_machine(),
ctx.base_node_comms().peer_manager(),
ctx.software_updater(),
);

let grpc = crate::grpc::base_node_grpc_server::BaseNodeGrpcServer::from_base_node_context(&ctx);
task::spawn(run_grpc(grpc, node_config.grpc_base_node_address, shutdown.to_signal()));
}

Expand Down

0 comments on commit 17f37fb

Please sign in to comment.