Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15837
Browse files Browse the repository at this point in the history
ref tikv#15184

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Nov 24, 2023
1 parent b37ecec commit 34a22c6
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 42 deletions.
4 changes: 1 addition & 3 deletions components/pd_client/src/client.rs
Expand Up @@ -1045,9 +1045,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_region_buckets(&self, bucket_stat: &BucketStat, period: Duration) -> PdFuture<()> {
Expand Down
1 change: 0 additions & 1 deletion components/raftstore-v2/src/batch/store.rs
Expand Up @@ -689,7 +689,6 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
causal_ts_provider,
workers.pd.scheduler(),
auto_split_controller,
store_meta.lock().unwrap().region_read_progress.clone(),
collector_reg_handle,
self.logger.clone(),
self.shutdown.clone(),
Expand Down
18 changes: 17 additions & 1 deletion components/raftstore-v2/src/worker/pd/mod.rs
Expand Up @@ -12,9 +12,17 @@ use engine_traits::{KvEngine, RaftEngine, TabletRegistry};
use kvproto::{metapb, pdpb};
use pd_client::{BucketStat, PdClient};
use raftstore::store::{
<<<<<<< HEAD
util::KeysInfoFormatter, AutoSplitController, Config, FlowStatsReporter, PdStatsMonitor,
ReadStats, RegionReadProgressRegistry, SplitInfo, StoreStatsReporter, TabletSnapManager,
TxnExt, WriteStats, NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
=======
metrics::STORE_INSPECT_DURATION_HISTOGRAM,
util::{KeysInfoFormatter, LatencyInspector, RaftstoreDuration},
AutoSplitController, Config, FlowStatsReporter, PdStatsMonitor, ReadStats, SplitInfo,
StoreStatsReporter, TabletSnapManager, TxnExt, WriteStats,
NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
};
use resource_metering::{Collector, CollectorRegHandle, RawRecords};
use slog::{error, Logger};
Expand Down Expand Up @@ -207,13 +215,13 @@ where
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
pd_scheduler: Scheduler<Task>,
auto_split_controller: AutoSplitController,
region_read_progress: RegionReadProgressRegistry,
collector_reg_handle: CollectorRegHandle,
logger: Logger,
shutdown: Arc<AtomicBool>,
cfg: Arc<VersionTrack<Config>>,
) -> Result<Self, std::io::Error> {
let mut stats_monitor = PdStatsMonitor::new(
<<<<<<< HEAD
cfg.value().pd_store_heartbeat_tick_interval.0 / NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
cfg.value().report_min_resolved_ts_interval.0,
PdReporter::new(pd_scheduler, logger.clone()),
Expand All @@ -224,6 +232,14 @@ where
collector_reg_handle,
store_id,
)?;
=======
store_heartbeat_interval / NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
cfg.value().inspect_interval.0,
PdReporter::new(pd_scheduler, logger.clone()),
);
stats_monitor.start(auto_split_controller, collector_reg_handle)?;
let slowness_stats = slowness::SlownessStatistics::new(&cfg.value());
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
Ok(Self {
store_id,
pd_client,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/tests/integrations/cluster.rs
Expand Up @@ -503,6 +503,7 @@ pub fn disable_all_auto_ticks(cfg: &mut Config) {
cfg.region_compact_check_interval = ReadableDuration::ZERO;
cfg.pd_heartbeat_tick_interval = ReadableDuration::ZERO;
cfg.pd_store_heartbeat_tick_interval = ReadableDuration::ZERO;
cfg.pd_report_min_resolved_ts_interval = ReadableDuration::ZERO;
cfg.snap_mgr_gc_tick_interval = ReadableDuration::ZERO;
cfg.lock_cf_compact_interval = ReadableDuration::ZERO;
cfg.peer_stale_state_check_interval = ReadableDuration::ZERO;
Expand All @@ -512,7 +513,6 @@ pub fn disable_all_auto_ticks(cfg: &mut Config) {
cfg.merge_check_tick_interval = ReadableDuration::ZERO;
cfg.cleanup_import_sst_interval = ReadableDuration::ZERO;
cfg.inspect_interval = ReadableDuration::ZERO;
cfg.report_min_resolved_ts_interval = ReadableDuration::ZERO;
cfg.reactive_memory_lock_tick_interval = ReadableDuration::ZERO;
cfg.report_region_buckets_tick_interval = ReadableDuration::ZERO;
cfg.check_long_uncommitted_interval = ReadableDuration::ZERO;
Expand Down
20 changes: 17 additions & 3 deletions components/raftstore/src/store/config.rs
Expand Up @@ -126,6 +126,7 @@ pub struct Config {
pub region_compact_redundant_rows_percent: u64,
pub pd_heartbeat_tick_interval: ReadableDuration,
pub pd_store_heartbeat_tick_interval: ReadableDuration,
pub pd_report_min_resolved_ts_interval: ReadableDuration,
pub snap_mgr_gc_tick_interval: ReadableDuration,
pub snap_gc_timeout: ReadableDuration,
pub lock_cf_compact_interval: ReadableDuration,
Expand Down Expand Up @@ -320,9 +321,6 @@ pub struct Config {
// The unsensitive(increase it to reduce sensitiveness) of the result-trend detection
pub slow_trend_unsensitive_result: f64,

// Interval to report min resolved ts, if it is zero, it means disabled.
pub report_min_resolved_ts_interval: ReadableDuration,

/// Interval to check whether to reactivate in-memory pessimistic lock after
/// being disabled before transferring leader.
pub reactive_memory_lock_tick_interval: ReadableDuration,
Expand Down Expand Up @@ -402,6 +400,15 @@ impl Default for Config {
region_compact_redundant_rows_percent: 20,
pd_heartbeat_tick_interval: ReadableDuration::minutes(1),
pd_store_heartbeat_tick_interval: ReadableDuration::secs(10),
<<<<<<< HEAD
=======
pd_report_min_resolved_ts_interval: ReadableDuration::secs(1),
// Disable periodic full compaction by default.
periodic_full_compact_start_times: ReadableSchedule::default(),
// If periodic full compaction is enabled, do not start a full compaction
// if the CPU utilization is over 10%.
periodic_full_compact_start_max_cpu: 0.1,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
notify_capacity: 40960,
snap_mgr_gc_tick_interval: ReadableDuration::minutes(1),
snap_gc_timeout: ReadableDuration::hours(4),
Expand Down Expand Up @@ -473,7 +480,11 @@ impl Default for Config {
// make it `10.0` to reduce a bit sensitiveness because SpikeFilter is disabled
slow_trend_unsensitive_cause: 10.0,
slow_trend_unsensitive_result: 0.5,
<<<<<<< HEAD
report_min_resolved_ts_interval: ReadableDuration::secs(1),
=======
slow_trend_network_io_factor: 0.0,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
check_leader_lease_interval: ReadableDuration::secs(0),
renew_leader_lease_advance_duration: ReadableDuration::secs(0),
allow_unsafe_vote_after_start: false,
Expand Down Expand Up @@ -943,6 +954,9 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["pd_store_heartbeat_tick_interval"])
.set(self.pd_store_heartbeat_tick_interval.as_secs_f64());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["pd_report_min_resolved_ts_interval"])
.set(self.pd_report_min_resolved_ts_interval.as_secs_f64());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["snap_mgr_gc_tick_interval"])
.set(self.snap_mgr_gc_tick_interval.as_secs_f64());
Expand Down
37 changes: 33 additions & 4 deletions components/raftstore/src/store/fsm/store.rs
Expand Up @@ -769,6 +769,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
StoreTick::CompactCheck => self.on_compact_check_tick(),
StoreTick::ConsistencyCheck => self.on_consistency_check_tick(),
StoreTick::CleanupImportSst => self.on_cleanup_import_sst_tick(),
StoreTick::PdReportMinResolvedTs => self.on_pd_report_min_resolved_ts_tick(),
}
let elapsed = timer.saturating_elapsed();
self.ctx
Expand Down Expand Up @@ -861,6 +862,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.register_cleanup_import_sst_tick();
self.register_compact_check_tick();
self.register_pd_store_heartbeat_tick();
self.register_pd_report_min_resolved_ts_tick();
self.register_compact_lock_cf_tick();
self.register_snap_mgr_gc_tick();
self.register_consistency_check_tick();
Expand Down Expand Up @@ -1680,7 +1682,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
&cfg,
)?;

let region_read_progress = store_meta.lock().unwrap().region_read_progress.clone();
let mut builder = RaftPollerBuilder {
cfg,
store: meta,
Expand Down Expand Up @@ -1717,7 +1718,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
mgr,
pd_client,
collector_reg_handle,
region_read_progress,
health_service,
causal_ts_provider,
)?;
Expand All @@ -1734,7 +1734,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
snap_mgr: SnapManager,
pd_client: Arc<C>,
collector_reg_handle: CollectorRegHandle,
region_read_progress: RegionReadProgressRegistry,
health_service: Option<HealthService>,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
) -> Result<()> {
Expand Down Expand Up @@ -1823,7 +1822,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
snap_mgr,
workers.pd_worker.remote(),
collector_reg_handle,
region_read_progress,
health_service,
coprocessor_host,
causal_ts_provider,
Expand Down Expand Up @@ -2524,6 +2522,25 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
}

fn report_min_resolved_ts(&self) {
let read_progress = {
let meta = self.ctx.store_meta.lock().unwrap();
meta.region_read_progress().clone()
};
let min_resolved_ts = read_progress.get_min_resolved_ts();

let task = PdTask::ReportMinResolvedTs {
store_id: self.fsm.store.id,
min_resolved_ts,
};
if let Err(e) = self.ctx.pd_scheduler.schedule(task) {
error!("failed to send min resolved ts to pd worker";
"store_id" => self.fsm.store.id,
"err" => ?e
);
}
}

fn store_heartbeat_pd(&mut self, report: Option<pdpb::StoreReport>) {
let mut stats = StoreStats::default();

Expand Down Expand Up @@ -2630,6 +2647,11 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
self.register_pd_store_heartbeat_tick();
}

fn on_pd_report_min_resolved_ts_tick(&mut self) {
self.report_min_resolved_ts();
self.register_pd_report_min_resolved_ts_tick();
}

fn on_snap_mgr_gc(&mut self) {
// refresh multi_snapshot_files enable flag
self.ctx.snap_mgr.set_enable_multi_snapshot_files(
Expand Down Expand Up @@ -2733,6 +2755,13 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
);
}

fn register_pd_report_min_resolved_ts_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::PdReportMinResolvedTs,
self.ctx.cfg.pd_report_min_resolved_ts_interval.0,
);
}

