Skip to content

Commit

Permalink
feat: remove spawn blocking calls from wallet db (output manager serv…
Browse files Browse the repository at this point in the history
…ice) (#3982)

Description
---
Removed spawn blocking calls for db operations from the wallet in the output manager service. (_This is the first PR in a couple of PRs required to implement this fully throughout the wallet code._)

We are using SQLite in WAL mode, which provides concurrent read access and single thread write access for the number of pooled connections configured. Implementing spawn blocking calls on top of that for every db operation is counter productive and easily results in interlock situations within the wallet code, as is the case currently with sending transactions in batch mode.

Although batch mode transactions still does not work with only this PR, it showed a definite improvement monitoring the action with tokio console; all interlocks are now within the transaction service and not in the output manager service anymore.

Motivation and Context
---
When sending multiple transactions in batch mode (with for example the `make-it-rain` command), the wallet locks up and no transactions are being sent.

How Has This Been Tested?
---
System level tests sending multiple transactions in batch mode to three receiving wallets.
Monitoring with tokio console.
  • Loading branch information
hansieodendaal committed Mar 31, 2022
1 parent b5797b7 commit cbf75ca
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 495 deletions.
2 changes: 1 addition & 1 deletion applications/tari_app_utilities/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn init_configuration(
// Load and apply configuration file
let cfg = bootstrap.load_configuration()?;

// Initialise the logger
// Initialise the logger (Comment out to enable tokio tracing via tokio-console)
bootstrap.initialize_logging()?;

log::info!(target: LOG_TARGET, "{} ({})", application_type, consts::APP_VERSION);
Expand Down
7 changes: 6 additions & 1 deletion applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ tari_shutdown = { path = "../../infrastructure/shutdown" }
tari_key_manager = { path = "../../base_layer/key_manager" }
tari_utilities = { git = "https://github.com/tari-project/tari_utilities.git", tag = "v0.3.1" }

# Uncomment for tokio tracing via tokio-console (needs "tracing" featurs)
#console-subscriber = "0.1.3"
#tokio = { version = "1.14", features = ["signal", "tracing"] }
# Uncomment for normal use (non tokio-console tracing)
tokio = { version = "1.14", features = ["signal"] }

sha2 = "0.9.5"
digest = "0.9.0"
chrono = { version = "0.4.19", default-features = false }
Expand All @@ -36,7 +42,6 @@ rpassword = "5.0"
rustyline = "9.0"
strum = "0.22"
strum_macros = "0.22"
tokio = { version = "1.11", features = ["signal"] }
thiserror = "1.0.26"
tonic = "0.6.2"
tracing = "0.1.26"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result<Vec<ParsedArgument>,
// txs per second
let txps = args.next().ok_or_else(|| ParseError::Empty("Txs/s".to_string()))?;
let txps = txps.parse::<f64>().map_err(ParseError::Float)?;
if txps > 25.0 {
println!("Maximum transaction rate is 25/sec");
return Err(ParseError::Invalid("Maximum transaction rate is 25/sec".to_string()));
if txps > 250.0 {
println!("Maximum transaction rate is 250/sec");
return Err(ParseError::Invalid("Maximum transaction rate is 250/sec".to_string()));
}
parsed_args.push(ParsedArgument::Float(txps));

Expand Down
3 changes: 3 additions & 0 deletions applications/tari_console_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub mod wallet_modes;

/// Application entry point
fn main() {
// Uncomment to enable tokio tracing via tokio-console
// console_subscriber::init();

match main_inner() {
Ok(_) => process::exit(0),
Err(err) => {
Expand Down
8 changes: 7 additions & 1 deletion applications/tari_mining_node/src/stratum/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub enum Error {
#[error("Can't create TLS connector: {0}")]
Tls(#[from] native_tls::Error),
#[error("Can't establish TLS connection: {0}")]
Tcp(#[from] native_tls::HandshakeError<std::net::TcpStream>),
Tcp(#[from] Box<native_tls::HandshakeError<std::net::TcpStream>>),
#[error("No connected stream")]
NotConnected,
#[error("Can't parse int: {0}")]
Expand All @@ -68,3 +68,9 @@ impl<T> From<std::sync::mpsc::SendError<T>> for Error {
Error::General(format!("Failed to send to a channel: {:?}", error))
}
}

impl From<native_tls::HandshakeError<std::net::TcpStream>> for Error {
fn from(error: native_tls::HandshakeError<std::net::TcpStream>) -> Self {
Error::General(format!("TLS handshake error: {:?}", error))
}
}
4 changes: 2 additions & 2 deletions applications/tari_mining_node/src/stratum/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::stratum::error::Error;

pub(crate) enum Stream {
Stream(BufStream<TcpStream>),
TlsStream(BufStream<TlsStream<TcpStream>>),
TlsStream(Box<BufStream<TlsStream<TcpStream>>>),
}

impl Stream {
Expand All @@ -46,7 +46,7 @@ impl Stream {
let base_host = format!("{}.{}", split_url[split_url.len() - 2], split_url[split_url.len() - 1]);
let mut stream = connector.connect(&base_host, conn)?;
stream.get_mut().set_nonblocking(true)?;
Ok(Self::TlsStream(BufStream::new(stream)))
Ok(Self::TlsStream(Box::from(BufStream::new(stream))))
} else {
conn.set_nonblocking(true)?;
Ok(Self::Stream(BufStream::new(conn)))
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ use crate::{
const LOG_TARGET: &str = "tari::validator_node::app";

fn main() {
// Uncomment to enable tokio tracing via tokio-console
// console_subscriber::init();

if let Err(err) = main_inner() {
let exit_code = err.exit_code;
eprintln!("{:?}", err);
Expand Down
7 changes: 6 additions & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ tari_storage = { version = "^0.30", path = "../../infrastructure/storage" }
tari_common_sqlite = { path = "../../common_sqlite" }
tari_utilities = { git = "https://github.com/tari-project/tari_utilities.git", tag = "v0.3.1" }

# Uncomment for tokio tracing via tokio-console (needs "tracing" featurs)
#console-subscriber = "0.1.3"
#tokio = { version = "1.14", features = ["sync", "macros", "tracing"] }
# Uncomment for normal use (non tokio-console tracing)
tokio = { version = "1.14", features = ["sync", "macros"] }

aes-gcm = "^0.8"
async-trait = "0.1.50"
argon2 = "0.2"
Expand All @@ -47,7 +53,6 @@ strum = "0.22"
strum_macros = "0.22"
tempfile = "3.1.0"
thiserror = "1.0.26"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.4"
prost = "0.9"

Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/src/assets/asset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl<T: OutputManagerBackend + 'static> AssetManager<T> {
let outputs = self
.output_database
.fetch_with_features(OutputFlags::ASSET_REGISTRATION)
.await
.map_err(|err| WalletError::OutputManagerError(err.into()))?;

debug!(
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ pub enum OutputManagerStorageError {
DieselConnectionError(#[from] diesel::ConnectionError),
#[error("Database migration error: `{0}`")]
DatabaseMigrationError(String),
#[error("Blocking task spawn error: `{0}`")]
BlockingTaskSpawnError(String),
#[error("Wallet db is already encrypted and cannot be encrypted until the previous encryption is removed")]
AlreadyEncrypted,
#[error("Byte array error: `{0}`")]
ByteArrayError(#[from] ByteArrayError),
#[error("Aead error: `{0}`")]
AeadError(String),
#[error("Tried to insert a script that already exists in the database")]
DuplicateScript,
#[error("Tari script error : {0}")]
ScriptError(#[from] ScriptError),
#[error("Binary not stored as valid hex:{0}")]
Expand Down
32 changes: 8 additions & 24 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{fmt, fmt::Formatter, sync::Arc};
use aes_gcm::Aes256Gcm;
use tari_common_types::{
transaction::TxId,
types::{BlockHash, HashOutput, PrivateKey, PublicKey},
types::{HashOutput, PrivateKey, PublicKey},
};
use tari_core::{
covenants::Covenant,
Expand All @@ -52,11 +52,8 @@ use tower::Service;

use crate::output_manager_service::{
error::OutputManagerError,
service::Balance,
storage::{
models::{KnownOneSidedPaymentScript, SpendingPriority},
OutputStatus,
},
service::{Balance, OutputStatusesByTxId},
storage::models::{KnownOneSidedPaymentScript, SpendingPriority},
};

/// API Request enum
Expand Down Expand Up @@ -247,21 +244,12 @@ pub enum OutputManagerResponse {
RewoundOutputs(Vec<RecoveredOutput>),
ScanOutputs(Vec<RecoveredOutput>),
AddKnownOneSidedPaymentScript,
CreateOutputWithFeatures {
output: Box<UnblindedOutputBuilder>,
},
CreatePayToSelfWithOutputs {
transaction: Box<Transaction>,
tx_id: TxId,
},
CreateOutputWithFeatures { output: Box<UnblindedOutputBuilder> },
CreatePayToSelfWithOutputs { transaction: Box<Transaction>, tx_id: TxId },
ReinstatedCancelledInboundTx,
CoinbaseAbandonedSet,
ClaimHtlcTransaction((TxId, MicroTari, MicroTari, Transaction)),
OutputStatusesByTxId {
statuses: Vec<OutputStatus>,
mined_height: Option<u64>,
block_hash: Option<BlockHash>,
},
OutputStatusesByTxId(OutputStatusesByTxId),
}

pub type OutputManagerEventSender = broadcast::Sender<Arc<OutputManagerEvent>>;
Expand Down Expand Up @@ -846,17 +834,13 @@ impl OutputManagerHandle {
pub async fn get_output_statuses_by_tx_id(
&mut self,
tx_id: TxId,
) -> Result<(Vec<OutputStatus>, Option<u64>, Option<BlockHash>), OutputManagerError> {
) -> Result<OutputStatusesByTxId, OutputManagerError> {
match self
.handle
.call(OutputManagerRequest::GetOutputStatusesByTxId(tx_id))
.await??
{
OutputManagerResponse::OutputStatusesByTxId {
statuses,
mined_height,
block_hash,
} => Ok((statuses, mined_height, block_hash)),
OutputManagerResponse::OutputStatusesByTxId(output_statuses_by_tx_id) => Ok(output_statuses_by_tx_id),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where
)?;
let tx_id = TxId::new_random();
let output_hex = db_output.commitment.to_hex();
if let Err(e) = self.db.add_unspent_output_with_tx_id(tx_id, db_output).await {
if let Err(e) = self.db.add_unspent_output_with_tx_id(tx_id, db_output) {
match e {
OutputManagerStorageError::DuplicateOutput => {
info!(
Expand Down
Loading

0 comments on commit cbf75ca

Please sign in to comment.