Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PITR: support modifying the config tikv.import.memory-use-ratio online when restore point. #14408

Merged
merged 6 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions components/server/src/server.rs
Expand Up @@ -1250,6 +1250,8 @@ where
LocalTablets::Singleton(engines.engines.kv.clone()),
servers.importer.clone(),
);
let import_cfg_mgr = import_service.get_config_manager();

if servers
.server
.register_service(create_import_sst(import_service))
Expand All @@ -1258,6 +1260,11 @@ where
fatal!("failed to register import service");
}

self.cfg_controller
.as_mut()
.unwrap()
.register(tikv::config::Module::Import, Box::new(import_cfg_mgr));

// Debug service.
let debug_service = DebugService::new(
engines.engines.clone(),
Expand Down
7 changes: 7 additions & 0 deletions components/server/src/server2.rs
Expand Up @@ -989,6 +989,8 @@ where
LocalTablets::Registry(self.tablet_registry.as_ref().unwrap().clone()),
servers.importer.clone(),
);
let import_cfg_mgr = import_service.get_config_manager();

if servers
.server
.register_service(create_import_sst(import_service))
Expand All @@ -997,6 +999,11 @@ where
fatal!("failed to register import service");
}

self.cfg_controller
.as_mut()
.unwrap()
.register(tikv::config::Module::Import, Box::new(import_cfg_mgr));

// Create Diagnostics service
let diag_service = DiagnosticsService::new(
servers.server.get_debug_thread_pool().clone(),
Expand Down
1 change: 1 addition & 0 deletions components/sst_importer/Cargo.toml
Expand Up @@ -29,6 +29,7 @@ keys = { workspace = true }
kvproto = { workspace = true }
lazy_static = "1.3"
log_wrappers = { workspace = true }
online_config = { workspace = true }
openssl = "0.10"
prometheus = { version = "0.13", default-features = false }
rand = "0.8"
Expand Down
53 changes: 47 additions & 6 deletions components/sst_importer/src/config.rs
@@ -1,10 +1,15 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::{error::Error, result::Result};
use std::{
error::Error,
result::Result,
sync::{Arc, RwLock},
};

use tikv_util::config::ReadableDuration;
use online_config::{self, OnlineConfig};
use tikv_util::{config::ReadableDuration, HandyRwLock};

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Debug, OnlineConfig)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
Expand Down Expand Up @@ -47,12 +52,48 @@ impl Config {
self.stream_channel_window = default_cfg.stream_channel_window;
}
if self.memory_use_ratio > 0.5 || self.memory_use_ratio < 0.0 {
return Err("import.mem_ratio should belong to [0.0, 0.5].".into());
}
Ok(())
}
}

#[derive(Clone)]
pub struct ConfigManager(pub Arc<RwLock<Config>>);

impl ConfigManager {
pub fn new(cfg: Config) -> Self {
ConfigManager(Arc::new(RwLock::new(cfg)))
}
}

impl online_config::ConfigManager for ConfigManager {
fn dispatch(&mut self, change: online_config::ConfigChange) -> online_config::Result<()> {
info!(
"import config changed";
"change" => ?change,
);

let mut cfg = self.rl().clone();
cfg.update(change)?;

if let Err(e) = cfg.validate() {
warn!(
"import.mem_ratio should belong to [0.0, 0.5], change it to {}",
default_cfg.memory_use_ratio,
"import config changed";
"change" => ?cfg,
);
self.memory_use_ratio = default_cfg.memory_use_ratio;
return Err(e);
}

*self.wl() = cfg;
Ok(())
}
}

impl std::ops::Deref for ConfigManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally not recommended to use Deref as it's not easy to trace the code, per my understanding.
I think in this case just use self.0.as_ref() directly seems more straightforward.

type Target = RwLock<Config>;

fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}
2 changes: 1 addition & 1 deletion components/sst_importer/src/lib.rs
Expand Up @@ -24,7 +24,7 @@ pub mod metrics;
pub mod sst_importer;

pub use self::{
config::Config,
config::{Config, ConfigManager},
errors::{error_inc, Error, Result},
import_file::sst_meta_to_path,
sst_importer::SstImporter,
Expand Down
85 changes: 77 additions & 8 deletions components/sst_importer/src/sst_importer.rs
Expand Up @@ -36,9 +36,9 @@ use tikv_util::{
bytes::{decode_bytes_in_place, encode_bytes},
stream_event::{EventEncoder, EventIterator, Iterator as EIterator},
},
config::ReadableSize,
sys::{thread::ThreadBuildWrapper, SysQuota},
time::{Instant, Limiter},
HandyRwLock,
};
use tokio::runtime::{Handle, Runtime};
use txn_types::{Key, TimeStamp, WriteRef};
Expand All @@ -49,7 +49,7 @@ use crate::{
import_mode::{ImportModeSwitcher, RocksDbMetricsFn},
metrics::*,
sst_writer::{RawSstWriter, TxnSstWriter},
util, Config, Error, Result,
util, Config, ConfigManager as ImportConfigManager, Error, Result,
};

