Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit transaction size #6154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ use tower::Service;
use crate::output_manager_service::{
error::OutputManagerError,
service::{Balance, OutputInfoByTxId},
storage::{
database::OutputBackendQuery,
models::{DbWalletOutput, KnownOneSidedPaymentScript, SpendingPriority},
},
storage::models::{DbWalletOutput, KnownOneSidedPaymentScript, SpendingPriority},
UtxoSelectionCriteria,
};

Expand Down Expand Up @@ -90,7 +87,6 @@ pub enum OutputManagerRequest {
CancelTransaction(TxId),
GetSpentOutputs,
GetUnspentOutputs,
GetOutputsBy(OutputBackendQuery),
GetInvalidOutputs,
ValidateUtxos,
RevalidateTxos,
Expand Down Expand Up @@ -152,7 +148,6 @@ impl fmt::Display for OutputManagerRequest {
CancelTransaction(v) => write!(f, "CancelTransaction ({})", v),
GetSpentOutputs => write!(f, "GetSpentOutputs"),
GetUnspentOutputs => write!(f, "GetUnspentOutputs"),
GetOutputsBy(q) => write!(f, "GetOutputs({:#?})", q),
GetInvalidOutputs => write!(f, "GetInvalidOutputs"),
ValidateUtxos => write!(f, "ValidateUtxos"),
RevalidateTxos => write!(f, "RevalidateTxos"),
Expand Down
8 changes: 2 additions & 6 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,6 @@ where
let outputs = self.fetch_unspent_outputs()?;
Ok(OutputManagerResponse::UnspentOutputs(outputs))
},
OutputManagerRequest::GetOutputsBy(q) => {
let outputs = self.fetch_outputs_by(q)?.into_iter().map(|v| v.into()).collect();
Ok(OutputManagerResponse::Outputs(outputs))
},
OutputManagerRequest::ValidateUtxos => {
self.validate_outputs().map(OutputManagerResponse::TxoValidationStarted)
},
Expand Down Expand Up @@ -1381,8 +1377,8 @@ where
Ok(self.resources.db.fetch_all_unspent_outputs()?)
}

pub fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerError> {
Ok(self.resources.db.fetch_outputs_by(q)?)
pub fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerError> {
Ok(self.resources.db.fetch_outputs_by_query(q)?)
}

pub fn fetch_invalid_outputs(&self) -> Result<Vec<DbWalletOutput>, OutputManagerError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
current_tip_height: Option<u64>,
) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
fn fetch_outputs_by_tx_id(&self, tx_id: TxId) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,11 @@ where T: OutputManagerBackend + 'static
Ok(outputs)
}

pub fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
self.db.fetch_outputs_by(q)
pub fn fetch_outputs_by_query(
&self,
q: OutputBackendQuery,
) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
self.db.fetch_outputs_by_query(q)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,9 +1037,9 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
.collect::<Result<Vec<_>, _>>()
}

