Skip to content

Commit

Permalink
Bulk loader: reduces default system parameters
Browse files Browse the repository at this point in the history
Uses by default targets of 1GB of RAM and 2 threads.

Data parsing is in most of the case slower than ingestion so no more than 2 threads are used anyway.
  • Loading branch information
Tpt committed Mar 21, 2023
1 parent f29a49b commit 1397601
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 66 deletions.
25 changes: 0 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion lib/Cargo.toml
Expand Up @@ -36,7 +36,6 @@ rio_xml = "0.8"
hex = "0.4"
siphasher = "0.3"
lazy_static = "1"
sysinfo = "0.28"
oxrdf = { version = "0.1.5", path="oxrdf", features = ["rdf-star", "oxsdatatypes"] }
oxsdatatypes = { version = "0.1.1", path="oxsdatatypes" }
spargebra = { version = "0.2.7", path="spargebra", features = ["rdf-star", "sep-0002", "sep-0006"] }
Expand Down
49 changes: 17 additions & 32 deletions lib/src/storage/mod.rs
Expand Up @@ -16,8 +16,6 @@ use crate::storage::numeric_encoder::Decoder;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_family = "wasm"))]
use std::cmp::{max, min};
#[cfg(not(target_family = "wasm"))]
use std::collections::VecDeque;
#[cfg(not(target_family = "wasm"))]
use std::collections::{HashMap, HashSet};
Expand All @@ -34,8 +32,6 @@ use std::sync::Arc;
use std::thread::spawn;
#[cfg(not(target_family = "wasm"))]
use std::thread::JoinHandle;
#[cfg(not(target_family = "wasm"))]
use sysinfo::{System, SystemExt};

mod backend;
mod binary_encoder;
Expand All @@ -58,8 +54,6 @@ const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default";
#[cfg(not(target_family = "wasm"))]
const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
#[cfg(not(target_family = "wasm"))]
const MAX_BULK_LOAD_BATCH_SIZE: usize = 100_000_000;

/// Low level storage primitives
#[derive(Clone)]
Expand Down Expand Up @@ -1241,32 +1235,23 @@ impl StorageBulkLoader {
&self,
quads: impl IntoIterator<Item = Result<Quad, EI>>,
) -> Result<(), EO> {
let system = System::new_all();
let cpu_count = min(4, system.physical_core_count().unwrap_or(2));
let num_threads = max(
if let Some(num_threads) = self.num_threads {
num_threads
} else if let Some(max_memory_size) = self.max_memory_size {
min(
cpu_count,
max_memory_size * 1000 / DEFAULT_BULK_LOAD_BATCH_SIZE,
)
} else {
cpu_count
},
2,
);
let batch_size = min(
if let Some(max_memory_size) = self.max_memory_size {
max(1000, max_memory_size * 1000 / num_threads)
} else {
max(
usize::try_from(system.free_memory()).unwrap() / 1000 / num_threads,
DEFAULT_BULK_LOAD_BATCH_SIZE,
)
},
MAX_BULK_LOAD_BATCH_SIZE,
);
let num_threads = self.num_threads.unwrap_or(2);
if num_threads < 2 {
return Err(
StorageError::Other("The bulk loader needs at least 2 threads".into()).into(),
);
}
let batch_size = if let Some(max_memory_size) = self.max_memory_size {
max_memory_size * 1000 / num_threads
} else {
DEFAULT_BULK_LOAD_BATCH_SIZE
};
if batch_size < 10_000 {
return Err(StorageError::Other(
"The bulk loader memory bound is too low. It needs at least 100MB".into(),
)
.into());
}
let mut threads = VecDeque::with_capacity(num_threads - 1);
let mut buffer = Vec::with_capacity(batch_size);
let done_counter = Arc::new(AtomicU64::new(0));
Expand Down
14 changes: 6 additions & 8 deletions lib/src/store.rs
Expand Up @@ -1328,7 +1328,7 @@ impl Iterator for GraphNameIter {
/// Memory usage is configurable using [`BulkLoader::set_max_memory_size_in_megabytes`]
/// and the number of used threads with [`BulkLoader::set_num_threads`].
/// By default the memory consumption target (excluding the system and RocksDB internal consumption)
/// is 1GB per thread and the number of threads is set to the number of logical CPU cores provided by the system.
/// is around 2GB per thread and 2 threads.
/// These targets are considered per loaded file.
///
/// Usage example with loading a dataset:
Expand Down Expand Up @@ -1360,23 +1360,21 @@ impl BulkLoader {
///
/// This number must be at last 2 (one for parsing and one for loading).
///
/// By default this is the number of logical CPU cores provided by the system except if
/// [`BulkLoader::set_max_memory_size_in_megabytes`] is set. In this case at least one 1GB is reserved
/// per used thread.
/// The default value is 2.
pub fn set_num_threads(mut self, num_threads: usize) -> Self {
self.storage = self.storage.set_num_threads(num_threads);
self
}

/// Sets the maximal number of memory used by this operation.
/// Sets a rough idea of the maximal amount of memory to be used by this operation.
///
/// This number must be at last a few megabytes per thread.
///
/// Memory used by RocksDB and the system is not taken into account in this limit.
/// Note that depending on the system behavior this amount might never be reached.
/// Note that depending on the system behavior this amount might never be reached or be blown up
/// (for example if the data contains very long IRIs or literals).
///
/// By default, at most 1GB per used thread is used
/// (i.e. at most GBs at the number of available logical CPU cores in total).
/// By default, a target 2GB per used thread is used.
pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self {
self.storage = self
.storage
Expand Down

0 comments on commit 1397601

Please sign in to comment.