fn register_snap_mgr_gc_tick(&self) {
self.ctx
.schedule_store_tick(StoreTick::SnapGc, self.ctx.cfg.snap_mgr_gc_tick_interval.0)
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/metrics.rs
Expand Up @@ -215,6 +215,7 @@ make_static_metric! {
pub label_enum RaftEventDurationType {
compact_check,
pd_store_heartbeat,
pd_report_min_resolved_ts,
snap_gc,
compact_lock_cf,
consistency_check,
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore/src/store/msg.rs
Expand Up @@ -441,6 +441,7 @@ pub enum StoreTick {
CompactLockCf,
ConsistencyCheck,
CleanupImportSst,
PdReportMinResolvedTs,
}

impl StoreTick {
Expand All @@ -453,6 +454,11 @@ impl StoreTick {
StoreTick::CompactLockCf => RaftEventDurationType::compact_lock_cf,
StoreTick::ConsistencyCheck => RaftEventDurationType::consistency_check,
StoreTick::CleanupImportSst => RaftEventDurationType::cleanup_import_sst,
<<<<<<< HEAD
=======
StoreTick::LoadMetricsWindow => RaftEventDurationType::load_metrics_window,
StoreTick::PdReportMinResolvedTs => RaftEventDurationType::pd_report_min_resolved_ts,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
}
}
}
Expand Down

0 comments on commit 34a22c6

Please sign in to comment.