Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15837
Browse files Browse the repository at this point in the history
ref tikv#15184

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Nov 24, 2023
1 parent 29c4807 commit 7a81481
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 25 deletions.
4 changes: 1 addition & 3 deletions components/pd_client/src/client.rs
Expand Up @@ -1063,9 +1063,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_region_buckets(&self, bucket_stat: &BucketStat, period: Duration) -> PdFuture<()> {
Expand Down
26 changes: 26 additions & 0 deletions components/raftstore-v2/src/batch/store.rs
Expand Up @@ -418,13 +418,39 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
.async_read_worker
.start("async-read-worker", read_runner);

<<<<<<< HEAD
let pd_scheduler = workers.pd_worker.start(
"pd-worker",
PdRunner::new(
store_id,
pd_client,
raft_engine.clone(),
tablet_factory.clone(),
=======
workers.pd.start(pd::Runner::new(
store_id,
pd_client,
raft_engine.clone(),
tablet_registry.clone(),
snap_mgr.clone(),
router.clone(),
workers.pd.remote(),
concurrency_manager,
causal_ts_provider,
workers.pd.scheduler(),
auto_split_controller,
collector_reg_handle,
grpc_service_mgr,
self.logger.clone(),
self.shutdown.clone(),
cfg.clone(),
)?);

let split_check_scheduler = workers.background.start(
"split-check",
SplitCheckRunner::with_registry(
tablet_registry.clone(),
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
router.clone(),
workers.pd_worker.remote(),
concurrency_manager,
Expand Down
39 changes: 39 additions & 0 deletions components/raftstore-v2/src/worker/pd/mod.rs
Expand Up @@ -13,10 +13,29 @@ use collections::HashMap;
use concurrency_manager::ConcurrencyManager;
use engine_traits::{KvEngine, RaftEngine, TabletFactory};
use kvproto::{metapb, pdpb};
<<<<<<< HEAD
use pd_client::PdClient;
use raftstore::store::{util::KeysInfoFormatter, TxnExt};
use slog::{error, info, Logger};
use tikv_util::{time::UnixSecs, worker::Runnable};
=======
use pd_client::{BucketStat, PdClient};
use raftstore::store::{
metrics::STORE_INSPECT_DURATION_HISTOGRAM,
util::{KeysInfoFormatter, LatencyInspector, RaftstoreDuration},
AutoSplitController, Config, FlowStatsReporter, PdStatsMonitor, ReadStats, SplitInfo,
StoreStatsReporter, TabletSnapManager, TxnExt, WriteStats,
NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
};
use resource_metering::{Collector, CollectorRegHandle, RawRecords};
use service::service_manager::GrpcServiceManager;
use slog::{error, warn, Logger};
use tikv_util::{
config::VersionTrack,
time::{Instant as TiInstant, UnixSecs},
worker::{Runnable, Scheduler},
};
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
use yatp::{task::future::TaskCell, Remote};

use crate::{batch::StoreRouter, router::PeerMsg};
Expand Down Expand Up @@ -135,10 +154,30 @@ where
remote: Remote<TaskCell>,
concurrency_manager: ConcurrencyManager,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
<<<<<<< HEAD
logger: Logger,
shutdown: Arc<AtomicBool>,
) -> Self {
Self {
=======
pd_scheduler: Scheduler<Task>,
auto_split_controller: AutoSplitController,
collector_reg_handle: CollectorRegHandle,
grpc_service_manager: GrpcServiceManager,
logger: Logger,
shutdown: Arc<AtomicBool>,
cfg: Arc<VersionTrack<Config>>,
) -> Result<Self, std::io::Error> {
let store_heartbeat_interval = cfg.value().pd_store_heartbeat_tick_interval.0;
let mut stats_monitor = PdStatsMonitor::new(
store_heartbeat_interval / NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
cfg.value().inspect_interval.0,
PdReporter::new(pd_scheduler, logger.clone()),
);
stats_monitor.start(auto_split_controller, collector_reg_handle)?;
let slowness_stats = slowness::SlownessStatistics::new(&cfg.value());
Ok(Self {
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
store_id,
pd_client,
raft_engine,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/tests/integrations/cluster.rs
Expand Up @@ -401,6 +401,7 @@ pub fn disable_all_auto_ticks(cfg: &mut Config) {
cfg.region_compact_check_interval = ReadableDuration::ZERO;
cfg.pd_heartbeat_tick_interval = ReadableDuration::ZERO;
cfg.pd_store_heartbeat_tick_interval = ReadableDuration::ZERO;
cfg.pd_report_min_resolved_ts_interval = ReadableDuration::ZERO;
cfg.snap_mgr_gc_tick_interval = ReadableDuration::ZERO;
cfg.lock_cf_compact_interval = ReadableDuration::ZERO;
cfg.peer_stale_state_check_interval = ReadableDuration::ZERO;
Expand All @@ -410,7 +411,6 @@ pub fn disable_all_auto_ticks(cfg: &mut Config) {
cfg.merge_check_tick_interval = ReadableDuration::ZERO;
cfg.cleanup_import_sst_interval = ReadableDuration::ZERO;
cfg.inspect_interval = ReadableDuration::ZERO;
cfg.report_min_resolved_ts_interval = ReadableDuration::ZERO;
cfg.reactive_memory_lock_tick_interval = ReadableDuration::ZERO;
cfg.report_region_buckets_tick_interval = ReadableDuration::ZERO;
cfg.check_long_uncommitted_interval = ReadableDuration::ZERO;
Expand Down
30 changes: 30 additions & 0 deletions components/raftstore/src/store/config.rs
Expand Up @@ -118,6 +118,7 @@ pub struct Config {
pub region_compact_tombstones_percent: u64,
pub pd_heartbeat_tick_interval: ReadableDuration,
pub pd_store_heartbeat_tick_interval: ReadableDuration,
pub pd_report_min_resolved_ts_interval: ReadableDuration,
pub snap_mgr_gc_tick_interval: ReadableDuration,
pub snap_gc_timeout: ReadableDuration,
pub lock_cf_compact_interval: ReadableDuration,
Expand Down Expand Up @@ -316,8 +317,17 @@ pub struct Config {
// Interval to inspect the latency of raftstore for slow store detection.
pub inspect_interval: ReadableDuration,

<<<<<<< HEAD
// Interval to report min resolved ts, if it is zero, it means disabled.
pub report_min_resolved_ts_interval: ReadableDuration,
=======
// The unsensitive(increase it to reduce sensitiveness) of the cause-trend detection
pub slow_trend_unsensitive_cause: f64,
// The unsensitive(increase it to reduce sensitiveness) of the result-trend detection
pub slow_trend_unsensitive_result: f64,
// The sensitiveness of slowness on network-io.
pub slow_trend_network_io_factor: f64,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))

/// Interval to check whether to reactivate in-memory pessimistic lock after
/// being disabled before transferring leader.
Expand Down Expand Up @@ -381,6 +391,15 @@ impl Default for Config {
region_compact_tombstones_percent: 30,
pd_heartbeat_tick_interval: ReadableDuration::minutes(1),
pd_store_heartbeat_tick_interval: ReadableDuration::secs(10),
<<<<<<< HEAD
=======
pd_report_min_resolved_ts_interval: ReadableDuration::secs(1),
// Disable periodic full compaction by default.
periodic_full_compact_start_times: ReadableSchedule::default(),
// If periodic full compaction is enabled, do not start a full compaction
// if the CPU utilization is over 10%.
periodic_full_compact_start_max_cpu: 0.1,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
notify_capacity: 40960,
snap_mgr_gc_tick_interval: ReadableDuration::minutes(1),
snap_gc_timeout: ReadableDuration::hours(4),
Expand Down Expand Up @@ -449,7 +468,15 @@ impl Default for Config {
region_split_size: ReadableSize(0),
clean_stale_peer_delay: ReadableDuration::minutes(0),
inspect_interval: ReadableDuration::millis(500),
<<<<<<< HEAD
report_min_resolved_ts_interval: ReadableDuration::secs(1),
=======
// The param `slow_trend_unsensitive_cause == 2.0` can yield good results,
// make it `10.0` to reduce a bit sensitiveness because SpikeFilter is disabled
slow_trend_unsensitive_cause: 10.0,
slow_trend_unsensitive_result: 0.5,
slow_trend_network_io_factor: 0.0,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
check_leader_lease_interval: ReadableDuration::secs(0),
renew_leader_lease_advance_duration: ReadableDuration::secs(0),
allow_unsafe_vote_after_start: false,
Expand Down Expand Up @@ -892,6 +919,9 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["pd_store_heartbeat_tick_interval"])
.set(self.pd_store_heartbeat_tick_interval.as_secs_f64());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["pd_report_min_resolved_ts_interval"])
.set(self.pd_report_min_resolved_ts_interval.as_secs_f64());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["snap_mgr_gc_tick_interval"])
.set(self.snap_mgr_gc_tick_interval.as_secs_f64());
Expand Down
37 changes: 33 additions & 4 deletions components/raftstore/src/store/fsm/store.rs
Expand Up @@ -726,6 +726,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
StoreTick::CompactCheck => self.on_compact_check_tick(),
StoreTick::ConsistencyCheck => self.on_consistency_check_tick(),
StoreTick::CleanupImportSst => self.on_cleanup_import_sst_tick(),
StoreTick::PdReportMinResolvedTs => self.on_pd_report_min_resolved_ts_tick(),
}
let elapsed = timer.saturating_elapsed();
self.ctx
Expand Down Expand Up @@ -815,6 +816,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.register_cleanup_import_sst_tick();
self.register_compact_check_tick();
self.register_pd_store_heartbeat_tick();
self.register_pd_report_min_resolved_ts_tick();
self.register_compact_lock_cf_tick();
self.register_snap_mgr_gc_tick();
self.register_consistency_check_tick();
Expand Down Expand Up @@ -1626,7 +1628,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
&cfg,
)?;

let region_read_progress = store_meta.lock().unwrap().region_read_progress.clone();
let mut builder = RaftPollerBuilder {
cfg,
store: meta,
Expand Down Expand Up @@ -1663,7 +1664,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
mgr,
pd_client,
collector_reg_handle,
region_read_progress,
health_service,
causal_ts_provider,
)?;
Expand All @@ -1680,7 +1680,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
snap_mgr: SnapManager,
pd_client: Arc<C>,
collector_reg_handle: CollectorRegHandle,
region_read_progress: RegionReadProgressRegistry,
health_service: Option<HealthService>,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
) -> Result<()> {
Expand Down Expand Up @@ -1761,7 +1760,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
snap_mgr,
workers.pd_worker.remote(),
collector_reg_handle,
region_read_progress,
health_service,
coprocessor_host,
causal_ts_provider,
Expand Down Expand Up @@ -2428,6 +2426,25 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
}

fn report_min_resolved_ts(&self) {
let read_progress = {
let meta = self.ctx.store_meta.lock().unwrap();
meta.region_read_progress().clone()
};
let min_resolved_ts = read_progress.get_min_resolved_ts();

let task = PdTask::ReportMinResolvedTs {
store_id: self.fsm.store.id,
min_resolved_ts,
};
if let Err(e) = self.ctx.pd_scheduler.schedule(task) {
error!("failed to send min resolved ts to pd worker";
"store_id" => self.fsm.store.id,
"err" => ?e
);
}
}

fn store_heartbeat_pd(&mut self, report: Option<pdpb::StoreReport>) {
let mut stats = StoreStats::default();

Expand Down Expand Up @@ -2532,6 +2549,11 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
self.register_pd_store_heartbeat_tick();
}

fn on_pd_report_min_resolved_ts_tick(&mut self) {
self.report_min_resolved_ts();
self.register_pd_report_min_resolved_ts_tick();
}

fn on_snap_mgr_gc(&mut self) {
// refresh multi_snapshot_files enable flag
self.ctx.snap_mgr.set_enable_multi_snapshot_files(
Expand Down Expand Up @@ -2635,6 +2657,13 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
);
}

fn register_pd_report_min_resolved_ts_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::PdReportMinResolvedTs,
self.ctx.cfg.pd_report_min_resolved_ts_interval.0,
);
}

fn register_snap_mgr_gc_tick(&self) {
self.ctx
.schedule_store_tick(StoreTick::SnapGc, self.ctx.cfg.snap_mgr_gc_tick_interval.0)
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/metrics.rs
Expand Up @@ -211,6 +211,7 @@ make_static_metric! {
pub label_enum RaftEventDurationType {
compact_check,
pd_store_heartbeat,
pd_report_min_resolved_ts,
snap_gc,
compact_lock_cf,
consistency_check,
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore/src/store/msg.rs
Expand Up @@ -425,6 +425,7 @@ pub enum StoreTick {
CompactLockCf,
ConsistencyCheck,
CleanupImportSst,
PdReportMinResolvedTs,
}

impl StoreTick {
Expand All @@ -437,6 +438,11 @@ impl StoreTick {
StoreTick::CompactLockCf => RaftEventDurationType::compact_lock_cf,
StoreTick::ConsistencyCheck => RaftEventDurationType::consistency_check,
StoreTick::CleanupImportSst => RaftEventDurationType::cleanup_import_sst,
<<<<<<< HEAD
=======
StoreTick::LoadMetricsWindow => RaftEventDurationType::load_metrics_window,
StoreTick::PdReportMinResolvedTs => RaftEventDurationType::pd_report_min_resolved_ts,
>>>>>>> bc1ae30437 (pd_client: support dynamically modifying `min-resolved-ts` report interval and reduce retry times (#15837))
}
}
}
Expand Down

0 comments on commit 7a81481

Please sign in to comment.