Skip to content

Commit

Permalink
feat: add sync rpc client pool to wallet connectivity (#3199)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Aug 17, 2021
2 parents 26e9748 + d7e7be0 commit 305aeda
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 42 deletions.
31 changes: 19 additions & 12 deletions base_layer/core/src/base_node/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,6 @@
mod service;
#[cfg(feature = "base_node")]
use crate::base_node::StateMachineHandle;
use crate::proto::{
base_node::{
FetchMatchingUtxos,
FetchUtxosResponse,
Signatures,
TipInfoResponse,
TxQueryBatchResponses,
TxQueryResponse,
TxSubmissionResponse,
},
types::{Signature, Transaction},
};
#[cfg(feature = "base_node")]
use crate::{
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
Expand All @@ -44,6 +32,22 @@ use crate::{
#[cfg(feature = "base_node")]
pub use service::BaseNodeWalletRpcService;

use crate::{
proto,
proto::{
base_node::{
FetchMatchingUtxos,
FetchUtxosResponse,
Signatures,
TipInfoResponse,
TxQueryBatchResponses,
TxQueryResponse,
TxSubmissionResponse,
},
types::{Signature, Transaction},
},
};

use tari_comms::protocol::rpc::{Request, Response, RpcStatus};
use tari_comms_rpc_macros::tari_rpc;

Expand Down Expand Up @@ -72,6 +76,9 @@ pub trait BaseNodeWalletService: Send + Sync + 'static {

#[rpc(method = 5)]
async fn get_tip_info(&self, request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus>;

#[rpc(method = 6)]
async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus>;
}

#[cfg(feature = "base_node")]
Expand Down
13 changes: 13 additions & 0 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput},
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
proto::{
base_node::{
FetchMatchingUtxos,
Expand Down Expand Up @@ -327,4 +328,16 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
is_synced,
}))
}

async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
let height = request.into_message();
let header = self
.db()
.fetch_header(height)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("Header not found at height {}", height)))?;

Ok(Response::new(header.into()))
}
}
4 changes: 2 additions & 2 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
let peer_node_id = self.update_connectivity_status().await;
let mut client = self
.wallet_connectivity
.obtain_base_node_rpc_client()
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;
let latency = client.get_last_request_latency().await?;
trace!(
debug!(
target: LOG_TARGET,
"Base node {} latency: {} ms",
peer_node_id,
Expand Down
23 changes: 20 additions & 3 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use futures::{
SinkExt,
};
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcClientLease};
use tari_core::base_node::rpc;
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::sync::watch;

pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<rpc::BaseNodeWalletRpcClient>>),
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
SetBaseNode(NodeId),
}

