Skip to content

Commit

Permalink
feat: limit transaction size (#6154)
Browse files Browse the repository at this point in the history
Description
---
Added checks to ensure that transactions negotiated between parties,
transactions to self and imported transactions do not exceed the RPC
frame size limit when the transaction is broadcast to the base node.
This can easily happen when lots of dust inputs are collected to be
spent. If a too-large transaction is detected in any of the transaction
service protocols it will be cancelled so that the user can try to
create a new transaction with different parameters, for example, to
reduce the recipient amount.

**Edit:**
- Added a wallet sqlite memory type connection to speed up tests with
large amounts of db activity
- Used fake outputs without valid bulletproof range proofs in the
`spend_dust` tests to speed it up even more as suggested by @SWvheerden

Motivation and Context
---
See #6108

How Has This Been Tested?
---
Added unit tests:
- `test_spend_dust_to_self_in_oversized_transaction`
- `test_spend_dust_to_other_in_oversized_transaction`
- `test_spend_dust_happy_path`

What process can a PR reviewer use to test or verify this change?
---
Code walk-through
Review unit tests

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal committed Mar 1, 2024
1 parent 97fc7b3 commit abd64d8
Show file tree
Hide file tree
Showing 18 changed files with 704 additions and 128 deletions.
7 changes: 1 addition & 6 deletions base_layer/wallet/src/output_manager_service/handle.rs
Expand Up @@ -45,10 +45,7 @@ use tower::Service;
use crate::output_manager_service::{
error::OutputManagerError,
service::{Balance, OutputInfoByTxId},
storage::{
database::OutputBackendQuery,
models::{DbWalletOutput, KnownOneSidedPaymentScript, SpendingPriority},
},
storage::models::{DbWalletOutput, KnownOneSidedPaymentScript, SpendingPriority},
UtxoSelectionCriteria,
};

Expand Down Expand Up @@ -90,7 +87,6 @@ pub enum OutputManagerRequest {
CancelTransaction(TxId),
GetSpentOutputs,
GetUnspentOutputs,
GetOutputsBy(OutputBackendQuery),
GetInvalidOutputs,
ValidateUtxos,
RevalidateTxos,
Expand Down Expand Up @@ -152,7 +148,6 @@ impl fmt::Display for OutputManagerRequest {
CancelTransaction(v) => write!(f, "CancelTransaction ({})", v),
GetSpentOutputs => write!(f, "GetSpentOutputs"),
GetUnspentOutputs => write!(f, "GetUnspentOutputs"),
GetOutputsBy(q) => write!(f, "GetOutputs({:#?})", q),
GetInvalidOutputs => write!(f, "GetInvalidOutputs"),
ValidateUtxos => write!(f, "ValidateUtxos"),
RevalidateTxos => write!(f, "RevalidateTxos"),
Expand Down
8 changes: 2 additions & 6 deletions base_layer/wallet/src/output_manager_service/service.rs
Expand Up @@ -306,10 +306,6 @@ where
let outputs = self.fetch_unspent_outputs()?;
Ok(OutputManagerResponse::UnspentOutputs(outputs))
},
OutputManagerRequest::GetOutputsBy(q) => {
let outputs = self.fetch_outputs_by(q)?.into_iter().map(|v| v.into()).collect();
Ok(OutputManagerResponse::Outputs(outputs))
},
OutputManagerRequest::ValidateUtxos => {
self.validate_outputs().map(OutputManagerResponse::TxoValidationStarted)
},
Expand Down Expand Up @@ -1381,8 +1377,8 @@ where
Ok(self.resources.db.fetch_all_unspent_outputs()?)
}

pub fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerError> {
Ok(self.resources.db.fetch_outputs_by(q)?)
pub fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerError> {
Ok(self.resources.db.fetch_outputs_by_query(q)?)
}

pub fn fetch_invalid_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerError> {
Expand Down
Expand Up @@ -101,5 +101,5 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
current_tip_height: Option<u64>,
) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
fn fetch_outputs_by_tx_id(&self, tx_id: TxId) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
}
Expand Up @@ -434,8 +434,11 @@ where T: OutputManagerBackend + 'static
Ok(outputs)
}