pub struct LoadedFile {
Expand Down Expand Up @@ -278,7 +278,7 @@ pub struct SstImporter {
download_rt: Runtime,
file_locks: Arc<DashMap<String, (CacheKvFile, Instant)>>,
mem_use: Arc<AtomicU64>,
mem_limit: ReadableSize,
mem_limit: Arc<AtomicU64>,
}

impl SstImporter {
Expand Down Expand Up @@ -308,8 +308,12 @@ impl SstImporter {
.build()?;
download_rt.spawn(cached_storage.gc_loop());

let memory_limit = (SysQuota::memory_limit_in_bytes() as f64) * cfg.memory_use_ratio;
info!("sst importer memory limit when apply"; "size" => ?memory_limit);
let memory_limit = Self::calcualte_usage_mem(cfg.memory_use_ratio);
info!(
"sst importer memory limit when apply";
"ratio" => cfg.memory_use_ratio,
"size" => ?memory_limit,
);

Ok(SstImporter {
dir: ImportDir::new(root)?,
Expand All @@ -321,10 +325,14 @@ impl SstImporter {
cached_storage,
download_rt,
mem_use: Arc::new(AtomicU64::new(0)),
mem_limit: ReadableSize(memory_limit as u64),
mem_limit: Arc::new(AtomicU64::new(memory_limit)),
})
}

fn calcualte_usage_mem(mem_ratio: f64) -> u64 {
((SysQuota::memory_limit_in_bytes() as f64) * mem_ratio) as u64
}

pub fn set_compression_type(
&mut self,
cf_name: CfName,
Expand Down Expand Up @@ -583,6 +591,19 @@ impl SstImporter {
Ok(())
}

pub fn update_config_memory_use_ratio(&self, cfg_mgr: &ImportConfigManager) {
let mem_ratio = cfg_mgr.rl().memory_use_ratio;
let memory_limit = Self::calcualte_usage_mem(mem_ratio);

if self.mem_limit.load(Ordering::SeqCst) != memory_limit {
self.mem_limit.store(memory_limit, Ordering::SeqCst);
info!("update importer config";
"memory-use-ratio" => mem_ratio,
"size" => memory_limit,
)
}
}

pub fn shrink_by_tick(&self) -> usize {
let mut shrink_buff_size: usize = 0;
let mut retain_buff_size: usize = 0;
Expand Down Expand Up @@ -643,15 +664,15 @@ impl SstImporter {
// If mem_limit is 0, which represent download kv-file when import.
// Or read kv-file into buffer directly.
pub fn import_support_download(&self) -> bool {
self.mem_limit == ReadableSize(0)
self.mem_limit.load(Ordering::SeqCst) == 0
}

fn request_memory(&self, meta: &KvMeta) -> Option<MemUsePermit> {
let size = meta.get_length();
let old = self.mem_use.fetch_add(size, Ordering::SeqCst);

// If the memory is limited, roll backup the mem_use and return false.
if old + size > self.mem_limit.0 {
if old + size > self.mem_limit.load(Ordering::SeqCst) {
self.mem_use.fetch_sub(size, Ordering::SeqCst);
CACHE_EVENT.with_label_values(&["out-of-quota"]).inc();
None
Expand Down Expand Up @@ -1449,6 +1470,7 @@ mod tests {
};
use external_storage_export::read_external_storage_info_buff;
use file_system::File;
use online_config::{ConfigManager, OnlineConfig};
use openssl::hash::{Hasher, MessageDigest};
use tempfile::Builder;
use test_sst_importer::*;
Expand Down Expand Up @@ -1958,6 +1980,53 @@ mod tests {
assert_eq!(err.kind(), io::ErrorKind::TimedOut);
}

#[test]
fn test_update_config_memory_use_ratio() {
// create SstImpoter with default.
let cfg = Config {
memory_use_ratio: 0.3,
..Default::default()
};
let import_dir = tempfile::tempdir().unwrap();
let importer = SstImporter::new(&cfg, import_dir, None, ApiVersion::V1).unwrap();
let mem_limit_old = importer.mem_limit.load(Ordering::SeqCst);

// create new config and get the diff config.
let cfg_new = Config {
memory_use_ratio: 0.1,
..Default::default()
};
let change = cfg.diff(&cfg_new);

// create config manager and update config.
let mut cfg_mgr = ImportConfigManager::new(cfg);
cfg_mgr.dispatch(change).unwrap();
importer.update_config_memory_use_ratio(&cfg_mgr);

let mem_limit_new = importer.mem_limit.load(Ordering::SeqCst);
assert!(mem_limit_old > mem_limit_new);
assert_eq!(
mem_limit_old / 3,
mem_limit_new,
"mem_limit_old / 3 = {} mem_limit_new = {}",
mem_limit_old / 3,
mem_limit_new
);
}

#[test]
fn test_update_config_with_invalid_conifg() {
let cfg = Config::default();
let cfg_new = Config {
memory_use_ratio: -0.1,
..Default::default()
};
let change = cfg.diff(&cfg_new);
let mut cfg_mgr = ImportConfigManager::new(cfg);
let r = cfg_mgr.dispatch(change);
assert!(r.is_err());
}

#[test]
fn test_do_read_kv_file() {
// create a sample kv file.
Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Expand Up @@ -3127,7 +3127,7 @@ pub struct TikvConfig {
#[online_config(skip)]
pub security: SecurityConfig,

#[online_config(skip)]
#[online_config(submodule)]
pub import: ImportConfig,

#[online_config(submodule)]
Expand Down
26 changes: 18 additions & 8 deletions src/import/sst_service.rs
Expand Up @@ -26,15 +26,16 @@ use kvproto::{
kvrpcpb::Context,
};
use sst_importer::{
error_inc, metrics::*, sst_importer::DownloadExt, sst_meta_to_path, Config, Error, Result,
SstImporter,
error_inc, metrics::*, sst_importer::DownloadExt, sst_meta_to_path, Config, ConfigManager,
Error, Result, SstImporter,
};
use tikv_kv::{Engine, Modify, SnapContext, Snapshot, SnapshotExt, WriteData, WriteEvent};
use tikv_util::{
config::ReadableSize,
future::create_stream_with_buffer,
sys::thread::ThreadBuildWrapper,
time::{Instant, Limiter},
HandyRwLock,
};
use tokio::{runtime::Runtime, time::sleep};
use txn_types::{Key, WriteRef, WriteType};
Expand Down Expand Up @@ -85,7 +86,7 @@ async fn wait_write(mut s: impl Stream<Item = WriteEvent> + Send + Unpin) -> sto
/// raftstore to trigger the ingest process.
#[derive(Clone)]
pub struct ImportSstService<E: Engine> {
cfg: Config,
cfg: ConfigManager,
tablets: LocalTablets<E::Local>,
engine: E,
threads: Arc<Runtime>,
Expand Down Expand Up @@ -296,10 +297,12 @@ impl<E: Engine> ImportSstService<E> {
if let LocalTablets::Singleton(tablet) = &tablets {
importer.start_switch_mode_check(threads.handle(), tablet.clone());
}
threads.spawn(Self::tick(importer.clone()));

let cfg_mgr = ConfigManager::new(cfg);
threads.spawn(Self::tick(importer.clone(), cfg_mgr.clone()));

ImportSstService {
cfg,
cfg: cfg_mgr,
tablets,
threads: Arc::new(threads),
block_threads: Arc::new(block_threads),
Expand All @@ -311,9 +314,15 @@ impl<E: Engine> ImportSstService<E> {
}
}

async fn tick(importer: Arc<SstImporter>) {
pub fn get_config_manager(&self) -> ConfigManager {
self.cfg.clone()
}

async fn tick(importer: Arc<SstImporter>, cfg: ConfigManager) {
loop {
sleep(Duration::from_secs(10)).await;

importer.update_config_memory_use_ratio(&cfg);
importer.shrink_by_tick();
}
}
Expand Down Expand Up @@ -544,7 +553,7 @@ macro_rules! impl_write {
let import = self.importer.clone();
let tablets = self.tablets.clone();
let (rx, buf_driver) =
create_stream_with_buffer(stream, self.cfg.stream_channel_window);
create_stream_with_buffer(stream, self.cfg.rl().stream_channel_window);
let mut rx = rx.map_err(Error::from);

let timer = Instant::now_coarse();
Expand Down Expand Up @@ -652,7 +661,8 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
let label = "upload";
let timer = Instant::now_coarse();
let import = self.importer.clone();
let (rx, buf_driver) = create_stream_with_buffer(stream, self.cfg.stream_channel_window);
let (rx, buf_driver) =
create_stream_with_buffer(stream, self.cfg.rl().stream_channel_window);
let mut map_rx = rx.map_err(Error::from);

let handle_task = async move {
Expand Down