Skip to content

Commit

Permalink
deal comments in PR
Browse files Browse the repository at this point in the history
Signed-off-by: joccau <zak.zhao@pingcap.com>
  • Loading branch information
joccau committed Mar 17, 2023
1 parent a516a43 commit e0054f1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
4 changes: 2 additions & 2 deletions components/sst_importer/src/config.rs
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use online_config::{self, OnlineConfig};
use tikv_util::config::ReadableDuration;
use tikv_util::{config::ReadableDuration, HandyRwLock};

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug, OnlineConfig)]
#[serde(default)]
Expand Down Expand Up @@ -85,7 +85,7 @@ impl online_config::ConfigManager for ConfigManager {
return Err(e);
}

*self.write().unwrap() = cfg;
*self.wl() = cfg;
Ok(())
}
}
Expand Down
19 changes: 12 additions & 7 deletions components/sst_importer/src/sst_importer.rs
Expand Up @@ -38,6 +38,7 @@ use tikv_util::{
},
sys::{thread::ThreadBuildWrapper, SysQuota},
time::{Instant, Limiter},
HandyRwLock,
};
use tokio::runtime::{Handle, Runtime};
use txn_types::{Key, TimeStamp, WriteRef};
Expand Down Expand Up @@ -590,8 +591,8 @@ impl SstImporter {
Ok(())
}

pub fn update_config_memory_use_ratio(&self, cfg_mgr: ImportConfigManager) {
let mem_ratio = cfg_mgr.read().unwrap().memory_use_ratio;
pub fn update_config_memory_use_ratio(&self, cfg_mgr: &ImportConfigManager) {
let mem_ratio = cfg_mgr.rl().memory_use_ratio;
let memory_limit = Self::calcualte_usage_mem(mem_ratio);

if self.mem_limit.load(Ordering::SeqCst) != memory_limit {
Expand Down Expand Up @@ -663,7 +664,7 @@ impl SstImporter {
// If mem_limit is 0, which represent download kv-file when import.
// Or read kv-file into buffer directly.
pub fn import_support_download(&self) -> bool {
self.mem_limit.load(Ordering::SeqCst) == 0
self.mem_limit.load(Ordering::SeqCst) <= 0
}

fn request_memory(&self, meta: &KvMeta) -> Option<MemUsePermit> {
Expand Down Expand Up @@ -2000,13 +2001,17 @@ mod tests {
// create config manager and update config.
let mut cfg_mgr = ImportConfigManager::new(cfg);
cfg_mgr.dispatch(change).unwrap();
importer.update_config_memory_use_ratio(cfg_mgr);
importer.update_config_memory_use_ratio(&cfg_mgr);

let mem_limit_new = importer.mem_limit.load(Ordering::SeqCst);
assert!(mem_limit_old > mem_limit_new);

let delt = (mem_limit_old - mem_limit_new * 3) as f64 / mem_limit_old as f64;
assert!(delt < 0.000001)
assert_eq!(
mem_limit_old / 3,
mem_limit_new,
"mem_limit_old / 3 = {} mem_limit_new = {}",
mem_limit_old / 3,
mem_limit_new
);
}

#[test]
Expand Down
11 changes: 5 additions & 6 deletions src/import/sst_service.rs
Expand Up @@ -35,6 +35,7 @@ use tikv_util::{
future::create_stream_with_buffer,
sys::thread::ThreadBuildWrapper,
time::{Instant, Limiter},
HandyRwLock,
};
use tokio::{runtime::Runtime, time::sleep};
use txn_types::{Key, WriteRef, WriteType};
Expand Down Expand Up @@ -321,7 +322,7 @@ impl<E: Engine> ImportSstService<E> {
loop {
sleep(Duration::from_secs(10)).await;

importer.update_config_memory_use_ratio(cfg.clone());
importer.update_config_memory_use_ratio(&cfg);
importer.shrink_by_tick();
}
}
Expand Down Expand Up @@ -551,10 +552,8 @@ macro_rules! impl_write {
) {
let import = self.importer.clone();
let tablets = self.tablets.clone();
let (rx, buf_driver) = create_stream_with_buffer(
stream,
self.cfg.0.as_ref().read().unwrap().stream_channel_window,
);
let (rx, buf_driver) =
create_stream_with_buffer(stream, self.cfg.rl().stream_channel_window);
let mut rx = rx.map_err(Error::from);

let timer = Instant::now_coarse();
Expand Down Expand Up @@ -663,7 +662,7 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
let timer = Instant::now_coarse();
let import = self.importer.clone();
let (rx, buf_driver) =
create_stream_with_buffer(stream, self.cfg.read().unwrap().stream_channel_window);
create_stream_with_buffer(stream, self.cfg.rl().stream_channel_window);
let mut map_rx = rx.map_err(Error::from);

let handle_task = async move {
Expand Down

0 comments on commit e0054f1

Please sign in to comment.