Skip to content

Commit

Permalink
Optimize txn validation for wallet
Browse files Browse the repository at this point in the history
Optimized the transaction validation memory footprint by restricting all
queries to the database to only return the information that is needed.
This holds two considerable advantages for databases with many or large
transactions:
- The memory foorprint is reduced.
- Overall performance is increased.
  • Loading branch information
hansieodendaal committed Nov 4, 2021
1 parent ef4f84f commit 4776570
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 93 deletions.
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ pub fn calculate_mmr_roots<T: BlockchainBackend>(db: &T, block: &Block) -> Resul

output_mmr.compress();

// TODO: #testnetreset clean up this code
// TODO: #testnet_reset clean up this code
let input_mr = if header.version == 1 {
MutableMmr::<HashDigest, _>::new(input_mmr.get_pruned_hash_set()?, Bitmap::create())?.get_merkle_root()?
} else {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/transactions/aggregated_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl AggregateBody {
}
self.inputs.sort();
self.outputs.sort();
// TODO: #testnetreset clean up this code
// TODO: #testnet_reset clean up this code
if version <= 1 {
self.kernels.sort_by(|a, b| a.deprecated_cmp(b));
} else {
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/transactions/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ impl TransactionOutput {
Challenge::new()
.chain(public_commitment_nonce.as_bytes())
.chain(script.as_bytes())
// TODO: Use consensus encoded bytes #testnet reset
// TODO: Use consensus encoded bytes #testnet_reset
.chain(features.to_v1_bytes())
.chain(sender_offset_public_key.as_bytes())
.chain(commitment.as_bytes())
Expand Down Expand Up @@ -887,7 +887,7 @@ impl TransactionOutput {
impl Hashable for TransactionOutput {
fn hash(&self) -> Vec<u8> {
HashDigest::new()
// TODO: use consensus encoding #testnetreset
// TODO: use consensus encoding #testnet_reset
.chain(self.features.to_v1_bytes())
.chain(self.commitment.as_bytes())
// .chain(range proof) // See docs as to why we exclude this
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
PRAGMA foreign_keys=OFF;
ALTER TABLE completed_transactions
RENAME TO completed_transactions_old;
CREATE TABLE completed_transactions (
tx_id INTEGER PRIMARY KEY NOT NULL,
source_public_key BLOB NOT NULL,
destination_public_key BLOB NOT NULL,
amount INTEGER NOT NULL,
fee INTEGER NOT NULL,
transaction_protocol TEXT NOT NULL,
status INTEGER NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME NOT NULL,
cancelled INTEGER NOT NULL DEFAULT 0,
direction INTEGER NULL DEFAULT NULL,
coinbase_block_height INTEGER NULL DEFAULT NULL,
send_count INTEGER NOT NULL DEFAULT 0,
last_send_timestamp DATETIME NULL DEFAULT NULL,
valid INTEGER NOT NULL DEFAULT 0,
confirmations INTEGER NULL DEFAULT NULL,
mined_height BIGINT NULL DEFAULT NULL,
mined_in_block BLOB NULL DEFAULT NULL

);
INSERT INTO completed_transactions (tx_id, source_public_key, destination_public_key, amount, fee, transaction_protocol,
status, message, timestamp, cancelled, direction, coinbase_block_height, send_count,
last_send_timestamp, valid, confirmations)
SELECT tx_id,
source_public_key,
destination_public_key,
amount,
fee,
transaction_protocol,
status,
message,
timestamp,
cancelled,
direction,
coinbase_block_height,
send_count,
last_send_timestamp,
valid,
confirmations,
mined_height,
mined_in_block
FROM completed_transactions_old;
DROP TABLE completed_transactions_old;
PRAGMA foreign_keys=ON;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE completed_transactions
ADD transaction_signature_nonce BLOB NOT NULL DEFAULT 0;

ALTER TABLE completed_transactions
ADD transaction_signature_key BLOB NOT NULL DEFAULT 0;
2 changes: 2 additions & 0 deletions base_layer/wallet/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ table! {
confirmations -> Nullable<BigInt>,
mined_height -> Nullable<BigInt>,
mined_in_block -> Nullable<Binary>,
transaction_signature_nonce -> Binary,
transaction_signature_key -> Binary,
}
}

Expand Down
4 changes: 4 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ pub enum TransactionKeyError {
Source(ByteArrayError),
#[error("Invalid destination PublicKey")]
Destination(ByteArrayError),
#[error("Invalid transaction signature nonce")]
SignatureNonce(ByteArrayError),
#[error("Invalid transaction signature key")]
SignatureKey(ByteArrayError),
}

#[derive(Debug, Error)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
handle::{TransactionEvent, TransactionEventSender},
storage::{
database::{TransactionBackend, TransactionDatabase},
models::CompletedTransaction,
sqlite_db::UnconfirmedTransactionInfo,
},
},
};
Expand All @@ -39,7 +39,10 @@ use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use tari_common_types::{transaction::TransactionStatus, types::BlockHash};
use tari_common_types::{
transaction::{TransactionStatus, TxId},
types::BlockHash,
};
use tari_comms::protocol::rpc::{RpcError::RequestFailed, RpcStatusCode::NotFound};
use tari_core::{
base_node::{
Expand Down Expand Up @@ -99,14 +102,15 @@ where
target: LOG_TARGET,
"Checking if transactions have been mined since last we checked"
);
let unmined_transactions = self
// Fetch completed but unconfirmed transactions that were not imported
let unconfirmed_transactions = self
.db
.fetch_unconfirmed_transactions()
.fetch_unconfirmed_transactions_info()
.await
.for_protocol(self.operation_id)
.unwrap();

for batch in unmined_transactions.chunks(self.config.max_tx_query_batch_size) {
for batch in unconfirmed_transactions.chunks(self.config.max_tx_query_batch_size) {
let (mined, unmined, tip_info) = self
.query_base_node_for_transactions(batch, &mut *base_node_wallet_client)
.await
Expand All @@ -117,36 +121,46 @@ where
mined.len(),
unmined.len()
);
for (tx, mined_height, mined_in_block, num_confirmations) in &mined {
info!(target: LOG_TARGET, "Updating transaction {} as mined", tx.tx_id);
self.update_transaction_as_mined(tx, mined_in_block, *mined_height, *num_confirmations)
.await?;
for (mined_tx, mined_height, mined_in_block, num_confirmations) in &mined {
info!(target: LOG_TARGET, "Updating transaction {} as mined", mined_tx.tx_id);
self.update_transaction_as_mined(
mined_tx.tx_id,
&mined_tx.status,
mined_in_block,
*mined_height,
*num_confirmations,
)
.await?;
}
if let Some((tip_height, tip_block)) = tip_info {
for tx in &unmined {
for unmined_tx in &unmined {
// Treat coinbases separately
if tx.is_coinbase() {
if tx.coinbase_block_height.unwrap_or_default() <= tip_height {
info!(target: LOG_TARGET, "Updated coinbase {} as abandoned", tx.tx_id);
if unmined_tx.is_coinbase() {
if unmined_tx.coinbase_block_height.unwrap_or_default() <= tip_height {
info!(target: LOG_TARGET, "Updated coinbase {} as abandoned", unmined_tx.tx_id);
self.update_coinbase_as_abandoned(
tx,
unmined_tx.tx_id,
&tip_block,
tip_height,
tip_height.saturating_sub(tx.coinbase_block_height.unwrap_or_default()),
tip_height.saturating_sub(unmined_tx.coinbase_block_height.unwrap_or_default()),
)
.await?;
} else {
info!(
target: LOG_TARGET,
"Coinbase not found, but it is for a block that is not yet in the chain. Coinbase \
height: {}, tip height:{}",
tx.coinbase_block_height.unwrap_or_default(),
unmined_tx.coinbase_block_height.unwrap_or_default(),
tip_height
);
}
} else {
info!(target: LOG_TARGET, "Updated transaction {} as unmined", tx.tx_id);
self.update_transaction_as_unmined(tx).await?;
info!(
target: LOG_TARGET,
"Updated transaction {} as unmined", unmined_tx.tx_id
);
self.update_transaction_as_unmined(unmined_tx.tx_id, &unmined_tx.status)
.await?;
}
}
}
Expand Down Expand Up @@ -216,7 +230,8 @@ where
.map(|k| k.excess.to_hex())
.unwrap()
);
self.update_transaction_as_unmined(&last_mined_transaction).await?;
self.update_transaction_as_unmined(last_mined_transaction.tx_id, &last_mined_transaction.status)
.await?;
} else {
info!(
target: LOG_TARGET,
Expand All @@ -230,12 +245,12 @@ where

async fn query_base_node_for_transactions(
&self,
batch: &[CompletedTransaction],
batch: &[UnconfirmedTransactionInfo],
base_node_client: &mut BaseNodeWalletRpcClient,
) -> Result<
(
Vec<(CompletedTransaction, u64, BlockHash, u64)>,
Vec<CompletedTransaction>,
Vec<(UnconfirmedTransactionInfo, u64, BlockHash, u64)>,
Vec<UnconfirmedTransactionInfo>,
Option<(u64, BlockHash)>,
),
TransactionServiceError,
Expand All @@ -244,11 +259,8 @@ where
let mut unmined = vec![];

let mut batch_signatures = HashMap::new();
for tx in batch.iter() {
// Imported transactions do not have a signature
if let Some(sig) = tx.transaction.first_kernel_excess_sig() {
batch_signatures.insert(sig.clone(), tx);
}
for tx_info in batch.iter() {
batch_signatures.insert(tx_info.signature.clone(), tx_info);
}

if batch_signatures.is_empty() {
Expand All @@ -259,7 +271,7 @@ where
info!(
target: LOG_TARGET,
"Asking base node for location of {} transactions by excess signature",
batch.len()
batch_signatures.len()
);

let batch_response = base_node_client
Expand All @@ -275,16 +287,16 @@ where
let response = TxQueryBatchResponse::try_from(response_proto)
.map_err(TransactionServiceError::ProtobufConversionError)?;
let sig = response.signature;
if let Some(completed_tx) = batch_signatures.get(&sig) {
if let Some(unconfirmed_tx) = batch_signatures.get(&sig) {
if response.location == TxLocation::Mined {
mined.push((
(*completed_tx).clone(),
(*unconfirmed_tx).clone(),
response.block_height,
response.block_hash.unwrap(),
response.confirmations,
));
} else {
unmined.push((*completed_tx).clone());
unmined.push((*unconfirmed_tx).clone());
}
}
}
Expand Down Expand Up @@ -333,14 +345,15 @@ where
#[allow(clippy::ptr_arg)]
async fn update_transaction_as_mined(
&mut self,
tx: &CompletedTransaction,
tx_id: TxId,
status: &TransactionStatus,
mined_in_block: &BlockHash,
mined_height: u64,
num_confirmations: u64,
) -> Result<(), TransactionServiceProtocolError> {
self.db
.set_transaction_mined_height(
tx.tx_id,
tx_id,
true,
mined_height,
mined_in_block.clone(),
Expand All @@ -351,23 +364,20 @@ where
.for_protocol(self.operation_id)?;

if num_confirmations >= self.config.num_confirmations_required {
self.publish_event(TransactionEvent::TransactionMined {
tx_id: tx.tx_id,
is_valid: true,
})
self.publish_event(TransactionEvent::TransactionMined { tx_id, is_valid: true })
} else {
self.publish_event(TransactionEvent::TransactionMinedUnconfirmed {
tx_id: tx.tx_id,
tx_id,
num_confirmations,
is_valid: true,
})
}

if tx.status == TransactionStatus::Coinbase {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx.tx_id, false).await {
if *status == TransactionStatus::Coinbase {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx_id, false).await {
warn!(
target: LOG_TARGET,
"Could not mark coinbase output for TxId: {} as not abandoned: {}", tx.tx_id, e
"Could not mark coinbase output for TxId: {} as not abandoned: {}", tx_id, e
);
};
}
Expand All @@ -378,14 +388,14 @@ where
#[allow(clippy::ptr_arg)]
async fn update_coinbase_as_abandoned(
&mut self,
tx: &CompletedTransaction,
tx_id: TxId,
mined_in_block: &BlockHash,
mined_height: u64,
num_confirmations: u64,
) -> Result<(), TransactionServiceProtocolError> {
self.db
.set_transaction_mined_height(
tx.tx_id,
tx_id,
false,
mined_height,
mined_in_block.clone(),
Expand All @@ -395,37 +405,38 @@ where
.await
.for_protocol(self.operation_id)?;

if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx.tx_id, true).await {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx_id, true).await {
warn!(
target: LOG_TARGET,
"Could not mark coinbase output for TxId: {} as abandoned: {}", tx.tx_id, e
"Could not mark coinbase output for TxId: {} as abandoned: {}", tx_id, e
);
};

self.publish_event(TransactionEvent::TransactionCancelled(tx.tx_id));
self.publish_event(TransactionEvent::TransactionCancelled(tx_id));

Ok(())
}

async fn update_transaction_as_unmined(
&mut self,
tx: &CompletedTransaction,
tx_id: TxId,
status: &TransactionStatus,
) -> Result<(), TransactionServiceProtocolError> {
self.db
.set_transaction_as_unmined(tx.tx_id)
.set_transaction_as_unmined(tx_id)
.await
.for_protocol(self.operation_id)?;

if tx.status == TransactionStatus::Coinbase {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx.tx_id, false).await {
if *status == TransactionStatus::Coinbase {
if let Err(e) = self.output_manager_handle.set_coinbase_abandoned(tx_id, false).await {
warn!(
target: LOG_TARGET,
"Could not mark coinbase output for TxId: {} as not abandoned: {}", tx.tx_id, e
"Could not mark coinbase output for TxId: {} as not abandoned: {}", tx_id, e
);
};
}

self.publish_event(TransactionEvent::TransactionBroadcast(tx.tx_id));
self.publish_event(TransactionEvent::TransactionBroadcast(tx_id));
Ok(())
}
}

0 comments on commit 4776570

Please sign in to comment.