Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15853
Browse files Browse the repository at this point in the history
ref tikv#12729

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
afeinberg authored and ti-chi-bot committed Oct 31, 2023
1 parent 47d1da1 commit fc8b378
Show file tree
Hide file tree
Showing 12 changed files with 638 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/online_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2018"
publish = false

[dependencies]
chrono = "0.4"
online_config_derive = { path = "./online_config_derive" }
serde = { version = "1.0", features = ["derive"] }

Expand Down
7 changes: 7 additions & 0 deletions components/online_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use std::collections::HashMap;
use std::fmt::{self, Debug, Display, Formatter};

use chrono::{FixedOffset, NaiveTime};
pub use online_config_derive::*;

pub type ConfigChange = HashMap<String, ConfigValue>;
pub type OffsetTime = (NaiveTime, FixedOffset);
pub type Schedule = Vec<OffsetTime>;

#[derive(Clone, PartialEq)]
pub enum ConfigValue {
Expand All @@ -22,6 +25,8 @@ pub enum ConfigValue {
IOPriority(String),
OptionSize(Option<u64>),
Module(ConfigChange),
OffsetTime(OffsetTime),
Schedule(Schedule),
Skip,
}

Expand All @@ -42,6 +47,8 @@ impl Display for ConfigValue {
ConfigValue::BlobRunMode(v) => write!(f, "{}", v),
ConfigValue::IOPriority(v) => write!(f, "{}", v),
ConfigValue::Module(v) => write!(f, "{:?}", v),
ConfigValue::OffsetTime((t, o)) => write!(f, "{} {}", t, o),
ConfigValue::Schedule(v) => write!(f, "{:?}", v),
ConfigValue::Skip => write!(f, "ConfigValue::Skip"),
}
}
Expand Down
7 changes: 7 additions & 0 deletions components/raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,15 @@ batch-system = { path = "../batch-system", default-features = false }
bitflags = "1.0.1"
byteorder = "1.2"
bytes = "1.0"
<<<<<<< HEAD
concurrency_manager = { path = "../concurrency_manager", default-features = false }
online_config = { path = "../online_config" }
=======
causal_ts = { workspace = true }
chrono = "0.4"
collections = { workspace = true }
concurrency_manager = { workspace = true }
>>>>>>> 2a24cfc4b2 (rafstore, engine_rocks: periodic full compaction (#12729) (#15853))
crc32fast = "1.2"
crossbeam = "0.8"
derivative = "2"
Expand Down
29 changes: 29 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,23 @@ use online_config::{ConfigChange, ConfigManager, ConfigValue, OnlineConfig};
use prometheus::register_gauge_vec;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
<<<<<<< HEAD
use tikv_util::config::{ReadableDuration, ReadableSize, VersionTrack};
use tikv_util::{box_err, info, warn};
=======
use tikv_util::{
box_err,
config::{ReadableDuration, ReadableSchedule, ReadableSize, VersionTrack},
error, info,
sys::SysQuota,
warn,
worker::Scheduler,
};
use time::Duration as TimeDuration;

use super::worker::{RaftStoreBatchComponent, RefreshConfigTask};
use crate::{coprocessor::config::RAFTSTORE_V2_SPLIT_SIZE, Result};
>>>>>>> 2a24cfc4b2 (rafstore, engine_rocks: periodic full compaction (#12729) (#15853))

lazy_static! {
pub static ref CONFIG_RAFTSTORE_GAUGE: prometheus::GaugeVec = register_gauge_vec!(
Expand Down Expand Up @@ -109,6 +124,15 @@ pub struct Config {
pub lock_cf_compact_interval: ReadableDuration,
pub lock_cf_compact_bytes_threshold: ReadableSize,

/// Hours of the day during which we may execute a periodic full compaction.
/// If not set or empty, periodic full compaction will not run. In toml this
/// should be a list of timesin "HH:MM" format with an optional timezone
/// offset. If no timezone is specified, local timezone is used. E.g.,
/// `["23:00 +0000", "03:00 +0700"]` or `["23:00", "03:00"]`.
pub periodic_full_compact_start_times: ReadableSchedule,
/// Do not start a full compaction if cpu utilization exceeds this number.
pub periodic_full_compact_start_max_cpu: f64,

#[online_config(skip)]
pub notify_capacity: usize,
pub messages_per_tick: usize,
Expand Down Expand Up @@ -278,6 +302,11 @@ impl Default for Config {
region_compact_tombstones_percent: 30,
pd_heartbeat_tick_interval: ReadableDuration::minutes(1),
pd_store_heartbeat_tick_interval: ReadableDuration::secs(10),
// Disable periodic full compaction by default.
periodic_full_compact_start_times: ReadableSchedule::default(),
// If periodic full compaction is enabled, do not start a full compaction
// if the CPU utilization is over 10%.
periodic_full_compact_start_max_cpu: 0.1,
notify_capacity: 40960,
snap_mgr_gc_tick_interval: ReadableDuration::minutes(1),
snap_gc_timeout: ReadableDuration::hours(4),
Expand Down
94 changes: 94 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,35 @@ use kvproto::raft_serverpb::{ExtraMessageType, PeerState, RaftMessage, RegionLoc
use kvproto::replication_modepb::{ReplicationMode, ReplicationStatus};
use protobuf::Message;
use raft::StateRole;
<<<<<<< HEAD
=======
use resource_control::{channel::unbounded, ResourceGroupManager};
use resource_metering::CollectorRegHandle;
use service::service_manager::GrpcServiceManager;
use sst_importer::SstImporter;
use tikv_alloc::trace::TraceEvent;
use tikv_util::{
box_try,
config::{Tracker, VersionTrack},
debug, defer, error,
future::poll_future_notify,
info, is_zero_duration,
mpsc::{self, LooseBoundedSender, Receiver},
slow_log,
store::{find_peer, region_on_stores},
sys::{
self as sys_util,
cpu_time::ProcessStat,
disk::{get_disk_status, DiskUsage},
},
time::{duration_to_sec, monotonic_raw_now, Instant as TiInstant},
timer::SteadyTimer,
warn,
worker::{LazyWorker, Scheduler, Worker},
yatp_pool::FuturePool,
Either, RingQueue,
};
>>>>>>> 2a24cfc4b2 (rafstore, engine_rocks: periodic full compaction (#12729) (#15853))
use time::{self, Timespec};

use collections::HashMap;
Expand Down Expand Up @@ -91,6 +120,10 @@ pub const PENDING_MSG_CAP: usize = 100;
const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10);
const ENTRY_CACHE_EVICT_TICK_DURATION: Duration = Duration::from_secs(1);

// Every 30 minutes, check if we can run full compaction. This allows the config
// setting `periodic_full_compact_start_max_cpu` to be changed dynamically.
const PERIODIC_FULL_COMPACT_TICK_INTERVAL_DURATION: Duration = Duration::from_secs(30 * 60);

pub struct StoreInfo<EK, ER> {
pub kv_engine: EK,
pub raft_engine: ER,
Expand Down Expand Up @@ -571,6 +604,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
StoreTick::SnapGc => self.on_snap_mgr_gc(),
StoreTick::CompactLockCf => self.on_compact_lock_cf(),
StoreTick::CompactCheck => self.on_compact_check_tick(),
StoreTick::PeriodicFullCompact => self.on_full_compact_tick(),
StoreTick::ConsistencyCheck => self.on_consistency_check_tick(),
StoreTick::CleanupImportSST => self.on_cleanup_import_sst_tick(),
StoreTick::RaftEnginePurge => self.on_raft_engine_purge_tick(),
Expand Down Expand Up @@ -636,6 +670,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.fsm.store.start_time = Some(time::get_time());
self.register_cleanup_import_sst_tick();
self.register_compact_check_tick();
self.register_full_compact_tick();
self.register_pd_store_heartbeat_tick();
self.register_compact_lock_cf_tick();
self.register_snap_mgr_gc_tick();
Expand Down Expand Up @@ -1962,6 +1997,65 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
}

fn register_full_compact_tick(&self) {
if !self.ctx.cfg.periodic_full_compact_start_times.is_empty() {
self.ctx.schedule_store_tick(
StoreTick::PeriodicFullCompact,
PERIODIC_FULL_COMPACT_TICK_INTERVAL_DURATION,
)
}
}

fn on_full_compact_tick(&mut self) {
self.register_full_compact_tick();

let local_time = chrono::Local::now();
if !self
.ctx
.cfg
.periodic_full_compact_start_times
.is_scheduled_this_hour(&local_time)
{
debug!(
"full compaction may not run at this time";
"local_time" => ?local_time,
"periodic_full_compact_start_times" => ?self.ctx.cfg.periodic_full_compact_start_times,
);
return;
}

if self.ctx.global_stat.stat.is_busy.load(Ordering::SeqCst) {
warn!("full compaction may not run at this time, `is_busy` flag is true",);
return;
}

let mut proc_stats = ProcessStat::cur_proc_stat().unwrap();
let cpu_usage = proc_stats.cpu_usage().unwrap();
let max_start_cpu_usage = self.ctx.cfg.periodic_full_compact_start_max_cpu;
if cpu_usage > max_start_cpu_usage {
warn!(
"full compaction may not run at this time, cpu usage is above max";
"cpu_usage" => cpu_usage,
"threshold" => max_start_cpu_usage,
);
return;
}

// Attempt executing a periodic full compaction.
// Note that full compaction will not run if other compaction tasks are running.
if let Err(e) = self
.ctx
.cleanup_scheduler
.schedule(CleanupTask::Compact(CompactTask::PeriodicFullCompact))
{
error!(
"failed to schedule a periodic full compaction";
"store_id" => self.fsm.store.id,
"err" => ?e
);
}
}

fn register_compact_check_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::CompactCheck,
Expand Down
124 changes: 124 additions & 0 deletions components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,130 @@ make_auto_flush_static_metric! {
}

make_static_metric! {
<<<<<<< HEAD
=======
pub label_enum RaftReadyType {
message,
commit,
append,
snapshot,
pending_region,
has_ready_region,
}

pub label_enum RaftSentMessageCounterType {
append,
append_resp,
prevote,
prevote_resp,
vote,
vote_resp,
snapshot,
heartbeat,
heartbeat_resp,
transfer_leader,
timeout_now,
read_index,
read_index_resp,
}

pub label_enum SendStatus {
accept,
drop,
}

pub label_enum RaftDroppedMessage {
mismatch_store_id,
mismatch_region_epoch,
mismatch_witness_snapshot,
stale_msg,
region_overlap,
region_no_peer,
region_tombstone_peer,
region_nonexistent,
applying_snap,
disk_full,
non_witness,
recovery,
unsafe_vote,
}

pub label_enum ProposalType {
all,
local_read,
read_index,
unsafe_read_index,
normal,
transfer_leader,
conf_change,
batch,
dropped_read_index,
}

pub label_enum RaftInvalidProposal {
mismatch_store_id,
region_not_found,
not_leader,
mismatch_peer_id,
stale_command,
epoch_not_match,
read_index_no_leader,
region_not_initialized,
is_applying_snapshot,
force_leader,
witness,
flashback_in_progress,
flashback_not_prepared,
non_witness,
}

pub label_enum RaftEventDurationType {
compact_check,
periodic_full_compact,
pd_store_heartbeat,
snap_gc,
compact_lock_cf,
consistency_check,
cleanup_import_sst,
raft_engine_purge,
peer_msg,
store_msg,
}

pub label_enum RaftLogGcSkippedReason {
reserve_log,
compact_idx_too_small,
threshold_limit,
}

pub label_enum LoadBaseSplitEventType {
// Workload fits the QPS threshold or byte threshold.
load_fit,
// Workload fits the CPU threshold.
cpu_load_fit,
// The statistical key is empty.
empty_statistical_key,
// Split info has been collected, ready to split.
ready_to_split,
// Split info has not been collected yet, not ready to split.
not_ready_to_split,
// The number of sampled keys does not meet the threshold.
no_enough_sampled_key,
// The number of sampled keys located on left and right does not meet the threshold.
no_enough_lr_key,
// The number of balanced keys does not meet the score.
no_balance_key,
// The number of contained keys does not meet the score.
no_uncross_key,
// Split info for the top hot CPU region has been collected, ready to split.
ready_to_split_cpu_top,
// Hottest key range for the top hot CPU region could not be found.
empty_hottest_key_range,
// The top hot CPU region could not be split.
unable_to_split_cpu_top,
}

>>>>>>> 2a24cfc4b2 (rafstore, engine_rocks: periodic full compaction (#12729) (#15853))
pub struct HibernatedPeerStateGauge: IntGauge {
"state" => {
awaken,
Expand Down
Loading

0 comments on commit fc8b378

Please sign in to comment.