Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve lmdb dynamic growth #6242

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions applications/minotari_console_wallet/src/ui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ fn crossterm_loop(mut app: App<CrosstermBackend<Stdout>>) -> Result<(), ExitErro
error!(target: LOG_TARGET, "Error drawing interface. {}", e);
ExitCode::InterfaceError
})?;
#[allow(clippy::blocks_in_conditions)]
match events.next().map_err(|e| {
error!(target: LOG_TARGET, "Error reading input event: {}", e);
ExitCode::InterfaceError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ where B: BlockchainBackend + 'static
"A peer has requested a block with hash {}", block_hex
);

#[allow(clippy::blocks_in_conditions)]
let maybe_block = match self
.blockchain_db
.fetch_block_by_hash(hash, true)
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
#[tari_comms::async_trait]
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
#[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_blocks(
&self,
request: Request<SyncBlocksRequest>,
Expand Down Expand Up @@ -273,6 +274,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_headers(
&self,
request: Request<SyncHeadersRequest>,
Expand Down Expand Up @@ -373,6 +375,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn get_header_by_height(
&self,
request: Request<u64>,
Expand All @@ -389,6 +392,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "debug", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn find_chain_split(
&self,
request: Request<FindChainSplitRequest>,
Expand Down Expand Up @@ -452,6 +456,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
let chain_metadata = self
.db()
Expand All @@ -462,6 +467,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_kernels(
&self,
request: Request<SyncKernelsRequest>,
Expand Down Expand Up @@ -588,6 +594,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
let req = request.message();
let peer_node_id = request.context().peer_node_id();
Expand Down
38 changes: 34 additions & 4 deletions base_layer/core/src/chain_storage/lmdb_db/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,53 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Instant;

use lmdb_zero::error;
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use tari_storage::lmdb_store::BYTES_PER_MB;

use crate::chain_storage::ChainStorageError;

pub const LOG_TARGET: &str = "c::cs::lmdb_db::lmdb";

pub fn serialize<T>(data: &T) -> Result<Vec<u8>, ChainStorageError>
/// Serialize the given data into a byte vector
/// Note:
/// `size_hint` is given as an option as checking what the serialized would be is expensive
/// for large data structures at ~30% overhead
pub fn serialize<T>(data: &T, size_hint: Option<usize>) -> Result<Vec<u8>, ChainStorageError>
where T: Serialize {
let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?;
#[allow(clippy::cast_possible_truncation)]
let mut buf = Vec::with_capacity(size as usize);
let start = Instant::now();
let mut buf = if let Some(size) = size_hint {
Vec::with_capacity(size)
} else {
let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?;
#[allow(clippy::cast_possible_truncation)]
Vec::with_capacity(size as usize)
};
let check_time = start.elapsed();
bincode::serialize_into(&mut buf, data).map_err(|e| {
error!(target: LOG_TARGET, "Could not serialize lmdb: {:?}", e);
ChainStorageError::AccessError(e.to_string())
})?;
if buf.len() >= BYTES_PER_MB {
let serialize_time = start.elapsed() - check_time;
trace!(
"lmdb_replace - {} MB, serialize check in {:.2?}, serialize in {:.2?}",
buf.len() / BYTES_PER_MB,
check_time,
serialize_time
);
}
if let Some(size) = size_hint {
if buf.len() > size {
warn!(
target: LOG_TARGET,
"lmdb_replace - Serialized size hint was too small. Expected {}, got {}", size, buf.len()
);
}
}
Ok(buf)
}

Expand Down
33 changes: 25 additions & 8 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::fmt::Debug;
use std::{fmt::Debug, time::Instant};

use lmdb_zero::{
del,
Expand All @@ -37,6 +37,7 @@ use lmdb_zero::{
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use tari_storage::lmdb_store::BYTES_PER_MB;
use tari_utilities::hex::to_hex;

use crate::chain_storage::{
Expand All @@ -62,7 +63,7 @@ where
K: AsLmdbBytes + ?Sized + Debug,
V: Serialize + Debug,
{
let val_buf = serialize(val)?;
let val_buf = serialize(val, None)?;
match txn.access().put(db, key, &val_buf, put::NOOVERWRITE) {
Ok(_) => {
trace!(
Expand Down Expand Up @@ -112,7 +113,7 @@ where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
let val_buf = serialize(val)?;
let val_buf = serialize(val, None)?;
txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| {
if let lmdb_zero::Error::Code(code) = &e {
if *code == lmdb_zero::error::MAP_FULL {
Expand All @@ -128,13 +129,20 @@ where
}

/// Inserts or replaces the item at the given key. If the key does not exist, a new entry is created
pub fn lmdb_replace<K, V>(txn: &WriteTransaction<'_>, db: &Database, key: &K, val: &V) -> Result<(), ChainStorageError>
pub fn lmdb_replace<K, V>(
txn: &WriteTransaction<'_>,
db: &Database,
key: &K,
val: &V,
size_hint: Option<usize>,
) -> Result<(), ChainStorageError>
where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
let val_buf = serialize(val)?;
txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| {
let val_buf = serialize(val, size_hint)?;
let start = Instant::now();
let res = txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| {
if let lmdb_zero::Error::Code(code) = &e {
if *code == lmdb_zero::error::MAP_FULL {
return ChainStorageError::DbResizeRequired(Some(val_buf.len()));
Expand All @@ -145,7 +153,16 @@ where
"Could not replace value in lmdb transaction: {:?}", e
);
ChainStorageError::AccessError(e.to_string())
})
});
if val_buf.len() >= BYTES_PER_MB {
let write_time = start.elapsed();
trace!(
"lmdb_replace - {} MB, lmdb write in {:.2?}",
val_buf.len() / BYTES_PER_MB,
write_time
);
}
res
}

/// Deletes the given key. An error is returned if the key does not exist
Expand Down Expand Up @@ -175,7 +192,7 @@ where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
txn.access().del_item(db, key, &serialize(value)?)?;
txn.access().del_item(db, key, &serialize(value, None)?)?;
Ok(())
}

Expand Down
Loading
Loading