Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wallet_ffi)!: add base node connectivity callback to wallet ffi #3796

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions base_layer/wallet_ffi/src/callback_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use tari_comms::types::CommsPublicKey;
use tari_comms_dht::event::{DhtEvent, DhtEventReceiver};
use tari_shutdown::ShutdownSignal;
use tari_wallet::{
connectivity_service::OnlineStatus,
output_manager_service::{
handle::{OutputManagerEvent, OutputManagerEventReceiver, OutputManagerHandle},
service::Balance,
Expand All @@ -66,6 +67,7 @@ use tari_wallet::{
},
},
};
use tokio::sync::watch;

const LOG_TARGET: &str = "wallet::transaction_service::callback_handler";

Expand All @@ -85,6 +87,7 @@ where TBackend: TransactionBackend + 'static
callback_balance_updated: unsafe extern "C" fn(*mut Balance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
db: TransactionDatabase<TBackend>,
transaction_service_event_stream: TransactionEventReceiver,
output_manager_service_event_stream: OutputManagerEventReceiver,
Expand All @@ -93,6 +96,7 @@ where TBackend: TransactionBackend + 'static
shutdown_signal: Option<ShutdownSignal>,
comms_public_key: CommsPublicKey,
balance_cache: Balance,
connectivity_status_watch: watch::Receiver<OnlineStatus>,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -107,6 +111,7 @@ where TBackend: TransactionBackend + 'static
dht_event_stream: DhtEventReceiver,
shutdown_signal: ShutdownSignal,
comms_public_key: CommsPublicKey,
connectivity_status_watch: watch::Receiver<OnlineStatus>,
callback_received_transaction: unsafe extern "C" fn(*mut InboundTransaction),
callback_received_transaction_reply: unsafe extern "C" fn(*mut CompletedTransaction),
callback_received_finalized_transaction: unsafe extern "C" fn(*mut CompletedTransaction),
Expand All @@ -120,6 +125,7 @@ where TBackend: TransactionBackend + 'static
callback_balance_updated: unsafe extern "C" fn(*mut Balance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
) -> Self {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -173,6 +179,10 @@ where TBackend: TransactionBackend + 'static
target: LOG_TARGET,
"SafMessagesReceivedCallback -> Assigning Fn: {:?}", callback_saf_messages_received
);
info!(
target: LOG_TARGET,
"ConnectivityStatusCallback -> Assigning Fn: {:?}", callback_connectivity_status
);

Self {
callback_received_transaction,
Expand All @@ -188,6 +198,7 @@ where TBackend: TransactionBackend + 'static
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_messages_received,
callback_connectivity_status,
db,
transaction_service_event_stream,
output_manager_service_event_stream,
Expand All @@ -196,6 +207,7 @@ where TBackend: TransactionBackend + 'static
shutdown_signal: Some(shutdown_signal),
comms_public_key,
balance_cache: Balance::zero(),
connectivity_status_watch,
}
}

Expand Down Expand Up @@ -302,6 +314,11 @@ where TBackend: TransactionBackend + 'static
Err(_e) => error!(target: LOG_TARGET, "Error reading from DHT event broadcast channel"),
}
}
Ok(_) = self.connectivity_status_watch.changed() => {
let status = *self.connectivity_status_watch.borrow();
trace!(target: LOG_TARGET, "Connectivity status change detected: {:?}", status);
self.connectivity_status_changed(status);
},
_ = shutdown_signal.wait() => {
info!(target: LOG_TARGET, "Transaction Callback Handler shutting down because the shutdown signal was received");
break;
Expand Down Expand Up @@ -516,4 +533,14 @@ where TBackend: TransactionBackend + 'static
(self.callback_saf_messages_received)();
}
}

