Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15853
Browse files Browse the repository at this point in the history
ref tikv#12729

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
afeinberg authored and ti-chi-bot committed Oct 31, 2023
1 parent 93e4f71 commit 1ac1b0a
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 3 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/online_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2018"
publish = false

[dependencies]
chrono = "0.4"
online_config_derive = { path = "./online_config_derive" }
serde = { version = "1.0", features = ["derive"] }

Expand Down
7 changes: 7 additions & 0 deletions components/online_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use std::{
fmt::{self, Debug, Display, Formatter},
};

use chrono::{FixedOffset, NaiveTime};
pub use online_config_derive::*;

pub type ConfigChange = HashMap<String, ConfigValue>;
pub type OffsetTime = (NaiveTime, FixedOffset);
pub type Schedule = Vec<OffsetTime>;

#[derive(Clone, PartialEq)]
pub enum ConfigValue {
Expand All @@ -23,6 +26,8 @@ pub enum ConfigValue {
BlobRunMode(String),
IOPriority(String),
Module(ConfigChange),
OffsetTime(OffsetTime),
Schedule(Schedule),
Skip,
None,
}
Expand All @@ -42,6 +47,8 @@ impl Display for ConfigValue {
ConfigValue::BlobRunMode(v) => write!(f, "{}", v),
ConfigValue::IOPriority(v) => write!(f, "{}", v),
ConfigValue::Module(v) => write!(f, "{:?}", v),
ConfigValue::OffsetTime((t, o)) => write!(f, "{} {}", t, o),
ConfigValue::Schedule(v) => write!(f, "{:?}", v),
ConfigValue::Skip => write!(f, "ConfigValue::Skip"),
ConfigValue::None => write!(f, ""),
}
Expand Down
7 changes: 7 additions & 0 deletions components/raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ batch-system = { path = "../batch-system", default-features = false }
bitflags = "1.0.1"
byteorder = "1.2"
bytes = "1.0"
<<<<<<< HEAD
collections = { path = "../collections" }
concurrency_manager = { path = "../concurrency_manager", default-features = false }
=======
causal_ts = { workspace = true }
chrono = "0.4"
collections = { workspace = true }
concurrency_manager = { workspace = true }
>>>>>>> 2a24cfc4b2 (rafstore, engine_rocks: periodic full compaction (#12729) (#15853))
crc32fast = "1.2"
crossbeam = "0.8"
derivative = "2"
Expand Down
16 changes: 15 additions & 1 deletion components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use tikv_util::{
box_err,
config::{ReadableDuration, ReadableSize, VersionTrack},
config::{ReadableDuration, ReadableSchedule, ReadableSize, VersionTrack},
error, info,
sys::SysQuota,
warn,
Expand Down Expand Up @@ -112,6 +112,15 @@ pub struct Config {
pub lock_cf_compact_interval: ReadableDuration,
pub lock_cf_compact_bytes_threshold: ReadableSize,

/// Hours of the day during which we may execute a periodic full compaction.
/// If not set or empty, periodic full compaction will not run. In toml this
/// should be a list of timesin "HH:MM" format with an optional timezone
/// offset. If no timezone is specified, local timezone is used. E.g.,
/// `["23:00 +0000", "03:00 +0700"]` or `["23:00", "03:00"]`.
pub periodic_full_compact_start_times: ReadableSchedule,
/// Do not start a full compaction if cpu utilization exceeds this number.
pub periodic_full_compact_start_max_cpu: f64,

#[online_config(skip)]
pub notify_capacity: usize,
pub messages_per_tick: usize,
Expand Down Expand Up @@ -313,6 +322,11 @@ impl Default for Config {
region_compact_tombstones_percent: 30,
pd_heartbeat_tick_interval: ReadableDuration::minutes(1),
pd_store_heartbeat_tick_interval: ReadableDuration::secs(10),
// Disable periodic full compaction by default.
periodic_full_compact_start_times: ReadableSchedule::default(),
// If periodic full compaction is enabled, do not start a full compaction
// if the CPU utilization is over 10%.
periodic_full_compact_start_max_cpu: 0.1,
notify_capacity: 40960,
snap_mgr_gc_tick_interval: ReadableDuration::minutes(1),
snap_gc_timeout: ReadableDuration::hours(4),
Expand Down
76 changes: 76 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,20 @@ use tikv_util::{
future::poll_future_notify,
info, is_zero_duration,
mpsc::{self, LooseBoundedSender, Receiver},
<<<<<<< HEAD
slow_log, sys as sys_util,
sys::disk::{get_disk_status, DiskUsage},
time::{duration_to_sec, Instant as TiInstant},
=======
slow_log,
store::{find_peer, region_on_stores},
sys::{
self as sys_util,
cpu_time::ProcessStat,
disk::{get_disk_status, DiskUsage},
},
time::{duration_to_sec, monotonic_raw_now, Instant as TiInstant},
>>>>>>> 2a24cfc4b2 (rafstore, engine_rocks: periodic full compaction (#12729) (#15853))
timer::SteadyTimer,
warn,
worker::{LazyWorker, Scheduler, Worker},
Expand Down Expand Up @@ -108,6 +119,10 @@ pub const PENDING_MSG_CAP: usize = 100;
const ENTRY_CACHE_EVICT_TICK_DURATION: Duration = Duration::from_secs(1);
pub const MULTI_FILES_SNAPSHOT_FEATURE: Feature = Feature::require(6, 1, 0); // it only makes sense for large region

// Every 30 minutes, check if we can run full compaction. This allows the config
// setting `periodic_full_compact_start_max_cpu` to be changed dynamically.
const PERIODIC_FULL_COMPACT_TICK_INTERVAL_DURATION: Duration = Duration::from_secs(30 * 60);

pub struct StoreInfo<EK, ER> {
pub kv_engine: EK,
pub raft_engine: ER,
Expand Down Expand Up @@ -661,6 +676,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
StoreTick::SnapGc => self.on_snap_mgr_gc(),
StoreTick::CompactLockCf => self.on_compact_lock_cf(),
StoreTick::CompactCheck => self.on_compact_check_tick(),
StoreTick::PeriodicFullCompact => self.on_full_compact_tick(),
StoreTick::ConsistencyCheck => self.on_consistency_check_tick(),
StoreTick::CleanupImportSst => self.on_cleanup_import_sst_tick(),
}
Expand Down Expand Up @@ -738,6 +754,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.fsm.store.start_time = Some(time::get_time());
self.register_cleanup_import_sst_tick();
self.register_compact_check_tick();
self.register_full_compact_tick();
self.register_pd_store_heartbeat_tick();
self.register_compact_lock_cf_tick();
self.register_snap_mgr_gc_tick();
Expand Down Expand Up @@ -2190,6 +2207,65 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
}

fn register_full_compact_tick(&self) {
if !self.ctx.cfg.periodic_full_compact_start_times.is_empty() {
self.ctx.schedule_store_tick(
StoreTick::PeriodicFullCompact,
PERIODIC_FULL_COMPACT_TICK_INTERVAL_DURATION,
)
}
}

fn on_full_compact_tick(&mut self) {
self.register_full_compact_tick();

let local_time = chrono::Local::now();
if !self
.ctx
.cfg
.periodic_full_compact_start_times
.is_scheduled_this_hour(&local_time)
{
debug!(
"full compaction may not run at this time";
"local_time" => ?local_time,
"periodic_full_compact_start_times" => ?self.ctx.cfg.periodic_full_compact_start_times,
);
return;
}

if self.ctx.global_stat.stat.is_busy.load(Ordering::SeqCst) {
warn!("full compaction may not run at this time, `is_busy` flag is true",);
return;
}

let mut proc_stats = ProcessStat::cur_proc_stat().unwrap();
let cpu_usage = proc_stats.cpu_usage().unwrap();
let max_start_cpu_usage = self.ctx.cfg.periodic_full_compact_start_max_cpu;
if cpu_usage > max_start_cpu_usage {
warn!(
"full compaction may not run at this time, cpu usage is above max";
"cpu_usage" => cpu_usage,
"threshold" => max_start_cpu_usage,
);
return;
}

// Attempt executing a periodic full compaction.
// Note that full compaction will not run if other compaction tasks are running.
if let Err(e) = self
.ctx
.cleanup_scheduler
.schedule(CleanupTask::Compact(CompactTask::PeriodicFullCompact))
{
error!(
"failed to schedule a periodic full compaction";
"store_id" => self.fsm.store.id,
"err" => ?e
);
}
}

fn register_compact_check_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::CompactCheck,
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ make_auto_flush_static_metric! {
make_static_metric! {
pub label_enum RaftEventDurationType {
compact_check,
periodic_full_compact,
pd_store_heartbeat,
snap_gc,
compact_lock_cf,
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl PeerTick {
#[derive(Debug, Clone, Copy)]
pub enum StoreTick {
CompactCheck,
PeriodicFullCompact,
PdStoreHeartbeat,
SnapGc,
CompactLockCf,
Expand All @@ -272,6 +273,7 @@ impl StoreTick {
pub fn tag(self) -> RaftEventDurationType {
match self {
StoreTick::CompactCheck => RaftEventDurationType::compact_check,
StoreTick::PeriodicFullCompact => RaftEventDurationType::periodic_full_compact,
StoreTick::PdStoreHeartbeat => RaftEventDurationType::pd_store_heartbeat,
StoreTick::SnapGc => RaftEventDurationType::snap_gc,
StoreTick::CompactLockCf => RaftEventDurationType::compact_lock_cf,
Expand Down
75 changes: 74 additions & 1 deletion components/raftstore/src/store/worker/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use fail::fail_point;
use thiserror::Error;
use tikv_util::{box_try, error, info, time::Instant, warn, worker::Runnable};

use super::metrics::COMPACT_RANGE_CF;
use super::metrics::{COMPACT_RANGE_CF, FULL_COMPACT};

type Key = Vec<u8>;

pub enum Task {
PeriodicFullCompact,

Compact {
cf_name: String,
start_key: Option<Key>, // None means smallest key
Expand All @@ -33,6 +35,7 @@ pub enum Task {
impl Display for Task {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
Task::PeriodicFullCompact => f.debug_struct("FullCompact").finish(),
Task::Compact {
ref cf_name,
ref start_key,
Expand Down Expand Up @@ -92,6 +95,31 @@ where
Runner { engine }
}

/// Periodic full compaction.
///
/// NOTE this is a highly experimental feature!
///
/// TODO: Do not start if there is heavy I/O.
/// TODO: Make it possible to rate limit, pause, or abort this by compacting
/// a range at a time.
pub fn full_compact(&mut self) -> Result<(), Error> {
fail_point!("on_full_compact");
info!("full compaction started");
let timer = Instant::now();
let full_compact_timer = FULL_COMPACT.start_coarse_timer();
box_try!(self.engine.compact_range(
None, None, // Compact the entire key range.
true, // no other compaction will run when this is running
1, // number of threads threads
));
full_compact_timer.observe_duration();
info!(
"full compaction finished";
"time_takes" => ?timer.saturating_elapsed(),
);
Ok(())
}

/// Sends a compact range command to RocksDB to compact the range of the cf.
pub fn compact_range_cf(
&mut self,
Expand Down Expand Up @@ -127,6 +155,11 @@ where

fn run(&mut self, task: Task) {
match task {
Task::PeriodicFullCompact => {
if let Err(e) = self.full_compact() {
error!("periodic full compaction failed"; "err" => %e);
}
}
Task::Compact {
cf_name,
start_key,
Expand Down Expand Up @@ -412,4 +445,44 @@ mod tests {
expected_ranges.push_back((s, e));
assert_eq!(ranges_need_to_compact, expected_ranges);
}

#[test]
fn test_full_compact_deletes() {
let tmp_dir = Builder::new().prefix("test").tempdir().unwrap();
let engine = open_db(tmp_dir.path().to_str().unwrap());
let mut runner = Runner::new(engine.clone());

// mvcc_put 0..5
for i in 0..5 {
let (k, v) = (format!("k{}", i), format!("value{}", i));
mvcc_put(&engine, k.as_bytes(), v.as_bytes(), 1.into(), 2.into());
}
engine.flush_cf(CF_WRITE, true).unwrap();

let (start, end) = (data_key(b"k0"), data_key(b"k5"));
let stats = engine
.get_range_stats(CF_WRITE, &start, &end)
.unwrap()
.unwrap();
assert_eq!(stats.num_entries, stats.num_versions);

for i in 0..5 {
let k = format!("k{}", i);
delete(&engine, k.as_bytes(), 3.into());
}
engine.flush_cf(CF_WRITE, true).unwrap();

let stats = engine
.get_range_stats(CF_WRITE, &start, &end)
.unwrap()
.unwrap();
assert_eq!(stats.num_entries - stats.num_versions, 5);

runner.run(Task::PeriodicFullCompact);
let stats = engine
.get_range_stats(CF_WRITE, &start, &end)
.unwrap()
.unwrap();
assert_eq!(stats.num_entries - stats.num_versions, 0);
}
}
5 changes: 5 additions & 0 deletions components/raftstore/src/store/worker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ lazy_static! {
&["cf"]
)
.unwrap();
pub static ref FULL_COMPACT: Histogram = register_histogram!(
"tikv_storage_full_compact_duration_seconds",
"Bucketed histogram of full compaction for the storage."
)
.unwrap();
pub static ref REGION_HASH_HISTOGRAM: Histogram = register_histogram!(
"tikv_raftstore_hash_duration_seconds",
"Bucketed histogram of raftstore hash computation duration"
Expand Down
Loading

0 comments on commit 1ac1b0a

Please sign in to comment.