Skip to content

Commit

Permalink
feat: wallet connectivity service (#3159)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
<!--- Describe your changes in detail -->
Adds a service responsible for wallet connectivity. This service
is responsible for and abstracts any complexity in the management of
the base node connections and RPC session management.

This PR makes use of this service in the base node montoring service but
does not "plumb" the WalletConenctivityService into the protocols. This
is left as a TODO, but we should expect this to remove many lines of
code and greaty simplify these protocols by removing the budren of
connection management in the various wallet components.

A number of simplifications on the peer connection and substream code,
debatably reducing places that bugs could hide.

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->
Follow on from #3152, making use of the pool in the wallet base node monitoring. The rest of the protocols should follow. 

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
Extensive service unit tests
Existing tests pass

Washing machine test on local wallet, with the broadcast protocol modified to use the wallet connectivity 
Base node RPC sessions max out at 10 each (2 wallets connected to same base node) even when pushing through 426 transactions.
![image](https://user-images.githubusercontent.com/1057902/128175630-52617736-f71f-4865-a915-6d6c9b00e8d5.png)

Manually tested wallet connectivity states (Connecting -> Online | Connecting -> Offline | Connecting -> Online -> Offline -> Connecting -> Online)

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
aviator-app[bot] committed Aug 10, 2021
2 parents 2ded2ac + e643b42 commit 54e8c8e
Show file tree
Hide file tree
Showing 31 changed files with 1,196 additions and 364 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::{components::Component, state::AppState};
use tari_wallet::base_node_service::service::OnlineState;
use tari_wallet::connectivity_service::OnlineStatus;
use tui::{
backend::Backend,
layout::Rect,
Expand All @@ -45,17 +45,17 @@ impl<B: Backend> Component<B> for BaseNode {
let base_node_state = app_state.get_base_node_state();

let chain_info = match base_node_state.online {
OnlineState::Connecting => Spans::from(vec![
OnlineStatus::Connecting => Spans::from(vec![
Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)),
Span::raw(" "),
Span::styled("Connecting...", Style::default().fg(Color::Reset)),
]),
OnlineState::Offline => Spans::from(vec![
OnlineStatus::Offline => Spans::from(vec![
Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)),
Span::raw(" "),
Span::styled("Offline", Style::default().fg(Color::Red)),
]),
OnlineState::Online => {
OnlineStatus::Online => {
if let Some(metadata) = base_node_state.clone().chain_metadata {
let tip = metadata.height_of_longest_chain();

Expand Down
91 changes: 38 additions & 53 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Expand Up @@ -68,7 +68,10 @@ use tari_wallet::{
util::emoji::EmojiId,
WalletSqlite,
};
use tokio::sync::{watch, RwLock};
use tokio::{
sync::{watch, RwLock},
task,
};

const LOG_TARGET: &str = "wallet::console_wallet::app_state";

Expand Down Expand Up @@ -665,15 +668,7 @@ impl AppStateInner {
)
.await?;

if let Err(e) = self
.wallet
.transaction_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}
self.validate_outputs().await;
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
self.data.base_node_selected = peer.clone();
Expand Down Expand Up @@ -701,15 +696,7 @@ impl AppStateInner {
)
.await?;

if let Err(e) = self
.wallet
.transaction_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}
self.validate_outputs().await;
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
self.data.base_node_selected = peer.clone();
Expand Down Expand Up @@ -751,15 +738,7 @@ impl AppStateInner {
)
.await?;

if let Err(e) = self
.wallet
.transaction_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}
self.validate_outputs().await;
self.spawn_transaction_revalidation_task();

self.data.base_node_peer_custom = None;
self.data.base_node_selected = previous;
Expand All @@ -778,33 +757,39 @@ impl AppStateInner {
Ok(())
}

pub async fn validate_outputs(&mut self) {
if let Err(e) = self
.wallet
.output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating UTXOs: {}", e);
}
pub fn spawn_transaction_revalidation_task(&mut self) {
let mut txn_service = self.wallet.transaction_service.clone();
let mut output_manager_service = self.wallet.output_manager_service.clone();

if let Err(e) = self
.wallet
.output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating STXOs: {}", e);
}
task::spawn(async move {
if let Err(e) = txn_service
.validate_transactions(ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating transactions: {}", e);
}

if let Err(e) = self
.wallet
.output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating Invalid TXOs: {}", e);
}
if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Unspent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating UTXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Spent, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating STXOs: {}", e);
}

if let Err(e) = output_manager_service
.validate_txos(TxoValidationType::Invalid, ValidationRetryStrategy::UntilSuccess)
.await
{
error!(target: LOG_TARGET, "Problem validating Invalid TXOs: {}", e);
}
});
}
}

Expand Down
Expand Up @@ -70,7 +70,9 @@ impl CrosstermEvents {
) {
Ok(true) => {
if let Ok(CEvent::Key(key)) = event::read() {
tx.send(Event::Input(key)).unwrap();
if let Err(e) = tx.send(Event::Input(key)) {
warn!(target: LOG_TARGET, "Error sending Tick event on MPSC channel: {}", e);
}
}
},
Ok(false) => {},
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/base_node_service/config.rs
Expand Up @@ -28,13 +28,15 @@ const LOG_TARGET: &str = "wallet::base_node_service::config";
#[derive(Clone, Debug)]
pub struct BaseNodeServiceConfig {
pub base_node_monitor_refresh_interval: Duration,
pub base_node_rpc_pool_size: usize,
pub request_max_age: Duration,
}

impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(5),
base_node_rpc_pool_size: 10,
request_max_age: Duration::from_secs(60),
}
}
Expand All @@ -51,6 +53,7 @@ impl BaseNodeServiceConfig {
Self {
base_node_monitor_refresh_interval: Duration::from_secs(refresh_interval),
request_max_age: Duration::from_secs(request_max_age),
..Default::default()
}
}
}
4 changes: 3 additions & 1 deletion base_layer/wallet/src/base_node_service/error.rs
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::error::WalletStorageError;
use crate::{connectivity_service::WalletConnectivityError, error::WalletStorageError};
use tari_comms::{connectivity::ConnectivityError, protocol::rpc::RpcError};
use tari_comms_dht::outbound::DhtOutboundError;
use tari_service_framework::reply_channel::TransportChannelError;
Expand All @@ -46,4 +46,6 @@ pub enum BaseNodeServiceError {
InvalidBaseNodeResponse(String),
#[error("Wallet storage error: `{0}`")]
WalletStorageError(#[from] WalletStorageError),
#[error("Wallet connectivity error: `{0}`")]
WalletConnectivityError(#[from] WalletConnectivityError),
}
6 changes: 2 additions & 4 deletions base_layer/wallet/src/base_node_service/handle.rs
Expand Up @@ -22,11 +22,9 @@

use super::{error::BaseNodeServiceError, service::BaseNodeState};
use futures::{stream::Fuse, StreamExt};
use std::sync::Arc;
use tari_comms::peer_manager::Peer;

use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::Peer;
use tari_service_framework::reply_channel::SenderService;
use tokio::sync::broadcast;
use tower::Service;
Expand Down
17 changes: 10 additions & 7 deletions base_layer/wallet/src/base_node_service/mock_base_node_service.rs
Expand Up @@ -20,10 +20,13 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::base_node_service::{
error::BaseNodeServiceError,
handle::{BaseNodeServiceRequest, BaseNodeServiceResponse},
service::{BaseNodeState, OnlineState},
use crate::{
base_node_service::{
error::BaseNodeServiceError,
handle::{BaseNodeServiceRequest, BaseNodeServiceResponse},
service::BaseNodeState,
},
connectivity_service::OnlineStatus,
};
use futures::StreamExt;
use tari_common_types::chain_metadata::ChainMetadata;
Expand Down Expand Up @@ -81,9 +84,9 @@ impl MockBaseNodeService {
let (chain_metadata, is_synced, online) = match height {
Some(height) => {
let metadata = ChainMetadata::new(height, Vec::new(), 0, 0, 0);
(Some(metadata), Some(true), OnlineState::Online)
(Some(metadata), Some(true), OnlineStatus::Online)
},
None => (None, None, OnlineState::Offline),
None => (None, None, OnlineStatus::Offline),
};
self.state = BaseNodeState {
chain_metadata,
Expand All @@ -102,7 +105,7 @@ impl MockBaseNodeService {
is_synced: Some(true),
updated: None,
latency: None,
online: OnlineState::Online,
online: OnlineStatus::Online,
base_node_peer: None,
}
}
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/base_node_service/mod.rs
Expand Up @@ -30,10 +30,10 @@ mod monitor;

use crate::{
base_node_service::{config::BaseNodeServiceConfig, handle::BaseNodeServiceHandle, service::BaseNodeService},
connectivity_service::WalletConnectivityHandle,
storage::database::{WalletBackend, WalletDatabase},
};
use log::*;
use tari_comms::connectivity::ConnectivityRequester;
use tari_service_framework::{
async_trait,
reply_channel,
Expand Down Expand Up @@ -80,12 +80,12 @@ where T: WalletBackend + 'static
let db = self.db.clone();

context.spawn_when_ready(move |handles| async move {
let connectivity_manager = handles.expect_handle::<ConnectivityRequester>();
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();

let service = BaseNodeService::new(
config,
request_stream,
connectivity_manager,
wallet_connectivity,
event_publisher,
handles.get_shutdown_signal(),
db,
Expand Down

0 comments on commit 54e8c8e

Please sign in to comment.