Skip to content

Commit

Permalink
feat: revalidate invalid utxo (#5020)
Browse files Browse the repository at this point in the history
Description
---
Add `last_validation_timestamp` for `Invalid`/`CancelledInbound` transactions.
Configurable interval for revalidation `num_of_seconds_to_revalidate_invalid_utxos`. Default is 3 days. When set to zero, the revalidation will be run every time. When revalidation failes, the `last_validation_timestamp` is updated.

How Has This Been Tested?
---
Partially (revalidation to valid tx).
  • Loading branch information
Cifko committed Dec 8, 2022
1 parent 4650153 commit f418d73
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 2 deletions.
1 change: 1 addition & 0 deletions base_layer/wallet/migrations/2022-12-07-110000/down.sql
@@ -0,0 +1 @@
ALTER TABLE outputs DROP COLUMN last_validation_timestamp;
1 change: 1 addition & 0 deletions base_layer/wallet/migrations/2022-12-07-110000/up.sql
@@ -0,0 +1 @@
ALTER TABLE outputs ADD last_validation_timestamp DATETIME NULL;
3 changes: 3 additions & 0 deletions base_layer/wallet/src/output_manager_service/config.rs
Expand Up @@ -41,6 +41,8 @@ pub struct OutputManagerServiceConfig {
/// If set to `true`, then outputs received via simple one-sided transactions, won't be automatically selected as
/// inputs for further transactions, but can still be selected individually as specific outputs.
pub autoignore_onesided_utxos: bool,
/// The number of seconds that have to pass for the wallet to run revalidation of invalid UTXOs on startup.
pub num_of_seconds_to_revalidate_invalid_utxos: u64,
}

impl Default for OutputManagerServiceConfig {
Expand All @@ -51,6 +53,7 @@ impl Default for OutputManagerServiceConfig {
num_confirmations_required: 3,
tx_validator_batch_size: 100,
autoignore_onesided_utxos: false,
num_of_seconds_to_revalidate_invalid_utxos: 60 * 60 * 24 * 3,
}
}
}
Expand Up @@ -30,6 +30,8 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
fn fetch_sorted_unspent_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
/// Retrieve outputs that have been mined but not spent yet (have not been deleted)
fn fetch_mined_unspent_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
/// Retrieve outputs that are invalid
fn fetch_invalid_outputs(&self, timestamp: i64) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
/// Retrieve outputs that have not been found or confirmed in the block chain yet
fn fetch_unspent_mined_unconfirmed_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
/// Modify the state the of the backend with a write operation
Expand All @@ -47,6 +49,7 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
) -> Result<(), OutputManagerStorageError>;

