Skip to content

Commit

Permalink
feat: add sql query to obtain balance (#3446)
Browse files Browse the repository at this point in the history
Description
---
- Replaced the `get_balance` function containing multiple sql queries with a single raw sql query.
- Removed redundant code (`fn fetch_pending_outgoing_outputs`) as a result of this change.
- Added more test points for time-locked balance.
- **Note:** To properly test transaction validation in `async fn test_txo_validation()`, access to the backend to obtain pending incoming transactions is needed via ` fn fetch_pending_incoming_outputs`, although that function is not used in production code anymore. If it is also removed other methods will have to be added to the backend to obtain the data for testing. Retaining the current function was chosen in lieu of adding other code.

Motivation and Context
---
Get balance used a lot of RAM and was really slow due to multiple database interactions.

Comparison of old vs. new query time performance for a wallet with 251,000 UTXOs in the database shown below:

- **Full scale**

![image](https://user-images.githubusercontent.com/39146854/136885517-1cb6f274-b85a-4281-a6d8-0edf42842baa.png)

- **Y-axis zoomed in**

![image](https://user-images.githubusercontent.com/39146854/136885891-2339c7b5-0638-402c-a380-86cceb51b1b2.png)


How Has This Been Tested?
---
- Unit tests
- System level tests
  • Loading branch information
hansieodendaal committed Oct 12, 2021
1 parent 43b2033 commit e23ceec
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 83 deletions.
69 changes: 5 additions & 64 deletions base_layer/wallet/src/output_manager_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
sync::Arc,
};
use tari_common_types::types::{BlindingFactor, Commitment, HashOutput, PrivateKey};
use tari_core::transactions::{tari_amount::MicroTari, transaction::TransactionOutput};
use tari_core::transactions::transaction::TransactionOutput;

const LOG_TARGET: &str = "wallet::output_manager_service::database";

Expand All @@ -51,7 +51,6 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
/// Modify the state the of the backend with a write operation
fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError>;
fn fetch_pending_incoming_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
fn fetch_pending_outgoing_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;

fn set_received_output_mined_height(
&self,
Expand Down Expand Up @@ -119,6 +118,8 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
fn set_coinbase_abandoned(&self, tx_id: TxId, abandoned: bool) -> Result<(), OutputManagerStorageError>;
/// Reinstate a cancelled inbound output
fn reinstate_cancelled_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError>;
/// Return the available, time locked, pending incoming and pending outgoing balance
fn get_balance(&self, tip: Option<u64>) -> Result<Balance, OutputManagerStorageError>;
}

/// Holds the state of the KeyManager being used by the Output Manager Service
Expand Down Expand Up @@ -276,69 +277,9 @@ where T: OutputManagerBackend + 'static

pub async fn get_balance(&self, current_chain_tip: Option<u64>) -> Result<Balance, OutputManagerStorageError> {
let db_clone = self.db.clone();
let db_clone2 = self.db.clone();
let db_clone3 = self.db.clone();
let db_clone4 = self.db.clone();

let unspent_outputs = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::UnspentOutputs) {
Ok(None) => log_error(
DbKey::UnspentOutputs,
OutputManagerStorageError::UnexpectedResult("Could not retrieve unspent outputs".to_string()),
),
Ok(Some(DbValue::UnspentOutputs(uo))) => Ok(uo),
Ok(Some(other)) => unexpected_result(DbKey::UnspentOutputs, other),
Err(e) => log_error(DbKey::UnspentOutputs, e),
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let pending_incoming_outputs = tokio::task::spawn_blocking(move || db_clone2.fetch_pending_incoming_outputs())
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let pending_outgoing_outputs = tokio::task::spawn_blocking(move || db_clone3.fetch_pending_outgoing_outputs())
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let time_locked_balance = if let Some(tip) = current_chain_tip {
let time_locked_outputs = tokio::task::spawn_blocking(move || {
db_clone4.fetch(&DbKey::TimeLockedUnspentOutputs(tip))?.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Time-locked Outputs cannot be retrieved".to_string())
})
})
tokio::task::spawn_blocking(move || db_clone.get_balance(current_chain_tip))
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
if let DbValue::UnspentOutputs(time_locked_uo) = time_locked_outputs {
Some(
time_locked_uo
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value),
)
} else {
None
}
} else {
None
};

let available_balance = unspent_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

let pending_incoming = pending_incoming_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

let pending_outgoing = pending_outgoing_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

Ok(Balance {
available_balance,
time_locked_balance,
pending_incoming_balance: pending_incoming,
pending_outgoing_balance: pending_outgoing,
})
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))?
}

