From f418d73eefbdca10c3adcea8c597df627368cef6 Mon Sep 17 00:00:00 2001 From: Martin Stefcek <35243812+Cifko@users.noreply.github.com> Date: Thu, 8 Dec 2022 05:18:47 -0500 Subject: [PATCH] feat: revalidate invalid utxo (#5020) Description --- Add `last_validation_timestamp` for `Invalid`/`CancelledInbound` transactions. Configurable interval for revalidation `num_of_seconds_to_revalidate_invalid_utxos`. Default is 3 days. When set to zero, the revalidation will be run every time. When revalidation failes, the `last_validation_timestamp` is updated. How Has This Been Tested? --- Partially (revalidation to valid tx). --- .../migrations/2022-12-07-110000/down.sql | 1 + .../migrations/2022-12-07-110000/up.sql | 1 + .../src/output_manager_service/config.rs | 3 + .../storage/database/backend.rs | 3 + .../storage/database/mod.rs | 11 ++++ .../storage/sqlite_db/mod.rs | 61 ++++++++++++++++- .../storage/sqlite_db/output_sql.rs | 20 ++++++ .../tasks/txo_validation_task.rs | 65 +++++++++++++++++++ base_layer/wallet/src/schema.rs | 1 + common/config/presets/d_console_wallet.toml | 4 ++ 10 files changed, 168 insertions(+), 2 deletions(-) create mode 100644 base_layer/wallet/migrations/2022-12-07-110000/down.sql create mode 100644 base_layer/wallet/migrations/2022-12-07-110000/up.sql diff --git a/base_layer/wallet/migrations/2022-12-07-110000/down.sql b/base_layer/wallet/migrations/2022-12-07-110000/down.sql new file mode 100644 index 0000000000..8d31ef38ed --- /dev/null +++ b/base_layer/wallet/migrations/2022-12-07-110000/down.sql @@ -0,0 +1 @@ +ALTER TABLE outputs DROP COLUMN last_validation_timestamp; diff --git a/base_layer/wallet/migrations/2022-12-07-110000/up.sql b/base_layer/wallet/migrations/2022-12-07-110000/up.sql new file mode 100644 index 0000000000..229cbcf496 --- /dev/null +++ b/base_layer/wallet/migrations/2022-12-07-110000/up.sql @@ -0,0 +1 @@ +ALTER TABLE outputs ADD last_validation_timestamp DATETIME NULL; diff --git a/base_layer/wallet/src/output_manager_service/config.rs b/base_layer/wallet/src/output_manager_service/config.rs index d08434de01..66c215e416 100644 --- a/base_layer/wallet/src/output_manager_service/config.rs +++ b/base_layer/wallet/src/output_manager_service/config.rs @@ -41,6 +41,8 @@ pub struct OutputManagerServiceConfig { /// If set to `true`, then outputs received via simple one-sided transactions, won't be automatically selected as /// inputs for further transactions, but can still be selected individually as specific outputs. pub autoignore_onesided_utxos: bool, + /// The number of seconds that have to pass for the wallet to run revalidation of invalid UTXOs on startup. + pub num_of_seconds_to_revalidate_invalid_utxos: u64, } impl Default for OutputManagerServiceConfig { @@ -51,6 +53,7 @@ impl Default for OutputManagerServiceConfig { num_confirmations_required: 3, tx_validator_batch_size: 100, autoignore_onesided_utxos: false, + num_of_seconds_to_revalidate_invalid_utxos: 60 * 60 * 24 * 3, } } } diff --git a/base_layer/wallet/src/output_manager_service/storage/database/backend.rs b/base_layer/wallet/src/output_manager_service/storage/database/backend.rs index ea31083027..3fc5b29e77 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database/backend.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database/backend.rs @@ -30,6 +30,8 @@ pub trait OutputManagerBackend: Send + Sync + Clone { fn fetch_sorted_unspent_outputs(&self) -> Result, OutputManagerStorageError>; /// Retrieve outputs that have been mined but not spent yet (have not been deleted) fn fetch_mined_unspent_outputs(&self) -> Result, OutputManagerStorageError>; + /// Retrieve outputs that are invalid + fn fetch_invalid_outputs(&self, timestamp: i64) -> Result, OutputManagerStorageError>; /// Retrieve outputs that have not been found or confirmed in the block chain yet fn fetch_unspent_mined_unconfirmed_outputs(&self) -> Result, OutputManagerStorageError>; /// Modify the state the of the backend with a write operation @@ -47,6 +49,7 @@ pub trait OutputManagerBackend: Send + Sync + Clone { ) -> Result<(), OutputManagerStorageError>; fn set_output_to_unmined_and_invalid(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError>; + fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError>; fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError>; fn mark_output_as_spent( diff --git a/base_layer/wallet/src/output_manager_service/storage/database/mod.rs b/base_layer/wallet/src/output_manager_service/storage/database/mod.rs index a2a8dfebd1..fcfd689bca 100644 --- a/base_layer/wallet/src/output_manager_service/storage/database/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/database/mod.rs @@ -285,6 +285,11 @@ where T: OutputManagerBackend + 'static Ok(utxos) } + pub fn fetch_invalid_outputs(&self, timestamp: i64) -> Result, OutputManagerStorageError> { + let utxos = self.db.fetch_invalid_outputs(timestamp)?; + Ok(utxos) + } + pub fn get_timelocked_outputs(&self, tip: u64) -> Result, OutputManagerStorageError> { let uo = match self.db.fetch(&DbKey::TimeLockedUnspentOutputs(tip)) { Ok(None) => log_error( @@ -416,6 +421,12 @@ where T: OutputManagerBackend + 'static Ok(()) } + pub fn update_last_validation_timestamp(&self, hash: HashOutput) -> Result<(), OutputManagerStorageError> { + let db = self.db.clone(); + db.update_last_validation_timestamp(hash)?; + Ok(()) + } + pub fn mark_output_as_spent( &self, hash: HashOutput, diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs index 9f1604008d..52142a58c2 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/mod.rs @@ -26,7 +26,7 @@ use std::{ }; use chacha20poly1305::XChaCha20Poly1305; -use chrono::NaiveDateTime; +use chrono::{NaiveDateTime, Utc}; use derivative::Derivative; use diesel::{ prelude::*, @@ -306,6 +306,29 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { .collect::, _>>() } + fn fetch_invalid_outputs(&self, timestamp: i64) -> Result, OutputManagerStorageError> { + let start = Instant::now(); + let conn = self.database_connection.get_pooled_connection()?; + let acquire_lock = start.elapsed(); + let outputs = OutputSql::index_invalid(&NaiveDateTime::from_timestamp_opt(timestamp, 0).unwrap(), &conn)?; + let cipher = acquire_read_lock!(self.cipher); + + if start.elapsed().as_millis() > 0 { + trace!( + target: LOG_TARGET, + "sqlite profile - fetch_invalid_outputs: lock {} + db_op {} = {} ms", + acquire_lock.as_millis(), + (start.elapsed() - acquire_lock).as_millis(), + start.elapsed().as_millis() + ); + } + + outputs + .into_iter() + .map(|o| o.to_db_unblinded_output(&cipher)) + .collect::, _>>() + } + fn fetch_unspent_mined_unconfirmed_outputs(&self) -> Result, OutputManagerStorageError> { let start = Instant::now(); let conn = self.database_connection.get_pooled_connection()?; @@ -449,6 +472,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { outputs::mined_timestamp.eq(timestamp), outputs::marked_deleted_at_height.eq::>(None), outputs::marked_deleted_in_block.eq::>>(None), + outputs::last_validation_timestamp.eq::>(None), )) .execute(&conn) .num_rows_affected_or_not_found(1)?; @@ -525,6 +549,29 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { Ok(()) } + fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError> { + let start = Instant::now(); + let conn = self.database_connection.get_pooled_connection()?; + let acquire_lock = start.elapsed(); + let hash = hash.to_vec(); + diesel::update(outputs::table.filter(outputs::hash.eq(hash))) + .set((outputs::last_validation_timestamp + .eq::>(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0)),)) + .execute(&conn) + .num_rows_affected_or_not_found(1)?; + if start.elapsed().as_millis() > 0 { + trace!( + target: LOG_TARGET, + "sqlite profile - set_output_to_be_revalidated_in_the_future: lock {} + db_op {} = {} ms", + acquire_lock.as_millis(), + (start.elapsed() - acquire_lock).as_millis(), + start.elapsed().as_millis() + ); + } + + Ok(()) + } + fn mark_output_as_spent( &self, hash: FixedHash, @@ -703,7 +750,11 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { diesel::update( outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32)), ) - .set((outputs::status.eq(OutputStatus::CancelledInbound as i32),)) + .set(( + outputs::status.eq(OutputStatus::CancelledInbound as i32), + outputs::last_validation_timestamp + .eq(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap()), + )) .execute(&conn)?; diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32))) @@ -813,6 +864,9 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase { output.update( UpdateOutput { status: Some(OutputStatus::CancelledInbound), + last_validation_timestamp: Some(Some( + NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap(), + )), ..Default::default() }, &conn, @@ -1104,6 +1158,7 @@ pub struct UpdateOutput { metadata_signature_u_y: Option>, mined_height: Option>, mined_in_block: Option>>, + last_validation_timestamp: Option>, } #[derive(AsChangeset)] @@ -1119,6 +1174,7 @@ pub struct UpdateOutputSql { metadata_signature_u_y: Option>, mined_height: Option>, mined_in_block: Option>>, + last_validation_timestamp: Option>, } /// Map a Rust friendly UpdateOutput to the Sql data type form @@ -1135,6 +1191,7 @@ impl From for UpdateOutputSql { spent_in_tx_id: u.spent_in_tx_id.map(|o| o.map(TxId::as_i64_wrapped)), mined_height: u.mined_height.map(|t| t.map(|h| h as i64)), mined_in_block: u.mined_in_block, + last_validation_timestamp: u.last_validation_timestamp, } } } diff --git a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs index 0ffc54bfb3..2a00e78aeb 100644 --- a/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs +++ b/base_layer/wallet/src/output_manager_service/storage/sqlite_db/output_sql.rs @@ -106,6 +106,7 @@ pub struct OutputSql { pub encrypted_value: Vec, pub minimum_value_promise: i64, pub source: i32, + pub last_validation_timestamp: Option, } impl OutputSql { @@ -325,6 +326,25 @@ impl OutputSql { .load(conn)?) } + pub fn index_invalid( + timestamp: &NaiveDateTime, + conn: &SqliteConnection, + ) -> Result, OutputManagerStorageError> { + Ok(outputs::table + .filter( + outputs::status + .eq(OutputStatus::Invalid as i32) + .or(outputs::status.eq(OutputStatus::CancelledInbound as i32)), + ) + .filter( + outputs::last_validation_timestamp + .le(timestamp) + .or(outputs::last_validation_timestamp.is_null()), + ) + .order(outputs::id.asc()) + .load(conn)?) + } + pub fn first_by_mined_height_desc(conn: &SqliteConnection) -> Result, OutputManagerStorageError> { Ok(outputs::table .filter(outputs::mined_height.is_not_null()) diff --git a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs index 4d0b04120a..644d7d60f8 100644 --- a/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs +++ b/base_layer/wallet/src/output_manager_service/tasks/txo_validation_task.rs @@ -25,6 +25,7 @@ use std::{ sync::Arc, }; +use chrono::{Duration, Utc}; use log::*; use tari_common_types::types::{BlockHash, FixedHash}; use tari_comms::{peer_manager::Peer, protocol::rpc::RpcError::RequestFailed}; @@ -108,6 +109,8 @@ where self.update_spent_outputs(&mut base_node_client, last_mined_header) .await?; + self.update_invalid_outputs(&mut base_node_client).await?; + self.publish_event(OutputManagerEvent::TxoValidationSuccess(self.operation_id)); debug!( target: LOG_TARGET, @@ -116,6 +119,68 @@ where Ok(self.operation_id) } + async fn update_invalid_outputs( + &self, + wallet_client: &mut BaseNodeWalletRpcClient, + ) -> Result<(), OutputManagerProtocolError> { + let invalid_outputs = self + .db + .fetch_invalid_outputs( + (Utc::now() - + Duration::seconds( + self.config + .num_of_seconds_to_revalidate_invalid_utxos + .try_into() + .map_err(|_| { + OutputManagerProtocolError::new(self.operation_id, OutputManagerError::InvalidConfig) + })?, + )) + .timestamp(), + ) + .for_protocol(self.operation_id)?; + + for batch in invalid_outputs.chunks(self.config.tx_validator_batch_size) { + let (mined, unmined, tip_height) = self + .query_base_node_for_outputs(batch, wallet_client) + .await + .for_protocol(self.operation_id)?; + debug!( + target: LOG_TARGET, + "Base node returned {} outputs as mined and {} outputs as unmined (Operation ID: {})", + mined.len(), + unmined.len(), + self.operation_id + ); + for (output, mined_height, mined_in_block, mmr_position, mined_timestamp) in &mined { + info!( + target: LOG_TARGET, + "Updating output comm:{}: hash {} as mined at height {} with current tip at {} (Operation ID: + {})", + output.commitment.to_hex(), + output.hash.to_hex(), + mined_height, + tip_height, + self.operation_id + ); + self.update_output_as_mined( + output, + mined_in_block, + *mined_height, + *mmr_position, + tip_height, + *mined_timestamp, + ) + .await?; + } + for output in unmined { + self.db + .update_last_validation_timestamp(output.hash) + .for_protocol(self.operation_id)?; + } + } + Ok(()) + } + #[allow(clippy::too_many_lines)] async fn update_spent_outputs( &self, diff --git a/base_layer/wallet/src/schema.rs b/base_layer/wallet/src/schema.rs index ffc7b95c7c..5ebed1597d 100644 --- a/base_layer/wallet/src/schema.rs +++ b/base_layer/wallet/src/schema.rs @@ -128,6 +128,7 @@ diesel::table! { encrypted_value -> Binary, minimum_value_promise -> BigInt, source -> Integer, + last_validation_timestamp -> Nullable, } } diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index c479f95f75..8f204e76ab 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -157,6 +157,10 @@ event_channel_size = 3500 # The number of batches the unconfirmed outputs will be divided into before being queried from the base node # (default = 100) #tx_validator_batch_size = 100 +# Number of seconds that have to pass for the wallet to run revalidation of invalid UTXOs on startup. +# If you set it to zero, the revalidation will be on every wallet rerun. Default is 3 days. +#num_of_seconds_to_revalidate_invalid_utxos = 259200 + [wallet.base_node] # Configuration for the wallet's base node service