Skip to content

Commit

Permalink
Protect Raft Engine Disk Usage (#13633)
Browse files Browse the repository at this point in the history
close #13642

Signed-off-by: Jarvis Zheng <jiayang@hust.edu.cn>
Signed-off-by: Jarvis <jiayang@hust.edu.cn>

Co-authored-by: Xinye Tao <xy.tao@outlook.com>
  • Loading branch information
jiayang-zheng and tabokie committed Nov 16, 2022
1 parent fca5a9e commit 2704588
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 30 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions components/engine_panic/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ impl RaftEngine for PanicEngine {
panic!()
}

fn get_engine_path(&self) -> &str {
panic!()
}

fn put_store_ident(&self, ident: &StoreIdent) -> Result<()> {
panic!()
}
Expand Down
4 changes: 4 additions & 0 deletions components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ impl RaftEngine for RocksEngine {
Ok(used_size)
}

fn get_engine_path(&self) -> &str {
self.as_inner().path()
}

fn put_store_ident(&self, ident: &StoreIdent) -> Result<()> {
self.put_msg(keys::STORE_IDENT_KEY, ident)
}
Expand Down
4 changes: 4 additions & 0 deletions components/engine_traits/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct RaftLogGcTask {
pub to: u64,
}

// TODO: Refactor common methods between Kv and Raft engine into a shared trait.
pub trait RaftEngine: RaftEngineReadOnly + PerfContextExt + Clone + Sync + Send + 'static {
type LogBatch: RaftLogBatch;

Expand Down Expand Up @@ -140,6 +141,9 @@ pub trait RaftEngine: RaftEngineReadOnly + PerfContextExt + Clone + Sync + Send

fn get_engine_size(&self) -> Result<u64>;

/// The path to the directory on the filesystem where the raft log is stored
fn get_engine_path(&self) -> &str;

/// Visit all available raft groups.
///
/// If any error is returned, the iteration will stop.
Expand Down
8 changes: 8 additions & 0 deletions components/raft_log_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ impl RaftLogEngine {
)))
}

pub fn path(&self) -> &str {
self.0.path()
}