/// This method is called when a transaction is built to be sent. It will encumber unspent outputs against a pending
Expand Down
125 changes: 106 additions & 19 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use crate::{
output_manager_service::{
error::OutputManagerStorageError,
service::Balance,
storage::{
database::{DbKey, DbKeyValuePair, DbValue, KeyManagerState, OutputManagerBackend, WriteOperation},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
Expand All @@ -38,7 +39,7 @@ use crate::{
};
use aes_gcm::{aead::Error as AeadError, Aes256Gcm, Error};
use chrono::{NaiveDateTime, Utc};
use diesel::{prelude::*, result::Error as DieselError, SqliteConnection};
use diesel::{prelude::*, result::Error as DieselError, sql_query, SqliteConnection};
use log::*;
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -330,24 +331,6 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.collect::<Result<Vec<_>, _>>()
}

fn fetch_pending_outgoing_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

let mut outputs = OutputSql::index_status(OutputStatus::EncumberedToBeSpent, &conn)?;
outputs.extend(OutputSql::index_status(
OutputStatus::ShortTermEncumberedToBeSpent,
&conn,
)?);
outputs.extend(OutputSql::index_status(OutputStatus::SpentMinedUnconfirmed, &conn)?);
for o in outputs.iter_mut() {
self.decrypt_if_necessary(o)?;
}
outputs
.iter()
.map(|o| DbUnblindedOutput::try_from(o.clone()))
.collect::<Result<Vec<_>, _>>()
}

fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -650,6 +633,12 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
}
}

fn get_balance(&self, tip: Option<u64>) -> Result<Balance, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

OutputSql::get_balance(tip, &(*conn))
}

fn cancel_pending_transaction(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -1047,6 +1036,104 @@ impl OutputSql {
.first::<OutputSql>(conn)?)
}

/// Return the available, time locked, pending incoming and pending outgoing balance
pub fn get_balance(tip: Option<u64>, conn: &SqliteConnection) -> Result<Balance, OutputManagerStorageError> {
#[derive(QueryableByName, Clone)]
struct BalanceQueryResult {
#[sql_type = "diesel::sql_types::BigInt"]
amount: i64,
#[sql_type = "diesel::sql_types::Text"]
category: String,
}
let balance_query_result = if let Some(val) = tip {
let balance_query = sql_query(
"SELECT coalesce(sum(value), 0) as amount, 'available_balance' as category \
FROM outputs WHERE status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'time_locked_balance' as category \
FROM outputs WHERE status = ? AND maturity > ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_incoming_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_outgoing_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ?",
)
// available_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::Unspent as i32)
// time_locked_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::Unspent as i32)
.bind::<diesel::sql_types::BigInt, _>(val as i64)
// pending_incoming_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::UnspentMinedUnconfirmed as i32)
// pending_outgoing_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::SpentMinedUnconfirmed as i32);
balance_query.load::<BalanceQueryResult>(conn)?
} else {
let balance_query = sql_query(
"SELECT coalesce(sum(value), 0) as amount, 'available_balance' as category \
FROM outputs WHERE status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_incoming_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ? \
UNION ALL \
SELECT coalesce(sum(value), 0) as amount, 'pending_outgoing_balance' as category \
FROM outputs WHERE status = ? OR status = ? OR status = ?",
)
// available_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::Unspent as i32)
// pending_incoming_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeReceived as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::UnspentMinedUnconfirmed as i32)
// pending_outgoing_balance
.bind::<diesel::sql_types::Integer, _>(OutputStatus::EncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::ShortTermEncumberedToBeSpent as i32)
.bind::<diesel::sql_types::Integer, _>(OutputStatus::SpentMinedUnconfirmed as i32);
balance_query.load::<BalanceQueryResult>(conn)?
};
let mut available_balance = None;
let mut time_locked_balance = Some(None);
let mut pending_incoming_balance = None;
let mut pending_outgoing_balance = None;
for balance in balance_query_result.clone() {
match balance.category.as_str() {
"available_balance" => available_balance = Some(MicroTari::from(balance.amount as u64)),
"time_locked_balance" => time_locked_balance = Some(Some(MicroTari::from(balance.amount as u64))),
"pending_incoming_balance" => pending_incoming_balance = Some(MicroTari::from(balance.amount as u64)),
"pending_outgoing_balance" => pending_outgoing_balance = Some(MicroTari::from(balance.amount as u64)),
_ => {
return Err(OutputManagerStorageError::UnexpectedResult(
"Unexpected category in balance query".to_string(),
))
},
}
}

Ok(Balance {
available_balance: available_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Available balance could not be calculated".to_string())
})?,
time_locked_balance: time_locked_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Time locked balance could not be calculated".to_string())
})?,
pending_incoming_balance: pending_incoming_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult(
"Pending incoming balance could not be calculated".to_string(),
)
})?,
pending_outgoing_balance: pending_outgoing_balance.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult(
"Pending outgoing balance could not be calculated".to_string(),
)
})?,
})
}