fn connectivity_status_changed(&mut self, status: OnlineStatus) {
debug!(
target: LOG_TARGET,
"Calling Connectivity Status changed callback function"
);
unsafe {
(self.callback_connectivity_status)(status as u64);
}
}
}
28 changes: 27 additions & 1 deletion base_layer/wallet_ffi/src/callback_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod test {
use tari_service_framework::reply_channel;
use tari_shutdown::Shutdown;
use tari_wallet::{
connectivity_service::OnlineStatus,
output_manager_service::{
handle::{OutputManagerEvent, OutputManagerHandle},
service::Balance,
Expand All @@ -60,7 +61,11 @@ mod test {
},
},
};
use tokio::{runtime::Runtime, sync::broadcast, time::Instant};
use tokio::{
runtime::Runtime,
sync::{broadcast, watch},
time::Instant,
};

use crate::{callback_handler::CallbackHandler, output_manager_service_mock::MockOutputManagerService};

Expand All @@ -81,6 +86,7 @@ mod test {
pub callback_balance_updated: u32,
pub callback_transaction_validation_complete: u32,
pub saf_messages_received: bool,
pub connectivity_status_callback_called: u64,
}

impl CallbackState {
Expand All @@ -101,6 +107,7 @@ mod test {
tx_cancellation_callback_called_inbound: false,
tx_cancellation_callback_called_outbound: false,
saf_messages_received: false,
connectivity_status_callback_called: 0,
}
}
}
Expand Down Expand Up @@ -200,6 +207,12 @@ mod test {
drop(lock);
}

unsafe extern "C" fn connectivity_status_callback(status: u64) {
let mut lock = CALLBACK_STATE.lock().unwrap();
lock.connectivity_status_callback_called += status + 1;
drop(lock);
}

