Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: joccau <zak.zhao@pingcap.com>
  • Loading branch information
joccau committed Mar 16, 2023
1 parent c4a8fa5 commit ee8f9d3
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
6 changes: 1 addition & 5 deletions components/sst_importer/src/config.rs
Expand Up @@ -52,11 +52,7 @@ impl Config {
self.stream_channel_window = default_cfg.stream_channel_window;
}
if self.memory_use_ratio > 0.5 || self.memory_use_ratio < 0.0 {
warn!(
"import.mem_ratio should belong to [0.0, 0.5], change it to {}",
default_cfg.memory_use_ratio,
);
self.memory_use_ratio = default_cfg.memory_use_ratio;
return Err("import.mem_ratio should belong to [0.0, 0.5].".into());
}
Ok(())
}
Expand Down
48 changes: 46 additions & 2 deletions components/sst_importer/src/sst_importer.rs
Expand Up @@ -48,7 +48,7 @@ use crate::{
import_mode::{ImportModeSwitcher, RocksDbMetricsFn},
metrics::*,
sst_writer::{RawSstWriter, TxnSstWriter},
util, Config, ConfigManager, Error, Result,
util, Config, ConfigManager as ImportConfigManager, Error, Result,
};

pub struct LoadedFile {
Expand Down Expand Up @@ -590,7 +590,7 @@ impl SstImporter {
Ok(())
}

pub fn update_mem_limit(&self, cfg_mgr: ConfigManager) {
pub fn update_config_memory_use_ratio(&self, cfg_mgr: ImportConfigManager) {
let mem_ratio = cfg_mgr.read().unwrap().memory_use_ratio;
let memory_limit = Self::calcualte_usage_mem(mem_ratio);

Expand Down Expand Up @@ -1469,6 +1469,7 @@ mod tests {
};
use external_storage_export::read_external_storage_info_buff;
use file_system::File;
use online_config::{ConfigManager, OnlineConfig};
use openssl::hash::{Hasher, MessageDigest};
use tempfile::Builder;
use test_sst_importer::*;
Expand Down Expand Up @@ -1978,6 +1979,49 @@ mod tests {
assert_eq!(err.kind(), io::ErrorKind::TimedOut);
}

#[test]
fn test_update_config_memory_use_ratio() {
// create SstImpoter with default.
let cfg = Config {
memory_use_ratio: 0.3,
..Default::default()
};
let import_dir = tempfile::tempdir().unwrap();
let importer = SstImporter::new(&cfg, import_dir, None, ApiVersion::V1).unwrap();
let mem_limit_old = importer.mem_limit.load(Ordering::SeqCst);

// create new config and get the diff config.
let cfg_new = Config {
memory_use_ratio: 0.1,
..Default::default()
};
let change = cfg.diff(&cfg_new);

// create config manager and update config.
let mut cfg_mgr = ImportConfigManager::new(cfg);
cfg_mgr.dispatch(change).unwrap();
importer.update_config_memory_use_ratio(cfg_mgr);

let mem_limit_new = importer.mem_limit.load(Ordering::SeqCst);
assert!(mem_limit_old > mem_limit_new);

let delt = (mem_limit_old - mem_limit_new * 3) as f64 / mem_limit_old as f64;
assert!(delt < 0.000001)
}

#[test]
fn test_update_config_with_invalid_conifg() {
let cfg = Config::default();
let cfg_new = Config {
memory_use_ratio: -0.1,
..Default::default()
};
let change = cfg.diff(&cfg_new);
let mut cfg_mgr = ImportConfigManager::new(cfg);
let r = cfg_mgr.dispatch(change);
assert!(r.is_err());
}

#[test]
fn test_do_read_kv_file() {
// create a sample kv file.
Expand Down
8 changes: 3 additions & 5 deletions src/import/sst_service.rs
Expand Up @@ -321,7 +321,7 @@ impl<E: Engine> ImportSstService<E> {
loop {
sleep(Duration::from_secs(10)).await;

importer.update_mem_limit(cfg.clone());
importer.update_config_memory_use_ratio(cfg.clone());
importer.shrink_by_tick();
}
}
Expand Down Expand Up @@ -662,10 +662,8 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
let label = "upload";
let timer = Instant::now_coarse();
let import = self.importer.clone();
let (rx, buf_driver) = create_stream_with_buffer(
stream,
self.cfg.read().unwrap().stream_channel_window,
);
let (rx, buf_driver) =
create_stream_with_buffer(stream, self.cfg.read().unwrap().stream_channel_window);
let mut map_rx = rx.map_err(Error::from);

let handle_task = async move {
Expand Down

0 comments on commit ee8f9d3

Please sign in to comment.