diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs index 5ea7cb7338..a3c9509728 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_broadcast_protocol.rs @@ -99,7 +99,7 @@ where .await .ok_or_else(|| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::Shutdown))?; - let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id).await { + let completed_tx = match self.resources.db.get_completed_transaction(self.tx_id) { Ok(tx) => tx, Err(e) => { error!( @@ -275,7 +275,6 @@ where self.resources .db .broadcast_completed_transaction(self.tx_id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.tx_id, TransactionServiceError::from(e)))?; let _size = self .resources @@ -430,7 +429,7 @@ where "Failed to Cancel outputs for TxId: {} after failed sending attempt with error {:?}", self.tx_id, e ); } - if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason).await { + if let Err(e) = self.resources.db.reject_completed_transaction(self.tx_id, reason) { warn!( target: LOG_TARGET, "Failed to Cancel TxId: {} after failed sending attempt with error {:?}", self.tx_id, e diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs index 43acb079cd..4eb3559efe 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_receive_protocol.rs @@ -131,7 +131,6 @@ where .resources .db .transaction_exists(data.tx_id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))? { trace!( @@ -167,7 +166,6 @@ where self.resources .db .add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone()) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; let send_result = send_transaction_reply( @@ -182,7 +180,6 @@ where self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; if send_result { @@ -237,7 +234,7 @@ where .ok_or_else(|| TransactionServiceProtocolError::new(self.id, TransactionServiceError::InvalidStateError))? .fuse(); - let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id).await { + let inbound_tx = match self.resources.db.get_pending_inbound_transaction(self.id) { Ok(tx) => tx, Err(_e) => { debug!( @@ -295,7 +292,6 @@ where self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; } @@ -339,7 +335,6 @@ where Ok(_) => self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?, Err(e) => warn!( target: LOG_TARGET, @@ -456,8 +451,7 @@ where self.resources .db - .complete_inbound_transaction(self.id, completed_transaction.clone()) - .await + .complete_inbound_transaction(self.id, completed_transaction) .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; info!( @@ -486,17 +480,13 @@ where "Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id ); - self.resources - .db - .cancel_pending_transaction(self.id) - .await - .map_err(|e| { - warn!( - target: LOG_TARGET, - "Pending Transaction does not exist and could not be cancelled: {:?}", e - ); - TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)) - })?; + self.resources.db.cancel_pending_transaction(self.id).map_err(|e| { + warn!( + target: LOG_TARGET, + "Pending Transaction does not exist and could not be cancelled: {:?}", e + ); + TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)) + })?; self.resources .output_manager_service diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index 413b5d4b94..34f5ffb205 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -317,7 +317,6 @@ where .resources .db .transaction_exists(tx_id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))? { let fee = sender_protocol @@ -337,14 +336,12 @@ where self.resources .db .add_pending_outbound_transaction(outbound_tx.tx_id, outbound_tx) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; } if transaction_status == TransactionStatus::Pending { self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; } @@ -394,7 +391,6 @@ where .resources .db .get_pending_outbound_transaction(tx_id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; if !outbound_tx.sender_protocol.is_collecting_single_signature() { @@ -452,7 +448,6 @@ where self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))? } }, @@ -499,7 +494,6 @@ where self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new( self.id, TransactionServiceError::from(e)) )?; @@ -521,7 +515,6 @@ where self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new( self.id, TransactionServiceError::from(e)) )? @@ -594,7 +587,6 @@ where self.resources .db .complete_outbound_transaction(tx_id, completed_transaction.clone()) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; info!( target: LOG_TARGET, @@ -615,7 +607,6 @@ where self.resources .db .increment_send_count(tx_id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; let _size = self @@ -905,20 +896,15 @@ where self.resources .db .increment_send_count(self.id) - .await .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; - self.resources - .db - .cancel_pending_transaction(self.id) - .await - .map_err(|e| { - warn!( - target: LOG_TARGET, - "Pending Transaction does not exist and could not be cancelled: {:?}", e - ); - TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)) - })?; + self.resources.db.cancel_pending_transaction(self.id).map_err(|e| { + warn!( + target: LOG_TARGET, + "Pending Transaction does not exist and could not be cancelled: {:?}", e + ); + TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)) + })?; self.resources .output_manager_service diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs index 28882f9752..92ddedbef6 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs @@ -112,7 +112,6 @@ where let unconfirmed_transactions = self .db .fetch_unconfirmed_transactions_info() - .await .for_protocol(self.operation_id) .unwrap(); @@ -216,7 +215,7 @@ where self.operation_id ); let op_id = self.operation_id; - while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().await.for_protocol(op_id)? { + while let Some(last_mined_transaction) = self.db.fetch_last_mined_transaction().for_protocol(op_id)? { let mined_height = last_mined_transaction .mined_height .ok_or_else(|| { @@ -414,7 +413,6 @@ where num_confirmations >= self.config.num_confirmations_required, status.is_faux(), ) - .await .for_protocol(self.operation_id)?; if num_confirmations >= self.config.num_confirmations_required { @@ -488,12 +486,10 @@ where num_confirmations >= self.config.num_confirmations_required, false, ) - .await .for_protocol(self.operation_id)?; self.db .abandon_coinbase_transaction(tx_id) - .await .for_protocol(self.operation_id)?; self.publish_event(TransactionEvent::TransactionCancelled( @@ -510,7 +506,6 @@ where ) -> Result<(), TransactionServiceProtocolError> { self.db .set_transaction_as_unmined(tx_id) - .await .for_protocol(self.operation_id)?; if *status == TransactionStatus::Coinbase { diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 7660d4af24..bba16e7217 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -379,7 +379,7 @@ where trace!(target: LOG_TARGET, "Handling Transaction Message, Trace: {}", msg.dht_header.message_tag); let result = self.accept_transaction(origin_public_key, inner_msg, - msg.dht_header.message_tag.as_value(), &mut receive_transaction_protocol_handles).await; + msg.dht_header.message_tag.as_value(), &mut receive_transaction_protocol_handles); match result { Err(TransactionServiceError::RepeatedMessageError) => { @@ -506,7 +506,7 @@ where Ok(join_result_inner) => self.complete_send_transaction_protocol( join_result_inner, &mut transaction_broadcast_protocol_handles - ).await, + ), Err(e) => error!(target: LOG_TARGET, "Error resolving Send Transaction Protocol: {:?}", e), }; } @@ -516,14 +516,14 @@ where Ok(join_result_inner) => self.complete_receive_transaction_protocol( join_result_inner, &mut transaction_broadcast_protocol_handles - ).await, + ), Err(e) => error!(target: LOG_TARGET, "Error resolving Send Transaction Protocol: {:?}", e), }; } Some(join_result) = transaction_broadcast_protocol_handles.next() => { trace!(target: LOG_TARGET, "Transaction Broadcast protocol has ended with result {:?}", join_result); match join_result { - Ok(join_result_inner) => self.complete_transaction_broadcast_protocol(join_result_inner).await, + Ok(join_result_inner) => self.complete_transaction_broadcast_protocol(join_result_inner), Err(e) => error!(target: LOG_TARGET, "Error resolving Broadcast Protocol: {:?}", e), }; } @@ -533,7 +533,7 @@ where Ok(join_result_inner) => self.complete_transaction_validation_protocol( join_result_inner, &mut transaction_broadcast_protocol_handles, - ).await, + ), Err(e) => error!(target: LOG_TARGET, "Error resolving Transaction Validation protocol: {:?}", e), }; } @@ -650,42 +650,34 @@ where .cancel_pending_transaction(tx_id) .await .map(|_| TransactionServiceResponse::TransactionCancelled), - TransactionServiceRequest::GetPendingInboundTransactions => { - Ok(TransactionServiceResponse::PendingInboundTransactions( - self.db.get_pending_inbound_transactions().await?, - )) - }, - TransactionServiceRequest::GetPendingOutboundTransactions => { - Ok(TransactionServiceResponse::PendingOutboundTransactions( - self.db.get_pending_outbound_transactions().await?, - )) - }, + TransactionServiceRequest::GetPendingInboundTransactions => Ok( + TransactionServiceResponse::PendingInboundTransactions(self.db.get_pending_inbound_transactions()?), + ), + TransactionServiceRequest::GetPendingOutboundTransactions => Ok( + TransactionServiceResponse::PendingOutboundTransactions(self.db.get_pending_outbound_transactions()?), + ), TransactionServiceRequest::GetCompletedTransactions => Ok( - TransactionServiceResponse::CompletedTransactions(self.db.get_completed_transactions().await?), + TransactionServiceResponse::CompletedTransactions(self.db.get_completed_transactions()?), ), TransactionServiceRequest::GetCancelledPendingInboundTransactions => { Ok(TransactionServiceResponse::PendingInboundTransactions( - self.db.get_cancelled_pending_inbound_transactions().await?, + self.db.get_cancelled_pending_inbound_transactions()?, )) }, TransactionServiceRequest::GetCancelledPendingOutboundTransactions => { Ok(TransactionServiceResponse::PendingOutboundTransactions( - self.db.get_cancelled_pending_outbound_transactions().await?, + self.db.get_cancelled_pending_outbound_transactions()?, )) }, - TransactionServiceRequest::GetCancelledCompletedTransactions => { - Ok(TransactionServiceResponse::CompletedTransactions( - self.db.get_cancelled_completed_transactions().await?, - )) - }, - TransactionServiceRequest::GetCompletedTransaction(tx_id) => { - Ok(TransactionServiceResponse::CompletedTransaction(Box::new( - self.db.get_completed_transaction(tx_id).await?, - ))) - }, + TransactionServiceRequest::GetCancelledCompletedTransactions => Ok( + TransactionServiceResponse::CompletedTransactions(self.db.get_cancelled_completed_transactions()?), + ), + TransactionServiceRequest::GetCompletedTransaction(tx_id) => Ok( + TransactionServiceResponse::CompletedTransaction(Box::new(self.db.get_completed_transaction(tx_id)?)), + ), TransactionServiceRequest::GetAnyTransaction(tx_id) => Ok(TransactionServiceResponse::AnyTransaction( - Box::new(self.db.get_any_transaction(tx_id).await?), + Box::new(self.db.get_any_transaction(tx_id)?), )), TransactionServiceRequest::ImportUtxoWithStatus { amount, @@ -707,11 +699,9 @@ where current_height, mined_timestamp, ) - .await .map(TransactionServiceResponse::UtxoImported), TransactionServiceRequest::SubmitTransactionToSelf(tx_id, tx, fee, amount, message) => self .submit_transaction_to_self(transaction_broadcast_join_handles, tx_id, tx, fee, amount, message) - .await .map(|_| TransactionServiceResponse::TransactionSubmitted), TransactionServiceRequest::GenerateCoinbaseTransaction(reward, fees, block_height) => self .generate_coinbase_transaction(reward, fees, block_height) @@ -728,13 +718,11 @@ where TransactionServiceRequest::ApplyEncryption(cipher) => self .db .apply_encryption(*cipher) - .await .map(|_| TransactionServiceResponse::EncryptionApplied) .map_err(TransactionServiceError::TransactionStorageError), TransactionServiceRequest::RemoveEncryption => self .db .remove_encryption() - .await .map(|_| TransactionServiceResponse::EncryptionRemoved) .map_err(TransactionServiceError::TransactionStorageError), TransactionServiceRequest::RestartTransactionProtocols => self @@ -742,11 +730,9 @@ where send_transaction_join_handles, receive_transaction_join_handles, ) - .await .map(|_| TransactionServiceResponse::ProtocolsRestarted), TransactionServiceRequest::RestartBroadcastProtocols => self .restart_broadcast_protocols(transaction_broadcast_join_handles) - .await .map(|_| TransactionServiceResponse::ProtocolsRestarted), TransactionServiceRequest::GetNumConfirmationsRequired => Ok( TransactionServiceResponse::NumConfirmationsRequired(self.resources.config.num_confirmations_required), @@ -940,8 +926,7 @@ where None, None, ), - ) - .await?; + )?; let _result = reply_channel .send(Ok(TransactionServiceResponse::TransactionSent(tx_id))) @@ -1162,8 +1147,7 @@ where None, None, ), - ) - .await?; + )?; Ok(Box::new((tx_id, pre_image, output))) } @@ -1218,7 +1202,7 @@ where .get_recipient_sender_offset_private_key(0) .map_err(|e| TransactionServiceProtocolError::new(tx_id, e.into()))?; let spend_key = PrivateKey::from_bytes( - CommsPublicKey::shared_secret(&sender_offset_private_key.clone(), &dest_pubkey.clone()).as_bytes(), + CommsPublicKey::shared_secret(&sender_offset_private_key, &dest_pubkey.clone()).as_bytes(), ) .map_err(|e| TransactionServiceProtocolError::new(tx_id, e.into()))?; @@ -1226,8 +1210,8 @@ where let rewind_blinding_key = PrivateKey::from_bytes(&hash_secret_key(&spend_key))?; let encryption_key = PrivateKey::from_bytes(&hash_secret_key(&rewind_blinding_key))?; let rewind_data = RewindData { - rewind_blinding_key: rewind_blinding_key.clone(), - encryption_key: encryption_key.clone(), + rewind_blinding_key, + encryption_key, }; let rtp = ReceiverTransactionProtocol::new_with_rewindable_output( @@ -1292,8 +1276,7 @@ where None, None, ), - ) - .await?; + )?; Ok(tx_id) } @@ -1438,8 +1421,7 @@ where None, None, ), - ) - .await?; + )?; Ok(tx_id) } @@ -1507,8 +1489,8 @@ where let tx_id = recipient_reply.tx_id; // First we check if this Reply is for a cancelled Pending Outbound Tx or a Completed Tx - let cancelled_outbound_tx = self.db.get_cancelled_pending_outbound_transaction(tx_id).await; - let completed_tx = self.db.get_completed_transaction_cancelled_or_not(tx_id).await; + let cancelled_outbound_tx = self.db.get_cancelled_pending_outbound_transaction(tx_id); + let completed_tx = self.db.get_completed_transaction_cancelled_or_not(tx_id); // This closure will check if the timestamps are beyond the cooldown period let check_cooldown = |timestamp: Option| { @@ -1548,7 +1530,7 @@ where ); tokio::spawn(send_transaction_cancelled_message( tx_id, - source_pubkey.clone(), + source_pubkey, self.resources.outbound_message_service.clone(), )); } else { @@ -1560,14 +1542,14 @@ where tokio::spawn(send_finalized_transaction_message( tx_id, ctx.transaction, - source_pubkey.clone(), + source_pubkey, self.resources.outbound_message_service.clone(), self.resources.config.direct_send_timeout, self.resources.config.transaction_routing_mechanism, )); } - if let Err(e) = self.resources.db.increment_send_count(tx_id).await { + if let Err(e) = self.resources.db.increment_send_count(tx_id) { warn!( target: LOG_TARGET, "Could not increment send count for completed transaction TxId {}: {:?}", tx_id, e @@ -1594,11 +1576,11 @@ where ); tokio::spawn(send_transaction_cancelled_message( tx_id, - source_pubkey.clone(), + source_pubkey, self.resources.outbound_message_service.clone(), )); - if let Err(e) = self.resources.db.increment_send_count(tx_id).await { + if let Err(e) = self.resources.db.increment_send_count(tx_id) { warn!( target: LOG_TARGET, "Could not increment send count for completed transaction TxId {}: {:?}", tx_id, e @@ -1622,7 +1604,7 @@ where } /// Handle the final clean up after a Send Transaction protocol completes - async fn complete_send_transaction_protocol( + fn complete_send_transaction_protocol( &mut self, join_result: Result>, transaction_broadcast_join_handles: &mut FuturesUnordered< @@ -1634,7 +1616,7 @@ where if val.transaction_status != TransactionStatus::Queued { let _sender = self.pending_transaction_reply_senders.remove(&val.tx_id); let _sender = self.send_transaction_cancellation_senders.remove(&val.tx_id); - let completed_tx = match self.db.get_completed_transaction(val.tx_id).await { + let completed_tx = match self.db.get_completed_transaction(val.tx_id) { Ok(v) => v, Err(e) => { error!( @@ -1646,7 +1628,6 @@ where }; let _result = self .broadcast_completed_transaction(completed_tx, transaction_broadcast_join_handles) - .await .map_err(|resp| { error!( target: LOG_TARGET, @@ -1683,7 +1664,7 @@ where /// Cancel a pending transaction async fn cancel_pending_transaction(&mut self, tx_id: TxId) -> Result<(), TransactionServiceError> { - self.db.cancel_pending_transaction(tx_id).await.map_err(|e| { + self.db.cancel_pending_transaction(tx_id).map_err(|e| { warn!( target: LOG_TARGET, "Pending Transaction does not exist and could not be cancelled: {:?}", e @@ -1733,7 +1714,7 @@ where // Check that an inbound transaction exists to be cancelled and that the Source Public key for that transaction // is the same as the cancellation message - if let Ok(inbound_tx) = self.db.get_pending_inbound_transaction(tx_id).await { + if let Ok(inbound_tx) = self.db.get_pending_inbound_transaction(tx_id) { if inbound_tx.source_public_key == source_pubkey { self.cancel_pending_transaction(tx_id).await?; } else { @@ -1749,13 +1730,13 @@ where } #[allow(clippy::map_entry)] - async fn restart_all_send_transaction_protocols( + fn restart_all_send_transaction_protocols( &mut self, join_handles: &mut FuturesUnordered< JoinHandle>>, >, ) -> Result<(), TransactionServiceError> { - let outbound_txs = self.db.get_pending_outbound_transactions().await?; + let outbound_txs = self.db.get_pending_outbound_transactions()?; for (tx_id, tx) in outbound_txs { let (sender_protocol, stage) = if tx.send_count > 0 { (None, TransactionSendProtocolStage::WaitForReply) @@ -1819,7 +1800,7 @@ where /// 'source_pubkey' - The pubkey from which the message was sent and to which the reply will be sent. /// 'sender_message' - Message from a sender containing the setup of the transaction being sent to you #[allow(clippy::too_many_lines)] - pub async fn accept_transaction( + pub fn accept_transaction( &mut self, source_pubkey: CommsPublicKey, sender_message: proto::TransactionSenderMessage, @@ -1844,7 +1825,7 @@ where ); // Check if this transaction has already been received and cancelled. - if let Ok(Some(any_tx)) = self.db.get_any_cancelled_transaction(data.tx_id).await { + if let Ok(Some(any_tx)) = self.db.get_any_cancelled_transaction(data.tx_id) { let tx = CompletedTransaction::from(any_tx); if tx.source_public_key != source_pubkey { @@ -1865,7 +1846,7 @@ where } // Check if this transaction has already been received. - if let Ok(inbound_tx) = self.db.get_pending_inbound_transaction(data.clone().tx_id).await { + if let Ok(inbound_tx) = self.db.get_pending_inbound_transaction(data.tx_id) { // Check that it is from the same person if inbound_tx.source_public_key != source_pubkey { return Err(TransactionServiceError::InvalidSourcePublicKey); @@ -1895,7 +1876,7 @@ where self.resources.config.direct_send_timeout, self.resources.config.transaction_routing_mechanism, )); - if let Err(e) = self.resources.db.increment_send_count(tx_id).await { + if let Err(e) = self.resources.db.increment_send_count(tx_id) { warn!( target: LOG_TARGET, "Could not increment send count for inbound transaction TxId {}: {:?}", tx_id, e @@ -1975,7 +1956,7 @@ where let sender = match self.finalized_transaction_senders.get_mut(&tx_id) { None => { // First check if perhaps we know about this inbound transaction but it was cancelled - match self.db.get_cancelled_pending_inbound_transaction(tx_id).await { + match self.db.get_cancelled_pending_inbound_transaction(tx_id) { Ok(t) => { if t.source_public_key != source_pubkey { debug!( @@ -1992,7 +1973,7 @@ where Restarting protocol", tx_id ); - self.db.uncancel_pending_transaction(tx_id).await?; + self.db.uncancel_pending_transaction(tx_id)?; self.output_manager_service .reinstate_cancelled_inbound_transaction_outputs(tx_id) .await?; @@ -2018,7 +1999,7 @@ where } /// Handle the final clean up after a Send Transaction protocol completes - async fn complete_receive_transaction_protocol( + fn complete_receive_transaction_protocol( &mut self, join_result: Result>, transaction_broadcast_join_handles: &mut FuturesUnordered< @@ -2030,7 +2011,7 @@ where let _public_key = self.finalized_transaction_senders.remove(&id); let _result = self.receiver_transaction_cancellation_senders.remove(&id); - let completed_tx = match self.db.get_completed_transaction(id).await { + let completed_tx = match self.db.get_completed_transaction(id) { Ok(v) => v, Err(e) => { warn!( @@ -2042,7 +2023,6 @@ where }; let _result = self .broadcast_completed_transaction(completed_tx, transaction_broadcast_join_handles) - .await .map_err(|e| { warn!( target: LOG_TARGET, @@ -2083,11 +2063,11 @@ where } } - async fn restart_all_receive_transaction_protocols( + fn restart_all_receive_transaction_protocols( &mut self, join_handles: &mut FuturesUnordered>>>, ) -> Result<(), TransactionServiceError> { - let inbound_txs = self.db.get_pending_inbound_transaction_sender_info().await?; + let inbound_txs = self.db.get_pending_inbound_transaction_sender_info()?; for txn in inbound_txs { self.restart_receive_transaction_protocol(txn.tx_id, txn.source_public_key, join_handles); } @@ -2128,7 +2108,7 @@ where } } - async fn restart_transaction_negotiation_protocols( + fn restart_transaction_negotiation_protocols( &mut self, send_transaction_join_handles: &mut FuturesUnordered< JoinHandle>>, @@ -2139,7 +2119,6 @@ where ) -> Result<(), TransactionServiceError> { trace!(target: LOG_TARGET, "Restarting transaction negotiation protocols"); self.restart_all_send_transaction_protocols(send_transaction_join_handles) - .await .map_err(|resp| { error!( target: LOG_TARGET, @@ -2149,7 +2128,6 @@ where })?; self.restart_all_receive_transaction_protocols(receive_transaction_join_handles) - .await .map_err(|resp| { error!( target: LOG_TARGET, @@ -2167,7 +2145,7 @@ where JoinHandle>>, >, ) -> Result { - self.resources.db.mark_all_transactions_as_unvalidated().await?; + self.resources.db.mark_all_transactions_as_unvalidated()?; self.start_transaction_validation_protocol(join_handles).await } @@ -2199,7 +2177,7 @@ where } /// Handle the final clean up after a Transaction Validation protocol completes - async fn complete_transaction_validation_protocol( + fn complete_transaction_validation_protocol( &mut self, join_result: Result>, transaction_broadcast_join_handles: &mut FuturesUnordered< @@ -2215,7 +2193,6 @@ where // Restart broadcast protocols for any transactions that were found to be no longer mined. let _ = self .restart_broadcast_protocols(transaction_broadcast_join_handles) - .await .map_err(|e| warn!(target: LOG_TARGET, "Error restarting broadcast protocols: {}", e)); }, Err(TransactionServiceProtocolError { id, error }) => { @@ -2233,7 +2210,7 @@ where } } - async fn restart_broadcast_protocols( + fn restart_broadcast_protocols( &mut self, broadcast_join_handles: &mut FuturesUnordered>>>, ) -> Result<(), TransactionServiceError> { @@ -2243,7 +2220,6 @@ where trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols"); self.broadcast_completed_and_broadcast_transactions(broadcast_join_handles) - .await .map_err(|resp| { error!( target: LOG_TARGET, @@ -2258,7 +2234,7 @@ where } /// Start to protocol to Broadcast the specified Completed Transaction to the Base Node. - async fn broadcast_completed_transaction( + fn broadcast_completed_transaction( &mut self, completed_tx: CompletedTransaction, join_handles: &mut FuturesUnordered>>>, @@ -2303,7 +2279,7 @@ where /// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base /// node. - async fn broadcast_completed_and_broadcast_transactions( + fn broadcast_completed_and_broadcast_transactions( &mut self, join_handles: &mut FuturesUnordered>>>, ) -> Result<(), TransactionServiceError> { @@ -2312,17 +2288,16 @@ where "Attempting to Broadcast all valid and not cancelled Completed Transactions with status 'Completed' and \ 'Broadcast'" ); - let txn_list = self.db.get_transactions_to_be_broadcast().await?; + let txn_list = self.db.get_transactions_to_be_broadcast()?; for completed_txn in txn_list { - self.broadcast_completed_transaction(completed_txn, join_handles) - .await?; + self.broadcast_completed_transaction(completed_txn, join_handles)?; } Ok(()) } /// Handle the final clean up after a Transaction Broadcast protocol completes - async fn complete_transaction_broadcast_protocol( + fn complete_transaction_broadcast_protocol( &mut self, join_result: Result>, ) { @@ -2386,7 +2361,7 @@ where } /// Add a completed transaction to the Transaction Manager to record directly importing a spendable UTXO. - pub async fn add_utxo_import_transaction_with_status( + pub fn add_utxo_import_transaction_with_status( &mut self, value: MicroTari, source_public_key: CommsPublicKey, @@ -2398,19 +2373,17 @@ where mined_timestamp: Option, ) -> Result { let tx_id = if let Some(id) = tx_id { id } else { TxId::new_random() }; - self.db - .add_utxo_import_transaction_with_status( - tx_id, - value, - source_public_key, - self.node_identity.public_key().clone(), - message, - maturity, - import_status.clone(), - current_height, - mined_timestamp, - ) - .await?; + self.db.add_utxo_import_transaction_with_status( + tx_id, + value, + source_public_key, + self.node_identity.public_key().clone(), + message, + maturity, + import_status.clone(), + current_height, + mined_timestamp, + )?; let transaction_event = match import_status { ImportStatus::Imported => TransactionEvent::TransactionImported(tx_id), ImportStatus::FauxUnconfirmed => TransactionEvent::FauxTransactionUnconfirmed { @@ -2434,7 +2407,7 @@ where } /// Submit a completed transaction to the Transaction Manager - async fn submit_transaction( + fn submit_transaction( &mut self, transaction_broadcast_join_handles: &mut FuturesUnordered< JoinHandle>>, @@ -2443,9 +2416,7 @@ where ) -> Result<(), TransactionServiceError> { let tx_id = completed_transaction.tx_id; trace!(target: LOG_TARGET, "Submit transaction ({}) to db.", tx_id); - self.db - .insert_completed_transaction(tx_id, completed_transaction) - .await?; + self.db.insert_completed_transaction(tx_id, completed_transaction)?; trace!( target: LOG_TARGET, "Launch the transaction broadcast protocol for submitted transaction ({}).", @@ -2457,14 +2428,13 @@ where transaction_status: TransactionStatus::Completed, }), transaction_broadcast_join_handles, - ) - .await; + ); Ok(()) } /// Submit a completed coin split transaction to the Transaction Manager. This is different from /// `submit_transaction` in that it will expose less information about the completed transaction. - pub async fn submit_transaction_to_self( + pub fn submit_transaction_to_self( &mut self, transaction_broadcast_join_handles: &mut FuturesUnordered< JoinHandle>>, @@ -2492,8 +2462,7 @@ where None, None, ), - ) - .await?; + )?; Ok(()) } @@ -2508,8 +2477,7 @@ where // first check if we already have a coinbase tx for this height and amount let find_result = self .db - .find_coinbase_transaction_at_block_height(block_height, amount) - .await?; + .find_coinbase_transaction_at_block_height(block_height, amount)?; let completed_transaction = match find_result { Some(completed_tx) => { @@ -2530,26 +2498,24 @@ where .output_manager_service .get_coinbase_transaction(tx_id, reward, fees, block_height) .await?; - self.db - .insert_completed_transaction( + self.db.insert_completed_transaction( + tx_id, + CompletedTransaction::new( tx_id, - CompletedTransaction::new( - tx_id, - self.node_identity.public_key().clone(), - self.node_identity.public_key().clone(), - amount, - MicroTari::from(0), - tx.clone(), - TransactionStatus::Coinbase, - format!("Coinbase Transaction for Block #{}", block_height), - Utc::now().naive_utc(), - TransactionDirection::Inbound, - Some(block_height), - None, - None, - ), - ) - .await?; + self.node_identity.public_key().clone(), + self.node_identity.public_key().clone(), + amount, + MicroTari::from(0), + tx.clone(), + TransactionStatus::Coinbase, + format!("Coinbase Transaction for Block #{}", block_height), + Utc::now().naive_utc(), + TransactionDirection::Inbound, + Some(block_height), + None, + None, + ), + )?; let _size = self .resources diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index 6fc21c2354..f018ba3088 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -280,173 +280,136 @@ where T: TransactionBackend + 'static Self { db: Arc::new(db) } } - pub async fn add_pending_inbound_transaction( + pub fn add_pending_inbound_transaction( &self, tx_id: TxId, inbound_tx: InboundTransaction, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || { - db_clone.write(WriteOperation::Insert(DbKeyValuePair::PendingInboundTransaction( + self.db + .write(WriteOperation::Insert(DbKeyValuePair::PendingInboundTransaction( tx_id, Box::new(inbound_tx), - ))) - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - + )))?; Ok(()) } - pub async fn add_pending_outbound_transaction( + pub fn add_pending_outbound_transaction( &self, tx_id: TxId, outbound_tx: OutboundTransaction, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || { - db_clone.write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( + self.db + .write(WriteOperation::Insert(DbKeyValuePair::PendingOutboundTransaction( tx_id, Box::new(outbound_tx), - ))) - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + )))?; Ok(()) } - pub async fn remove_pending_outbound_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || { - db_clone.write(WriteOperation::Remove(DbKey::PendingOutboundTransaction(tx_id))) - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + pub fn remove_pending_outbound_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db + .write(WriteOperation::Remove(DbKey::PendingOutboundTransaction(tx_id)))?; Ok(()) } /// Check if a transaction with the specified TxId exists in any of the collections - pub async fn transaction_exists(&self, tx_id: TxId) -> Result { - let db_clone = self.db.clone(); - let tx_id_clone = tx_id; - tokio::task::spawn_blocking(move || db_clone.transaction_exists(tx_id_clone)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + pub fn transaction_exists(&self, tx_id: TxId) -> Result { + self.db.transaction_exists(tx_id) } - pub async fn insert_completed_transaction( + pub fn insert_completed_transaction( &self, tx_id: TxId, transaction: CompletedTransaction, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - - tokio::task::spawn_blocking(move || { - db_clone.write(WriteOperation::Insert(DbKeyValuePair::CompletedTransaction( + self.db + .write(WriteOperation::Insert(DbKeyValuePair::CompletedTransaction( tx_id, Box::new(transaction), ))) - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) } - pub async fn get_pending_outbound_transaction( + pub fn get_pending_outbound_transaction( &self, tx_id: TxId, ) -> Result { - self.get_pending_outbound_transaction_by_cancelled(tx_id, false).await + self.get_pending_outbound_transaction_by_cancelled(tx_id, false) } - pub async fn get_cancelled_pending_outbound_transaction( + pub fn get_cancelled_pending_outbound_transaction( &self, tx_id: TxId, ) -> Result { - self.get_pending_outbound_transaction_by_cancelled(tx_id, true).await + self.get_pending_outbound_transaction_by_cancelled(tx_id, true) } - pub async fn get_pending_outbound_transaction_by_cancelled( + pub fn get_pending_outbound_transaction_by_cancelled( &self, tx_id: TxId, cancelled: bool, ) -> Result { - let db_clone = self.db.clone(); let key = if cancelled { DbKey::CancelledPendingOutboundTransaction(tx_id) } else { DbKey::PendingOutboundTransaction(tx_id) }; - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&key) { + let t = match self.db.fetch(&key) { Ok(None) => Err(TransactionStorageError::ValueNotFound(key)), Ok(Some(DbValue::PendingOutboundTransaction(pt))) => Ok(pt), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(*t) } - pub async fn get_pending_inbound_transaction( - &self, - tx_id: TxId, - ) -> Result { - self.get_pending_inbound_transaction_by_cancelled(tx_id, false).await + pub fn get_pending_inbound_transaction(&self, tx_id: TxId) -> Result { + self.get_pending_inbound_transaction_by_cancelled(tx_id, false) } - pub async fn get_cancelled_pending_inbound_transaction( + pub fn get_cancelled_pending_inbound_transaction( &self, tx_id: TxId, ) -> Result { - self.get_pending_inbound_transaction_by_cancelled(tx_id, true).await + self.get_pending_inbound_transaction_by_cancelled(tx_id, true) } - pub async fn get_pending_inbound_transaction_by_cancelled( + pub fn get_pending_inbound_transaction_by_cancelled( &self, tx_id: TxId, cancelled: bool, ) -> Result { - let db_clone = self.db.clone(); let key = if cancelled { DbKey::CancelledPendingInboundTransaction(tx_id) } else { DbKey::PendingInboundTransaction(tx_id) }; - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&key) { + let t = match self.db.fetch(&key) { Ok(None) => Err(TransactionStorageError::ValueNotFound(key)), Ok(Some(DbValue::PendingInboundTransaction(pt))) => Ok(pt), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(*t) } - pub async fn get_completed_transaction( - &self, - tx_id: TxId, - ) -> Result { - self.get_completed_transaction_by_cancelled(tx_id, false).await + pub fn get_completed_transaction(&self, tx_id: TxId) -> Result { + self.get_completed_transaction_by_cancelled(tx_id, false) } - pub async fn get_cancelled_completed_transaction( + pub fn get_cancelled_completed_transaction( &self, tx_id: TxId, ) -> Result { - self.get_completed_transaction_by_cancelled(tx_id, true).await + self.get_completed_transaction_by_cancelled(tx_id, true) } - pub async fn get_completed_transaction_by_cancelled( + pub fn get_completed_transaction_by_cancelled( &self, tx_id: TxId, cancelled: bool, ) -> Result { - let db_clone = self.db.clone(); let key = DbKey::CompletedTransaction(tx_id); - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::CompletedTransaction(tx_id)) { + let t = match self.db.fetch(&DbKey::CompletedTransaction(tx_id)) { Ok(None) => Err(TransactionStorageError::ValueNotFound(key)), Ok(Some(DbValue::CompletedTransaction(pt))) => { if (pt.cancelled.is_some()) == cancelled { @@ -457,99 +420,81 @@ where T: TransactionBackend + 'static }, Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(*t) } - pub async fn get_imported_transactions(&self) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - let t = tokio::task::spawn_blocking(move || db_clone.fetch_imported_transactions()) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + pub fn get_imported_transactions(&self) -> Result, TransactionStorageError> { + let t = self.db.fetch_imported_transactions()?; Ok(t) } - pub async fn get_unconfirmed_faux_transactions( - &self, - ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - let t = tokio::task::spawn_blocking(move || db_clone.fetch_unconfirmed_faux_transactions()) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + pub fn get_unconfirmed_faux_transactions(&self) -> Result, TransactionStorageError> { + let t = self.db.fetch_unconfirmed_faux_transactions()?; Ok(t) } - pub async fn get_confirmed_faux_transactions_from_height( + pub fn get_confirmed_faux_transactions_from_height( &self, height: u64, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - let t = tokio::task::spawn_blocking(move || db_clone.fetch_confirmed_faux_transactions_from_height(height)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + let t = self.db.fetch_confirmed_faux_transactions_from_height(height)?; Ok(t) } - pub async fn fetch_last_mined_transaction(&self) -> Result, TransactionStorageError> { + pub fn fetch_last_mined_transaction(&self) -> Result, TransactionStorageError> { self.db.fetch_last_mined_transaction() } /// Light weight method to return completed but unconfirmed transactions that were not imported - pub async fn fetch_unconfirmed_transactions_info( + pub fn fetch_unconfirmed_transactions_info( &self, ) -> Result, TransactionStorageError> { self.db.fetch_unconfirmed_transactions_info() } /// This method returns all completed transactions that must be broadcast - pub async fn get_transactions_to_be_broadcast(&self) -> Result, TransactionStorageError> { + pub fn get_transactions_to_be_broadcast(&self) -> Result, TransactionStorageError> { self.db.get_transactions_to_be_broadcast() } - pub async fn get_completed_transaction_cancelled_or_not( + pub fn get_completed_transaction_cancelled_or_not( &self, tx_id: TxId, ) -> Result { - let db_clone = self.db.clone(); let key = DbKey::CompletedTransaction(tx_id); - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::CompletedTransaction(tx_id)) { + let t = match self.db.fetch(&DbKey::CompletedTransaction(tx_id)) { Ok(None) => Err(TransactionStorageError::ValueNotFound(key)), Ok(Some(DbValue::CompletedTransaction(pt))) => Ok(pt), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(*t) } - pub async fn get_pending_inbound_transactions( + pub fn get_pending_inbound_transactions( &self, ) -> Result, TransactionStorageError> { - self.get_pending_inbound_transactions_by_cancelled(false).await + self.get_pending_inbound_transactions_by_cancelled(false) } - pub async fn get_cancelled_pending_inbound_transactions( + pub fn get_cancelled_pending_inbound_transactions( &self, ) -> Result, TransactionStorageError> { - self.get_pending_inbound_transactions_by_cancelled(true).await + self.get_pending_inbound_transactions_by_cancelled(true) } - async fn get_pending_inbound_transactions_by_cancelled( + fn get_pending_inbound_transactions_by_cancelled( &self, cancelled: bool, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - let key = if cancelled { DbKey::CancelledPendingInboundTransactions } else { DbKey::PendingInboundTransactions }; - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&key) { + let t = match self.db.fetch(&key) { Ok(None) => log_error( key, TransactionStorageError::UnexpectedResult( @@ -559,37 +504,33 @@ where T: TransactionBackend + 'static Ok(Some(DbValue::PendingInboundTransactions(pt))) => Ok(pt), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(t) } - pub async fn get_pending_outbound_transactions( + pub fn get_pending_outbound_transactions( &self, ) -> Result, TransactionStorageError> { - self.get_pending_outbound_transactions_by_cancelled(false).await + self.get_pending_outbound_transactions_by_cancelled(false) } - pub async fn get_cancelled_pending_outbound_transactions( + pub fn get_cancelled_pending_outbound_transactions( &self, ) -> Result, TransactionStorageError> { - self.get_pending_outbound_transactions_by_cancelled(true).await + self.get_pending_outbound_transactions_by_cancelled(true) } - async fn get_pending_outbound_transactions_by_cancelled( + fn get_pending_outbound_transactions_by_cancelled( &self, cancelled: bool, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - let key = if cancelled { DbKey::CancelledPendingOutboundTransactions } else { DbKey::PendingOutboundTransactions }; - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&key) { + let t = match self.db.fetch(&key) { Ok(None) => log_error( key, TransactionStorageError::UnexpectedResult( @@ -599,75 +540,58 @@ where T: TransactionBackend + 'static Ok(Some(DbValue::PendingOutboundTransactions(pt))) => Ok(pt), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(t) } - pub async fn get_pending_transaction_counterparty_pub_key_by_tx_id( + pub fn get_pending_transaction_counterparty_pub_key_by_tx_id( &mut self, tx_id: TxId, ) -> Result { - let db_clone = self.db.clone(); - let pub_key = - tokio::task::spawn_blocking(move || db_clone.get_pending_transaction_counterparty_pub_key_by_tx_id(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + let pub_key = self.db.get_pending_transaction_counterparty_pub_key_by_tx_id(tx_id)?; Ok(pub_key) } - pub async fn get_completed_transactions( - &self, - ) -> Result, TransactionStorageError> { - self.get_completed_transactions_by_cancelled(false).await + pub fn get_completed_transactions(&self) -> Result, TransactionStorageError> { + self.get_completed_transactions_by_cancelled(false) } - pub async fn get_cancelled_completed_transactions( + pub fn get_cancelled_completed_transactions( &self, ) -> Result, TransactionStorageError> { - self.get_completed_transactions_by_cancelled(true).await + self.get_completed_transactions_by_cancelled(true) } - pub async fn get_any_transaction(&self, tx_id: TxId) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); + pub fn get_any_transaction(&self, tx_id: TxId) -> Result, TransactionStorageError> { let key = DbKey::AnyTransaction(tx_id); - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&key) { + let t = match self.db.fetch(&key) { Ok(None) => Ok(None), Ok(Some(DbValue::WalletTransaction(pt))) => Ok(Some(*pt)), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(t) } - pub async fn get_any_cancelled_transaction( + pub fn get_any_cancelled_transaction( &self, tx_id: TxId, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - - let tx = tokio::task::spawn_blocking(move || db_clone.fetch_any_cancelled_transaction(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + let tx = self.db.fetch_any_cancelled_transaction(tx_id)?; Ok(tx) } - async fn get_completed_transactions_by_cancelled( + fn get_completed_transactions_by_cancelled( &self, cancelled: bool, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - let key = if cancelled { DbKey::CancelledCompletedTransactions } else { DbKey::CompletedTransactions }; - let t = tokio::task::spawn_blocking(move || match db_clone.fetch(&key) { + let t = match self.db.fetch(&key) { Ok(None) => log_error( key, TransactionStorageError::UnexpectedResult("Could not retrieve completed transactions".to_string()), @@ -675,88 +599,55 @@ where T: TransactionBackend + 'static Ok(Some(DbValue::CompletedTransactions(pt))) => Ok(pt), Ok(Some(other)) => unexpected_result(key, other), Err(e) => log_error(key, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(t) } /// This method moves a `PendingOutboundTransaction` to the `CompleteTransaction` collection. - pub async fn complete_outbound_transaction( + pub fn complete_outbound_transaction( &self, tx_id: TxId, transaction: CompletedTransaction, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - - tokio::task::spawn_blocking(move || db_clone.complete_outbound_transaction(tx_id, transaction)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + self.db.complete_outbound_transaction(tx_id, transaction) } /// This method moves a `PendingInboundTransaction` to the `CompleteTransaction` collection. - pub async fn complete_inbound_transaction( + pub fn complete_inbound_transaction( &self, tx_id: TxId, transaction: CompletedTransaction, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - - tokio::task::spawn_blocking(move || db_clone.complete_inbound_transaction(tx_id, transaction)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + self.db.complete_inbound_transaction(tx_id, transaction) } - pub async fn reject_completed_transaction( + pub fn reject_completed_transaction( &self, tx_id: TxId, reason: TxCancellationReason, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.reject_completed_transaction(tx_id, reason)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + self.db.reject_completed_transaction(tx_id, reason) } - pub async fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.set_pending_transaction_cancellation_status(tx_id, true)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.set_pending_transaction_cancellation_status(tx_id, true) } - pub async fn uncancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.set_pending_transaction_cancellation_status(tx_id, false)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn uncancel_pending_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.set_pending_transaction_cancellation_status(tx_id, false) } - pub async fn mark_direct_send_success(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.mark_direct_send_success(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn mark_direct_send_success(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.mark_direct_send_success(tx_id) } /// Indicated that the specified completed transaction has been broadcast into the mempool - pub async fn broadcast_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - - tokio::task::spawn_blocking(move || db_clone.broadcast_completed_transaction(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + pub fn broadcast_completed_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.broadcast_completed_transaction(tx_id) } /// Faux transaction added to the database with imported status - pub async fn add_utxo_import_transaction_with_status( + pub fn add_utxo_import_transaction_with_status( &self, tx_id: TxId, amount: MicroTari, @@ -770,8 +661,8 @@ where T: TransactionBackend + 'static ) -> Result<(), TransactionStorageError> { let transaction = CompletedTransaction::new( tx_id, - source_public_key.clone(), - comms_public_key.clone(), + source_public_key, + comms_public_key, amount, MicroTari::from(0), Transaction::new( @@ -790,84 +681,50 @@ where T: TransactionBackend + 'static mined_timestamp, ); - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || { - db_clone.write(WriteOperation::Insert(DbKeyValuePair::CompletedTransaction( + self.db + .write(WriteOperation::Insert(DbKeyValuePair::CompletedTransaction( tx_id, Box::new(transaction), - ))) - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + )))?; Ok(()) } - pub async fn cancel_coinbase_transaction_at_block_height( + pub fn cancel_coinbase_transaction_at_block_height( &self, block_height: u64, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - - tokio::task::spawn_blocking(move || db_clone.cancel_coinbase_transaction_at_block_height(block_height)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + self.db.cancel_coinbase_transaction_at_block_height(block_height) } - pub async fn find_coinbase_transaction_at_block_height( + pub fn find_coinbase_transaction_at_block_height( &self, block_height: u64, amount: MicroTari, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - - tokio::task::spawn_blocking(move || db_clone.find_coinbase_transaction_at_block_height(block_height, amount)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + self.db.find_coinbase_transaction_at_block_height(block_height, amount) } - pub async fn apply_encryption(&self, cipher: XChaCha20Poly1305) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.apply_encryption(cipher)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + pub fn apply_encryption(&self, cipher: XChaCha20Poly1305) -> Result<(), TransactionStorageError> { + self.db.apply_encryption(cipher) } - pub async fn remove_encryption(&self) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.remove_encryption()) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string())) - .and_then(|inner_result| inner_result) + pub fn remove_encryption(&self) -> Result<(), TransactionStorageError> { + self.db.remove_encryption() } - pub async fn increment_send_count(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.increment_send_count(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn increment_send_count(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.increment_send_count(tx_id) } - pub async fn set_transaction_as_unmined(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.set_transaction_as_unmined(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn set_transaction_as_unmined(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.set_transaction_as_unmined(tx_id) } - pub async fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.mark_all_transactions_as_unvalidated()) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> { + self.db.mark_all_transactions_as_unvalidated() } - pub async fn set_transaction_mined_height( + pub fn set_transaction_mined_height( &self, tx_id: TxId, mined_height: u64, @@ -877,43 +734,29 @@ where T: TransactionBackend + 'static is_confirmed: bool, is_faux: bool, ) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || { - db_clone.update_mined_height( - tx_id, - mined_height, - mined_in_block, - mined_timestamp, - num_confirmations, - is_confirmed, - is_faux, - ) - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + self.db.update_mined_height( + tx_id, + mined_height, + mined_in_block, + mined_timestamp, + num_confirmations, + is_confirmed, + is_faux, + ) } - pub async fn get_pending_inbound_transaction_sender_info( + pub fn get_pending_inbound_transaction_sender_info( &self, ) -> Result, TransactionStorageError> { - let db_clone = self.db.clone(); - - let t = tokio::task::spawn_blocking(move || match db_clone.get_pending_inbound_transaction_sender_info() { + let t = match self.db.get_pending_inbound_transaction_sender_info() { Ok(v) => Ok(v), Err(e) => log_error(DbKey::PendingInboundTransactions, e), - }) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; + }?; Ok(t) } - pub async fn abandon_coinbase_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { - let db_clone = self.db.clone(); - tokio::task::spawn_blocking(move || db_clone.abandon_coinbase_transaction(tx_id)) - .await - .map_err(|err| TransactionStorageError::BlockingTaskSpawnError(err.to_string()))??; - Ok(()) + pub fn abandon_coinbase_transaction(&self, tx_id: TxId) -> Result<(), TransactionStorageError> { + self.db.abandon_coinbase_transaction(tx_id) } } diff --git a/base_layer/wallet/src/transaction_service/tasks/check_faux_transaction_status.rs b/base_layer/wallet/src/transaction_service/tasks/check_faux_transaction_status.rs index b92ca86a65..17542e8595 100644 --- a/base_layer/wallet/src/transaction_service/tasks/check_faux_transaction_status.rs +++ b/base_layer/wallet/src/transaction_service/tasks/check_faux_transaction_status.rs @@ -49,14 +49,14 @@ pub async fn check_faux_transactions( event_publisher: TransactionEventSender, tip_height: u64, ) { - let mut all_faux_transactions: Vec = match db.get_imported_transactions().await { + let mut all_faux_transactions: Vec = match db.get_imported_transactions() { Ok(txs) => txs, Err(e) => { error!(target: LOG_TARGET, "Problem retrieving imported transactions: {}", e); return; }, }; - let mut unconfirmed_faux = match db.get_unconfirmed_faux_transactions().await { + let mut unconfirmed_faux = match db.get_unconfirmed_faux_transactions() { Ok(txs) => txs, Err(e) => { error!( @@ -69,7 +69,7 @@ pub async fn check_faux_transactions( all_faux_transactions.append(&mut unconfirmed_faux); // Reorged faux transactions cannot be detected by excess signature, thus use last known confirmed transaction // height or current tip height with safety margin to determine if these should be returned - let last_mined_transaction = match db.fetch_last_mined_transaction().await { + let last_mined_transaction = match db.fetch_last_mined_transaction() { Ok(tx) => tx, Err(_) => None, }; @@ -79,7 +79,7 @@ pub async fn check_faux_transactions( } else { height_with_margin }; - let mut confirmed_faux = match db.get_confirmed_faux_transactions_from_height(check_height).await { + let mut confirmed_faux = match db.get_confirmed_faux_transactions_from_height(check_height) { Ok(txs) => txs, Err(e) => { error!( @@ -134,17 +134,15 @@ pub async fn check_faux_transactions( num_confirmations, is_valid, ); - let result = db - .set_transaction_mined_height( - tx.tx_id, - mined_height, - mined_in_block, - 0, - num_confirmations, - is_confirmed, - is_valid, - ) - .await; + let result = db.set_transaction_mined_height( + tx.tx_id, + mined_height, + mined_in_block, + 0, + num_confirmations, + is_confirmed, + is_valid, + ); if let Err(e) = result { error!( target: LOG_TARGET, diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index df9dcf37ad..faa064a0af 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -3328,8 +3328,8 @@ async fn test_coinbase_generation_and_monitoring() { ); // Now we will test validation where tx1 will not be found but tx2b will be unconfirmed, then confirmed. - let tx1 = db.get_completed_transaction(tx_id1).await.unwrap(); - let tx2b = db.get_completed_transaction(tx_id2b).await.unwrap(); + let tx1 = db.get_completed_transaction(tx_id1).unwrap(); + let tx2b = db.get_completed_transaction(tx_id2b).unwrap(); let mut block_headers = HashMap::new(); for i in 0..=4 { @@ -5072,7 +5072,10 @@ async fn transaction_service_tx_broadcast() { let tx1_fee = alice_completed_tx1.fee; - assert_eq!(alice_completed_tx1.status, TransactionStatus::Completed); + assert!( + alice_completed_tx1.status == TransactionStatus::Completed || + alice_completed_tx1.status == TransactionStatus::Broadcast + ); let _transactions = alice_ts_interface .base_node_rpc_mock_state @@ -5173,7 +5176,10 @@ async fn transaction_service_tx_broadcast() { .remove(&tx_id2) .expect("Transaction must be in collection"); - assert_eq!(alice_completed_tx2.status, TransactionStatus::Completed); + assert!( + alice_completed_tx2.status == TransactionStatus::Completed || + alice_completed_tx2.status == TransactionStatus::Broadcast + ); let _transactions = alice_ts_interface .base_node_rpc_mock_state @@ -5309,13 +5315,15 @@ async fn broadcast_all_completed_transactions_on_startup() { .wallet_connectivity_service_mock .set_base_node(alice_ts_interface.base_node_identity.to_peer()); + // Note: The event stream has to be assigned before the broadcast protocol is restarted otherwise the events will be + // dropped + let mut event_stream = alice_ts_interface.transaction_service_handle.get_event_stream(); assert!(alice_ts_interface .transaction_service_handle .restart_broadcast_protocols() .await .is_ok()); - let mut event_stream = alice_ts_interface.transaction_service_handle.get_event_stream(); let delay = sleep(Duration::from_secs(60)); tokio::pin!(delay); let mut found1 = false; diff --git a/base_layer/wallet/tests/transaction_service_tests/storage.rs b/base_layer/wallet/tests/transaction_service_tests/storage.rs index 6b0da3b13b..236e9e2ca3 100644 --- a/base_layer/wallet/tests/transaction_service_tests/storage.rs +++ b/base_layer/wallet/tests/transaction_service_tests/storage.rs @@ -60,10 +60,8 @@ use tari_wallet::{ }, }; use tempfile::tempdir; -use tokio::runtime::Runtime; pub fn test_db_backend(backend: T) { - let runtime = Runtime::new().unwrap(); let mut db = TransactionDatabase::new(backend); let factories = CryptoFactories::default(); let input = create_unblinded_output( @@ -123,25 +121,18 @@ pub fn test_db_backend(backend: T) { send_count: 0, last_send_timestamp: None, }); - assert!( - !runtime.block_on(db.transaction_exists(tx_id)).unwrap(), - "TxId should not exist" - ); + assert!(!db.transaction_exists(tx_id).unwrap(), "TxId should not exist"); - runtime - .block_on(db.add_pending_outbound_transaction(outbound_txs[i].tx_id, outbound_txs[i].clone())) + db.add_pending_outbound_transaction(outbound_txs[i].tx_id, outbound_txs[i].clone()) .unwrap(); - assert!( - runtime.block_on(db.transaction_exists(tx_id)).unwrap(), - "TxId should exist" - ); + assert!(db.transaction_exists(tx_id).unwrap(), "TxId should exist"); } - let retrieved_outbound_txs = runtime.block_on(db.get_pending_outbound_transactions()).unwrap(); + let retrieved_outbound_txs = db.get_pending_outbound_transactions().unwrap(); assert_eq!(outbound_txs.len(), messages.len()); for i in outbound_txs.iter().take(messages.len()) { - let retrieved_outbound_tx = runtime.block_on(db.get_pending_outbound_transaction(i.tx_id)).unwrap(); + let retrieved_outbound_tx = db.get_pending_outbound_transaction(i.tx_id).unwrap(); assert_eq!(&retrieved_outbound_tx, i); assert_eq!(retrieved_outbound_tx.send_count, 0); assert!(retrieved_outbound_tx.last_send_timestamp.is_none()); @@ -149,19 +140,12 @@ pub fn test_db_backend(backend: T) { assert_eq!(&retrieved_outbound_txs.get(&i.tx_id).unwrap(), &i); } - runtime - .block_on(db.increment_send_count(outbound_txs[0].tx_id)) - .unwrap(); - let retrieved_outbound_tx = runtime - .block_on(db.get_pending_outbound_transaction(outbound_txs[0].tx_id)) - .unwrap(); + db.increment_send_count(outbound_txs[0].tx_id).unwrap(); + let retrieved_outbound_tx = db.get_pending_outbound_transaction(outbound_txs[0].tx_id).unwrap(); assert_eq!(retrieved_outbound_tx.send_count, 1); assert!(retrieved_outbound_tx.last_send_timestamp.is_some()); - let any_outbound_tx = runtime - .block_on(db.get_any_transaction(outbound_txs[0].tx_id)) - .unwrap() - .unwrap(); + let any_outbound_tx = db.get_any_transaction(outbound_txs[0].tx_id).unwrap().unwrap(); if let WalletTransaction::PendingOutbound(tx) = any_outbound_tx { assert_eq!(tx, retrieved_outbound_tx); } else { @@ -192,20 +176,13 @@ pub fn test_db_backend(backend: T) { send_count: 0, last_send_timestamp: None, }); - assert!( - !runtime.block_on(db.transaction_exists(tx_id)).unwrap(), - "TxId should not exist" - ); - runtime - .block_on(db.add_pending_inbound_transaction(tx_id, inbound_txs[i].clone())) + assert!(!db.transaction_exists(tx_id).unwrap(), "TxId should not exist"); + db.add_pending_inbound_transaction(tx_id, inbound_txs[i].clone()) .unwrap(); - assert!( - runtime.block_on(db.transaction_exists(tx_id)).unwrap(), - "TxId should exist" - ); + assert!(db.transaction_exists(tx_id).unwrap(), "TxId should exist"); } - let retrieved_inbound_txs = runtime.block_on(db.get_pending_inbound_transactions()).unwrap(); + let retrieved_inbound_txs = db.get_pending_inbound_transactions().unwrap(); assert_eq!(inbound_txs.len(), messages.len()); for i in inbound_txs.iter().take(messages.len()) { let retrieved_tx = retrieved_inbound_txs.get(&i.tx_id).unwrap(); @@ -214,34 +191,29 @@ pub fn test_db_backend(backend: T) { assert!(retrieved_tx.last_send_timestamp.is_none()); } - runtime.block_on(db.increment_send_count(inbound_txs[0].tx_id)).unwrap(); - let retrieved_inbound_tx = runtime - .block_on(db.get_pending_inbound_transaction(inbound_txs[0].tx_id)) - .unwrap(); + db.increment_send_count(inbound_txs[0].tx_id).unwrap(); + let retrieved_inbound_tx = db.get_pending_inbound_transaction(inbound_txs[0].tx_id).unwrap(); assert_eq!(retrieved_inbound_tx.send_count, 1); assert!(retrieved_inbound_tx.last_send_timestamp.is_some()); - let any_inbound_tx = runtime - .block_on(db.get_any_transaction(inbound_txs[0].tx_id)) - .unwrap() - .unwrap(); + let any_inbound_tx = db.get_any_transaction(inbound_txs[0].tx_id).unwrap().unwrap(); if let WalletTransaction::PendingInbound(tx) = any_inbound_tx { assert_eq!(tx, retrieved_inbound_tx); } else { panic!("Should have found inbound tx"); } - let inbound_pub_key = runtime - .block_on(db.get_pending_transaction_counterparty_pub_key_by_tx_id(inbound_txs[0].tx_id)) + let inbound_pub_key = db + .get_pending_transaction_counterparty_pub_key_by_tx_id(inbound_txs[0].tx_id) .unwrap(); assert_eq!(inbound_pub_key, inbound_txs[0].source_public_key); - assert!(runtime - .block_on(db.get_pending_transaction_counterparty_pub_key_by_tx_id(100u64.into())) + assert!(db + .get_pending_transaction_counterparty_pub_key_by_tx_id(100u64.into()) .is_err()); - let outbound_pub_key = runtime - .block_on(db.get_pending_transaction_counterparty_pub_key_by_tx_id(outbound_txs[0].tx_id)) + let outbound_pub_key = db + .get_pending_transaction_counterparty_pub_key_by_tx_id(outbound_txs[0].tx_id) .unwrap(); assert_eq!(outbound_pub_key, outbound_txs[0].destination_public_key); @@ -281,20 +253,16 @@ pub fn test_db_backend(backend: T) { mined_in_block: None, mined_timestamp: None, }); - runtime - .block_on(db.complete_outbound_transaction(outbound_txs[i].tx_id, completed_txs[i].clone())) - .unwrap(); - runtime - .block_on( - db.complete_inbound_transaction(inbound_txs[i].tx_id, CompletedTransaction { - tx_id: inbound_txs[i].tx_id, - ..completed_txs[i].clone() - }), - ) + db.complete_outbound_transaction(outbound_txs[i].tx_id, completed_txs[i].clone()) .unwrap(); + db.complete_inbound_transaction(inbound_txs[i].tx_id, CompletedTransaction { + tx_id: inbound_txs[i].tx_id, + ..completed_txs[i].clone() + }) + .unwrap(); } - let retrieved_completed_txs = runtime.block_on(db.get_completed_transactions()).unwrap(); + let retrieved_completed_txs = db.get_completed_transactions().unwrap(); assert_eq!(retrieved_completed_txs.len(), 2 * messages.len()); for i in 0..messages.len() { @@ -311,254 +279,165 @@ pub fn test_db_backend(backend: T) { ); } - runtime - .block_on(db.increment_send_count(completed_txs[0].tx_id)) - .unwrap(); - runtime - .block_on(db.increment_send_count(completed_txs[0].tx_id)) - .unwrap(); - let retrieved_completed_tx = runtime - .block_on(db.get_completed_transaction(completed_txs[0].tx_id)) - .unwrap(); + db.increment_send_count(completed_txs[0].tx_id).unwrap(); + db.increment_send_count(completed_txs[0].tx_id).unwrap(); + let retrieved_completed_tx = db.get_completed_transaction(completed_txs[0].tx_id).unwrap(); assert_eq!(retrieved_completed_tx.send_count, 2); assert!(retrieved_completed_tx.last_send_timestamp.is_some()); assert!(retrieved_completed_tx.confirmations.is_none()); - assert!(runtime.block_on(db.fetch_last_mined_transaction()).unwrap().is_none()); + assert!(db.fetch_last_mined_transaction().unwrap().is_none()); - runtime - .block_on(db.set_transaction_mined_height(completed_txs[0].tx_id, 10, FixedHash::zero(), 0, 5, true, false)) + db.set_transaction_mined_height(completed_txs[0].tx_id, 10, FixedHash::zero(), 0, 5, true, false) .unwrap(); assert_eq!( - runtime - .block_on(db.fetch_last_mined_transaction()) - .unwrap() - .unwrap() - .tx_id, + db.fetch_last_mined_transaction().unwrap().unwrap().tx_id, completed_txs[0].tx_id ); - let retrieved_completed_tx = runtime - .block_on(db.get_completed_transaction(completed_txs[0].tx_id)) - .unwrap(); + let retrieved_completed_tx = db.get_completed_transaction(completed_txs[0].tx_id).unwrap(); assert_eq!(retrieved_completed_tx.confirmations, Some(5)); - let any_completed_tx = runtime - .block_on(db.get_any_transaction(completed_txs[0].tx_id)) - .unwrap() - .unwrap(); + let any_completed_tx = db.get_any_transaction(completed_txs[0].tx_id).unwrap().unwrap(); if let WalletTransaction::Completed(tx) = any_completed_tx { assert_eq!(tx, retrieved_completed_tx); } else { panic!("Should have found completed tx"); } - let completed_txs_map = runtime.block_on(db.get_completed_transactions()).unwrap(); + let completed_txs_map = db.get_completed_transactions().unwrap(); let num_completed_txs = completed_txs_map.len(); - assert_eq!( - runtime - .block_on(db.get_cancelled_completed_transactions()) - .unwrap() - .len(), - 0 - ); + assert_eq!(db.get_cancelled_completed_transactions().unwrap().len(), 0); let cancelled_tx_id = completed_txs_map[&1u64.into()].tx_id; - assert!(runtime - .block_on(db.get_cancelled_completed_transaction(cancelled_tx_id)) - .is_err()); - runtime - .block_on(db.reject_completed_transaction(cancelled_tx_id, TxCancellationReason::Unknown)) + assert!(db.get_cancelled_completed_transaction(cancelled_tx_id).is_err()); + db.reject_completed_transaction(cancelled_tx_id, TxCancellationReason::Unknown) .unwrap(); - let completed_txs_map = runtime.block_on(db.get_completed_transactions()).unwrap(); + let completed_txs_map = db.get_completed_transactions().unwrap(); assert_eq!(completed_txs_map.len(), num_completed_txs - 1); - runtime - .block_on(db.get_cancelled_completed_transaction(cancelled_tx_id)) + db.get_cancelled_completed_transaction(cancelled_tx_id) .expect("Should find cancelled transaction"); - let mut cancelled_txs = runtime.block_on(db.get_cancelled_completed_transactions()).unwrap(); + let mut cancelled_txs = db.get_cancelled_completed_transactions().unwrap(); assert_eq!(cancelled_txs.len(), 1); assert!(cancelled_txs.remove(&cancelled_tx_id).is_some()); - let any_cancelled_completed_tx = runtime - .block_on(db.get_any_transaction(cancelled_tx_id)) - .unwrap() - .unwrap(); + let any_cancelled_completed_tx = db.get_any_transaction(cancelled_tx_id).unwrap().unwrap(); if let WalletTransaction::Completed(tx) = any_cancelled_completed_tx { assert_eq!(tx.tx_id, cancelled_tx_id); } else { panic!("Should have found cancelled completed tx"); } - runtime - .block_on(db.add_pending_inbound_transaction( + db.add_pending_inbound_transaction( + 999u64.into(), + InboundTransaction::new( 999u64.into(), - InboundTransaction::new( - 999u64.into(), - PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), - 22 * uT, - rtp, - TransactionStatus::Pending, - "To be cancelled".to_string(), - Utc::now().naive_utc(), - ), - )) - .unwrap(); + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + 22 * uT, + rtp, + TransactionStatus::Pending, + "To be cancelled".to_string(), + Utc::now().naive_utc(), + ), + ) + .unwrap(); - assert_eq!( - runtime - .block_on(db.get_cancelled_pending_inbound_transactions()) - .unwrap() - .len(), - 0 - ); + assert_eq!(db.get_cancelled_pending_inbound_transactions().unwrap().len(), 0); - assert_eq!( - runtime.block_on(db.get_pending_inbound_transactions()).unwrap().len(), - 1 - ); + assert_eq!(db.get_pending_inbound_transactions().unwrap().len(), 1); assert!( - !runtime - .block_on(db.get_pending_inbound_transaction(999u64.into())) + !db.get_pending_inbound_transaction(999u64.into()) .unwrap() .direct_send_success ); - runtime.block_on(db.mark_direct_send_success(999u64.into())).unwrap(); + db.mark_direct_send_success(999u64.into()).unwrap(); assert!( - runtime - .block_on(db.get_pending_inbound_transaction(999u64.into())) + db.get_pending_inbound_transaction(999u64.into()) .unwrap() .direct_send_success ); - assert!(runtime - .block_on(db.get_cancelled_pending_inbound_transaction(999u64.into())) - .is_err()); - runtime.block_on(db.cancel_pending_transaction(999u64.into())).unwrap(); - runtime - .block_on(db.get_cancelled_pending_inbound_transaction(999u64.into())) + assert!(db.get_cancelled_pending_inbound_transaction(999u64.into()).is_err()); + db.cancel_pending_transaction(999u64.into()).unwrap(); + db.get_cancelled_pending_inbound_transaction(999u64.into()) .expect("Should find cancelled inbound tx"); - assert_eq!( - runtime - .block_on(db.get_cancelled_pending_inbound_transactions()) - .unwrap() - .len(), - 1 - ); + assert_eq!(db.get_cancelled_pending_inbound_transactions().unwrap().len(), 1); - assert_eq!( - runtime.block_on(db.get_pending_inbound_transactions()).unwrap().len(), - 0 - ); + assert_eq!(db.get_pending_inbound_transactions().unwrap().len(), 0); - let any_cancelled_inbound_tx = runtime - .block_on(db.get_any_transaction(999u64.into())) - .unwrap() - .unwrap(); + let any_cancelled_inbound_tx = db.get_any_transaction(999u64.into()).unwrap().unwrap(); if let WalletTransaction::PendingInbound(tx) = any_cancelled_inbound_tx { assert_eq!(tx.tx_id, TxId::from(999u64)); } else { panic!("Should have found cancelled inbound tx"); } - let mut cancelled_txs = runtime - .block_on(db.get_cancelled_pending_inbound_transactions()) - .unwrap(); + let mut cancelled_txs = db.get_cancelled_pending_inbound_transactions().unwrap(); assert_eq!(cancelled_txs.len(), 1); assert!(cancelled_txs.remove(&999u64.into()).is_some()); - runtime - .block_on(db.add_pending_outbound_transaction( + db.add_pending_outbound_transaction( + 998u64.into(), + OutboundTransaction::new( 998u64.into(), - OutboundTransaction::new( - 998u64.into(), - PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), - 22 * uT, - stp.get_fee_amount().unwrap(), - stp, - TransactionStatus::Pending, - "To be cancelled".to_string(), - Utc::now().naive_utc(), - false, - ), - )) - .unwrap(); + PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + 22 * uT, + stp.get_fee_amount().unwrap(), + stp, + TransactionStatus::Pending, + "To be cancelled".to_string(), + Utc::now().naive_utc(), + false, + ), + ) + .unwrap(); assert!( - !runtime - .block_on(db.get_pending_outbound_transaction(998u64.into())) + !db.get_pending_outbound_transaction(998u64.into()) .unwrap() .direct_send_success ); - runtime.block_on(db.mark_direct_send_success(998u64.into())).unwrap(); + db.mark_direct_send_success(998u64.into()).unwrap(); assert!( - runtime - .block_on(db.get_pending_outbound_transaction(998u64.into())) + db.get_pending_outbound_transaction(998u64.into()) .unwrap() .direct_send_success ); - assert_eq!( - runtime - .block_on(db.get_cancelled_pending_outbound_transactions()) - .unwrap() - .len(), - 0 - ); + assert_eq!(db.get_cancelled_pending_outbound_transactions().unwrap().len(), 0); - assert_eq!( - runtime.block_on(db.get_pending_outbound_transactions()).unwrap().len(), - 1 - ); + assert_eq!(db.get_pending_outbound_transactions().unwrap().len(), 1); - assert!(runtime - .block_on(db.get_cancelled_pending_outbound_transaction(998u64.into())) - .is_err()); + assert!(db.get_cancelled_pending_outbound_transaction(998u64.into()).is_err()); - runtime.block_on(db.cancel_pending_transaction(998u64.into())).unwrap(); - runtime - .block_on(db.get_cancelled_pending_outbound_transaction(998u64.into())) + db.cancel_pending_transaction(998u64.into()).unwrap(); + db.get_cancelled_pending_outbound_transaction(998u64.into()) .expect("Should find cancelled outbound tx"); - assert_eq!( - runtime - .block_on(db.get_cancelled_pending_outbound_transactions()) - .unwrap() - .len(), - 1 - ); + assert_eq!(db.get_cancelled_pending_outbound_transactions().unwrap().len(), 1); - assert_eq!( - runtime.block_on(db.get_pending_outbound_transactions()).unwrap().len(), - 0 - ); + assert_eq!(db.get_pending_outbound_transactions().unwrap().len(), 0); - let mut cancelled_txs = runtime - .block_on(db.get_cancelled_pending_outbound_transactions()) - .unwrap(); + let mut cancelled_txs = db.get_cancelled_pending_outbound_transactions().unwrap(); assert_eq!(cancelled_txs.len(), 1); assert!(cancelled_txs.remove(&998u64.into()).is_some()); - let any_cancelled_outbound_tx = runtime - .block_on(db.get_any_transaction(998u64.into())) - .unwrap() - .unwrap(); + let any_cancelled_outbound_tx = db.get_any_transaction(998u64.into()).unwrap().unwrap(); if let WalletTransaction::PendingOutbound(tx) = any_cancelled_outbound_tx { assert_eq!(tx.tx_id, TxId::from(998u64)); } else { panic!("Should have found cancelled outbound tx"); } - let unmined_txs = runtime.block_on(db.fetch_unconfirmed_transactions_info()).unwrap(); + let unmined_txs = db.fetch_unconfirmed_transactions_info().unwrap(); assert_eq!(unmined_txs.len(), 4); - runtime - .block_on(db.set_transaction_as_unmined(completed_txs[0].tx_id)) - .unwrap(); + db.set_transaction_as_unmined(completed_txs[0].tx_id).unwrap(); - let unmined_txs = runtime.block_on(db.fetch_unconfirmed_transactions_info()).unwrap(); + let unmined_txs = db.fetch_unconfirmed_transactions_info().unwrap(); assert_eq!(unmined_txs.len(), 5); } diff --git a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index 9e1db00891..eb4c040aad 100644 --- a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -181,7 +181,7 @@ pub async fn add_transaction_to_database( ) { let factories = CryptoFactories::default(); let (_utxo, uo0) = make_input(&mut OsRng, 10 * amount, &factories.commitment).await; - let (txs1, _uou1) = schema_to_transaction(&[txn_schema!(from: vec![uo0.clone()], to: vec![amount])]); + let (txs1, _uou1) = schema_to_transaction(&[txn_schema!(from: vec![uo0], to: vec![amount])]); let tx1 = (*txs1[0]).clone(); let completed_tx1 = CompletedTransaction::new( tx_id, @@ -189,7 +189,7 @@ pub async fn add_transaction_to_database( CommsPublicKey::default(), amount, 200 * uT, - tx1.clone(), + tx1, status.unwrap_or(TransactionStatus::Completed), "Test".to_string(), Utc::now().naive_local(), @@ -198,7 +198,7 @@ pub async fn add_transaction_to_database( None, None, ); - db.insert_completed_transaction(tx_id, completed_tx1).await.unwrap(); + db.insert_completed_transaction(tx_id, completed_tx1).unwrap(); } /// Simple task that responds with a OutputManagerResponse::TransactionCancelled response to any request made on this @@ -254,7 +254,7 @@ async fn tx_broadcast_protocol_submit_success() { add_transaction_to_database(1u64.into(), 1 * T, None, None, resources.db.clone()).await; - let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).await.unwrap(); + let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).unwrap(); assert!(db_completed_tx.confirmations.is_none()); let protocol = TransactionBroadcastProtocol::new(1u64.into(), resources.clone(), timeout_watch.get_receiver()); @@ -352,7 +352,7 @@ async fn tx_broadcast_protocol_submit_rejection() { } // Check transaction is cancelled in db - let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).await; + let db_completed_tx = resources.db.get_completed_transaction(1u64.into()); assert!(db_completed_tx.is_err()); // Check that the appropriate events were emitted @@ -461,7 +461,7 @@ async fn tx_broadcast_protocol_restart_protocol_as_query() { assert_eq!(result.unwrap(), TxId::from(1u64)); // Check transaction status is updated - let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).await.unwrap(); + let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).unwrap(); assert_eq!(db_completed_tx.status, TransactionStatus::Broadcast); } @@ -535,7 +535,7 @@ async fn tx_broadcast_protocol_submit_success_followed_by_rejection() { } // Check transaction is cancelled in db - let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).await; + let db_completed_tx = resources.db.get_completed_transaction(1u64.into()); assert!(db_completed_tx.is_err()); // Check that the appropriate events were emitted @@ -621,7 +621,7 @@ async fn tx_broadcast_protocol_submit_already_mined() { assert_eq!(result.unwrap(), 1); // Check transaction status is updated - let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).await.unwrap(); + let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).unwrap(); assert_eq!(db_completed_tx.status, TransactionStatus::Completed); } @@ -719,7 +719,7 @@ async fn tx_broadcast_protocol_submit_and_base_node_gets_changed() { assert_eq!(result.unwrap(), TxId::from(1u64)); // Check transaction status is updated - let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).await.unwrap(); + let db_completed_tx = resources.db.get_completed_transaction(1u64.into()).unwrap(); assert_eq!(db_completed_tx.status, TransactionStatus::Broadcast); } @@ -761,7 +761,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { ) .await; - let tx2 = resources.db.get_completed_transaction(2u64.into()).await.unwrap(); + let tx2 = resources.db.get_completed_transaction(2u64.into()).unwrap(); let transaction_query_batch_responses = vec![TxQueryBatchResponseProto { signature: Some(SignatureProto::from( @@ -797,7 +797,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let result = join_handle.await.unwrap(); assert!(result.is_ok()); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( completed_txs.get(&1u64.into()).unwrap().status, @@ -825,7 +825,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let result = join_handle.await.unwrap(); assert!(result.is_ok()); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( completed_txs.get(&1u64.into()).unwrap().status, @@ -871,7 +871,7 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { let result = join_handle.await.unwrap(); assert!(result.is_ok()); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( completed_txs.get(&2u64.into()).unwrap().status, @@ -917,7 +917,7 @@ async fn tx_revalidation() { ) .await; - let tx2 = resources.db.get_completed_transaction(2u64.into()).await.unwrap(); + let tx2 = resources.db.get_completed_transaction(2u64.into()).unwrap(); // set tx2 as fully mined let transaction_query_batch_responses = vec![TxQueryBatchResponseProto { @@ -954,7 +954,7 @@ async fn tx_revalidation() { let result = join_handle.await.unwrap(); assert!(result.is_ok()); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( completed_txs.get(&2u64.into()).unwrap().status, @@ -983,8 +983,8 @@ async fn tx_revalidation() { rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); // revalidate sets all to unvalidated, so lets check that thay are - resources.db.mark_all_transactions_as_unvalidated().await.unwrap(); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + resources.db.mark_all_transactions_as_unvalidated().unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( completed_txs.get(&2u64.into()).unwrap().status, TransactionStatus::MinedConfirmed @@ -1005,7 +1005,7 @@ async fn tx_revalidation() { let result = join_handle.await.unwrap(); assert!(result.is_ok()); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); // data should now be updated and changed assert_eq!( completed_txs.get(&2u64.into()).unwrap().status, @@ -1073,13 +1073,13 @@ async fn tx_validation_protocol_reorg() { } rpc_service_state.set_blocks(block_headers.clone()); - let tx1 = resources.db.get_completed_transaction(1u64.into()).await.unwrap(); - let tx2 = resources.db.get_completed_transaction(2u64.into()).await.unwrap(); - let tx3 = resources.db.get_completed_transaction(3u64.into()).await.unwrap(); - let tx4 = resources.db.get_completed_transaction(4u64.into()).await.unwrap(); - let tx5 = resources.db.get_completed_transaction(5u64.into()).await.unwrap(); - let coinbase_tx1 = resources.db.get_completed_transaction(6u64.into()).await.unwrap(); - let coinbase_tx2 = resources.db.get_completed_transaction(7u64.into()).await.unwrap(); + let tx1 = resources.db.get_completed_transaction(1u64.into()).unwrap(); + let tx2 = resources.db.get_completed_transaction(2u64.into()).unwrap(); + let tx3 = resources.db.get_completed_transaction(3u64.into()).unwrap(); + let tx4 = resources.db.get_completed_transaction(4u64.into()).unwrap(); + let tx5 = resources.db.get_completed_transaction(5u64.into()).unwrap(); + let coinbase_tx1 = resources.db.get_completed_transaction(6u64.into()).unwrap(); + let coinbase_tx2 = resources.db.get_completed_transaction(7u64.into()).unwrap(); let transaction_query_batch_responses = vec![ TxQueryBatchResponseProto { @@ -1177,7 +1177,7 @@ async fn tx_validation_protocol_reorg() { let result = join_handle.await.unwrap(); assert!(result.is_ok()); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); let mut unconfirmed_count = 0; let mut confirmed_count = 0; for tx in completed_txs.values() { @@ -1296,7 +1296,7 @@ async fn tx_validation_protocol_reorg() { assert_eq!(rpc_service_state.take_get_header_by_height_calls().len(), 0); - let completed_txs = resources.db.get_completed_transactions().await.unwrap(); + let completed_txs = resources.db.get_completed_transactions().unwrap(); assert_eq!( completed_txs.get(&4u64.into()).unwrap().status, TransactionStatus::Completed @@ -1317,7 +1317,7 @@ async fn tx_validation_protocol_reorg() { completed_txs.get(&7u64.into()).unwrap().status, TransactionStatus::Coinbase ); - let cancelled_completed_txs = resources.db.get_cancelled_completed_transactions().await.unwrap(); + let cancelled_completed_txs = resources.db.get_cancelled_completed_transactions().unwrap(); assert!(matches!( cancelled_completed_txs.get(&6u64.into()).unwrap().cancelled, diff --git a/base_layer/wallet_ffi/src/callback_handler.rs b/base_layer/wallet_ffi/src/callback_handler.rs index d606f6ea71..4533ef0637 100644 --- a/base_layer/wallet_ffi/src/callback_handler.rs +++ b/base_layer/wallet_ffi/src/callback_handler.rs @@ -235,15 +235,15 @@ where TBackend: TransactionBackend + 'static trace!(target: LOG_TARGET, "Transaction Service Callback Handler event {:?}", msg); match (*msg).clone() { TransactionEvent::ReceivedTransaction(tx_id) => { - self.receive_transaction_event(tx_id).await; + self.receive_transaction_event(tx_id); self.trigger_balance_refresh().await; }, TransactionEvent::ReceivedTransactionReply(tx_id) => { - self.receive_transaction_reply_event(tx_id).await; + self.receive_transaction_reply_event(tx_id); self.trigger_balance_refresh().await; }, TransactionEvent::ReceivedFinalizedTransaction(tx_id) => { - self.receive_finalized_transaction_event(tx_id).await; + self.receive_finalized_transaction_event(tx_id); self.trigger_balance_refresh().await; }, TransactionEvent::TransactionSendResult(tx_id, status) => { @@ -251,27 +251,27 @@ where TBackend: TransactionBackend + 'static self.trigger_balance_refresh().await; }, TransactionEvent::TransactionCancelled(tx_id, reason) => { - self.receive_transaction_cancellation(tx_id, reason as u64).await; + self.receive_transaction_cancellation(tx_id, reason as u64); self.trigger_balance_refresh().await; }, TransactionEvent::TransactionBroadcast(tx_id) => { - self.receive_transaction_broadcast_event(tx_id).await; + self.receive_transaction_broadcast_event(tx_id); self.trigger_balance_refresh().await; }, TransactionEvent::TransactionMined{tx_id, is_valid: _} => { - self.receive_transaction_mined_event(tx_id).await; + self.receive_transaction_mined_event(tx_id); self.trigger_balance_refresh().await; }, TransactionEvent::TransactionMinedUnconfirmed{tx_id, num_confirmations, is_valid: _} => { - self.receive_transaction_mined_unconfirmed_event(tx_id, num_confirmations).await; + self.receive_transaction_mined_unconfirmed_event(tx_id, num_confirmations); self.trigger_balance_refresh().await; }, TransactionEvent::FauxTransactionConfirmed{tx_id, is_valid: _} => { - self.receive_faux_transaction_confirmed_event(tx_id).await; + self.receive_faux_transaction_confirmed_event(tx_id); self.trigger_balance_refresh().await; }, TransactionEvent::FauxTransactionUnconfirmed{tx_id, num_confirmations, is_valid: _} => { - self.receive_faux_transaction_unconfirmed_event(tx_id, num_confirmations).await; + self.receive_faux_transaction_unconfirmed_event(tx_id, num_confirmations); self.trigger_balance_refresh().await; }, TransactionEvent::TransactionValidationStateChanged(_request_key) => { @@ -358,8 +358,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_transaction_event(&mut self, tx_id: TxId) { - match self.db.get_pending_inbound_transaction(tx_id).await { + fn receive_transaction_event(&mut self, tx_id: TxId) { + match self.db.get_pending_inbound_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -377,8 +377,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_transaction_reply_event(&mut self, tx_id: TxId) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_transaction_reply_event(&mut self, tx_id: TxId) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -393,8 +393,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_finalized_transaction_event(&mut self, tx_id: TxId) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_finalized_transaction_event(&mut self, tx_id: TxId) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -458,15 +458,15 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_transaction_cancellation(&mut self, tx_id: TxId, reason: u64) { + fn receive_transaction_cancellation(&mut self, tx_id: TxId, reason: u64) { let mut transaction = None; - if let Ok(tx) = self.db.get_cancelled_completed_transaction(tx_id).await { + if let Ok(tx) = self.db.get_cancelled_completed_transaction(tx_id) { transaction = Some(tx); - } else if let Ok(tx) = self.db.get_cancelled_pending_outbound_transaction(tx_id).await { + } else if let Ok(tx) = self.db.get_cancelled_pending_outbound_transaction(tx_id) { let mut outbound_tx = CompletedTransaction::from(tx); outbound_tx.source_public_key = self.comms_public_key.clone(); transaction = Some(outbound_tx); - } else if let Ok(tx) = self.db.get_cancelled_pending_inbound_transaction(tx_id).await { + } else if let Ok(tx) = self.db.get_cancelled_pending_inbound_transaction(tx_id) { let mut inbound_tx = CompletedTransaction::from(tx); inbound_tx.destination_public_key = self.comms_public_key.clone(); transaction = Some(inbound_tx); @@ -491,8 +491,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_transaction_broadcast_event(&mut self, tx_id: TxId) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_transaction_broadcast_event(&mut self, tx_id: TxId) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -507,8 +507,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_transaction_mined_event(&mut self, tx_id: TxId) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_transaction_mined_event(&mut self, tx_id: TxId) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -523,8 +523,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_transaction_mined_unconfirmed_event(&mut self, tx_id: TxId, confirmations: u64) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_transaction_mined_unconfirmed_event(&mut self, tx_id: TxId, confirmations: u64) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -539,8 +539,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_faux_transaction_confirmed_event(&mut self, tx_id: TxId) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_faux_transaction_confirmed_event(&mut self, tx_id: TxId) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, @@ -555,8 +555,8 @@ where TBackend: TransactionBackend + 'static } } - async fn receive_faux_transaction_unconfirmed_event(&mut self, tx_id: TxId, confirmations: u64) { - match self.db.get_completed_transaction(tx_id).await { + fn receive_faux_transaction_unconfirmed_event(&mut self, tx_id: TxId, confirmations: u64) { + match self.db.get_completed_transaction(tx_id) { Ok(tx) => { debug!( target: LOG_TARGET, diff --git a/base_layer/wallet_ffi/src/callback_handler_tests.rs b/base_layer/wallet_ffi/src/callback_handler_tests.rs index 7e14b2bf1a..9448dbd422 100644 --- a/base_layer/wallet_ffi/src/callback_handler_tests.rs +++ b/base_layer/wallet_ffi/src/callback_handler_tests.rs @@ -247,8 +247,7 @@ mod test { "1".to_string(), Utc::now().naive_utc(), ); - runtime - .block_on(db.add_pending_inbound_transaction(1u64.into(), inbound_tx.clone())) + db.add_pending_inbound_transaction(1u64.into(), inbound_tx.clone()) .unwrap(); let completed_tx = CompletedTransaction::new( @@ -272,8 +271,7 @@ mod test { None, None, ); - runtime - .block_on(db.insert_completed_transaction(2u64.into(), completed_tx.clone())) + db.insert_completed_transaction(2u64.into(), completed_tx.clone()) .unwrap(); let stp = SenderTransactionProtocol::new_placeholder(); @@ -288,29 +286,25 @@ mod test { Utc::now().naive_utc(), false, ); - runtime - .block_on(db.add_pending_outbound_transaction(3u64.into(), outbound_tx.clone())) + db.add_pending_outbound_transaction(3u64.into(), outbound_tx.clone()) .unwrap(); - runtime.block_on(db.cancel_pending_transaction(3u64.into())).unwrap(); + db.cancel_pending_transaction(3u64.into()).unwrap(); let inbound_tx_cancelled = InboundTransaction { tx_id: 4u64.into(), ..inbound_tx.clone() }; - runtime - .block_on(db.add_pending_inbound_transaction(4u64.into(), inbound_tx_cancelled)) + db.add_pending_inbound_transaction(4u64.into(), inbound_tx_cancelled) .unwrap(); - runtime.block_on(db.cancel_pending_transaction(4u64.into())).unwrap(); + db.cancel_pending_transaction(4u64.into()).unwrap(); let completed_tx_cancelled = CompletedTransaction { tx_id: 5u64.into(), ..completed_tx.clone() }; - runtime - .block_on(db.insert_completed_transaction(5u64.into(), completed_tx_cancelled.clone())) + db.insert_completed_transaction(5u64.into(), completed_tx_cancelled.clone()) .unwrap(); - runtime - .block_on(db.reject_completed_transaction(5u64.into(), TxCancellationReason::Unknown)) + db.reject_completed_transaction(5u64.into(), TxCancellationReason::Unknown) .unwrap(); let faux_unconfirmed_tx = CompletedTransaction::new( @@ -334,8 +328,7 @@ mod test { Some(2), Some(NaiveDateTime::from_timestamp(0, 0)), ); - runtime - .block_on(db.insert_completed_transaction(6u64.into(), faux_unconfirmed_tx.clone())) + db.insert_completed_transaction(6u64.into(), faux_unconfirmed_tx.clone()) .unwrap(); let faux_confirmed_tx = CompletedTransaction::new( @@ -359,8 +352,7 @@ mod test { Some(5), Some(NaiveDateTime::from_timestamp(0, 0)), ); - runtime - .block_on(db.insert_completed_transaction(7u64.into(), faux_confirmed_tx.clone())) + db.insert_completed_transaction(7u64.into(), faux_confirmed_tx.clone()) .unwrap(); let (transaction_event_sender, transaction_event_receiver) = broadcast::channel(20);