fn set_output_to_unmined_and_invalid(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError>;
fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError>;
fn set_outputs_to_be_revalidated(&self) -> Result<(), OutputManagerStorageError>;

fn mark_output_as_spent(
Expand Down
Expand Up @@ -285,6 +285,11 @@ where T: OutputManagerBackend + 'static
Ok(utxos)
}

pub fn fetch_invalid_outputs(&self, timestamp: i64) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError> {
let utxos = self.db.fetch_invalid_outputs(timestamp)?;
Ok(utxos)
}

pub fn get_timelocked_outputs(&self, tip: u64) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError> {
let uo = match self.db.fetch(&DbKey::TimeLockedUnspentOutputs(tip)) {
Ok(None) => log_error(
Expand Down Expand Up @@ -416,6 +421,12 @@ where T: OutputManagerBackend + 'static
Ok(())
}

pub fn update_last_validation_timestamp(&self, hash: HashOutput) -> Result<(), OutputManagerStorageError> {
let db = self.db.clone();
db.update_last_validation_timestamp(hash)?;
Ok(())
}

pub fn mark_output_as_spent(
&self,
hash: HashOutput,
Expand Down
Expand Up @@ -26,7 +26,7 @@ use std::{
};

use chacha20poly1305::XChaCha20Poly1305;
use chrono::NaiveDateTime;
use chrono::{NaiveDateTime, Utc};
use derivative::Derivative;
use diesel::{
prelude::*,
Expand Down Expand Up @@ -306,6 +306,29 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.collect::<Result<Vec<_>, _>>()
}

fn fetch_invalid_outputs(&self, timestamp: i64) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError> {
let start = Instant::now();
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();
let outputs = OutputSql::index_invalid(&NaiveDateTime::from_timestamp_opt(timestamp, 0).unwrap(), &conn)?;
let cipher = acquire_read_lock!(self.cipher);

if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
"sqlite profile - fetch_invalid_outputs: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);
}

outputs
.into_iter()
.map(|o| o.to_db_unblinded_output(&cipher))
.collect::<Result<Vec<_>, _>>()
}

fn fetch_unspent_mined_unconfirmed_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError> {
let start = Instant::now();
let conn = self.database_connection.get_pooled_connection()?;
Expand Down Expand Up @@ -449,6 +472,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
outputs::mined_timestamp.eq(timestamp),
outputs::marked_deleted_at_height.eq::<Option<i64>>(None),
outputs::marked_deleted_in_block.eq::<Option<Vec<u8>>>(None),
outputs::last_validation_timestamp.eq::<Option<NaiveDateTime>>(None),
))
.execute(&conn)
.num_rows_affected_or_not_found(1)?;
Expand Down Expand Up @@ -525,6 +549,29 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
Ok(())
}

fn update_last_validation_timestamp(&self, hash: FixedHash) -> Result<(), OutputManagerStorageError> {
let start = Instant::now();
let conn = self.database_connection.get_pooled_connection()?;
let acquire_lock = start.elapsed();
let hash = hash.to_vec();
diesel::update(outputs::table.filter(outputs::hash.eq(hash)))
.set((outputs::last_validation_timestamp
.eq::<Option<NaiveDateTime>>(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0)),))
.execute(&conn)
.num_rows_affected_or_not_found(1)?;
if start.elapsed().as_millis() > 0 {
trace!(
target: LOG_TARGET,
"sqlite profile - set_output_to_be_revalidated_in_the_future: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);
}

Ok(())
}

fn mark_output_as_spent(
&self,
hash: FixedHash,
Expand Down Expand Up @@ -703,7 +750,11 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
diesel::update(
outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeReceived as i32)),
)
.set((outputs::status.eq(OutputStatus::CancelledInbound as i32),))
.set((
outputs::status.eq(OutputStatus::CancelledInbound as i32),
outputs::last_validation_timestamp
.eq(NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap()),
))
.execute(&conn)?;

diesel::update(outputs::table.filter(outputs::status.eq(OutputStatus::ShortTermEncumberedToBeSpent as i32)))
Expand Down Expand Up @@ -813,6 +864,9 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
output.update(
UpdateOutput {
status: Some(OutputStatus::CancelledInbound),
last_validation_timestamp: Some(Some(
NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap(),
)),
..Default::default()
},
&conn,
Expand Down Expand Up @@ -1104,6 +1158,7 @@ pub struct UpdateOutput {
metadata_signature_u_y: Option<Vec<u8>>,
mined_height: Option<Option<u64>>,
mined_in_block: Option<Option<Vec<u8>>>,
last_validation_timestamp: Option<Option<NaiveDateTime>>,
}

#[derive(AsChangeset)]
Expand All @@ -1119,6 +1174,7 @@ pub struct UpdateOutputSql {
metadata_signature_u_y: Option<Vec<u8>>,
mined_height: Option<Option<i64>>,
mined_in_block: Option<Option<Vec<u8>>>,
last_validation_timestamp: Option<Option<NaiveDateTime>>,
}

/// Map a Rust friendly UpdateOutput to the Sql data type form
Expand All @@ -1135,6 +1191,7 @@ impl From<UpdateOutput> for UpdateOutputSql {
spent_in_tx_id: u.spent_in_tx_id.map(|o| o.map(TxId::as_i64_wrapped)),
mined_height: u.mined_height.map(|t| t.map(|h| h as i64)),
mined_in_block: u.mined_in_block,
last_validation_timestamp: u.last_validation_timestamp,
}
}
}
Expand Down
Expand Up @@ -106,6 +106,7 @@ pub struct OutputSql {
pub encrypted_value: Vec<u8>,
pub minimum_value_promise: i64,
pub source: i32,
pub last_validation_timestamp: Option<NaiveDateTime>,
}

