Skip to content

Commit

Permalink
rafstore, engine_rocks: periodic full compaction
Browse files Browse the repository at this point in the history
ref tikv#12729

adds the concept of a full compaction: a compaction that compacts
all columns families, ranges, and levels. this has the effect of
deleting all of the tombstone markers.

if ``raftstore.full-compact-tick-interval`` is set, attempt running
full compaction at least frequently.

if ``raftstore.full_compact_restrict_hours_local_tz`` is set, run full
compaction only during the hours specified.

the tikv.yaml segment below will run compaction at 03:00 and 23:00
(3am and 11pm respectively) in the tikv nodes' local timezone.

```
[raftstore]
full-compact-tick-interval = "1h"
full-compact-restrict-hours-local-tz = [3, 23]

```

to address in in follow up PRs:
* integration tests.
* pausing/rate-limiting full compactions to avoid disrupting live
  traffic.

Signed-off-by: Alex Feinberg <alex@strlen.net>
  • Loading branch information
afeinberg authored and ti-chi-bot committed Oct 31, 2023
1 parent 629a0a9 commit 6a04b50
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ bitflags = "1.0.1"
byteorder = "1.2"
bytes = "1.0"
causal_ts = { workspace = true }
chrono = "0.4"
collections = { workspace = true }
concurrency_manager = { workspace = true }
crc32fast = "1.2"
Expand Down
13 changes: 13 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ pub struct Config {
pub lock_cf_compact_interval: ReadableDuration,
pub lock_cf_compact_bytes_threshold: ReadableSize,

/// The duration of time to attempt scheduling a full compaction. If not
/// set, full compaction is not run automatically.
pub full_compact_tick_interval: ReadableDuration,
/// Hours of the day during which we may execute a full compaction. If not
/// set or empty, full compaction may be started every
/// `full_compact_tick_interval`. This should be a list in of hours (in
/// the local timezone) e.g., ["23", "4"]
#[online_config(skip)]
pub full_compact_restrict_hours_local_tz: Vec<u32>,

#[online_config(skip)]
pub notify_capacity: usize,
pub messages_per_tick: usize,
Expand Down Expand Up @@ -435,6 +445,9 @@ impl Default for Config {
region_compact_redundant_rows_percent: None,
pd_heartbeat_tick_interval: ReadableDuration::minutes(1),
pd_store_heartbeat_tick_interval: ReadableDuration::secs(10),
// Disable full compaction by default
full_compact_tick_interval: ReadableDuration::secs(0),
full_compact_restrict_hours_local_tz: Vec::new(),
notify_capacity: 40960,
snap_mgr_gc_tick_interval: ReadableDuration::minutes(1),
snap_gc_timeout: ReadableDuration::hours(4),
Expand Down
41 changes: 41 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use batch_system::{
HandlerBuilder, PollHandler, Priority,
};
use causal_ts::CausalTsProviderImpl;
use chrono::Timelike;
use collections::{HashMap, HashMapEntry, HashSet};
use concurrency_manager::ConcurrencyManager;
use crossbeam::channel::{TryRecvError, TrySendError};
Expand Down Expand Up @@ -768,6 +769,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
StoreTick::SnapGc => self.on_snap_mgr_gc(),
StoreTick::CompactLockCf => self.on_compact_lock_cf(),
StoreTick::CompactCheck => self.on_compact_check_tick(),
StoreTick::FullCompact => self.on_full_compact_tick(),
StoreTick::ConsistencyCheck => self.on_consistency_check_tick(),
StoreTick::CleanupImportSst => self.on_cleanup_import_sst_tick(),
}
Expand Down Expand Up @@ -858,6 +860,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.fsm.store.start_time = Some(time::get_time());
self.register_cleanup_import_sst_tick();
self.register_compact_check_tick();
self.register_full_compact_tick();
self.register_pd_store_heartbeat_tick();
self.register_compact_lock_cf_tick();
self.register_snap_mgr_gc_tick();
Expand Down Expand Up @@ -2436,6 +2439,44 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
}

fn register_full_compact_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::FullCompact,
self.ctx.cfg.full_compact_tick_interval.0,
)
}

