Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make wallet db calls synchronous rather than async #3982

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion applications/tari_app_utilities/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn init_configuration(
// Load and apply configuration file
let cfg = bootstrap.load_configuration()?;

// Initialise the logger
// Initialise the logger (Comment out to enable tokio tracing via tokio-console)
bootstrap.initialize_logging()?;

log::info!(target: LOG_TARGET, "{} ({})", application_type, consts::APP_VERSION);
Expand Down
7 changes: 6 additions & 1 deletion applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ tari_shutdown = { path = "../../infrastructure/shutdown" }
tari_key_manager = { path = "../../base_layer/key_manager" }
tari_utilities = { git = "https://github.com/tari-project/tari_utilities.git", tag = "v0.3.1" }

# Uncomment for tokio tracing via tokio-console (needs "tracing" featurs)
#console-subscriber = "0.1.3"
#tokio = { version = "1.14", features = ["signal", "tracing"] }
# Uncomment for normal use (non tokio-console tracing)
tokio = { version = "1.14", features = ["signal"] }

sha2 = "0.9.5"
digest = "0.9.0"
chrono = { version = "0.4.19", default-features = false }
Expand All @@ -36,7 +42,6 @@ rpassword = "5.0"
rustyline = "9.0"
strum = "0.22"
strum_macros = "0.22"
tokio = { version = "1.11", features = ["signal"] }
thiserror = "1.0.26"
tonic = "0.6.2"
tracing = "0.1.26"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ fn parse_make_it_rain(mut args: SplitWhitespace) -> Result<Vec<ParsedArgument>,
// txs per second
let txps = args.next().ok_or_else(|| ParseError::Empty("Txs/s".to_string()))?;
let txps = txps.parse::<f64>().map_err(ParseError::Float)?;
if txps > 25.0 {
println!("Maximum transaction rate is 25/sec");
return Err(ParseError::Invalid("Maximum transaction rate is 25/sec".to_string()));
if txps > 250.0 {
println!("Maximum transaction rate is 250/sec");
return Err(ParseError::Invalid("Maximum transaction rate is 250/sec".to_string()));
}
parsed_args.push(ParsedArgument::Float(txps));

Expand Down
3 changes: 3 additions & 0 deletions applications/tari_console_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub mod wallet_modes;

/// Application entry point
fn main() {
// Uncomment to enable tokio tracing via tokio-console
// console_subscriber::init();

match main_inner() {
Ok(_) => process::exit(0),
Err(err) => {
Expand Down
8 changes: 7 additions & 1 deletion applications/tari_mining_node/src/stratum/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub enum Error {
#[error("Can't create TLS connector: {0}")]
Tls(#[from] native_tls::Error),
#[error("Can't establish TLS connection: {0}")]
Tcp(#[from] native_tls::HandshakeError<std::net::TcpStream>),
Tcp(#[from] Box<native_tls::HandshakeError<std::net::TcpStream>>),
#[error("No connected stream")]
NotConnected,
#[error("Can't parse int: {0}")]
Expand All @@ -68,3 +68,9 @@ impl<T> From<std::sync::mpsc::SendError<T>> for Error {
Error::General(format!("Failed to send to a channel: {:?}", error))
}
}

impl From<native_tls::HandshakeError<std::net::TcpStream>> for Error {
fn from(error: native_tls::HandshakeError<std::net::TcpStream>) -> Self {
Error::General(format!("TLS handshake error: {:?}", error))
}
}
4 changes: 2 additions & 2 deletions applications/tari_mining_node/src/stratum/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::stratum::error::Error;

pub(crate) enum Stream {
Stream(BufStream<TcpStream>),
TlsStream(BufStream<TlsStream<TcpStream>>),
TlsStream(Box<BufStream<TlsStream<TcpStream>>>),
}

impl Stream {
Expand All @@ -46,7 +46,7 @@ impl Stream {
let base_host = format!("{}.{}", split_url[split_url.len() - 2], split_url[split_url.len() - 1]);
let mut stream = connector.connect(&base_host, conn)?;
stream.get_mut().set_nonblocking(true)?;
Ok(Self::TlsStream(BufStream::new(stream)))
Ok(Self::TlsStream(Box::from(BufStream::new(stream))))
} else {
conn.set_nonblocking(true)?;
Ok(Self::Stream(BufStream::new(conn)))
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ use crate::{
const LOG_TARGET: &str = "tari::validator_node::app";

fn main() {
// Uncomment to enable tokio tracing via tokio-console
// console_subscriber::init();

if let Err(err) = main_inner() {
let exit_code = err.exit_code;
eprintln!("{:?}", err);
Expand Down
7 changes: 6 additions & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ tari_storage = { version = "^0.30", path = "../../infrastructure/storage" }
tari_common_sqlite = { path = "../../common_sqlite" }
tari_utilities = { git = "https://github.com/tari-project/tari_utilities.git", tag = "v0.3.1" }

# Uncomment for tokio tracing via tokio-console (needs "tracing" featurs)
#console-subscriber = "0.1.3"
#tokio = { version = "1.14", features = ["sync", "macros", "tracing"] }
# Uncomment for normal use (non tokio-console tracing)
tokio = { version = "1.14", features = ["sync", "macros"] }

aes-gcm = "^0.8"
async-trait = "0.1.50"
argon2 = "0.2"
Expand All @@ -47,7 +53,6 @@ strum = "0.22"
strum_macros = "0.22"
tempfile = "3.1.0"
thiserror = "1.0.26"
tokio = { version = "1.11", features = ["sync", "macros"] }
tower = "0.4"
prost = "0.9"

Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/src/assets/asset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl<T: OutputManagerBackend + 'static> AssetManager<T> {
let outputs = self
.output_database
.fetch_with_features(OutputFlags::ASSET_REGISTRATION)
.await
.map_err(|err| WalletError::OutputManagerError(err.into()))?;

debug!(
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ pub enum OutputManagerStorageError {
DieselConnectionError(#[from] diesel::ConnectionError),
#[error("Database migration error: `{0}`")]
DatabaseMigrationError(String),
#[error("Blocking task spawn error: `{0}`")]
BlockingTaskSpawnError(String),
#[error("Wallet db is already encrypted and cannot be encrypted until the previous encryption is removed")]
AlreadyEncrypted,
#[error("Byte array error: `{0}`")]
ByteArrayError(#[from] ByteArrayError),
#[error("Aead error: `{0}`")]
AeadError(String),
#[error("Tried to insert a script that already exists in the database")]
DuplicateScript,
#[error("Tari script error : {0}")]
ScriptError(#[from] ScriptError),
#[error("Binary not stored as valid hex:{0}")]
Expand Down
32 changes: 8 additions & 24 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{fmt, fmt::Formatter, sync::Arc};
use aes_gcm::Aes256Gcm;
use tari_common_types::{
transaction::TxId,
types::{BlockHash, HashOutput, PrivateKey, PublicKey},
types::{HashOutput, PrivateKey, PublicKey},
};
use tari_core::{
covenants::Covenant,
Expand All @@ -52,11 +52,8 @@ use tower::Service;

use crate::output_manager_service::{
error::OutputManagerError,
service::Balance,
storage::{
models::{KnownOneSidedPaymentScript, SpendingPriority},
OutputStatus,
},
service::{Balance, OutputStatusesByTxId},
storage::models::{KnownOneSidedPaymentScript, SpendingPriority},
};

/// API Request enum
Expand Down Expand Up @@ -247,21 +244,12 @@ pub enum OutputManagerResponse {
RewoundOutputs(Vec<RecoveredOutput>),
ScanOutputs(Vec<RecoveredOutput>),
AddKnownOneSidedPaymentScript,
CreateOutputWithFeatures {
output: Box<UnblindedOutputBuilder>,
},
CreatePayToSelfWithOutputs {
transaction: Box<Transaction>,
tx_id: TxId,
},
CreateOutputWithFeatures { output: Box<UnblindedOutputBuilder> },
CreatePayToSelfWithOutputs { transaction: Box<Transaction>, tx_id: TxId },
ReinstatedCancelledInboundTx,
CoinbaseAbandonedSet,
ClaimHtlcTransaction((TxId, MicroTari, MicroTari, Transaction)),
OutputStatusesByTxId {
statuses: Vec<OutputStatus>,
mined_height: Option<u64>,
block_hash: Option<BlockHash>,
},
OutputStatusesByTxId(OutputStatusesByTxId),
}

pub type OutputManagerEventSender = broadcast::Sender<Arc<OutputManagerEvent>>;
Expand Down Expand Up @@ -846,17 +834,13 @@ impl OutputManagerHandle {
pub async fn get_output_statuses_by_tx_id(
&mut self,
tx_id: TxId,
) -> Result<(Vec<OutputStatus>, Option<u64>, Option<BlockHash>), OutputManagerError> {
) -> Result<OutputStatusesByTxId, OutputManagerError> {
match self
.handle
.call(OutputManagerRequest::GetOutputStatusesByTxId(tx_id))
.await??
{
OutputManagerResponse::OutputStatusesByTxId {
statuses,
mined_height,
block_hash,
} => Ok((statuses, mined_height, block_hash)),
OutputManagerResponse::OutputStatusesByTxId(output_statuses_by_tx_id) => Ok(output_statuses_by_tx_id),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where
)?;
let tx_id = TxId::new_random();
let output_hex = db_output.commitment.to_hex();
if let Err(e) = self.db.add_unspent_output_with_tx_id(tx_id, db_output).await {
if let Err(e) = self.db.add_unspent_output_with_tx_id(tx_id, db_output) {
match e {
OutputManagerStorageError::DuplicateOutput => {
info!(
Expand Down
Loading