diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index b4ec645952..f3a4c7d3b5 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -54,6 +54,7 @@ use tari_comms::types::CommsPublicKey; use tari_comms_dht::event::{DhtEvent, DhtEventReceiver}; use tari_shutdown::ShutdownSignal; use tari_wallet::{ + connectivity_service::OnlineStatus, output_manager_service::{ handle::{OutputManagerEvent, OutputManagerEventReceiver, OutputManagerHandle}, service::Balance, @@ -66,6 +67,7 @@ use tari_wallet::{ }, }, }; +use tokio::sync::watch; const LOG_TARGET: &str = "wallet::transaction_service::callback_handler"; @@ -85,6 +87,7 @@ where TBackend: TransactionBackend + 'static callback_balance_updated: unsafe extern "C" fn(*mut Balance), callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool), callback_saf_messages_received: unsafe extern "C" fn(), + callback_connectivity_status: unsafe extern "C" fn(u64), db: TransactionDatabase, transaction_service_event_stream: TransactionEventReceiver, output_manager_service_event_stream: OutputManagerEventReceiver, @@ -93,6 +96,7 @@ where TBackend: TransactionBackend + 'static shutdown_signal: Option, comms_public_key: CommsPublicKey, balance_cache: Balance, + connectivity_status_watch: watch::Receiver, } #[allow(clippy::too_many_arguments)] @@ -107,6 +111,7 @@ where TBackend: TransactionBackend + 'static dht_event_stream: DhtEventReceiver, shutdown_signal: ShutdownSignal, comms_public_key: CommsPublicKey, + connectivity_status_watch: watch::Receiver, callback_received_transaction: unsafe extern "C" fn(*mut InboundTransaction), callback_received_transaction_reply: unsafe extern "C" fn(*mut CompletedTransaction), callback_received_finalized_transaction: unsafe extern "C" fn(*mut CompletedTransaction), @@ -120,6 +125,7 @@ where TBackend: TransactionBackend + 'static callback_balance_updated: unsafe extern "C" fn(*mut Balance), callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool), callback_saf_messages_received: unsafe extern "C" fn(), + callback_connectivity_status: unsafe extern "C" fn(u64), ) -> Self { info!( target: LOG_TARGET, @@ -173,6 +179,10 @@ where TBackend: TransactionBackend + 'static target: LOG_TARGET, "SafMessagesReceivedCallback -> Assigning Fn: {:?}", callback_saf_messages_received ); + info!( + target: LOG_TARGET, + "ConnectivityStatusCallback -> Assigning Fn: {:?}", callback_connectivity_status + ); Self { callback_received_transaction, @@ -188,6 +198,7 @@ where TBackend: TransactionBackend + 'static callback_balance_updated, callback_transaction_validation_complete, callback_saf_messages_received, + callback_connectivity_status, db, transaction_service_event_stream, output_manager_service_event_stream, @@ -196,6 +207,7 @@ where TBackend: TransactionBackend + 'static shutdown_signal: Some(shutdown_signal), comms_public_key, balance_cache: Balance::zero(), + connectivity_status_watch, } } @@ -302,6 +314,11 @@ where TBackend: TransactionBackend + 'static Err(_e) => error!(target: LOG_TARGET, "Error reading from DHT event broadcast channel"), } } + Ok(_) = self.connectivity_status_watch.changed() => { + let status = *self.connectivity_status_watch.borrow(); + trace!(target: LOG_TARGET, "Connectivity status change detected: {:?}", status); + self.connectivity_status_changed(status); + }, _ = shutdown_signal.wait() => { info!(target: LOG_TARGET, "Transaction Callback Handler shutting down because the shutdown signal was received"); break; @@ -516,4 +533,14 @@ where TBackend: TransactionBackend + 'static (self.callback_saf_messages_received)(); } } + + fn connectivity_status_changed(&mut self, status: OnlineStatus) { + debug!( + target: LOG_TARGET, + "Calling Connectivity Status changed callback function" + ); + unsafe { + (self.callback_connectivity_status)(status as u64); + } + } } diff --git a/base_layer/wallet_ffi/src/callback_handler_tests.rs b/base_layer/wallet_ffi/src/callback_handler_tests.rs index e01b1c0410..64732fc9b1 100644 --- a/base_layer/wallet_ffi/src/callback_handler_tests.rs +++ b/base_layer/wallet_ffi/src/callback_handler_tests.rs @@ -45,6 +45,7 @@ mod test { use tari_service_framework::reply_channel; use tari_shutdown::Shutdown; use tari_wallet::{ + connectivity_service::OnlineStatus, output_manager_service::{ handle::{OutputManagerEvent, OutputManagerHandle}, service::Balance, @@ -60,7 +61,11 @@ mod test { }, }, }; - use tokio::{runtime::Runtime, sync::broadcast, time::Instant}; + use tokio::{ + runtime::Runtime, + sync::{broadcast, watch}, + time::Instant, + }; use crate::{callback_handler::CallbackHandler, output_manager_service_mock::MockOutputManagerService}; @@ -81,6 +86,7 @@ mod test { pub callback_balance_updated: u32, pub callback_transaction_validation_complete: u32, pub saf_messages_received: bool, + pub connectivity_status_callback_called: u64, } impl CallbackState { @@ -101,6 +107,7 @@ mod test { tx_cancellation_callback_called_inbound: false, tx_cancellation_callback_called_outbound: false, saf_messages_received: false, + connectivity_status_callback_called: 0, } } } @@ -200,6 +207,12 @@ mod test { drop(lock); } + unsafe extern "C" fn connectivity_status_callback(status: u64) { + let mut lock = CALLBACK_STATE.lock().unwrap(); + lock.connectivity_status_callback_called += status + 1; + drop(lock); + } + #[test] fn test_callback_handler() { let runtime = Runtime::new().unwrap(); @@ -300,6 +313,8 @@ mod test { runtime.spawn(mock_output_manager_service.run()); assert_eq!(balance, runtime.block_on(oms_handle.get_balance()).unwrap()); + let (connectivity_tx, connectivity_rx) = watch::channel(OnlineStatus::Offline); + let callback_handler = CallbackHandler::new( db, transaction_event_receiver, @@ -308,6 +323,7 @@ mod test { dht_event_receiver, shutdown_signal.to_signal(), PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + connectivity_rx, received_tx_callback, received_tx_reply_callback, received_tx_finalized_callback, @@ -321,6 +337,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, ); runtime.spawn(callback_handler.start()); @@ -506,6 +523,14 @@ mod test { dht_event_sender .send(Arc::new(DhtEvent::StoreAndForwardMessagesReceived)) .unwrap(); + thread::sleep(Duration::from_secs(2)); + connectivity_tx.send(OnlineStatus::Offline).unwrap(); + thread::sleep(Duration::from_secs(2)); + connectivity_tx.send(OnlineStatus::Connecting).unwrap(); + thread::sleep(Duration::from_secs(2)); + connectivity_tx.send(OnlineStatus::Online).unwrap(); + thread::sleep(Duration::from_secs(2)); + connectivity_tx.send(OnlineStatus::Connecting).unwrap(); thread::sleep(Duration::from_secs(10)); @@ -525,6 +550,7 @@ mod test { assert_eq!(lock.callback_txo_validation_complete, 3); assert_eq!(lock.callback_balance_updated, 5); assert_eq!(lock.callback_transaction_validation_complete, 7); + assert_eq!(lock.connectivity_status_callback_called, 7); drop(lock); } diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 91fc6d8e16..5041dd5084 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -139,6 +139,7 @@ use tari_p2p::{ use tari_shutdown::Shutdown; use tari_utilities::{hex, hex::Hex}; use tari_wallet::{ + connectivity_service::WalletConnectivityInterface, contacts_service::storage::database::Contact, error::{WalletError, WalletStorageError}, storage::{ @@ -3260,6 +3261,13 @@ unsafe fn init_logging( /// `callback_saf_message_received` - The callback function pointer that will be called when the Dht has determined that /// is has connected to enough of its neighbours to be confident that it has received any SAF messages that were waiting /// for it. +/// `callback_connectivity_status` - This callback is called when the status of connection to the set base node +/// changes. it will return an enum encoded as an integer as follows: +/// pub enum OnlineStatus { +/// Connecting, // 0 +/// Online, // 1 +/// Offline, // 2 +/// } /// `recovery_in_progress` - Pointer to an bool which will be modified to indicate if there is an outstanding recovery /// that should be completed or not to an error code should one occur, may not be null. Functions as an out parameter. /// `error_out` - Pointer to an int which will be modified @@ -3292,6 +3300,7 @@ pub unsafe extern "C" fn wallet_create( callback_balance_updated: unsafe extern "C" fn(*mut TariBalance), callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool), callback_saf_messages_received: unsafe extern "C" fn(), + callback_connectivity_status: unsafe extern "C" fn(u64), recovery_in_progress: *mut bool, error_out: *mut c_int, ) -> *mut TariWallet { @@ -3483,6 +3492,7 @@ pub unsafe extern "C" fn wallet_create( w.dht_service.subscribe_dht_events(), w.comms.shutdown_signal(), w.comms.node_identity().public_key().clone(), + w.wallet_connectivity.get_connectivity_status_watch(), callback_received_transaction, callback_received_transaction_reply, callback_received_finalized_transaction, @@ -3496,6 +3506,7 @@ pub unsafe extern "C" fn wallet_create( callback_balance_updated, callback_transaction_validation_complete, callback_saf_messages_received, + callback_connectivity_status, ); runtime.spawn(callback_handler.start()); @@ -6199,6 +6210,10 @@ mod test { // assert!(true); //optimized out by compiler } + unsafe extern "C" fn connectivity_status_callback(_status: u64) { + // assert!(true); //optimized out by compiler + } + const NETWORK_STRING: &str = "dibbler"; #[test] @@ -6571,6 +6586,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6607,6 +6623,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6709,6 +6726,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6756,6 +6774,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6786,6 +6805,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6811,6 +6831,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6857,6 +6878,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -6932,6 +6954,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -7138,6 +7161,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); @@ -7192,6 +7216,7 @@ mod test { balance_updated_callback, transaction_validation_complete_callback, saf_messages_received_callback, + connectivity_status_callback, recovery_in_progress_ptr, error_ptr, ); diff --git a/base_layer/wallet_ffi/wallet.h b/base_layer/wallet_ffi/wallet.h index 742f3971f8..889aaea741 100644 --- a/base_layer/wallet_ffi/wallet.h +++ b/base_layer/wallet_ffi/wallet.h @@ -478,6 +478,13 @@ struct TariPublicKeys *comms_list_connected_public_keys(struct TariWallet *walle /// `callback_saf_message_received` - The callback function pointer that will be called when the Dht has determined that /// is has connected to enough of its neighbours to be confident that it has received any SAF messages that were waiting /// for it. +/// `callback_connectivity_status` - This callback is called when the status of connection to the set base node changes. +/// it will return an enum encoded as an integer as follows: +/// pub enum OnlineStatus { +/// Connecting, // 0 +/// Online, // 1 +/// Offline, // 2 +/// } /// `recovery_in_progress` - Pointer to an bool which will be modified to indicate if there is an outstanding recovery /// that should be completed or not to an error code should one occur, may not be null. Functions as an out parameter. /// `error_out` - Pointer to an int which will be modified @@ -515,6 +522,7 @@ struct TariWallet *wallet_create(struct TariCommsConfig *config, void (*callback_balance_updated)(struct TariBalance *), void (*callback_transaction_validation_complete)(unsigned long long, bool), void (*callback_saf_message_received)(), + void (*callback_connectivity_status)(unsigned long long), bool *recovery_in_progress, int *error_out); diff --git a/integration_tests/helpers/ffi/ffiInterface.js b/integration_tests/helpers/ffi/ffiInterface.js index 048041480a..2f2f455fa2 100644 --- a/integration_tests/helpers/ffi/ffiInterface.js +++ b/integration_tests/helpers/ffi/ffiInterface.js @@ -291,6 +291,7 @@ class InterfaceFFI { this.ptr, this.ptr, this.ptr, + this.ptr, this.boolPtr, this.intPtr, ], @@ -1165,6 +1166,9 @@ class InterfaceFFI { fn ); } + static createCallbackConnectivityStatus(fn) { + return ffi.Callback(this.void, [this.ulonglong], fn); + } //endregion static walletCreate( @@ -1186,7 +1190,8 @@ class InterfaceFFI { callback_txo_validation_complete, callback_balance_updated, callback_transaction_validation_complete, - callback_saf_message_received + callback_saf_message_received, + callback_connectivity_status ) { let error = this.initError(); let recovery_in_progress = this.initBool(); @@ -1211,6 +1216,7 @@ class InterfaceFFI { callback_balance_updated, callback_transaction_validation_complete, callback_saf_message_received, + callback_connectivity_status, recovery_in_progress, error ); diff --git a/integration_tests/helpers/ffi/wallet.js b/integration_tests/helpers/ffi/wallet.js index d48a4e3cda..d307660dd2 100644 --- a/integration_tests/helpers/ffi/wallet.js +++ b/integration_tests/helpers/ffi/wallet.js @@ -43,6 +43,7 @@ class Wallet { callback_balance_updated; callback_transaction_validation_complete; callback_saf_message_received; + callback_connectivity_status; recoveryProgressCallback; getTxoValidationStatus() { @@ -141,6 +142,10 @@ class Wallet { this.recoveryProgressCallback = InterfaceFFI.createRecoveryProgressCallback( this.onRecoveryProgress ); + this.callback_connectivity_status = + InterfaceFFI.createCallbackConnectivityStatus( + this.onConnectivityStatusChange + ); //endregion this.receivedTransaction = 0; @@ -180,7 +185,8 @@ class Wallet { this.callback_txo_validation_complete, this.callback_balance_updated, this.callback_transaction_validation_complete, - this.callback_saf_message_received + this.callback_saf_message_received, + this.callback_connectivity_status ); } @@ -328,6 +334,10 @@ class Wallet { return InterfaceFFI.walletIsRecoveryInProgress(this.ptr); } + onConnectivityStatusChange = (status) => { + console.log("Connectivity Status Changed to ", status); + }; + getPublicKey() { let ptr = InterfaceFFI.walletGetPublicKey(this.ptr); let pk = new PublicKey(); @@ -463,6 +473,7 @@ class Wallet { this.callback_transaction_validation_complete = this.callback_saf_message_received = this.recoveryProgressCallback = + this.callback_connectivity_status = undefined; // clear callback function pointers } }