From ac42d5e0aacbed439438c823c910fd90f1b63adc Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 25 Oct 2021 18:10:05 +0200 Subject: [PATCH 1/2] Optimize get transactions query Optimized the get transactiosn query for transactions that need to be broadcast/rebroadcast by sending a single diesel sql query that only returns the result, isntead of multiple queries that return all the transactions in the database with filtering and selection in the Rust code. --- .../wallet/src/transaction_service/service.rs | 25 ++--- .../transaction_service/storage/database.rs | 7 ++ .../transaction_service/storage/sqlite_db.rs | 106 ++++++++++++++++++ .../tests/output_manager_service/service.rs | 17 +++ 4 files changed, 142 insertions(+), 13 deletions(-) diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index b77d9d5957..cbd5601de3 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -1625,7 +1625,7 @@ where } trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols"); - self.broadcast_all_completed_transactions(broadcast_join_handles) + self.broadcast_selected_completed_transactions(broadcast_join_handles) .await .map_err(|resp| { error!( @@ -1682,22 +1682,21 @@ where Ok(()) } - /// Go through all completed transactions that have not yet been broadcast and broadcast all of them to the base + /// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base /// node. - async fn broadcast_all_completed_transactions( + async fn broadcast_selected_completed_transactions( &mut self, join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { - trace!(target: LOG_TARGET, "Attempting to Broadcast all Completed Transactions"); - let completed_txs = self.db.get_completed_transactions().await?; - for (_, completed_tx) in completed_txs { - if completed_tx.valid && - (completed_tx.status == TransactionStatus::Completed || - completed_tx.status == TransactionStatus::Broadcast) && - !completed_tx.is_coinbase() - { - self.broadcast_completed_transaction(completed_tx, join_handles).await?; - } + trace!( + target: LOG_TARGET, + "Attempting to Broadcast all valid and not cancelled Completed Transactions with status 'Completed' and \ + 'Broadcast'" + ); + let txn_list = self.db.get_transactions_to_be_broadcast().await?; + for completed_txn in txn_list { + self.broadcast_completed_transaction(completed_txn, join_handles) + .await?; } Ok(()) diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index 0ed8840538..56e5c8a454 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -56,6 +56,8 @@ pub trait TransactionBackend: Send + Sync + Clone { fn fetch_unconfirmed_transactions(&self) -> Result, TransactionStorageError>; + fn get_transactions_to_be_broadcast(&self) -> Result, TransactionStorageError>; + /// Check if a record with the provided key exists in the backend. fn contains(&self, key: &DbKey) -> Result; /// Modify the state the of the backend with a write operation @@ -424,6 +426,11 @@ where T: TransactionBackend + 'static self.db.fetch_unconfirmed_transactions() } + /// This method returns all completed transactions that must be re-broadcast + pub async fn get_transactions_to_be_broadcast(&self) -> Result, TransactionStorageError> { + self.db.get_transactions_to_be_broadcast() + } + pub async fn get_completed_transaction_cancelled_or_not( &self, tx_id: TxId, diff --git a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs index 0def1078b8..6396fd1f76 100644 --- a/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs +++ b/base_layer/wallet/src/transaction_service/storage/sqlite_db.rs @@ -1034,6 +1034,42 @@ impl TransactionBackend for TransactionServiceSqliteDatabase { Ok(result) } + fn get_transactions_to_be_broadcast(&self) -> Result, TransactionStorageError> { + let start = Instant::now(); + let conn = self.database_connection.acquire_lock(); + let acquire_lock = start.elapsed(); + let txs = completed_transactions::table + .filter(completed_transactions::valid.eq(true as i32)) + .filter( + completed_transactions::status + .eq(TransactionStatus::Completed as i32) + .or(completed_transactions::status.eq(TransactionStatus::Broadcast as i32)), + ) + .filter( + completed_transactions::coinbase_block_height + .is_null() + .or(completed_transactions::coinbase_block_height.eq(0)), + ) + .filter(completed_transactions::cancelled.eq(false as i32)) + .order_by(completed_transactions::tx_id) + .load::(&*conn)?; + + let mut result = vec![]; + for mut tx in txs { + self.decrypt_if_necessary(&mut tx)?; + result.push(tx.try_into()?); + } + trace!( + target: LOG_TARGET, + "sqlite profile - get_transactions_to_be_broadcast: lock {} + db_op {} = {} ms", + acquire_lock.as_millis(), + (start.elapsed() - acquire_lock).as_millis(), + start.elapsed().as_millis() + ); + + Ok(result) + } + fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> { let start = Instant::now(); let conn = self.database_connection.acquire_lock(); @@ -2362,4 +2398,74 @@ mod test { assert!(db3.fetch(&DbKey::PendingOutboundTransactions).is_ok()); assert!(db3.fetch(&DbKey::CompletedTransactions).is_ok()); } + + #[test] + fn test_get_tranactions_to_be_rebroadcast() { + let db_name = format!("{}.sqlite3", string(8).as_str()); + let temp_dir = tempdir().unwrap(); + let db_folder = temp_dir.path().to_str().unwrap().to_string(); + let db_path = format!("{}{}", db_folder, db_name); + + embed_migrations!("./migrations"); + let conn = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path)); + + embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed"); + + for i in 0..1000 { + let (valid, cancelled, status, coinbase_block_height) = match i % 13 { + 0 => (true, i % 3 == 0, TransactionStatus::Completed, None), + 1 => (true, i % 5 == 0, TransactionStatus::Broadcast, None), + 2 => (true, i % 7 == 0, TransactionStatus::Completed, Some(i % 2)), + 3 => (true, i % 11 == 0, TransactionStatus::Broadcast, Some(i % 2)), + 4 => (i % 13 == 0, false, TransactionStatus::Completed, None), + 5 => (i % 17 == 0, false, TransactionStatus::Broadcast, None), + 6 => (true, false, TransactionStatus::Pending, None), + 7 => (true, false, TransactionStatus::Coinbase, None), + 8 => (true, false, TransactionStatus::MinedUnconfirmed, None), + 9 => (true, false, TransactionStatus::Imported, None), + 10 => (true, false, TransactionStatus::MinedConfirmed, None), + _ => (true, false, TransactionStatus::Completed, Some(i)), + }; + let completed_tx = CompletedTransaction { + tx_id: i, + source_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + destination_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)), + amount: MicroTari::from(100), + fee: MicroTari::from(100), + transaction: Transaction::new( + vec![], + vec![], + vec![], + PrivateKey::random(&mut OsRng), + PrivateKey::random(&mut OsRng), + ), + status, + message: "Yo!".to_string(), + timestamp: Utc::now().naive_utc(), + cancelled, + direction: TransactionDirection::Unknown, + coinbase_block_height, + send_count: 0, + last_send_timestamp: None, + valid, + confirmations: None, + mined_height: None, + mined_in_block: None, + }; + let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap(); + completed_tx_sql.commit(&conn).unwrap(); + } + + let connection = WalletDbConnection::new(conn, None); + let db1 = TransactionServiceSqliteDatabase::new(connection, None); + + let txn_list = db1.get_transactions_to_be_broadcast().unwrap(); + assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185); + for txn in &txn_list { + assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast); + assert!(txn.valid); + assert!(!txn.cancelled); + assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0)); + } + } } diff --git a/base_layer/wallet/tests/output_manager_service/service.rs b/base_layer/wallet/tests/output_manager_service/service.rs index d1a04f0298..b107501e46 100644 --- a/base_layer/wallet/tests/output_manager_service/service.rs +++ b/base_layer/wallet/tests/output_manager_service/service.rs @@ -1381,6 +1381,23 @@ async fn test_txo_validation() { .await .unwrap(); + // This is needed on a fast computer, otherwise the balance have not been updated correctly yet with the next step + let mut event_stream = oms.get_event_stream(); + let delay = sleep(Duration::from_secs(10)); + tokio::pin!(delay); + loop { + tokio::select! { + event = event_stream.recv() => { + if let OutputManagerEvent::TxoValidationSuccess(_) = &*event.unwrap(){ + break; + } + }, + () = &mut delay => { + break; + }, + } + } + let balance = oms.get_balance().await.unwrap(); assert_eq!( balance.available_balance, From 2e52b13e3fa0e680857676dafe0f9e358aac3461 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Tue, 26 Oct 2021 09:57:31 +0200 Subject: [PATCH 2/2] Applied review comments --- base_layer/wallet/src/transaction_service/service.rs | 8 +++++--- .../wallet/src/transaction_service/storage/database.rs | 2 +- integration_tests/features/WalletCli.feature | 2 +- integration_tests/features/support/steps.js | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index cbd5601de3..48e1b579e2 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -1625,12 +1625,14 @@ where } trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols"); - self.broadcast_selected_completed_transactions(broadcast_join_handles) + self.broadcast_completed_and_broadcast_transactions(broadcast_join_handles) .await .map_err(|resp| { error!( target: LOG_TARGET, - "Error broadcasting all completed transactions: {:?}", resp + "Error broadcasting all valid and not cancelled Completed Transactions with status 'Completed' \ + and 'Broadcast': {:?}", + resp ); resp })?; @@ -1684,7 +1686,7 @@ where /// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base /// node. - async fn broadcast_selected_completed_transactions( + async fn broadcast_completed_and_broadcast_transactions( &mut self, join_handles: &mut FuturesUnordered>>, ) -> Result<(), TransactionServiceError> { diff --git a/base_layer/wallet/src/transaction_service/storage/database.rs b/base_layer/wallet/src/transaction_service/storage/database.rs index 56e5c8a454..754754b845 100644 --- a/base_layer/wallet/src/transaction_service/storage/database.rs +++ b/base_layer/wallet/src/transaction_service/storage/database.rs @@ -426,7 +426,7 @@ where T: TransactionBackend + 'static self.db.fetch_unconfirmed_transactions() } - /// This method returns all completed transactions that must be re-broadcast + /// This method returns all completed transactions that must be broadcast pub async fn get_transactions_to_be_broadcast(&self) -> Result, TransactionStorageError> { self.db.get_transactions_to_be_broadcast() } diff --git a/integration_tests/features/WalletCli.feature b/integration_tests/features/WalletCli.feature index 4a7b4021fa..bdbd501980 100644 --- a/integration_tests/features/WalletCli.feature +++ b/integration_tests/features/WalletCli.feature @@ -81,7 +81,7 @@ Feature: Wallet CLI # TODO: base node connection. And I wait 30 seconds And I stop wallet SENDER - And I make it rain from wallet SENDER 1 tx / sec 10 sec 8000 uT 100 increment to RECEIVER via command line + And I make it rain from wallet SENDER 1 tx per sec 10 sec 8000 uT 100 increment to RECEIVER via command line Then wallet SENDER has at least 10 transactions that are all TRANSACTION_STATUS_BROADCAST and valid Then wallet RECEIVER has at least 10 transactions that are all TRANSACTION_STATUS_BROADCAST and valid And mining node MINE mines 5 blocks diff --git a/integration_tests/features/support/steps.js b/integration_tests/features/support/steps.js index 944139d388..fc372d8004 100644 --- a/integration_tests/features/support/steps.js +++ b/integration_tests/features/support/steps.js @@ -3547,7 +3547,7 @@ When( ); Then( - "I make it rain from wallet {word} {int} tx / sec {int} sec {int} uT {int} increment to {word} via command line", + "I make it rain from wallet {word} {int} tx per sec {int} sec {int} uT {int} increment to {word} via command line", { timeout: 300 * 1000 }, async function (sender, freq, duration, amount, amount_inc, receiver) { let wallet = this.getWallet(sender);