pub fn find_by_commitment(
commitment: &[u8],
conn: &SqliteConnection,
Expand Down
15 changes: 15 additions & 0 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ async fn test_get_balance() {
let balance = oms.get_balance().await.unwrap();

assert_eq!(output_val, balance.available_balance);
assert_eq!(output_val, balance.time_locked_balance.unwrap());
assert_eq!(recv_value + change_val, balance.pending_incoming_balance);
assert_eq!(output_val, balance.pending_outgoing_balance);
}
Expand All @@ -776,6 +777,10 @@ async fn sending_transaction_with_short_term_clear() {
let (_ti, uo) = make_input(&mut OsRng.clone(), available_balance, &factories.commitment);
oms.add_output(uo).await.unwrap();

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, available_balance);
assert_eq!(balance.time_locked_balance.unwrap(), available_balance);

// Check that funds are encumbered and then unencumbered if the pending tx is not confirmed before restart
let _stp = oms
.prepare_transaction_to_send(
Expand All @@ -790,13 +795,16 @@ async fn sending_transaction_with_short_term_clear() {
.unwrap();

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, MicroTari::from(0));
assert_eq!(balance.time_locked_balance.unwrap(), MicroTari::from(0));
assert_eq!(balance.pending_outgoing_balance, available_balance);

drop(oms);
let (mut oms, _, _shutdown, _, _, _, _, _) = setup_output_manager_service(backend.clone(), true).await;

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, available_balance);
assert_eq!(balance.time_locked_balance.unwrap(), available_balance);

// Check that is the pending tx is confirmed that the encumberance persists after restart
let stp = oms
Expand All @@ -817,6 +825,8 @@ async fn sending_transaction_with_short_term_clear() {
let (mut oms, _, _shutdown, _, _, _, _, _) = setup_output_manager_service(backend, true).await;

let balance = oms.get_balance().await.unwrap();
assert_eq!(balance.available_balance, MicroTari::from(0));
assert_eq!(balance.time_locked_balance.unwrap(), MicroTari::from(0));
assert_eq!(balance.pending_outgoing_balance, available_balance);
}

Expand Down Expand Up @@ -1080,6 +1090,7 @@ async fn test_txo_validation() {
balance.available_balance,
MicroTari::from(output2_value) + MicroTari::from(output3_value)
);
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());
assert_eq!(balance.pending_outgoing_balance, MicroTari::from(output1_value));
assert_eq!(
balance.pending_incoming_balance,
Expand Down Expand Up @@ -1179,6 +1190,7 @@ async fn test_txo_validation() {
balance.available_balance,
MicroTari::from(output2_value) + MicroTari::from(output3_value)
);
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());

assert_eq!(oms.get_unspent_outputs().await.unwrap().len(), 2);

Expand Down Expand Up @@ -1226,6 +1238,7 @@ async fn test_txo_validation() {
);
assert_eq!(balance.pending_outgoing_balance, MicroTari::from(0));
assert_eq!(balance.pending_incoming_balance, MicroTari::from(0));
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());

// Trigger another validation and only Output3 should be checked
oms.validate_txos().await.unwrap();
Expand Down Expand Up @@ -1331,6 +1344,7 @@ async fn test_txo_validation() {
balance.pending_incoming_balance,
MicroTari::from(output1_value) - MicroTari::from(900_300)
);
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());

// Now we will update the mined_height in the responses so that the outputs on the reorged chain are confirmed
// Output 1: Spent in Block 5 - Confirmed
Expand Down Expand Up @@ -1390,6 +1404,7 @@ async fn test_txo_validation() {
);
assert_eq!(balance.pending_outgoing_balance, MicroTari::from(0));
assert_eq!(balance.pending_incoming_balance, MicroTari::from(0));
assert_eq!(balance.available_balance, balance.time_locked_balance.unwrap());
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions base_layer/wallet/tests/output_manager_service/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ pub fn test_db_backend<T: OutputManagerBackend + 'static>(backend: T) {
assert_eq!(time_locked_outputs.len(), 0);
let time_locked_balance = unspent_outputs[4].unblinded_output.value;

for i in 0..4usize {
let balance = runtime.block_on(db.get_balance(Some(i as u64))).unwrap();
let mut sum = MicroTari::from(0);
for output in unspent_outputs.iter().take(5).skip(i + 1) {
sum += output.unblinded_output.value;
}
assert_eq!(balance.time_locked_balance.unwrap(), sum);
}

unspent_outputs.sort();

let outputs = runtime.block_on(db.fetch_sorted_unspent_outputs()).unwrap();
Expand Down

0 comments on commit e23ceec

Please sign in to comment.