diff --git a/applications/minotari_console_wallet/src/ui/mod.rs b/applications/minotari_console_wallet/src/ui/mod.rs index 93f14443a5..5413c7ee22 100644 --- a/applications/minotari_console_wallet/src/ui/mod.rs +++ b/applications/minotari_console_wallet/src/ui/mod.rs @@ -108,6 +108,7 @@ fn crossterm_loop(mut app: App>) -> Result<(), ExitErro error!(target: LOG_TARGET, "Error drawing interface. {}", e); ExitCode::InterfaceError })?; + #[allow(clippy::blocks_in_conditions)] match events.next().map_err(|e| { error!(target: LOG_TARGET, "Error reading input event: {}", e); ExitCode::InterfaceError diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 1150e1c345..727df42046 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -344,6 +344,7 @@ where B: BlockchainBackend + 'static "A peer has requested a block with hash {}", block_hex ); + #[allow(clippy::blocks_in_conditions)] let maybe_block = match self .blockchain_db .fetch_block_by_hash(hash, true) diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 1c35d079c4..81db956111 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -110,6 +110,7 @@ impl BaseNodeSyncRpcService { #[tari_comms::async_trait] impl BaseNodeSyncService for BaseNodeSyncRpcService { #[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn sync_blocks( &self, request: Request, @@ -273,6 +274,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } #[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn sync_headers( &self, request: Request, @@ -373,6 +375,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } #[instrument(level = "trace", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn get_header_by_height( &self, request: Request, @@ -389,6 +392,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } #[instrument(level = "debug", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn find_chain_split( &self, request: Request, @@ -452,6 +456,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } #[instrument(level = "trace", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn get_chain_metadata(&self, _: Request<()>) -> Result, RpcStatus> { let chain_metadata = self .db() @@ -462,6 +467,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } #[instrument(level = "trace", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn sync_kernels( &self, request: Request, @@ -588,6 +594,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } #[instrument(level = "trace", skip(self), err)] + #[allow(clippy::blocks_in_conditions)] async fn sync_utxos(&self, request: Request) -> Result, RpcStatus> { let req = request.message(); let peer_node_id = request.context().peer_node_id(); diff --git a/base_layer/core/src/chain_storage/lmdb_db/helpers.rs b/base_layer/core/src/chain_storage/lmdb_db/helpers.rs index 1bf6b2e60d..064b7be75b 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/helpers.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/helpers.rs @@ -20,23 +20,53 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::time::Instant; + use lmdb_zero::error; use log::*; use serde::{de::DeserializeOwned, Serialize}; +use tari_storage::lmdb_store::BYTES_PER_MB; use crate::chain_storage::ChainStorageError; pub const LOG_TARGET: &str = "c::cs::lmdb_db::lmdb"; -pub fn serialize(data: &T) -> Result, ChainStorageError> +/// Serialize the given data into a byte vector +/// Note: +/// `size_hint` is given as an option as checking what the serialized would be is expensive +/// for large data structures at ~30% overhead +pub fn serialize(data: &T, size_hint: Option) -> Result, ChainStorageError> where T: Serialize { - let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?; - #[allow(clippy::cast_possible_truncation)] - let mut buf = Vec::with_capacity(size as usize); + let start = Instant::now(); + let mut buf = if let Some(size) = size_hint { + Vec::with_capacity(size) + } else { + let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?; + #[allow(clippy::cast_possible_truncation)] + Vec::with_capacity(size as usize) + }; + let check_time = start.elapsed(); bincode::serialize_into(&mut buf, data).map_err(|e| { error!(target: LOG_TARGET, "Could not serialize lmdb: {:?}", e); ChainStorageError::AccessError(e.to_string()) })?; + if buf.len() >= BYTES_PER_MB { + let serialize_time = start.elapsed() - check_time; + trace!( + "lmdb_replace - {} MB, serialize check in {:.2?}, serialize in {:.2?}", + buf.len() / BYTES_PER_MB, + check_time, + serialize_time + ); + } + if let Some(size) = size_hint { + if buf.len() > size { + warn!( + target: LOG_TARGET, + "lmdb_replace - Serialized size hint was too small. Expected {}, got {}", size, buf.len() + ); + } + } Ok(buf) } diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb.rs index fb85cbb95f..2b104b7289 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::fmt::Debug; +use std::{fmt::Debug, time::Instant}; use lmdb_zero::{ del, @@ -37,6 +37,7 @@ use lmdb_zero::{ }; use log::*; use serde::{de::DeserializeOwned, Serialize}; +use tari_storage::lmdb_store::BYTES_PER_MB; use tari_utilities::hex::to_hex; use crate::chain_storage::{ @@ -62,7 +63,7 @@ where K: AsLmdbBytes + ?Sized + Debug, V: Serialize + Debug, { - let val_buf = serialize(val)?; + let val_buf = serialize(val, None)?; match txn.access().put(db, key, &val_buf, put::NOOVERWRITE) { Ok(_) => { trace!( @@ -112,7 +113,7 @@ where K: AsLmdbBytes + ?Sized, V: Serialize, { - let val_buf = serialize(val)?; + let val_buf = serialize(val, None)?; txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| { if let lmdb_zero::Error::Code(code) = &e { if *code == lmdb_zero::error::MAP_FULL { @@ -128,13 +129,20 @@ where } /// Inserts or replaces the item at the given key. If the key does not exist, a new entry is created -pub fn lmdb_replace(txn: &WriteTransaction<'_>, db: &Database, key: &K, val: &V) -> Result<(), ChainStorageError> +pub fn lmdb_replace( + txn: &WriteTransaction<'_>, + db: &Database, + key: &K, + val: &V, + size_hint: Option, +) -> Result<(), ChainStorageError> where K: AsLmdbBytes + ?Sized, V: Serialize, { - let val_buf = serialize(val)?; - txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| { + let val_buf = serialize(val, size_hint)?; + let start = Instant::now(); + let res = txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| { if let lmdb_zero::Error::Code(code) = &e { if *code == lmdb_zero::error::MAP_FULL { return ChainStorageError::DbResizeRequired(Some(val_buf.len())); @@ -145,7 +153,16 @@ where "Could not replace value in lmdb transaction: {:?}", e ); ChainStorageError::AccessError(e.to_string()) - }) + }); + if val_buf.len() >= BYTES_PER_MB { + let write_time = start.elapsed(); + trace!( + "lmdb_replace - {} MB, lmdb write in {:.2?}", + val_buf.len() / BYTES_PER_MB, + write_time + ); + } + res } /// Deletes the given key. An error is returned if the key does not exist @@ -175,7 +192,7 @@ where K: AsLmdbBytes + ?Sized, V: Serialize, { - txn.access().del_item(db, key, &serialize(value)?)?; + txn.access().del_item(db, key, &serialize(value, None)?)?; Ok(()) } diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index a74cca1897..64fb8f5597 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{convert::TryFrom, fmt, fs, fs::File, ops::Deref, path::Path, sync::Arc, time::Instant}; +use std::{cmp::max, convert::TryFrom, fmt, fs, fs::File, ops::Deref, path::Path, sync::Arc, time::Instant}; use fs2::FileExt; use lmdb_zero::{ @@ -41,7 +41,7 @@ use tari_common_types::{ types::{BlockHash, Commitment, FixedHash, HashOutput, PublicKey, Signature}, }; use tari_mmr::sparse_merkle_tree::{DeleteResult, NodeKey, ValueHash}; -use tari_storage::lmdb_store::{db, LMDBBuilder, LMDBConfig, LMDBStore}; +use tari_storage::lmdb_store::{db, LMDBBuilder, LMDBConfig, LMDBStore, BYTES_PER_MB}; use tari_utilities::{ hex::{to_hex, Hex}, ByteArray, @@ -63,7 +63,6 @@ use crate::{ error::{ChainStorageError, OrNotFound}, lmdb_db::{ composite_key::{CompositeKey, InputKey, OutputKey}, - helpers::serialize, lmdb::{ fetch_db_entry_sizes, lmdb_clear, @@ -331,23 +330,10 @@ impl LMDBDatabase { #[allow(clippy::enum_glob_use)] use WriteOperation::*; - // Ensure there will be enough space in the database to insert the block before it is attempted; this is more - // efficient than relying on an error if the LMDB environment map size was reached with each component's insert - // operation, with cleanup, resize and re-try. This will also prevent block sync from stalling due to LMDB - // environment map size being reached. - if txn.operations().iter().any(|op| { - matches!(op, InsertOrphanBlock { .. }) || - matches!(op, InsertTipBlockBody { .. }) || - matches!(op, InsertChainOrphanBlock { .. }) - }) { - unsafe { - LMDBStore::resize_if_required(&self.env, &self.env_config)?; - } - } - + let number_of_operations = txn.operations().len(); let write_txn = self.write_transaction()?; - for op in txn.operations() { - trace!(target: LOG_TARGET, "[apply_db_transaction] WriteOperation: {}", op); + for (i, op) in txn.operations().iter().enumerate() { + trace!(target: LOG_TARGET, "[apply_db_transaction] WriteOperation: {} ({} of {})", op, i + 1, number_of_operations); match op { InsertOrphanBlock(block) => self.insert_orphan_block(&write_txn, block)?, InsertChainHeader { header } => { @@ -499,7 +485,7 @@ impl LMDBDatabase { self.insert_bad_block_and_cleanup(&write_txn, hash, *height, reason.to_string())?; }, InsertReorg { reorg } => { - lmdb_replace(&write_txn, &self.reorgs, &reorg.local_time.timestamp(), &reorg)?; + lmdb_replace(&write_txn, &self.reorgs, &reorg.local_time.timestamp(), &reorg, None)?; }, ClearAllReorgs => { lmdb_clear(&write_txn, &self.reorgs)?; @@ -514,40 +500,41 @@ impl LMDBDatabase { Ok(()) } - fn all_dbs(&self) -> [(&'static str, &DatabaseRef); 26] { + fn all_dbs(&self) -> [(&'static str, &DatabaseRef); 27] { [ - ("metadata_db", &self.metadata_db), - ("headers_db", &self.headers_db), - ("header_accumulated_data_db", &self.header_accumulated_data_db), - ("block_accumulated_data_db", &self.block_accumulated_data_db), - ("block_hashes_db", &self.block_hashes_db), - ("utxos_db", &self.utxos_db), - ("inputs_db", &self.inputs_db), - ("txos_hash_to_index_db", &self.txos_hash_to_index_db), - ("kernels_db", &self.kernels_db), - ("kernel_excess_index", &self.kernel_excess_index), - ("kernel_excess_sig_index", &self.kernel_excess_sig_index), - ("kernel_mmr_size_index", &self.kernel_mmr_size_index), - ("utxo_commitment_index", &self.utxo_commitment_index), - ("contract_index", &self.contract_index), - ("unique_id_index", &self.unique_id_index), + (LMDB_DB_METADATA, &self.metadata_db), + (LMDB_DB_HEADERS, &self.headers_db), + (LMDB_DB_HEADER_ACCUMULATED_DATA, &self.header_accumulated_data_db), + (LMDB_DB_BLOCK_ACCUMULATED_DATA, &self.block_accumulated_data_db), + (LMDB_DB_BLOCK_HASHES, &self.block_hashes_db), + (LMDB_DB_UTXOS, &self.utxos_db), + (LMDB_DB_INPUTS, &self.inputs_db), + (LMDB_DB_TXOS_HASH_TO_INDEX, &self.txos_hash_to_index_db), + (LMDB_DB_KERNELS, &self.kernels_db), + (LMDB_DB_KERNEL_EXCESS_INDEX, &self.kernel_excess_index), + (LMDB_DB_KERNEL_EXCESS_SIG_INDEX, &self.kernel_excess_sig_index), + (LMDB_DB_KERNEL_MMR_SIZE_INDEX, &self.kernel_mmr_size_index), + (LMDB_DB_UTXO_COMMITMENT_INDEX, &self.utxo_commitment_index), + (LMDB_DB_CONTRACT_ID_INDEX, &self.contract_index), + (LMDB_DB_UNIQUE_ID_INDEX, &self.unique_id_index), ( - "deleted_txo_hash_to_header_index", + LMDB_DB_DELETED_TXO_HASH_TO_HEADER_INDEX, &self.deleted_txo_hash_to_header_index, ), - ("orphans_db", &self.orphans_db), + (LMDB_DB_ORPHANS, &self.orphans_db), ( - "orphan_header_accumulated_data_db", + LMDB_DB_ORPHAN_HEADER_ACCUMULATED_DATA, &self.orphan_header_accumulated_data_db, ), - ("monero_seed_height_db", &self.monero_seed_height_db), - ("orphan_chain_tips_db", &self.orphan_chain_tips_db), - ("orphan_parent_map_index", &self.orphan_parent_map_index), - ("bad_blocks", &self.bad_blocks), - ("reorgs", &self.reorgs), - ("validator_nodes", &self.validator_nodes), - ("validator_nodes_mapping", &self.validator_nodes_mapping), - ("template_registrations", &self.template_registrations), + (LMDB_DB_MONERO_SEED_HEIGHT, &self.monero_seed_height_db), + (LMDB_DB_ORPHAN_CHAIN_TIPS, &self.orphan_chain_tips_db), + (LMDB_DB_ORPHAN_PARENT_MAP_INDEX, &self.orphan_parent_map_index), + (LMDB_DB_BAD_BLOCK_LIST, &self.bad_blocks), + (LMDB_DB_REORGS, &self.reorgs), + (LMDB_DB_VALIDATOR_NODES, &self.validator_nodes), + (LMDB_DB_TIP_UTXO_SMT, &self.tip_utxo_smt), + (LMDB_DB_VALIDATOR_NODES_MAPPING, &self.validator_nodes_mapping), + (LMDB_DB_TEMPLATE_REGISTRATIONS, &self.template_registrations), ] } @@ -591,7 +578,7 @@ impl LMDBDatabase { mined_height: header_height, mined_timestamp: header_timestamp, }, - "utxos_db", + LMDB_DB_UTXOS, )?; Ok(()) @@ -731,7 +718,7 @@ impl LMDBDatabase { k: MetadataKey, v: &MetadataValue, ) -> Result<(), ChainStorageError> { - lmdb_replace(txn, &self.metadata_db, &k.as_u32(), v)?; + lmdb_replace(txn, &self.metadata_db, &k.as_u32(), v, None)?; Ok(()) } @@ -1420,24 +1407,40 @@ impl LMDBDatabase { } fn insert_tip_smt(&self, txn: &WriteTransaction<'_>, smt: &OutputSmt) -> Result<(), ChainStorageError> { + let start = Instant::now(); let k = MetadataKey::TipSmt; - match lmdb_replace(txn, &self.tip_utxo_smt, &k.as_u32(), smt) { + // This is best effort, if it fails (typically when the entry does not yet exist) we just log it + if let Err(e) = lmdb_delete(txn, &self.tip_utxo_smt, &k.as_u32(), LMDB_DB_TIP_UTXO_SMT) { + debug!( + "Could NOT delete '{}' db with key '{}' ({})", + LMDB_DB_TIP_UTXO_SMT, + to_hex(k.as_u32().as_lmdb_bytes()), + e + ); + } + + #[allow(clippy::cast_possible_truncation)] + let estimated_bytes = smt.size().saturating_mul(225) as usize; + match lmdb_replace(txn, &self.tip_utxo_smt, &k.as_u32(), smt, Some(estimated_bytes)) { Ok(_) => { trace!( - "Inserted {} bytes with key '{}' into 'tip_utxo_smt' (size {})", - serialize(smt).unwrap_or_default().len(), + "Inserted ~{} MB with key '{}' into '{}' (size {}) in {:.2?}", + estimated_bytes / BYTES_PER_MB, to_hex(k.as_u32().as_lmdb_bytes()), - smt.size() + LMDB_DB_TIP_UTXO_SMT, + smt.size(), + start.elapsed() ); Ok(()) }, Err(e) => { if let ChainStorageError::DbResizeRequired(Some(val)) = e { trace!( - "Could NOT insert {} bytes with key '{}' into 'tip_utxo_smt' (size {})", - val, + "Could NOT insert {} MB with key '{}' into '{}' (size {})", + val / BYTES_PER_MB, to_hex(k.as_u32().as_lmdb_bytes()), + LMDB_DB_TIP_UTXO_SMT, smt.size() ); } @@ -1469,7 +1472,13 @@ impl LMDBDatabase { block_accum_data.kernels = kernel_hash_set; } - lmdb_replace(write_txn, &self.block_accumulated_data_db, &height, &block_accum_data)?; + lmdb_replace( + write_txn, + &self.block_accumulated_data_db, + &height, + &block_accum_data, + None, + )?; Ok(()) } @@ -1481,7 +1490,7 @@ impl LMDBDatabase { ) -> Result<(), ChainStorageError> { let current_height = lmdb_get(write_txn, &self.monero_seed_height_db, seed)?.unwrap_or(std::u64::MAX); if height < current_height { - lmdb_replace(write_txn, &self.monero_seed_height_db, seed, &height)?; + lmdb_replace(write_txn, &self.monero_seed_height_db, seed, &height, None)?; }; Ok(()) } @@ -1524,7 +1533,7 @@ impl LMDBDatabase { buffer.copy_from_slice(&key_bytes[0..32]); let key = OutputKey::new(&FixedHash::from(buffer), &input.output_hash())?; debug!(target: LOG_TARGET, "Pruning output from 'utxos_db': key '{}'", key.0); - lmdb_delete(write_txn, &self.utxos_db, &key.convert_to_comp_key(), "utxos_db")?; + lmdb_delete(write_txn, &self.utxos_db, &key.convert_to_comp_key(), LMDB_DB_UTXOS)?; }; // From 'txos_hash_to_index_db::utxos_db' debug!( @@ -1536,7 +1545,7 @@ impl LMDBDatabase { write_txn, &self.txos_hash_to_index_db, input.output_hash().as_slice(), - "utxos_db", + LMDB_DB_UTXOS, )?; } @@ -1566,14 +1575,14 @@ impl LMDBDatabase { write_txn, &self.txos_hash_to_index_db, output_hash.as_slice(), - "utxos_db", + LMDB_DB_UTXOS, )?; let mut buffer = [0u8; 32]; buffer.copy_from_slice(&key_bytes[0..32]); let key = OutputKey::new(&FixedHash::from(buffer), output_hash)?; debug!(target: LOG_TARGET, "Pruning output from 'utxos_db': key '{}'", key.0); - lmdb_delete(write_txn, &self.utxos_db, &key.convert_to_comp_key(), "utxos_db")?; + lmdb_delete(write_txn, &self.utxos_db, &key.convert_to_comp_key(), LMDB_DB_UTXOS)?; }, None => return Err(ChainStorageError::InvalidOperation("Output key not found".to_string())), } @@ -1639,7 +1648,7 @@ impl LMDBDatabase { #[cfg(not(test))] const CLEAN_BAD_BLOCKS_BEFORE_REL_HEIGHT: u64 = 0; - lmdb_replace(txn, &self.bad_blocks, hash.deref(), &(height, reason))?; + lmdb_replace(txn, &self.bad_blocks, hash.deref(), &(height, reason), None)?; // Clean up bad blocks that are far from the tip let metadata = fetch_metadata(txn, &self.metadata_db)?; let deleted_before_height = metadata @@ -1791,10 +1800,35 @@ impl BlockchainBackend for LMDBDatabase { return Ok(()); } + // Ensure there will be enough space in the database to insert the block and replace the SMT before it is + // attempted; this is more efficient than relying on an error if the LMDB environment map size was reached with + // the write operation, with cleanup, resize and re-try afterwards. + let block_operations = txn.operations().iter().filter(|op| { + matches!(op, WriteOperation::InsertOrphanBlock { .. }) || + matches!(op, WriteOperation::InsertTipBlockBody { .. }) || + matches!(op, WriteOperation::InsertChainOrphanBlock { .. }) + }); + let count = block_operations.count(); + if count > 0 { + let (mapsize, size_used_bytes, size_left_bytes) = LMDBStore::get_stats(&self.env)?; + trace!( + target: LOG_TARGET, + "[apply_db_transaction] Block insert operations: {}, mapsize: {} MB, used: {} MB, remaining: {} MB", + count, mapsize / BYTES_PER_MB, size_used_bytes / BYTES_PER_MB, size_left_bytes / BYTES_PER_MB + ); + unsafe { + LMDBStore::resize_if_required( + &self.env, + &self.env_config, + Some(max(self.env_config.grow_size_bytes(), 128 * BYTES_PER_MB)), + )?; + } + } + let mark = Instant::now(); - // Resize this many times before assuming something is not right - const MAX_RESIZES: usize = 5; - for i in 0..MAX_RESIZES { + // Resize this many times before assuming something is not right (up to 1 GB) + let max_resizes = 1024 * BYTES_PER_MB / self.env_config.grow_size_bytes(); + for i in 0..max_resizes { let num_operations = txn.operations().len(); match self.apply_db_transaction(&txn) { Ok(_) => { @@ -1807,7 +1841,7 @@ impl BlockchainBackend for LMDBDatabase { return Ok(()); }, - Err(ChainStorageError::DbResizeRequired(shortfall)) => { + Err(ChainStorageError::DbResizeRequired(size_that_could_not_be_written)) => { info!( target: LOG_TARGET, "Database resize required (resized {} time(s) in this transaction)", @@ -1818,7 +1852,7 @@ impl BlockchainBackend for LMDBDatabase { // BlockchainDatabase, so we know there are no other threads taking out LMDB transactions when this // is called. unsafe { - LMDBStore::resize(&self.env, &self.env_config, shortfall)?; + LMDBStore::resize(&self.env, &self.env_config, size_that_could_not_be_written)?; } }, Err(e) => { @@ -2749,6 +2783,7 @@ fn run_migrations(db: &LMDBDatabase) -> Result<(), ChainStorageError> { &db.metadata_db, &k.as_u32(), &MetadataValue::MigrationVersion(MIGRATION_VERSION), + None, )?; txn.commit()?; } diff --git a/base_layer/core/src/chain_storage/tests/blockchain_database.rs b/base_layer/core/src/chain_storage/tests/blockchain_database.rs index 5469ab685b..dd18656bc6 100644 --- a/base_layer/core/src/chain_storage/tests/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/tests/blockchain_database.rs @@ -428,7 +428,7 @@ mod fetch_total_size_stats { let _block_and_outputs = add_many_chained_blocks(2, &db, &key_manager).await; let stats = db.fetch_total_size_stats().unwrap(); assert_eq!( - stats.sizes().iter().find(|s| s.name == "utxos_db").unwrap().num_entries, + stats.sizes().iter().find(|s| s.name == "utxos").unwrap().num_entries, genesis_output_count + 2 ); } diff --git a/base_layer/core/src/validation/aggregate_body/aggregate_body_chain_validator.rs b/base_layer/core/src/validation/aggregate_body/aggregate_body_chain_validator.rs index ffb95f148b..d1a799233c 100644 --- a/base_layer/core/src/validation/aggregate_body/aggregate_body_chain_validator.rs +++ b/base_layer/core/src/validation/aggregate_body/aggregate_body_chain_validator.rs @@ -110,7 +110,6 @@ fn validate_input_not_pruned( db: &B, ) -> Result, ValidationError> { let mut inputs: Vec = body.inputs().clone(); - let outputs: Vec = body.outputs().clone(); for input in &mut inputs { if input.is_compact() { let output = match db.fetch_output(&input.output_hash()) { @@ -118,7 +117,7 @@ fn validate_input_not_pruned( Some(output_mined_info) => output_mined_info.output, None => { let input_output_hash = input.output_hash(); - if let Some(found) = outputs.iter().find(|o| o.hash() == input_output_hash) { + if let Some(found) = body.outputs().iter().find(|o| o.hash() == input_output_hash) { found.clone() } else { warn!( diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 8fb30829e3..ef7cdff9de 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -4924,6 +4924,7 @@ pub unsafe extern "C" fn comms_list_connected_public_keys( let mut connectivity = (*wallet).wallet.comms.connectivity(); let peer_manager = (*wallet).wallet.comms.peer_manager(); + #[allow(clippy::blocks_in_conditions)] match (*wallet).runtime.block_on(async move { let connections = connectivity.get_active_connections().await?; let mut public_keys = Vec::with_capacity(connections.len()); @@ -6412,6 +6413,7 @@ pub unsafe extern "C" fn wallet_get_seed_peers(wallet: *mut TariWallet, error_ou } let peer_manager = (*wallet).wallet.comms.peer_manager(); let query = PeerQuery::new().select_where(|p| p.is_seed()); + #[allow(clippy::blocks_in_conditions)] match (*wallet).runtime.block_on(async move { let peers = peer_manager.perform_query(query).await?; let mut public_keys = Vec::with_capacity(peers.len()); diff --git a/common/config/presets/g_miner.toml b/common/config/presets/g_miner.toml index 35b58500f9..45e473b9c9 100644 --- a/common/config/presets/g_miner.toml +++ b/common/config/presets/g_miner.toml @@ -42,11 +42,6 @@ # Base node reconnect timeout after any GRPC or miner error (default: 10 s) #wait_timeout_on_error = 10 -# The extra data to store in the coinbase, usually some data about the mining pool. -# Note that this data is publicly readable, but it is suggested you populate it so that -# pool dominance can be seen before any one party has more than 51%. (default = "minotari_miner") -#coinbase_extra = "minotari_miner" - # The Tari wallet address (valid address in hex) where the mining funds will be sent to - must be assigned # e.g. "78e724f466d202abdee0f23c261289074e4a2fc9eb61e83e0179eead76ce2d3f17" #wallet_payment_address = "YOUR_WALLET_TARI_ADDRESS" diff --git a/infrastructure/storage/src/lmdb_store/mod.rs b/infrastructure/storage/src/lmdb_store/mod.rs index 2833649193..2e2cf94c86 100644 --- a/infrastructure/storage/src/lmdb_store/mod.rs +++ b/infrastructure/storage/src/lmdb_store/mod.rs @@ -28,4 +28,4 @@ pub use lmdb_zero::{ db, traits::{AsLmdbBytes, FromLmdbBytes}, }; -pub use store::{DatabaseRef, LMDBBuilder, LMDBConfig, LMDBDatabase, LMDBStore}; +pub use store::{DatabaseRef, LMDBBuilder, LMDBConfig, LMDBDatabase, LMDBStore, BYTES_PER_MB}; diff --git a/infrastructure/storage/src/lmdb_store/store.rs b/infrastructure/storage/src/lmdb_store/store.rs index 471de6a3a6..bf2cc33fc3 100644 --- a/infrastructure/storage/src/lmdb_store/store.rs +++ b/infrastructure/storage/src/lmdb_store/store.rs @@ -9,6 +9,7 @@ use std::{ convert::TryInto, path::{Path, PathBuf}, sync::Arc, + time::Instant, }; use lmdb_zero::{ @@ -41,7 +42,7 @@ use crate::{ }; const LOG_TARGET: &str = "lmdb"; -const BYTES_PER_MB: usize = 1024 * 1024; +pub const BYTES_PER_MB: usize = 1024 * 1024; /// An atomic pointer to an LMDB database instance pub type DatabaseRef = Arc>; @@ -92,7 +93,8 @@ impl LMDBConfig { impl Default for LMDBConfig { fn default() -> Self { - Self::new_from_mb(16, 16, 8) + // Do not choose these values too small, as the entire SMT is replaced for every new block + Self::new_from_mb(128, 128, 64) } } @@ -186,7 +188,7 @@ impl LMDBBuilder { let flags = self.env_flags | open::NOTLS; let env = builder.open(&path, flags, 0o600)?; // SAFETY: no transactions can be open at this point - LMDBStore::resize_if_required(&env, &self.env_config)?; + LMDBStore::resize_if_required(&env, &self.env_config, None)?; Arc::new(env) }; @@ -346,16 +348,15 @@ pub struct LMDBStore { } impl LMDBStore { - /// Close all databases and close the environment. You cannot be guaranteed that the dbs will be closed after - /// calling this function because there still may be threads accessing / writing to a database that will block - /// this call. However, in that case `shutdown` returns an error. + /// Force flush the data buffers to disk. pub fn flush(&self) -> Result<(), lmdb_zero::error::Error> { - trace!(target: LOG_TARGET, "Forcing flush of buffers to disk"); + let start = Instant::now(); self.env.sync(true)?; - debug!(target: LOG_TARGET, "LMDB Buffers have been flushed"); + trace!(target: LOG_TARGET, "LMDB buffers flushed in {:.2?}", start.elapsed()); Ok(()) } + /// Write log information about the LMDB environment and databases to the log. pub fn log_info(&self) { match self.env.info() { Err(e) => warn!( @@ -406,10 +407,12 @@ impl LMDBStore { self.databases.get(db_name).cloned() } + /// Returns the LMDB environment configuration pub fn env_config(&self) -> LMDBConfig { self.env_config.clone() } + /// Returns the LMDB environment with handle pub fn env(&self) -> Arc { self.env.clone() } @@ -421,22 +424,39 @@ impl LMDBStore { /// not check for this condition, the caller must ensure it explicitly. /// /// - pub unsafe fn resize_if_required(env: &Environment, config: &LMDBConfig) -> Result<(), LMDBError> { - let env_info = env.info()?; - let stat = env.stat()?; - let size_used_bytes = stat.psize as usize * env_info.last_pgno; - let size_left_bytes = env_info.mapsize - size_used_bytes; - - if size_left_bytes <= config.resize_threshold_bytes { + pub unsafe fn resize_if_required( + env: &Environment, + config: &LMDBConfig, + increase_threshold_by: Option, + ) -> Result<(), LMDBError> { + let (mapsize, size_used_bytes, size_left_bytes) = LMDBStore::get_stats(env)?; + if size_left_bytes <= config.resize_threshold_bytes + increase_threshold_by.unwrap_or_default() { debug!( target: LOG_TARGET, - "Resize required: Used bytes: {}, Remaining bytes: {}", size_used_bytes, size_left_bytes + "Resize required: mapsize: {} MB, used: {} MB, remaining: {} MB", + mapsize / BYTES_PER_MB, + size_used_bytes / BYTES_PER_MB, + size_left_bytes / BYTES_PER_MB ); - Self::resize(env, config, None)?; + Self::resize(env, config, Some(increase_threshold_by.unwrap_or_default()))?; } Ok(()) } + /// Returns the LMDB environment statistics. + /// Note: + /// In Windows and Ubuntu, this function does not always return the actual used size of the + /// database on disk when the database has grown large (> 700MB), reason unknown (not tested + /// on Mac). + pub fn get_stats(env: &Environment) -> Result<(usize, usize, usize), LMDBError> { + let env_info = env.info()?; + let stat = env.stat()?; + let size_used_bytes = stat.psize as usize * env_info.last_pgno; + let size_left_bytes = env_info.mapsize - size_used_bytes; + + Ok((env_info.mapsize, size_used_bytes, size_left_bytes)) + } + /// Grows the LMDB environment by the configured amount /// /// # Safety @@ -444,19 +464,25 @@ impl LMDBStore { /// not check for this condition, the caller must ensure it explicitly. /// /// - pub unsafe fn resize(env: &Environment, config: &LMDBConfig, shortfall: Option) -> Result<(), LMDBError> { + pub unsafe fn resize( + env: &Environment, + config: &LMDBConfig, + increase_threshold_by: Option, + ) -> Result<(), LMDBError> { + let start = Instant::now(); let env_info = env.info()?; let current_mapsize = env_info.mapsize; - env.set_mapsize(current_mapsize + config.grow_size_bytes + shortfall.unwrap_or_default())?; + env.set_mapsize(current_mapsize + config.grow_size_bytes + increase_threshold_by.unwrap_or_default())?; let env_info = env.info()?; let new_mapsize = env_info.mapsize; debug!( target: LOG_TARGET, - "({}) LMDB MB, mapsize was grown from {:?} MB to {:?} MB, increased by {:?} MB", + "({}) LMDB MB, mapsize was grown from {} MB to {} MB, increased by {} MB, in {:.2?}", env.path()?.to_str()?, current_mapsize / BYTES_PER_MB, new_mapsize / BYTES_PER_MB, - (config.grow_size_bytes + shortfall.unwrap_or_default()) / BYTES_PER_MB, + (config.grow_size_bytes + increase_threshold_by.unwrap_or_default()) / BYTES_PER_MB, + start.elapsed() ); Ok(()) @@ -479,15 +505,17 @@ impl LMDBDatabase { K: AsLmdbBytes + ?Sized, V: Serialize, { - const MAX_RESIZES: usize = 5; + // Resize this many times before assuming something is not right (up to 1 GB) + let max_resizes = 1024 * BYTES_PER_MB / self.env_config.grow_size_bytes(); let value = LMDBWriteTransaction::convert_value(value)?; - for _ in 0..MAX_RESIZES { + for i in 0..max_resizes { match self.write(key, &value) { Ok(txn) => return Ok(txn), Err(error::Error::Code(error::MAP_FULL)) => { info!( target: LOG_TARGET, - "Failed to obtain write transaction because the database needs to be resized" + "Database resize required (resized {} time(s) in this transaction)", + i + 1 ); unsafe { LMDBStore::resize(&self.env, &self.env_config, Some(value.len()))?; diff --git a/infrastructure/storage/tests/lmdb.rs b/infrastructure/storage/tests/lmdb.rs index 45740521c7..ce14dd3386 100644 --- a/infrastructure/storage/tests/lmdb.rs +++ b/infrastructure/storage/tests/lmdb.rs @@ -222,9 +222,9 @@ fn test_multi_thread_writes() { #[test] fn test_multi_writes() { { - let env = init("multi-writes").unwrap(); + let store = init("multi-writes").unwrap(); for i in 0..2 { - let db = env.get_handle("users").unwrap(); + let db = store.get_handle("users").unwrap(); let res = db.with_write_transaction(|mut txn| { for j in 0..1000 { let v = i * 1000 + j; @@ -235,7 +235,7 @@ fn test_multi_writes() { }); assert!(res.is_ok()); } - env.flush().unwrap(); + store.flush().unwrap(); } clean_up("multi-writes"); // In Windows file handles must be released before files can be deleted } @@ -277,7 +277,7 @@ fn test_lmdb_resize_on_create() { let db_name = "test"; { // Create db with large preset environment size - let env = LMDBBuilder::new() + let store = LMDBBuilder::new() .set_path(&path) .set_env_config(LMDBConfig::new( 100 * PRESET_SIZE * 1024 * 1024, @@ -289,17 +289,17 @@ fn test_lmdb_resize_on_create() { .build() .unwrap(); // Add some data that is `>= 2 * (PRESET_SIZE * 1024 * 1024)` - let db = env.get_handle(db_name).unwrap(); + let db = store.get_handle(db_name).unwrap(); let users = load_users(); for i in 0..100 { db.insert(&i, &users).unwrap(); } // Ensure enough data is loaded - let env_info = env.env().info().unwrap(); - let env_stat = env.env().stat().unwrap(); + let env_info = store.env().info().unwrap(); + let env_stat = store.env().stat().unwrap(); size_used_round_1 = env_stat.psize as usize * env_info.last_pgno; assert!(size_used_round_1 >= 2 * (PRESET_SIZE * 1024 * 1024)); - env.flush().unwrap(); + store.flush().unwrap(); } {