fn on_full_compact_tick(&mut self) {
self.register_full_compact_tick();
if !self.ctx.cfg.full_compact_restrict_hours_local_tz.is_empty() {
let hour_in_local_tz = chrono::Local::now().time().hour();
if !self
.ctx
.cfg
.full_compact_restrict_hours_local_tz
.contains(&hour_in_local_tz)
{
debug!(
"full compaction may not run at this time";
"hour_in_local_tz" => hour_in_local_tz,
"full_compact_restrict_hours_local_tz" => ?self.ctx.cfg.full_compact_restrict_hours_local_tz,
);
return;
}
}
// full compact
if let Err(e) = self
.ctx
.cleanup_scheduler
.schedule(CleanupTask::Compact(CompactTask::FullCompact))
{
error!("failed to schedule full compaction";
"store_id" => self.fsm.store.id,
"err" => ?e
);
}
}

fn register_compact_check_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::CompactCheck,
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ make_static_metric! {

pub label_enum RaftEventDurationType {
compact_check,
full_compact,
pd_store_heartbeat,
snap_gc,
compact_lock_cf,
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl PeerTick {
#[derive(Debug, Clone, Copy)]
pub enum StoreTick {
CompactCheck,
FullCompact,
PdStoreHeartbeat,
SnapGc,
CompactLockCf,
Expand All @@ -447,6 +448,7 @@ impl StoreTick {
pub fn tag(self) -> RaftEventDurationType {
match self {
StoreTick::CompactCheck => RaftEventDurationType::compact_check,
StoreTick::FullCompact => RaftEventDurationType::full_compact,
StoreTick::PdStoreHeartbeat => RaftEventDurationType::pd_store_heartbeat,
StoreTick::SnapGc => RaftEventDurationType::snap_gc,
StoreTick::CompactLockCf => RaftEventDurationType::compact_lock_cf,
Expand Down
27 changes: 26 additions & 1 deletion components/raftstore/src/store/worker/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use fail::fail_point;
use thiserror::Error;
use tikv_util::{box_try, error, info, time::Instant, warn, worker::Runnable};

use super::metrics::COMPACT_RANGE_CF;
use super::metrics::{COMPACT_RANGE_CF, FULL_COMPACT};

type Key = Vec<u8>;

pub enum Task {
FullCompact,

Compact {
cf_name: String,
start_key: Option<Key>, // None means smallest key
Expand Down Expand Up @@ -58,6 +60,7 @@ impl CompactThreshold {
impl Display for Task {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
Task::FullCompact => f.debug_struct("FullCompact").finish(),
Task::Compact {
ref cf_name,
ref start_key,
Expand Down Expand Up @@ -127,6 +130,23 @@ where
Runner { engine }
}

/// Full compaction.
pub fn full_compact(&mut self) -> Result<(), Error> {
fail_point!("on_full_compact");
let timer = Instant::now();
let full_compact_timer = FULL_COMPACT.start_coarse_timer();
box_try!(
self.engine
.compact_range(None, None, false, 1 /* threads */,)
);
full_compact_timer.observe_duration();
info!(
"full compaction finished";
"time_takes" => ?timer.saturating_elapsed(),
);
Ok(())
}

/// Sends a compact range command to RocksDB to compact the range of the cf.
pub fn compact_range_cf(
&mut self,
Expand Down Expand Up @@ -163,6 +183,11 @@ where

fn run(&mut self, task: Task) {
match task {
Task::FullCompact => {
if let Err(e) = self.full_compact() {
error!("full compaction failed"; "err" => %e);
}
}
Task::Compact {
cf_name,
start_key,
Expand Down
5 changes: 5 additions & 0 deletions components/raftstore/src/store/worker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ lazy_static! {
&["cf"]
)
.unwrap();
pub static ref FULL_COMPACT: Histogram = register_histogram!(
"tikv_raftstore_full_compact_duration_seconds",
"Bucketed histogram of full compaction"
)
.unwrap();
pub static ref REGION_HASH_HISTOGRAM: Histogram = register_histogram!(
"tikv_raftstore_hash_duration_seconds",
"Bucketed histogram of raftstore hash computation duration"
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ fn test_serde_custom_tikv_config() {
slow_trend_unsensitive_result: 0.5,
enable_v2_compatible_learner: false,
unsafe_disable_check_quorum: false,
full_compact_tick_interval: ReadableDuration::secs(0),
full_compact_restrict_hours_local_tz: Vec::new(),
};
value.pd = PdConfig::new(vec!["example.com:443".to_owned()]);
let titan_cf_config = TitanCfConfig {
Expand Down

0 comments on commit 6a04b50

Please sign in to comment.