Expand Down Expand Up @@ -68,7 +69,7 @@ impl WalletConnectivityHandle {
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeWalletRpcClient RPC session.
pub async fn obtain_base_node_rpc_client(&mut self) -> Option<RpcClientLease<rpc::BaseNodeWalletRpcClient>> {
pub async fn obtain_base_node_wallet_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeWalletRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
// Under what conditions do the (1) mpsc channel and (2) oneshot channel error?
// (1) when the receiver has been dropped
Expand All @@ -85,6 +86,22 @@ impl WalletConnectivityHandle {
reply_rx.await.ok()
}

/// Obtain a BaseNodeSyncRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeSyncRpcClient rpc session from a currently selected base
/// node/nodes. It will be block until this is happens. The ONLY other time it will return is if the node is
/// shutting down, where it will return None. Use this function whenever no work can be done without a
/// BaseNodeSyncRpcClient RPC session.
pub async fn obtain_base_node_sync_rpc_client(&mut self) -> Option<RpcClientLease<BaseNodeSyncRpcClient>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(WalletConnectivityRequest::ObtainBaseNodeSyncRpcClient(reply_tx))
.await
.ok()?;

reply_rx.await.ok()
}

pub async fn get_connectivity_status(&mut self) -> OnlineStatus {
self.online_status_rx.recv().await.unwrap_or(OnlineStatus::Offline)
}
Expand Down
111 changes: 93 additions & 18 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tari_comms::{
peer_manager::NodeId,
protocol::rpc::{RpcClientLease, RpcClientPool},
};
use tari_core::base_node::rpc;
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::time;

const LOG_TARGET: &str = "wallet::connectivity";
Expand All @@ -54,9 +54,14 @@ pub struct WalletConnectivityService {
request_stream: Fuse<mpsc::Receiver<WalletConnectivityRequest>>,
connectivity: ConnectivityRequester,
base_node_watch: Watch<Option<NodeId>>,
base_node_wallet_rpc_client_pool: Option<RpcClientPool<rpc::BaseNodeWalletRpcClient>>,
pools: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_base_node_rpc_requests: Vec<oneshot::Sender<RpcClientLease<rpc::BaseNodeWalletRpcClient>>>,
pending_requests: Vec<ReplyOneshot>,
}

struct ClientPoolContainer {
pub base_node_wallet_rpc_client: RpcClientPool<BaseNodeWalletRpcClient>,
pub base_node_sync_rpc_client: RpcClientPool<BaseNodeSyncRpcClient>,
}

impl WalletConnectivityService {
Expand All @@ -72,8 +77,8 @@ impl WalletConnectivityService {
request_stream: request_stream.fuse(),
connectivity,
base_node_watch,
base_node_wallet_rpc_client_pool: None,
pending_base_node_rpc_requests: Vec::new(),
pools: None,
pending_requests: Vec::new(),
online_status_watch,
}
}
Expand All @@ -100,7 +105,10 @@ impl WalletConnectivityService {
use WalletConnectivityRequest::*;
match request {
ObtainBaseNodeWalletRpcClient(reply) => {
self.handle_get_base_node_wallet_rpc_client(reply).await;
self.handle_pool_request(reply.into()).await;
},
ObtainBaseNodeSyncRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},

SetBaseNode(peer) => {
Expand All @@ -109,12 +117,20 @@ impl WalletConnectivityService {
}
}

async fn handle_pool_request(&mut self, reply: ReplyOneshot) {
use ReplyOneshot::*;
match reply {
WalletRpc(tx) => self.handle_get_base_node_wallet_rpc_client(tx).await,
SyncRpc(tx) => self.handle_get_base_node_sync_rpc_client(tx).await,
}
}

async fn handle_get_base_node_wallet_rpc_client(
&mut self,
reply: oneshot::Sender<RpcClientLease<rpc::BaseNodeWalletRpcClient>>,
reply: oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>,
) {
match self.base_node_wallet_rpc_client_pool {
Some(ref pool) => match pool.get().await {
match self.pools {
Some(ref pools) => match pools.base_node_wallet_rpc_client.get().await {
Ok(client) => {
let _ = reply.send(client);
},
Expand All @@ -124,16 +140,47 @@ impl WalletConnectivityService {
"Base node connection failed: {}. Reconnecting...", e
);
self.trigger_reconnect();
self.pending_base_node_rpc_requests.push(reply);
self.pending_requests.push(reply.into());
},
},
None => {
self.pending_base_node_rpc_requests.push(reply);
self.pending_requests.push(reply.into());
if self.base_node_watch.borrow().is_none() {
warn!(
target: LOG_TARGET,
"{} requests are waiting for base node to be set",
self.pending_base_node_rpc_requests.len()
self.pending_requests.len()
);
}
},
}
}

async fn handle_get_base_node_sync_rpc_client(
&mut self,
reply: oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>,
) {
match self.pools {
Some(ref pools) => match pools.base_node_sync_rpc_client.get().await {
Ok(client) => {
let _ = reply.send(client);
},
Err(e) => {
warn!(
target: LOG_TARGET,
"Base node connection failed: {}. Reconnecting...", e
);
self.trigger_reconnect();
self.pending_requests.push(reply.into());
},
},
None => {
self.pending_requests.push(reply.into());
if self.base_node_watch.borrow().is_none() {
warn!(
target: LOG_TARGET,
"{} requests are waiting for base node to be set",
self.pending_requests.len()
);
}
},
Expand All @@ -151,12 +198,12 @@ impl WalletConnectivityService {
}

fn set_base_node_peer(&mut self, peer: NodeId) {
self.base_node_wallet_rpc_client_pool = None;
self.pools = None;
self.base_node_watch.broadcast(Some(peer));
}

async fn setup_base_node_connection(&mut self, peer: NodeId) {
self.base_node_wallet_rpc_client_pool = None;
self.pools = None;
loop {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -194,8 +241,10 @@ impl WalletConnectivityService {
"Successfully established peer connection to base node {}",
conn.peer_node_id()
);
let pool = conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size);
self.base_node_wallet_rpc_client_pool = Some(pool);
self.pools = Some(ClientPoolContainer {
base_node_sync_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size),
base_node_wallet_rpc_client: conn.create_rpc_client_pool(self.config.base_node_rpc_pool_size),
});
self.notify_pending_requests().await?;
debug!(
target: LOG_TARGET,
Expand All @@ -206,14 +255,40 @@ impl WalletConnectivityService {
}

async fn notify_pending_requests(&mut self) -> Result<(), WalletConnectivityError> {
let current_pending = mem::take(&mut self.pending_base_node_rpc_requests);
let current_pending = mem::take(&mut self.pending_requests);
for reply in current_pending {
if reply.is_canceled() {
continue;
}

self.handle_get_base_node_wallet_rpc_client(reply).await;
self.handle_pool_request(reply).await;
}
Ok(())
}
}

enum ReplyOneshot {
WalletRpc(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
SyncRpc(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
}

impl ReplyOneshot {
pub fn is_canceled(&self) -> bool {
use ReplyOneshot::*;
match self {
WalletRpc(tx) => tx.is_canceled(),
SyncRpc(tx) => tx.is_canceled(),
}
}
}

impl From<oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>> for ReplyOneshot {
fn from(tx: oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>) -> Self {
ReplyOneshot::WalletRpc(tx)
}
}
impl From<oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>> for ReplyOneshot {
fn from(tx: oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>) -> Self {
ReplyOneshot::SyncRpc(tx)
}
}
12 changes: 6 additions & 6 deletions base_layer/wallet/src/connectivity_service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async fn it_dials_peer_when_base_node_is_set() {
// Now a connection will given to the service
mock_state.add_active_connection(conn).await;

let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}

Expand All @@ -106,7 +106,7 @@ async fn it_resolves_many_pending_rpc_session_requests() {
let pending_requests = iter::repeat_with(|| {
let mut handle = handle.clone();
task::spawn(async move {
let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
rpc_client.is_connected()
})
})
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn it_changes_to_a_new_base_node() {
assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1);
let _ = mock_state.take_calls().await;

let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());

// Initiate a connection to the base node
Expand All @@ -150,7 +150,7 @@ async fn it_changes_to_a_new_base_node() {
mock_state.expect_dial_peer(base_node_peer2.node_id()).await;
assert_eq!(mock_state.count_calls_containing("AddManagedPeer").await, 1);

let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}

Expand Down Expand Up @@ -178,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() {
let barrier = barrier.clone();
async move {
barrier.wait().await;
let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}
});
Expand Down Expand Up @@ -215,7 +215,7 @@ async fn it_gracefully_handles_multiple_connection_failures() {
let barrier = barrier.clone();
async move {
barrier.wait().await;
let rpc_client = handle.obtain_base_node_rpc_client().await.unwrap();
let rpc_client = handle.obtain_base_node_wallet_rpc_client().await.unwrap();
assert!(rpc_client.is_connected());
}
});
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl fmt::Display for OutputManagerRequest {
GetPublicRewindKeys => write!(f, "GetPublicRewindKeys"),
FeeEstimate(_) => write!(f, "FeeEstimate"),
ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"),
ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"),
ScanOutputs(_) => write!(f, "ScanOutputs"),
AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"),
ReinstateCancelledInboundTx(_) => write!(f, "ReinstateCancelledInboundTx"),
}
Expand Down
Loading

0 comments on commit 305aeda

Please sign in to comment.