Skip to content

Commit

Permalink
feat!: expose reason for transaction cancellation callback in wallet_ffi
Browse files Browse the repository at this point in the history
Update cancellation callback and transaction protocols
Update unit tests
Update integration tests
Update wallet header
Clippy fixes
  • Loading branch information
StriderDM committed Nov 22, 2021
1 parent b5a88e3 commit df2e4d2
Show file tree
Hide file tree
Showing 18 changed files with 98 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl WalletEventMonitor {
self.trigger_balance_refresh();
notifier.transaction_mined(tx_id);
},
TransactionEvent::TransactionCancelled(tx_id) => {
TransactionEvent::TransactionCancelled(tx_id, _) => {
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
notifier.transaction_cancelled(tx_id);
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/services/liveness/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl LivenessState {
self.failed_pings
.entry(node_id)
.and_modify(|v| {
*v = *v + 1;
*v += 1;
})
.or_insert(1);
}
Expand All @@ -167,7 +167,7 @@ impl LivenessState {
/// a latency sample is added and calculated. The given `peer` must match the recorded peer
pub fn record_pong(&mut self, nonce: u64, sent_by: &NodeId) -> Option<u32> {
self.inc_pongs_received();
self.failed_pings.remove_entry(&sent_by);
self.failed_pings.remove_entry(sent_by);

let (node_id, _) = self.inflight_pings.get(&nonce)?;
if node_id == sent_by {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub enum TransactionEvent {
TransactionDirectSendResult(TxId, bool),
TransactionCompletedImmediately(TxId),
TransactionStoreForwardSendResult(TxId, bool),
TransactionCancelled(TxId),
TransactionCancelled(TxId, u64),
TransactionBroadcast(TxId),
TransactionImported(TxId),
TransactionMined {
Expand Down
11 changes: 11 additions & 0 deletions base_layer/wallet/src/transaction_service/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum TxRejection {
Unknown,
UserCancelled,
Timeout,
DoubleSpend,
Orphan,
TimeLocked,
InvalidTransaction,
}

pub mod transaction_broadcast_protocol;
pub mod transaction_receive_protocol;
pub mod transaction_send_protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{
transaction_service::{
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::TransactionEvent,
protocols::TxRejection,
service::TransactionServiceResources,
storage::{database::TransactionBackend, models::CompletedTransaction},
},
Expand Down Expand Up @@ -215,10 +216,31 @@ where

self.cancel_transaction().await;

let reason = match response.rejection_reason {
TxSubmissionRejectionReason::None | TxSubmissionRejectionReason::ValidationFailed => {
TransactionServiceError::MempoolRejectionInvalidTransaction
},
TxSubmissionRejectionReason::DoubleSpend => TransactionServiceError::MempoolRejectionDoubleSpend,
TxSubmissionRejectionReason::Orphan => TransactionServiceError::MempoolRejectionOrphan,
TxSubmissionRejectionReason::TimeLocked => TransactionServiceError::MempoolRejectionTimeLocked,
_ => TransactionServiceError::UnexpectedBaseNodeResponse,
};

let cancellation_event_reason = match reason {
TransactionServiceError::MempoolRejectionInvalidTransaction => TxRejection::InvalidTransaction,
TransactionServiceError::MempoolRejectionDoubleSpend => TxRejection::DoubleSpend,
TransactionServiceError::MempoolRejectionOrphan => TxRejection::Orphan,
TransactionServiceError::MempoolRejectionTimeLocked => TxRejection::TimeLocked,
_ => TxRejection::Unknown,
};

let _ = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(self.tx_id)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
self.tx_id,
cancellation_event_reason as u64,
)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand All @@ -228,15 +250,6 @@ where
e
});

let reason = match response.rejection_reason {
TxSubmissionRejectionReason::None | TxSubmissionRejectionReason::ValidationFailed => {
TransactionServiceError::MempoolRejectionInvalidTransaction
},
TxSubmissionRejectionReason::DoubleSpend => TransactionServiceError::MempoolRejectionDoubleSpend,
TxSubmissionRejectionReason::Orphan => TransactionServiceError::MempoolRejectionOrphan,
TxSubmissionRejectionReason::TimeLocked => TransactionServiceError::MempoolRejectionTimeLocked,
_ => TransactionServiceError::UnexpectedBaseNodeResponse,
};
return Err(TransactionServiceProtocolError::new(self.tx_id, reason));
} else if response.rejection_reason == TxSubmissionRejectionReason::AlreadyMined {
info!(
Expand Down Expand Up @@ -342,7 +355,10 @@ where
let _ = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(self.tx_id)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
self.tx_id,
TxRejection::InvalidTransaction as u64,
)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tari_common_types::transaction::{TransactionDirection, TransactionStatus, Tx
use tari_comms::types::CommsPublicKey;
use tokio::sync::{mpsc, oneshot};

use crate::connectivity_service::WalletConnectivityInterface;
use crate::{connectivity_service::WalletConnectivityInterface, transaction_service::protocols::TxRejection};
use tari_common_types::types::HashOutput;
use tari_core::transactions::{
transaction_entities::Transaction,
Expand Down Expand Up @@ -504,7 +504,10 @@ where
let _ = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(self.id)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
self.id,
TxRejection::Timeout as u64,
)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
config::TransactionRoutingMechanism,
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::{TransactionEvent, TransactionServiceResponse},
protocols::TxRejection,
service::TransactionServiceResources,
storage::{
database::TransactionBackend,
Expand Down Expand Up @@ -826,7 +827,10 @@ where
let _ = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(self.id)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
self.id,
TxRejection::Timeout as u64,
)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct TransactionValidationProtocol<TTransactionBackend, TWalletConnectivit
event_publisher: TransactionEventSender,
output_manager_handle: OutputManagerHandle,
}
use crate::transaction_service::protocols::TxRejection;
use tari_common_types::types::Signature;

#[allow(unused_variables)]
Expand Down Expand Up @@ -416,7 +417,10 @@ where
);
};

self.publish_event(TransactionEvent::TransactionCancelled(tx_id));
self.publish_event(TransactionEvent::TransactionCancelled(
tx_id,
TxRejection::Orphan as u64,
));

Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
transaction_receive_protocol::{TransactionReceiveProtocol, TransactionReceiveProtocolStage},
transaction_send_protocol::{TransactionSendProtocol, TransactionSendProtocolStage},
transaction_validation_protocol::TransactionValidationProtocol,
TxRejection,
},
storage::{
database::{TransactionBackend, TransactionDatabase},
Expand Down Expand Up @@ -1301,7 +1302,10 @@ where

let _ = self
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(tx_id)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
tx_id,
TxRejection::UserCancelled as u64,
)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand Down
14 changes: 7 additions & 7 deletions base_layer/wallet/tests/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,7 @@ fn test_transaction_cancellation() {
loop {
tokio::select! {
event = alice_event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(_) = &*event.unwrap() {
if let TransactionEvent::TransactionCancelled(..) = &*event.unwrap() {
cancelled = true;
break;
}
Expand Down Expand Up @@ -2380,7 +2380,7 @@ fn test_transaction_cancellation() {
loop {
tokio::select! {
event = alice_event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(_) = &*event.unwrap() {
if let TransactionEvent::TransactionCancelled(..) = &*event.unwrap() {
cancelled = true;
break;
}
Expand Down Expand Up @@ -3524,7 +3524,7 @@ fn test_coinbase_abandoned() {
loop {
tokio::select! {
event = alice_event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(tx_id) = &*event.unwrap() {
if let TransactionEvent::TransactionCancelled(tx_id, _) = &*event.unwrap() {
if tx_id == &tx_id1 {
count += 1;
}
Expand Down Expand Up @@ -3684,7 +3684,7 @@ fn test_coinbase_abandoned() {
count += 1;
}
},
TransactionEvent::TransactionCancelled(tx_id) => {
TransactionEvent::TransactionCancelled(tx_id, _) => {
if tx_id == &tx_id2 {
count += 1;
}
Expand Down Expand Up @@ -3771,7 +3771,7 @@ fn test_coinbase_abandoned() {
count += 1;
}
},
TransactionEvent::TransactionCancelled(tx_id) => {
TransactionEvent::TransactionCancelled(tx_id, _) => {
if tx_id == &tx_id1 {
count += 1;
}
Expand Down Expand Up @@ -4755,7 +4755,7 @@ fn test_transaction_timeout_cancellation() {
loop {
tokio::select! {
event = carol_event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(t) = &*event.unwrap() {
if let TransactionEvent::TransactionCancelled(t, _) = &*event.unwrap() {
if t == &tx_id {
transaction_cancelled = true;
break;
Expand Down Expand Up @@ -5074,7 +5074,7 @@ fn transaction_service_tx_broadcast() {
loop {
tokio::select! {
event = alice_event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(tx_id) = &*event.unwrap(){
if let TransactionEvent::TransactionCancelled(tx_id, _) = &*event.unwrap(){
if tx_id == &tx_id2 {
tx2_cancelled = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ async fn tx_broadcast_protocol_submit_rejection() {
loop {
tokio::select! {
event = event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(_) = &*event.unwrap() {
if let TransactionEvent::TransactionCancelled(..) = &*event.unwrap() {
cancelled = true;
}
},
Expand Down Expand Up @@ -547,7 +547,7 @@ async fn tx_broadcast_protocol_submit_success_followed_by_rejection() {
loop {
tokio::select! {
event = event_stream.recv() => {
if let TransactionEvent::TransactionCancelled(_) = &*event.unwrap() {
if let TransactionEvent::TransactionCancelled(..) = &*event.unwrap() {
cancelled = true;
}
},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/tests/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ fn test_store_and_forward_send_tx() {
event = carol_event_stream.recv() => {
match &*event.unwrap() {
TransactionEvent::ReceivedTransaction(_) => tx_recv = true,
TransactionEvent::TransactionCancelled(_) => tx_cancelled = true,
TransactionEvent::TransactionCancelled(..) => tx_cancelled = true,
_ => (),
}
if tx_recv && tx_cancelled {
Expand Down
12 changes: 6 additions & 6 deletions base_layer/wallet_ffi/src/callback_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where TBackend: TransactionBackend + 'static
callback_transaction_mined_unconfirmed: unsafe extern "C" fn(*mut CompletedTransaction, u64),
callback_direct_send_result: unsafe extern "C" fn(TxId, bool),
callback_store_and_forward_send_result: unsafe extern "C" fn(TxId, bool),
callback_transaction_cancellation: unsafe extern "C" fn(*mut CompletedTransaction),
callback_transaction_cancellation: unsafe extern "C" fn(*mut CompletedTransaction, u64),
callback_txo_validation_complete: unsafe extern "C" fn(u64, u8),
callback_balance_updated: unsafe extern "C" fn(*mut Balance),
callback_transaction_validation_complete: unsafe extern "C" fn(u64, u8),
Expand Down Expand Up @@ -123,7 +123,7 @@ where TBackend: TransactionBackend + 'static
callback_transaction_mined_unconfirmed: unsafe extern "C" fn(*mut CompletedTransaction, u64),
callback_direct_send_result: unsafe extern "C" fn(TxId, bool),
callback_store_and_forward_send_result: unsafe extern "C" fn(TxId, bool),
callback_transaction_cancellation: unsafe extern "C" fn(*mut CompletedTransaction),
callback_transaction_cancellation: unsafe extern "C" fn(*mut CompletedTransaction, u64),
callback_txo_validation_complete: unsafe extern "C" fn(TxId, u8),
callback_balance_updated: unsafe extern "C" fn(*mut Balance),
callback_transaction_validation_complete: unsafe extern "C" fn(TxId, u8),
Expand Down Expand Up @@ -242,8 +242,8 @@ where TBackend: TransactionBackend + 'static
self.receive_store_and_forward_send_result(tx_id, result);
self.trigger_balance_refresh().await;
},
TransactionEvent::TransactionCancelled(tx_id) => {
self.receive_transaction_cancellation(tx_id).await;
TransactionEvent::TransactionCancelled(tx_id, reason) => {
self.receive_transaction_cancellation(tx_id, reason).await;
self.trigger_balance_refresh().await;
},
TransactionEvent::TransactionBroadcast(tx_id) => {
Expand Down Expand Up @@ -425,7 +425,7 @@ where TBackend: TransactionBackend + 'static
}
}

async fn receive_transaction_cancellation(&mut self, tx_id: TxId) {
async fn receive_transaction_cancellation(&mut self, tx_id: TxId, reason: u64) {
let mut transaction = None;
if let Ok(tx) = self.db.get_cancelled_completed_transaction(tx_id).await {
transaction = Some(tx);
Expand All @@ -451,7 +451,7 @@ where TBackend: TransactionBackend + 'static
);
let boxing = Box::into_raw(Box::new(tx));
unsafe {
(self.callback_transaction_cancellation)(boxing);
(self.callback_transaction_cancellation)(boxing, reason);
}
},
}
Expand Down
18 changes: 14 additions & 4 deletions base_layer/wallet_ffi/src/callback_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ mod test {
};

use crate::{callback_handler::CallbackHandler, output_manager_service_mock::MockOutputManagerService};
use tari_wallet::transaction_service::protocols::TxRejection;

struct CallbackState {
pub received_tx_callback_called: bool,
Expand Down Expand Up @@ -168,7 +169,7 @@ mod test {
drop(lock);
}

unsafe extern "C" fn tx_cancellation_callback(tx: *mut CompletedTransaction) {
unsafe extern "C" fn tx_cancellation_callback(tx: *mut CompletedTransaction, _reason: u64) {
let mut lock = CALLBACK_STATE.lock().unwrap();
match (*tx).tx_id {
3 => lock.tx_cancellation_callback_called_inbound = true,
Expand Down Expand Up @@ -415,7 +416,10 @@ mod test {
mock_output_manager_service_state.set_balance(balance.clone());
// Balance updated should be detected with following event, total = 4 times
transaction_event_sender
.send(Arc::new(TransactionEvent::TransactionCancelled(3u64)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
3u64,
TxRejection::UserCancelled as u64,
)))
.unwrap();
let start = Instant::now();
while start.elapsed().as_secs() < 10 {
Expand All @@ -431,11 +435,17 @@ mod test {
assert_eq!(callback_balance_updated, 4);

transaction_event_sender
.send(Arc::new(TransactionEvent::TransactionCancelled(4u64)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
4u64,
TxRejection::UserCancelled as u64,
)))
.unwrap();

transaction_event_sender
.send(Arc::new(TransactionEvent::TransactionCancelled(5u64)))
.send(Arc::new(TransactionEvent::TransactionCancelled(
5u64,
TxRejection::UserCancelled as u64,
)))
.unwrap();

oms_event_sender
Expand Down

0 comments on commit df2e4d2

Please sign in to comment.