/// If path is not an empty directory, we say db exists.
pub fn exists(path: &str) -> bool {
let path = Path::new(path);
Expand Down Expand Up @@ -615,6 +619,10 @@ impl RaftEngine for RaftLogEngine {
Ok(self.0.get_used_size() as u64)
}

fn get_engine_path(&self) -> &str {
self.path()
}

fn for_each_raft_group<E, F>(&self, f: &mut F) -> std::result::Result<(), E>
where
F: FnMut(u64) -> std::result::Result<(), E>,
Expand Down
139 changes: 111 additions & 28 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ use tikv_util::{
math::MovingAvgU32,
metrics::INSTANCE_BACKEND_CPU_QUOTA,
quota_limiter::{QuotaLimitConfigManager, QuotaLimiter},
sys::{cpu_time::ProcessStat, disk, register_memory_usage_high_water, SysQuota},
sys::{
cpu_time::ProcessStat, disk, path_in_diff_mount_point, register_memory_usage_high_water,
SysQuota,
},
thread_group::GroupProperties,
time::{Instant, Monitor},
worker::{Builder as WorkerBuilder, LazyWorker, Scheduler, Worker},
Expand Down Expand Up @@ -533,36 +536,66 @@ where
// enough space to do compaction and region migration when TiKV recover.
// This file is created in data_dir rather than db_path, because we must not
// increase store size of db_path.
fn calculate_reserved_space(capacity: u64, reserved_size_from_config: u64) -> u64 {
let mut reserved_size = reserved_size_from_config;
if reserved_size_from_config != 0 {
reserved_size =
cmp::max((capacity as f64 * 0.05) as u64, reserved_size_from_config);
}
reserved_size
}
fn reserve_physical_space(data_dir: &String, available: u64, reserved_size: u64) {
let path = Path::new(data_dir).join(file_system::SPACE_PLACEHOLDER_FILE);
if let Err(e) = file_system::remove_file(path) {
warn!("failed to remove space holder on starting: {}", e);
}

// place holder file size is 20% of total reserved space.
if available > reserved_size {
file_system::reserve_space_for_recover(data_dir, reserved_size / 5)
.map_err(|e| panic!("Failed to reserve space for recovery: {}.", e))
.unwrap();
} else {
warn!("no enough disk space left to create the place holder file");
}
}

let disk_stats = fs2::statvfs(&self.config.storage.data_dir).unwrap();
let mut capacity = disk_stats.total_space();
if self.config.raft_store.capacity.0 > 0 {
capacity = cmp::min(capacity, self.config.raft_store.capacity.0);
}
let mut reserve_space = self.config.storage.reserve_space.0;
if self.config.storage.reserve_space.0 != 0 {
reserve_space = cmp::max(
(capacity as f64 * 0.05) as u64,
self.config.storage.reserve_space.0,
);
}
disk::set_disk_reserved_space(reserve_space);
let path =
Path::new(&self.config.storage.data_dir).join(file_system::SPACE_PLACEHOLDER_FILE);
if let Err(e) = file_system::remove_file(path) {
warn!("failed to remove space holder on starting: {}", e);
}
// reserve space for kv engine
let kv_reserved_size =
calculate_reserved_space(capacity, self.config.storage.reserve_space.0);
disk::set_disk_reserved_space(kv_reserved_size);
reserve_physical_space(
&self.config.storage.data_dir,
disk_stats.available_space(),
kv_reserved_size,
);

let available = disk_stats.available_space();
// place holder file size is 20% of total reserved space.
if available > reserve_space {
file_system::reserve_space_for_recover(
&self.config.storage.data_dir,
reserve_space / 5,
)
.map_err(|e| panic!("Failed to reserve space for recovery: {}.", e))
.unwrap();
let raft_data_dir = if self.config.raft_engine.enable {
self.config.raft_engine.config().dir
} else {
warn!("no enough disk space left to create the place holder file");
self.config.raft_store.raftdb_path.clone()
};

let separated_raft_mount_path =
path_in_diff_mount_point(&self.config.storage.data_dir, &raft_data_dir);
if separated_raft_mount_path {
let raft_disk_stats = fs2::statvfs(&raft_data_dir).unwrap();
// reserve space for raft engine if raft engine is deployed separately
let raft_reserved_size = calculate_reserved_space(
raft_disk_stats.total_space(),
self.config.storage.reserve_raft_space.0,
);
disk::set_raft_disk_reserved_space(raft_reserved_size);
reserve_physical_space(
&raft_data_dir,
raft_disk_stats.available_space(),
raft_reserved_size,
);
}
}

Expand Down Expand Up @@ -1448,13 +1481,28 @@ where
let store_path = self.store_path.clone();
let snap_mgr = self.snap_mgr.clone().unwrap();
let reserve_space = disk::get_disk_reserved_space();
if reserve_space == 0 {
let reserve_raft_space = disk::get_raft_disk_reserved_space();
if reserve_space == 0 && reserve_raft_space == 0 {
info!("disk space checker not enabled");
return;
}
let raft_path = engines.raft.get_engine_path().to_string();
let separated_raft_mount_path =
path_in_diff_mount_point(raft_path.as_str(), engines.kv.path());
let raft_almost_full_threshold = reserve_raft_space;
let raft_already_full_threshold = reserve_raft_space / 2;

let almost_full_threshold = reserve_space;
let already_full_threshold = reserve_space / 2;
fn calculate_disk_usage(a: disk::DiskUsage, b: disk::DiskUsage) -> disk::DiskUsage {
match (a, b) {
(disk::DiskUsage::AlreadyFull, _) => disk::DiskUsage::AlreadyFull,
(_, disk::DiskUsage::AlreadyFull) => disk::DiskUsage::AlreadyFull,
(disk::DiskUsage::AlmostFull, _) => disk::DiskUsage::AlmostFull,
(_, disk::DiskUsage::AlmostFull) => disk::DiskUsage::AlmostFull,
(disk::DiskUsage::Normal, disk::DiskUsage::Normal) => disk::DiskUsage::Normal,
}
}
self.background_worker
.spawn_interval_task(DEFAULT_STORAGE_STATS_INTERVAL, move || {
let disk_stats = match fs2::statvfs(&store_path) {
Expand All @@ -1481,14 +1529,45 @@ where
.get_engine_size()
.expect("get raft engine size");

let mut raft_disk_status = disk::DiskUsage::Normal;
if separated_raft_mount_path && reserve_raft_space != 0 {
let raft_disk_stats = match fs2::statvfs(&raft_path) {
Err(e) => {
error!(
"get disk stat for raft engine failed";
"raft engine path" => raft_path.clone(),
"err" => ?e
);
return;
}
Ok(stats) => stats,
};
let raft_disk_cap = raft_disk_stats.total_space();
let mut raft_disk_available =
raft_disk_cap.checked_sub(raft_size).unwrap_or_default();
raft_disk_available = cmp::min(raft_disk_available, raft_disk_stats.available_space());
raft_disk_status = if raft_disk_available <= raft_already_full_threshold
{
disk::DiskUsage::AlreadyFull
} else if raft_disk_available <= raft_almost_full_threshold
{
disk::DiskUsage::AlmostFull
} else {
disk::DiskUsage::Normal
};
}
let placeholer_file_path = PathBuf::from_str(&data_dir)
.unwrap()
.join(Path::new(file_system::SPACE_PLACEHOLDER_FILE));

let placeholder_size: u64 =
file_system::get_file_size(placeholer_file_path).unwrap_or(0);

let used_size = snap_size + kv_size + raft_size + placeholder_size;
let used_size = if !separated_raft_mount_path {
snap_size + kv_size + raft_size + placeholder_size
} else {
snap_size + kv_size + placeholder_size
};
let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity {
disk_cap
} else {
Expand All @@ -1499,18 +1578,22 @@ where
available = cmp::min(available, disk_stats.available_space());

let prev_disk_status = disk::get_disk_status(0); //0 no need care about failpoint.
let cur_disk_status = if available <= already_full_threshold {
let cur_kv_disk_status = if available <= already_full_threshold {
disk::DiskUsage::AlreadyFull
} else if available <= almost_full_threshold {
disk::DiskUsage::AlmostFull
} else {
disk::DiskUsage::Normal
};
let cur_disk_status = calculate_disk_usage(raft_disk_status, cur_kv_disk_status);
if prev_disk_status != cur_disk_status {
warn!(
"disk usage {:?}->{:?}, available={},snap={},kv={},raft={},capacity={}",
"disk usage {:?}->{:?} (raft engine usage: {:?}, kv engine usage: {:?}), seperated raft mount={}, kv available={}, snap={}, kv={}, raft={}, capacity={}",
prev_disk_status,
cur_disk_status,
raft_disk_status,
cur_kv_disk_status,
separated_raft_mount_path,
available,
snap_size,
kv_size,
Expand Down
1 change: 1 addition & 0 deletions components/tikv_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
log_wrappers = { workspace = true }
mnt = "0.3.1"
nix = "0.24"
num-traits = "0.2"
num_cpus = "1"
Expand Down
9 changes: 9 additions & 0 deletions components/tikv_util/src/sys/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use kvproto::disk_usage::DiskUsage;
// Percent is not configurable, But if you want to change, please make sure
// the percent in both the init fs and store monitor are keep the same.
static DISK_RESERVED_SPACE: AtomicU64 = AtomicU64::new(0);
static RAFT_DISK_RESERVED_SPACE: AtomicU64 = AtomicU64::new(0);
static DISK_STATUS: AtomicI32 = AtomicI32::new(0);

pub fn set_disk_reserved_space(v: u64) {
Expand All @@ -20,6 +21,14 @@ pub fn get_disk_reserved_space() -> u64 {
DISK_RESERVED_SPACE.load(Ordering::Acquire)
}

pub fn set_raft_disk_reserved_space(v: u64) {
RAFT_DISK_RESERVED_SPACE.store(v, Ordering::Release)
}

pub fn get_raft_disk_reserved_space() -> u64 {
RAFT_DISK_RESERVED_SPACE.load(Ordering::Acquire)
}

pub fn set_disk_status(status: DiskUsage) {
let v = match status {
DiskUsage::Normal => 0,
Expand Down

0 comments on commit 2704588

Please sign in to comment.