impl OutputSql {
Expand Down Expand Up @@ -325,6 +326,25 @@ impl OutputSql {
.load(conn)?)
}

pub fn index_invalid(
timestamp: &NaiveDateTime,
conn: &SqliteConnection,
) -> Result<Vec<OutputSql>, OutputManagerStorageError> {
Ok(outputs::table
.filter(
outputs::status
.eq(OutputStatus::Invalid as i32)
.or(outputs::status.eq(OutputStatus::CancelledInbound as i32)),
)
.filter(
outputs::last_validation_timestamp
.le(timestamp)
.or(outputs::last_validation_timestamp.is_null()),
)
.order(outputs::id.asc())
.load(conn)?)
}

pub fn first_by_mined_height_desc(conn: &SqliteConnection) -> Result<Option<OutputSql>, OutputManagerStorageError> {
Ok(outputs::table
.filter(outputs::mined_height.is_not_null())
Expand Down
Expand Up @@ -25,6 +25,7 @@ use std::{
sync::Arc,
};

use chrono::{Duration, Utc};
use log::*;
use tari_common_types::types::{BlockHash, FixedHash};
use tari_comms::{peer_manager::Peer, protocol::rpc::RpcError::RequestFailed};
Expand Down Expand Up @@ -108,6 +109,8 @@ where
self.update_spent_outputs(&mut base_node_client, last_mined_header)
.await?;

self.update_invalid_outputs(&mut base_node_client).await?;

self.publish_event(OutputManagerEvent::TxoValidationSuccess(self.operation_id));
debug!(
target: LOG_TARGET,
Expand All @@ -116,6 +119,68 @@ where
Ok(self.operation_id)
}

async fn update_invalid_outputs(
&self,
wallet_client: &mut BaseNodeWalletRpcClient,
) -> Result<(), OutputManagerProtocolError> {
let invalid_outputs = self
.db
.fetch_invalid_outputs(
(Utc::now() -
Duration::seconds(
self.config
.num_of_seconds_to_revalidate_invalid_utxos
.try_into()
.map_err(|_| {
OutputManagerProtocolError::new(self.operation_id, OutputManagerError::InvalidConfig)
})?,
))
.timestamp(),
)
.for_protocol(self.operation_id)?;

for batch in invalid_outputs.chunks(self.config.tx_validator_batch_size) {
let (mined, unmined, tip_height) = self
.query_base_node_for_outputs(batch, wallet_client)
.await
.for_protocol(self.operation_id)?;
debug!(
target: LOG_TARGET,
"Base node returned {} outputs as mined and {} outputs as unmined (Operation ID: {})",
mined.len(),
unmined.len(),
self.operation_id
);
for (output, mined_height, mined_in_block, mmr_position, mined_timestamp) in &mined {
info!(
target: LOG_TARGET,
"Updating output comm:{}: hash {} as mined at height {} with current tip at {} (Operation ID:
{})",
output.commitment.to_hex(),
output.hash.to_hex(),
mined_height,
tip_height,
self.operation_id
);
self.update_output_as_mined(
output,
mined_in_block,
*mined_height,
*mmr_position,
tip_height,
*mined_timestamp,
)
.await?;
}
for output in unmined {
self.db
.update_last_validation_timestamp(output.hash)
.for_protocol(self.operation_id)?;
}
}
Ok(())
}

#[allow(clippy::too_many_lines)]
async fn update_spent_outputs(
&self,
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/schema.rs
Expand Up @@ -128,6 +128,7 @@ diesel::table! {
encrypted_value -> Binary,
minimum_value_promise -> BigInt,
source -> Integer,
last_validation_timestamp -> Nullable<Timestamp>,
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/config/presets/d_console_wallet.toml
Expand Up @@ -157,6 +157,10 @@ event_channel_size = 3500
# The number of batches the unconfirmed outputs will be divided into before being queried from the base node
# (default = 100)
#tx_validator_batch_size = 100
# Number of seconds that have to pass for the wallet to run revalidation of invalid UTXOs on startup.
# If you set it to zero, the revalidation will be on every wallet rerun. Default is 3 days.
#num_of_seconds_to_revalidate_invalid_utxos = 259200


[wallet.base_node]
# Configuration for the wallet's base node service
Expand Down

0 comments on commit f418d73

Please sign in to comment.