#[test]
fn test_callback_handler() {
let runtime = Runtime::new().unwrap();
Expand Down Expand Up @@ -300,6 +313,8 @@ mod test {
runtime.spawn(mock_output_manager_service.run());
assert_eq!(balance, runtime.block_on(oms_handle.get_balance()).unwrap());

let (connectivity_tx, connectivity_rx) = watch::channel(OnlineStatus::Offline);

let callback_handler = CallbackHandler::new(
db,
transaction_event_receiver,
Expand All @@ -308,6 +323,7 @@ mod test {
dht_event_receiver,
shutdown_signal.to_signal(),
PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
connectivity_rx,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
Expand All @@ -321,6 +337,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
);

runtime.spawn(callback_handler.start());
Expand Down Expand Up @@ -506,6 +523,14 @@ mod test {
dht_event_sender
.send(Arc::new(DhtEvent::StoreAndForwardMessagesReceived))
.unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Offline).unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Connecting).unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Online).unwrap();
thread::sleep(Duration::from_secs(2));
connectivity_tx.send(OnlineStatus::Connecting).unwrap();

thread::sleep(Duration::from_secs(10));

Expand All @@ -525,6 +550,7 @@ mod test {
assert_eq!(lock.callback_txo_validation_complete, 3);
assert_eq!(lock.callback_balance_updated, 5);
assert_eq!(lock.callback_transaction_validation_complete, 7);
assert_eq!(lock.connectivity_status_callback_called, 7);

drop(lock);
}
Expand Down
25 changes: 25 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ use tari_p2p::{
use tari_shutdown::Shutdown;
use tari_utilities::{hex, hex::Hex};
use tari_wallet::{
connectivity_service::WalletConnectivityInterface,
contacts_service::storage::database::Contact,
error::{WalletError, WalletStorageError},
storage::{
Expand Down Expand Up @@ -3260,6 +3261,13 @@ unsafe fn init_logging(
/// `callback_saf_message_received` - The callback function pointer that will be called when the Dht has determined that
/// is has connected to enough of its neighbours to be confident that it has received any SAF messages that were waiting
/// for it.
/// `callback_connectivity_status` - This callback is called when the status of connection to the set base node
/// changes. it will return an enum encoded as an integer as follows:
/// pub enum OnlineStatus {
/// Connecting, // 0
/// Online, // 1
/// Offline, // 2
/// }
/// `recovery_in_progress` - Pointer to an bool which will be modified to indicate if there is an outstanding recovery
/// that should be completed or not to an error code should one occur, may not be null. Functions as an out parameter.
/// `error_out` - Pointer to an int which will be modified
Expand Down Expand Up @@ -3292,6 +3300,7 @@ pub unsafe extern "C" fn wallet_create(
callback_balance_updated: unsafe extern "C" fn(*mut TariBalance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, bool),
callback_saf_messages_received: unsafe extern "C" fn(),
callback_connectivity_status: unsafe extern "C" fn(u64),
recovery_in_progress: *mut bool,
error_out: *mut c_int,
) -> *mut TariWallet {
Expand Down Expand Up @@ -3483,6 +3492,7 @@ pub unsafe extern "C" fn wallet_create(
w.dht_service.subscribe_dht_events(),
w.comms.shutdown_signal(),
w.comms.node_identity().public_key().clone(),
w.wallet_connectivity.get_connectivity_status_watch(),
callback_received_transaction,
callback_received_transaction_reply,
callback_received_finalized_transaction,
Expand All @@ -3496,6 +3506,7 @@ pub unsafe extern "C" fn wallet_create(
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_messages_received,
callback_connectivity_status,
);

runtime.spawn(callback_handler.start());
Expand Down Expand Up @@ -6199,6 +6210,10 @@ mod test {
// assert!(true); //optimized out by compiler
}

unsafe extern "C" fn connectivity_status_callback(_status: u64) {
// assert!(true); //optimized out by compiler
}

const NETWORK_STRING: &str = "dibbler";

#[test]
Expand Down Expand Up @@ -6571,6 +6586,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6607,6 +6623,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6709,6 +6726,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6756,6 +6774,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6786,6 +6805,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand All @@ -6811,6 +6831,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6857,6 +6878,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -6932,6 +6954,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -7138,6 +7161,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down Expand Up @@ -7192,6 +7216,7 @@ mod test {
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
recovery_in_progress_ptr,
error_ptr,
);
Expand Down
8 changes: 8 additions & 0 deletions base_layer/wallet_ffi/wallet.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ struct TariPublicKeys *comms_list_connected_public_keys(struct TariWallet *walle
/// `callback_saf_message_received` - The callback function pointer that will be called when the Dht has determined that
/// is has connected to enough of its neighbours to be confident that it has received any SAF messages that were waiting
/// for it.
/// `callback_connectivity_status` - This callback is called when the status of connection to the set base node changes.
/// it will return an enum encoded as an integer as follows:
/// pub enum OnlineStatus {
/// Connecting, // 0
/// Online, // 1
/// Offline, // 2
/// }
/// `recovery_in_progress` - Pointer to an bool which will be modified to indicate if there is an outstanding recovery
/// that should be completed or not to an error code should one occur, may not be null. Functions as an out parameter.
/// `error_out` - Pointer to an int which will be modified
Expand Down Expand Up @@ -515,6 +522,7 @@ struct TariWallet *wallet_create(struct TariCommsConfig *config,
void (*callback_balance_updated)(struct TariBalance *),
void (*callback_transaction_validation_complete)(unsigned long long, bool),
void (*callback_saf_message_received)(),
void (*callback_connectivity_status)(unsigned long long),
bool *recovery_in_progress,
int *error_out);

Expand Down
8 changes: 7 additions & 1 deletion integration_tests/helpers/ffi/ffiInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class InterfaceFFI {
this.ptr,
this.ptr,
this.ptr,
this.ptr,
this.boolPtr,
this.intPtr,
],
Expand Down Expand Up @@ -1165,6 +1166,9 @@ class InterfaceFFI {
fn
);
}
static createCallbackConnectivityStatus(fn) {
return ffi.Callback(this.void, [this.ulonglong], fn);
}
//endregion

static walletCreate(
Expand All @@ -1186,7 +1190,8 @@ class InterfaceFFI {
callback_txo_validation_complete,
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_message_received
callback_saf_message_received,
callback_connectivity_status
) {
let error = this.initError();
let recovery_in_progress = this.initBool();
Expand All @@ -1211,6 +1216,7 @@ class InterfaceFFI {
callback_balance_updated,
callback_transaction_validation_complete,
callback_saf_message_received,
callback_connectivity_status,
recovery_in_progress,
error
);
Expand Down
Loading