fn fetch_outputs_by(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
fn fetch_outputs_by_query(&self, q: OutputBackendQuery) -> Result<Vec<DbWalletOutput>, OutputManagerStorageError> {
let mut conn = self.database_connection.get_pooled_connection()?;
Ok(OutputSql::fetch_outputs_by(q, &mut conn)?
Ok(OutputSql::fetch_outputs_by_query(q, &mut conn)?
.into_iter()
.filter_map(|x| {
x.to_db_wallet_output()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl OutputSql {

/// Retrieves UTXOs by a set of given rules
#[allow(clippy::cast_sign_loss)]
pub fn fetch_outputs_by(
pub fn fetch_outputs_by_query(
q: OutputBackendQuery,
conn: &mut SqliteConnection,
) -> Result<Vec<OutputSql>, OutputManagerStorageError> {
Expand Down
21 changes: 21 additions & 0 deletions base_layer/wallet/src/storage/sqlite_utilities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,27 @@ pub fn run_migration_and_create_sqlite_connection<P: AsRef<Path>>(
Ok(WalletDbConnection::new(pool, Some(file_lock)))
}

pub fn run_migration_and_create_sqlite_memory_connection(
sqlite_pool_size: usize,
) -> Result<WalletDbConnection, WalletStorageError> {
let mut pool = SqliteConnectionPool::new(
String::from(":memory:"),
sqlite_pool_size,
true,
true,
Duration::from_secs(60),
);
pool.create_pool()?;
let mut connection = pool.get_pooled_connection()?;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
connection
.run_pending_migrations(MIGRATIONS)
.map_err(|err| WalletStorageError::DatabaseMigrationError(format!("Database migration failed {}", err)))?;

Ok(WalletDbConnection::new(pool, None))
}

pub fn acquire_exclusive_file_lock(db_path: &Path) -> Result<File, WalletStorageError> {
let lock_file_path = match db_path.file_name() {
None => {
Expand Down
6 changes: 6 additions & 0 deletions base_layer/wallet/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tempfile::{tempdir, TempDir};

use crate::storage::sqlite_utilities::{
run_migration_and_create_sqlite_connection,
run_migration_and_create_sqlite_memory_connection,
wallet_db_connection::WalletDbConnection,
};

Expand Down Expand Up @@ -58,6 +59,11 @@ pub fn make_wallet_database_connection(path: Option<String>) -> (WalletDbConnect
(connection, temp_dir)
}

/// A test helper to create a temporary wallet service memory databases
pub fn make_wallet_database_memory_connection() -> WalletDbConnection {
run_migration_and_create_sqlite_memory_connection(16).unwrap()
}

pub fn create_consensus_rules() -> ConsensusManager {
ConsensusManager::builder(Network::LocalNet).build().unwrap()
}
Expand Down
6 changes: 6 additions & 0 deletions base_layer/wallet/src/transaction_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ pub enum TransactionServiceError {
InvalidKeyId(String),
#[error("Invalid key manager data: `{0}`")]
KeyManagerServiceError(#[from] KeyManagerServiceError),
#[error("Serialization error: `{0}`")]
SerializationError(String),
#[error("Transaction exceed maximum byte size. Expected < {expected} but got {got}.")]
TransactionTooLarge { got: usize, expected: usize },
#[error("Pending Transaction was oversized")]
Oversized,
}

impl From<RangeProofError> for TransactionServiceError {
Expand Down
41 changes: 41 additions & 0 deletions base_layer/wallet/src/transaction_service/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,48 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use bincode::serialize_into;
use log::{debug, error};
use serde::Serialize;
use tari_common_types::transaction::TxId;
use tari_comms::protocol::rpc;

use crate::transaction_service::error::{TransactionServiceError, TransactionServiceProtocolError};

pub mod transaction_broadcast_protocol;
pub mod transaction_receive_protocol;
pub mod transaction_send_protocol;
pub mod transaction_validation_protocol;

const LOG_TARGET: &str = "wallet::transaction_service::protocols";

/// Verify that the negotiated transaction is not too large to be broadcast
pub fn check_transaction_size<T: Serialize>(
transaction: &T,
tx_id: TxId,
) -> Result<(), TransactionServiceProtocolError<TxId>> {
let mut buf: Vec<u8> = Vec::new();
serialize_into(&mut buf, transaction).map_err(|e| {
TransactionServiceProtocolError::new(tx_id, TransactionServiceError::SerializationError(e.to_string()))
})?;
const SIZE_MARGIN: usize = 1024 * 10;
if buf.len() > rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN) {
let err = TransactionServiceProtocolError::new(tx_id, TransactionServiceError::TransactionTooLarge {
got: buf.len(),
expected: rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN),
});
error!(
target: LOG_TARGET,
"Transaction '{}' too large, cannot be broadcast ({:?}).",
tx_id, err
);
Err(err)
} else {
debug!(
target: LOG_TARGET,
"Transaction '{}' size ok, can be broadcast (got: {}, limit: {}).",
tx_id, buf.len(), rpc::RPC_MAX_FRAME_SIZE.saturating_sub(SIZE_MARGIN)
);
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::{
transaction_service::{
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::TransactionEvent,
protocols::check_transaction_size,
service::TransactionServiceResources,
storage::{
database::TransactionBackend,
Expand Down Expand Up @@ -127,6 +128,10 @@ where
);
return Ok(self.tx_id);
}
if let Err(e) = check_transaction_size(&completed_tx.transaction, self.tx_id) {
self.cancel_transaction(TxCancellationReason::Oversized).await;
return Err(e);
}

loop {
tokio::select! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
transaction_service::{
error::{TransactionServiceError, TransactionServiceProtocolError},
handle::TransactionEvent,
protocols::check_transaction_size,
service::TransactionServiceResources,
storage::{
database::TransactionBackend,
Expand Down Expand Up @@ -159,6 +160,12 @@ where
Utc::now().naive_utc(),
);

// Verify that the negotiated transaction is not too large to be broadcast
if let Err(e) = check_transaction_size(&inbound_transaction, self.id) {
self.cancel_oversized_transaction().await?;
return Err(e);
}

self.resources
.db
.add_pending_inbound_transaction(inbound_transaction.tx_id, inbound_transaction.clone())
Expand Down Expand Up @@ -242,6 +249,12 @@ where
},
};

// Verify that the negotiated transaction is not too large to be broadcast
if let Err(e) = check_transaction_size(&inbound_tx, self.id) {
self.cancel_oversized_transaction().await?;
return Err(e);
}

// Determine the time remaining before this transaction times out
let elapsed_time = utc_duration_since(&inbound_tx.timestamp)
.map_err(|e| TransactionServiceProtocolError::new(self.id, e.into()))?;
Expand Down Expand Up @@ -469,6 +482,32 @@ where
"Cancelling Transaction Receive Protocol (TxId: {}) due to timeout after no counterparty response", self.id
);

self.cancel_transaction(TxCancellationReason::Timeout).await?;

info!(
target: LOG_TARGET,
"Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id
);

Err(TransactionServiceProtocolError::new(
self.id,
TransactionServiceError::Timeout,
))
}

async fn cancel_oversized_transaction(&mut self) -> Result<(), TransactionServiceProtocolError<TxId>> {
info!(
target: LOG_TARGET,
"Cancelling Transaction Receive Protocol (TxId: {}) due to transaction being oversized", self.id
);

self.cancel_transaction(TxCancellationReason::Oversized).await
}

async fn cancel_transaction(
&mut self,
cancel_reason: TxCancellationReason,
) -> Result<(), TransactionServiceProtocolError<TxId>> {
self.resources.db.cancel_pending_transaction(self.id).map_err(|e| {
warn!(
target: LOG_TARGET,
Expand All @@ -486,10 +525,7 @@ where
let _size = self
.resources
.event_publisher
.send(Arc::new(TransactionEvent::TransactionCancelled(
self.id,
TxCancellationReason::Timeout,
)))
.send(Arc::new(TransactionEvent::TransactionCancelled(self.id, cancel_reason)))
.map_err(|e| {
trace!(
target: LOG_TARGET,
Expand All @@ -502,14 +538,6 @@ where
)
});

info!(
target: LOG_TARGET,
"Pending Transaction (TxId: {}) timed out after no response from counterparty", self.id
);

Err(TransactionServiceProtocolError::new(
self.id,
TransactionServiceError::Timeout,
))
Ok(())
}
}
Loading
Loading