pub fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
self.db.fetch_outputs_by(q)
pub fn fetch_outputs_by_query(
&self,
q: OutputBackendQuery,
) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
self.db.fetch_outputs_by_query(q)
}
}

Expand Down
Expand Up @@ -1037,9 +1037,9 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.collect::<Result<Vec<_>, _>>()
}

fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
let mut conn = self.database_connection.get_pooled_connection()?;
Ok(OutputSql::fetch_outputs_by(q, &mut conn)?
Ok(OutputSql::fetch_outputs_by_query(q, &mut conn)?
.into_iter()
.filter_map(|x| {
x.to_db_wallet_output()
Expand Down
Expand Up @@ -120,7 +120,7 @@ impl OutputSql {

/// Retrieves UTXOs by a set of given rules
#[allow(clippy::cast_sign_loss)]
pub fn fetch_outputs_by(
pub fn fetch_outputs_by_query(
q: OutputBackendQuery,
conn: &mut SqliteConnection,
) -> Result<Vec<OutputSql>, OutputManagerStorageError> {
Expand Down
21 changes: 21 additions & 0 deletions base_layer/wallet/src/storage/sqlite_utilities/mod.rs
Expand Up @@ -74,6 +74,27 @@ pub fn run_migration_and_create_sqlite_connection<P: AsRef<Path>>(
Ok(WalletDbConnection::new(pool, Some(file_lock)))
}

pub fn run_migration_and_create_sqlite_memory_connection(
sqlite_pool_size: usize,
) -> Result<WalletDbConnection, WalletStorageError> {
let mut pool = SqliteConnectionPool::new(
String::from(":memory:"),
sqlite_pool_size,
true,
true,
Duration::from_secs(60),
);
pool.create_pool()?;
let mut connection = pool.get_pooled_connection()?;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
connection
.run_pending_migrations(MIGRATIONS)
.map_err(|err| WalletStorageError::DatabaseMigrationError(format!("Database migration failed {}", err)))?;

Ok(WalletDbConnection::new(pool, None))
}

pub fn acquire_exclusive_file_lock(db_path: &Path) -> Result<File, WalletStorageError> {
let lock_file_path = match db_path.file_name() {
None => {
Expand Down
6 changes: 6 additions & 0 deletions base_layer/wallet/src/test_utils.rs
Expand Up @@ -30,6 +30,7 @@ use tempfile::{tempdir, TempDir};

use crate::storage::sqlite_utilities::{
run_migration_and_create_sqlite_connection,
run_migration_and_create_sqlite_memory_connection,
wallet_db_connection::WalletDbConnection,
};

Expand Down Expand Up @@ -58,6 +59,11 @@ pub fn make_wallet_database_connection(path: Option<String>) -> (WalletDbConnect
(connection, temp_dir)
}

/// A test helper to create a temporary wallet service memory databases
pub fn make_wallet_database_memory_connection() -> WalletDbConnection {
run_migration_and_create_sqlite_memory_connection(16).unwrap()
}

pub fn create_consensus_rules() -> ConsensusManager {
ConsensusManager::builder(Network::LocalNet).build().unwrap()
}
Expand Down
6 changes: 6 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Expand Up @@ -185,6 +185,12 @@ pub enum TransactionServiceError {
InvalidKeyId(String),
#[error("Invalid key manager data: `{0}`")]
KeyManagerServiceError(#[from] KeyManagerServiceError),
#[error("Serialization error: `{0}`")]
SerializationError(String),
#[error("Transaction exceed maximum byte size. Expected < {expected} but got {got}.")]
TransactionTooLarge { got: usize, expected: usize },
#[error("Pending Transaction was oversized")]
Oversized,
}

impl From<RangeProofError> for TransactionServiceError {
Expand Down
41 changes: 41 additions & 0 deletions base_layer/wallet/src/transaction_service/protocols/mod.rs
Expand Up @@ -20,7 +20,48 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use bincode::serialize_into;
use log::{debug, error};
use serde::Serialize;
use tari_common_types::transaction::TxId;
use tari_comms::protocol::rpc;

use crate::transaction_service::error::{TransactionServiceError, TransactionServiceProtocolError};

pub mod transaction_broadcast_protocol;
pub mod transaction_receive_protocol;
pub mod transaction_send_protocol;
pub mod transaction_validation_protocol;

const LOG_TARGET: &str = "wallet::transaction_service::protocols";

/// Verify that the negotiated transaction is not too large to be broadcast
pub fn check_transaction_size<T: Serialize>(
transaction: &T,
tx_id: TxId,
) -> Result<(), TransactionServiceProtocolError<TxId>> {
let mut buf: Vec<u8> = Vec::new();
serialize_into(&mut buf, transaction).map_err(|e| {
TransactionServiceProtocolError::new(tx_id, TransactionServiceError::SerializationError(e.to_string()))
})?;
const SIZE_MARGIN: usize = 1024 * 10;
if buf.len() > rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN) {
let err = TransactionServiceProtocolError::new(tx_id, TransactionServiceError::TransactionTooLarge {
got: buf.len(),
expected: rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN),
});
error!(
target: LOG_TARGET,
"Transaction '{}' too large, cannot be broadcast ({:?}).",
tx_id, err
);
Err(err)
} else {
debug!(
target: LOG_TARGET,
"Transaction '{}' size ok, can be broadcast (got: {}, limit: {}).",
tx_id, buf.len(), rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN)
);
Ok(())
}
}
Expand Up @@ -47,6 +47,7 @@ use crate::{
transaction_service::{
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::TransactionEvent,
protocols::check_transaction_size,
service::TransactionServiceResources,
storage::{
database::TransactionBackend,
Expand Down Expand Up @@ -127,6 +128,10 @@ where
);
return Ok(self.tx_id);
}
if let Err(e) = check_transaction_size(&completed_tx.transaction, self.tx_id) {
self.cancel_transaction(TxCancellationReason::Oversized).await;
return Err(e);
}

loop {
tokio::select! {
Expand Down
Expand Up @@ -44,6 +44,7 @@ use crate::{
transaction_service::{
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::TransactionEvent,
protocols::check_transaction_size,
service::TransactionServiceResources,
storage::{
database::TransactionBackend,
Expand Down Expand Up @@ -159,6 +160,12 @@ where
Utc::now().naive_utc(),
);

// Verify that the negotiated transaction is not too large to be broadcast
if let Err(e) = check_transaction_size(&inbound_transaction, self.id) {
self.cancel_oversized_transaction().await?;
return Err(e);
}

self.resources
.db
.add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone())
Expand Down Expand Up @@ -242,6 +249,12 @@ where
},
};

// Verify that the negotiated transaction is not too large to be broadcast
if let Err(e) = check_transaction_size(&inbound_tx, self.id) {
self.cancel_oversized_transaction().await?;
return Err(e);
}

// Determine the time remaining before this transaction times out
let elapsed_time = utc_duration_since(&inbound_tx.timestamp)
.map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?;
Expand Down Expand Up @@ -469,6 +482,32 @@ where
"Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id
);

self.cancel_transaction(TxCancellationReason::Timeout).await?;

info!(
target: LOG_TARGET,
"Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id
);

Err(TransactionServiceProtocolError::new(
self.id,
TransactionServiceError::Timeout,
))
}

async fn cancel_oversized_transaction(&mut self) -> Result<(), TransactionServiceProtocolError<TxId>> {
info!(
target: LOG_TARGET,
"Cancelling Transaction Receive Protocol (TxId: {}) due to transaction being oversized", self.id
);

self.cancel_transaction(TxCancellationReason::Oversized).await
}

async fn cancel_transaction(
&mut self,
cancel_reason: TxCancellationReason,
) -> Result<(), TransactionServiceProtocolError<TxId>> {
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
Expand All @@ -486,10 +525,7 @@ where
let _size = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(
self.id,
TxCancellationReason::Timeout,
)))
.send(Arc::new(TransactionEvent::TransactionCancelled(self.id, cancel_reason)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand All @@ -502,14 +538,6 @@ where
)
});

info!(
target: LOG_TARGET,
"Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id
);

Err(TransactionServiceProtocolError::new(
self.id,
TransactionServiceError::Timeout,
))
Ok(())
}
}

0 comments on commit abd64d8

Please sign in to comment.