Skip to content

Commit

Permalink
pd_client: support dynamically modifying min-resolved-ts report int…
Browse files Browse the repository at this point in the history
…erval and reduce retry times (#15837)

ref #15184

- The min-resolved-ts will report periodically, no need to do retires
- support dynamic change `min-resolved-ts` report interval

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] committed Nov 24, 2023
1 parent b23787c commit bc1ae30
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 74 deletions.
4 changes: 1 addition & 3 deletions components/pd_client/src/client.rs
Expand Up @@ -1098,9 +1098,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_region_buckets(&self, bucket_stat: &BucketStat, period: Duration) -> PdFuture<()> {
Expand Down
1 change: 0 additions & 1 deletion components/raftstore-v2/src/batch/store.rs
Expand Up @@ -811,7 +811,6 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
causal_ts_provider,
workers.pd.scheduler(),
auto_split_controller,
store_meta.lock().unwrap().region_read_progress.clone(),
collector_reg_handle,
grpc_service_mgr,
self.logger.clone(),
Expand Down
15 changes: 4 additions & 11 deletions components/raftstore-v2/src/worker/pd/mod.rs
Expand Up @@ -14,9 +14,9 @@ use pd_client::{BucketStat, PdClient};
use raftstore::store::{
metrics::STORE_INSPECT_DURATION_HISTOGRAM,
util::{KeysInfoFormatter, LatencyInspector, RaftstoreDuration},
AutoSplitController, Config, FlowStatsReporter, PdStatsMonitor, ReadStats,
RegionReadProgressRegistry, SplitInfo, StoreStatsReporter, TabletSnapManager, TxnExt,
WriteStats, NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
AutoSplitController, Config, FlowStatsReporter, PdStatsMonitor, ReadStats, SplitInfo,
StoreStatsReporter, TabletSnapManager, TxnExt, WriteStats,
NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
};
use resource_metering::{Collector, CollectorRegHandle, RawRecords};
use service::service_manager::GrpcServiceManager;
Expand Down Expand Up @@ -245,7 +245,6 @@ where
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
pd_scheduler: Scheduler<Task>,
auto_split_controller: AutoSplitController,
region_read_progress: RegionReadProgressRegistry,
collector_reg_handle: CollectorRegHandle,
grpc_service_manager: GrpcServiceManager,
logger: Logger,
Expand All @@ -255,16 +254,10 @@ where
let store_heartbeat_interval = cfg.value().pd_store_heartbeat_tick_interval.0;
let mut stats_monitor = PdStatsMonitor::new(
store_heartbeat_interval / NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
cfg.value().report_min_resolved_ts_interval.0,
cfg.value().inspect_interval.0,
PdReporter::new(pd_scheduler, logger.clone()),
);
stats_monitor.start(
auto_split_controller,
region_read_progress,
collector_reg_handle,
store_id,
)?;
stats_monitor.start(auto_split_controller, collector_reg_handle)?;
let slowness_stats = slowness::SlownessStatistics::new(&cfg.value());
Ok(Self {
store_id,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/tests/integrations/cluster.rs
Expand Up @@ -515,6 +515,7 @@ pub fn disable_all_auto_ticks(cfg: &mut Config) {
cfg.region_compact_check_interval = ReadableDuration::ZERO;
cfg.pd_heartbeat_tick_interval = ReadableDuration::ZERO;
cfg.pd_store_heartbeat_tick_interval = ReadableDuration::ZERO;
cfg.pd_report_min_resolved_ts_interval = ReadableDuration::ZERO;
cfg.snap_mgr_gc_tick_interval = ReadableDuration::ZERO;
cfg.lock_cf_compact_interval = ReadableDuration::ZERO;
cfg.peer_stale_state_check_interval = ReadableDuration::ZERO;
Expand All @@ -524,7 +525,6 @@ pub fn disable_all_auto_ticks(cfg: &mut Config) {
cfg.merge_check_tick_interval = ReadableDuration::ZERO;
cfg.cleanup_import_sst_interval = ReadableDuration::ZERO;
cfg.inspect_interval = ReadableDuration::ZERO;
cfg.report_min_resolved_ts_interval = ReadableDuration::ZERO;
cfg.reactive_memory_lock_tick_interval = ReadableDuration::ZERO;
cfg.report_region_buckets_tick_interval = ReadableDuration::ZERO;
cfg.check_long_uncommitted_interval = ReadableDuration::ZERO;
Expand Down
9 changes: 5 additions & 4 deletions components/raftstore/src/store/config.rs
Expand Up @@ -142,6 +142,7 @@ pub struct Config {
pub region_compact_redundant_rows_percent: Option<u64>,
pub pd_heartbeat_tick_interval: ReadableDuration,
pub pd_store_heartbeat_tick_interval: ReadableDuration,
pub pd_report_min_resolved_ts_interval: ReadableDuration,
pub snap_mgr_gc_tick_interval: ReadableDuration,
pub snap_gc_timeout: ReadableDuration,
/// The duration of snapshot waits for region split. It prevents leader from
Expand Down Expand Up @@ -360,9 +361,6 @@ pub struct Config {
// The sensitiveness of slowness on network-io.
pub slow_trend_network_io_factor: f64,

// Interval to report min resolved ts, if it is zero, it means disabled.
pub report_min_resolved_ts_interval: ReadableDuration,

/// Interval to check whether to reactivate in-memory pessimistic lock after
/// being disabled before transferring leader.
pub reactive_memory_lock_tick_interval: ReadableDuration,
Expand Down Expand Up @@ -445,6 +443,7 @@ impl Default for Config {
region_compact_redundant_rows_percent: Some(20),
pd_heartbeat_tick_interval: ReadableDuration::minutes(1),
pd_store_heartbeat_tick_interval: ReadableDuration::secs(10),
pd_report_min_resolved_ts_interval: ReadableDuration::secs(1),
// Disable periodic full compaction by default.
periodic_full_compact_start_times: ReadableSchedule::default(),
// If periodic full compaction is enabled, do not start a full compaction
Expand Down Expand Up @@ -524,7 +523,6 @@ impl Default for Config {
slow_trend_unsensitive_cause: 10.0,
slow_trend_unsensitive_result: 0.5,
slow_trend_network_io_factor: 0.0,
report_min_resolved_ts_interval: ReadableDuration::secs(1),
check_leader_lease_interval: ReadableDuration::secs(0),
renew_leader_lease_advance_duration: ReadableDuration::secs(0),
allow_unsafe_vote_after_start: false,
Expand Down Expand Up @@ -1042,6 +1040,9 @@ impl Config {
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["pd_store_heartbeat_tick_interval"])
.set(self.pd_store_heartbeat_tick_interval.as_secs_f64());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["pd_report_min_resolved_ts_interval"])
.set(self.pd_report_min_resolved_ts_interval.as_secs_f64());
CONFIG_RAFTSTORE_GAUGE
.with_label_values(&["snap_mgr_gc_tick_interval"])
.set(self.snap_mgr_gc_tick_interval.as_secs_f64());
Expand Down
37 changes: 33 additions & 4 deletions components/raftstore/src/store/fsm/store.rs
Expand Up @@ -789,6 +789,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
StoreTick::LoadMetricsWindow => self.on_load_metrics_window_tick(),
StoreTick::ConsistencyCheck => self.on_consistency_check_tick(),
StoreTick::CleanupImportSst => self.on_cleanup_import_sst_tick(),
StoreTick::PdReportMinResolvedTs => self.on_pd_report_min_resolved_ts_tick(),
}
let elapsed = timer.saturating_elapsed();
self.ctx
Expand Down Expand Up @@ -883,6 +884,7 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.register_full_compact_tick();
self.register_load_metrics_window_tick();
self.register_pd_store_heartbeat_tick();
self.register_pd_report_min_resolved_ts_tick();
self.register_compact_lock_cf_tick();
self.register_snap_mgr_gc_tick();
self.register_consistency_check_tick();
Expand Down Expand Up @@ -1702,7 +1704,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
&cfg,
)?;

let region_read_progress = store_meta.lock().unwrap().region_read_progress.clone();
let mut builder = RaftPollerBuilder {
cfg,
store: meta,
Expand Down Expand Up @@ -1739,7 +1740,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
mgr,
pd_client,
collector_reg_handle,
region_read_progress,
health_service,
causal_ts_provider,
snap_generator_pool,
Expand All @@ -1758,7 +1758,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
snap_mgr: SnapManager,
pd_client: Arc<C>,
collector_reg_handle: CollectorRegHandle,
region_read_progress: RegionReadProgressRegistry,
health_service: Option<HealthService>,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
snap_generator_pool: FuturePool,
Expand Down Expand Up @@ -1850,7 +1849,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
snap_mgr,
workers.pd_worker.remote(),
collector_reg_handle,
region_read_progress,
health_service,
coprocessor_host,
causal_ts_provider,
Expand Down Expand Up @@ -2678,6 +2676,25 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
}

fn report_min_resolved_ts(&self) {
let read_progress = {
let meta = self.ctx.store_meta.lock().unwrap();
meta.region_read_progress().clone()
};
let min_resolved_ts = read_progress.get_min_resolved_ts();

let task = PdTask::ReportMinResolvedTs {
store_id: self.fsm.store.id,
min_resolved_ts,
};
if let Err(e) = self.ctx.pd_scheduler.schedule(task) {
error!("failed to send min resolved ts to pd worker";
"store_id" => self.fsm.store.id,
"err" => ?e
);
}
}

fn store_heartbeat_pd(&mut self, report: Option<pdpb::StoreReport>) {
let mut stats = StoreStats::default();

Expand Down Expand Up @@ -2784,6 +2801,11 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
self.register_pd_store_heartbeat_tick();
}

fn on_pd_report_min_resolved_ts_tick(&mut self) {
self.report_min_resolved_ts();
self.register_pd_report_min_resolved_ts_tick();
}

fn on_snap_mgr_gc(&mut self) {
// refresh multi_snapshot_files enable flag
self.ctx.snap_mgr.set_enable_multi_snapshot_files(
Expand Down Expand Up @@ -2888,6 +2910,13 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
);
}

fn register_pd_report_min_resolved_ts_tick(&self) {
self.ctx.schedule_store_tick(
StoreTick::PdReportMinResolvedTs,
self.ctx.cfg.pd_report_min_resolved_ts_interval.0,
);
}

fn register_snap_mgr_gc_tick(&self) {
self.ctx
.schedule_store_tick(StoreTick::SnapGc, self.ctx.cfg.snap_mgr_gc_tick_interval.0)
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/metrics.rs
Expand Up @@ -217,6 +217,7 @@ make_static_metric! {
periodic_full_compact,
load_metrics_window,
pd_store_heartbeat,
pd_report_min_resolved_ts,
snap_gc,
compact_lock_cf,
consistency_check,
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/msg.rs
Expand Up @@ -442,6 +442,7 @@ pub enum StoreTick {
CompactLockCf,
ConsistencyCheck,
CleanupImportSst,
PdReportMinResolvedTs,
}

impl StoreTick {
Expand All @@ -456,6 +457,7 @@ impl StoreTick {
StoreTick::ConsistencyCheck => RaftEventDurationType::consistency_check,
StoreTick::CleanupImportSst => RaftEventDurationType::cleanup_import_sst,
StoreTick::LoadMetricsWindow => RaftEventDurationType::load_metrics_window,
StoreTick::PdReportMinResolvedTs => RaftEventDurationType::pd_report_min_resolved_ts,
}
}
}
Expand Down
48 changes: 3 additions & 45 deletions components/raftstore/src/store/worker/pd.rs
Expand Up @@ -68,7 +68,7 @@ use crate::{
AutoSplitController, ReadStats, SplitConfigChange, WriteStats,
},
Callback, CasualMessage, Config, PeerMsg, RaftCmdExtraOpts, RaftCommand, RaftRouter,
RegionReadProgressRegistry, SnapManager, StoreInfo, StoreMsg, TxnExt,
SnapManager, StoreInfo, StoreMsg, TxnExt,
},
};

Expand Down Expand Up @@ -450,16 +450,6 @@ fn default_collect_tick_interval() -> Duration {
DEFAULT_COLLECT_TICK_INTERVAL
}

fn config(interval: Duration) -> Duration {
fail_point!("mock_min_resolved_ts_interval", |_| {
Duration::from_millis(50)
});
fail_point!("mock_min_resolved_ts_interval_disable", |_| {
Duration::from_millis(0)
});
interval
}

#[inline]
fn convert_record_pairs(m: HashMap<String, u64>) -> RecordPairVec {
m.into_iter()
Expand Down Expand Up @@ -562,20 +552,14 @@ where
collect_store_infos_interval: Duration,
load_base_split_check_interval: Duration,
collect_tick_interval: Duration,
report_min_resolved_ts_interval: Duration,
inspect_latency_interval: Duration,
}

impl<T> StatsMonitor<T>
where
T: StoreStatsReporter,
{
pub fn new(
interval: Duration,
report_min_resolved_ts_interval: Duration,
inspect_latency_interval: Duration,
reporter: T,
) -> Self {
pub fn new(interval: Duration, inspect_latency_interval: Duration, reporter: T) -> Self {
StatsMonitor {
reporter,
handle: None,
Expand All @@ -587,7 +571,6 @@ where
DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL,
interval,
),
report_min_resolved_ts_interval: config(report_min_resolved_ts_interval),
// Use `inspect_latency_interval` as the minimal limitation for collecting tick.
collect_tick_interval: cmp::min(
inspect_latency_interval,
Expand All @@ -602,9 +585,7 @@ where
pub fn start(
&mut self,
mut auto_split_controller: AutoSplitController,
region_read_progress: RegionReadProgressRegistry,
collector_reg_handle: CollectorRegHandle,
store_id: u64,
) -> Result<(), io::Error> {
if self.collect_tick_interval
< cmp::min(
Expand All @@ -625,9 +606,6 @@ where
let load_base_split_check_interval = self
.load_base_split_check_interval
.div_duration_f64(tick_interval) as u64;
let report_min_resolved_ts_interval = self
.report_min_resolved_ts_interval
.div_duration_f64(tick_interval) as u64;
let update_latency_stats_interval = self
.inspect_latency_interval
.div_duration_f64(tick_interval) as u64;
Expand Down Expand Up @@ -686,12 +664,6 @@ where
&mut region_cpu_records_collector,
);
}
if is_enable_tick(timer_cnt, report_min_resolved_ts_interval) {
reporter.report_min_resolved_ts(
store_id,
region_read_progress.get_min_resolved_ts(),
);
}
if is_enable_tick(timer_cnt, update_latency_stats_interval) {
reporter.update_latency_stats(timer_cnt);
}
Expand Down Expand Up @@ -1050,7 +1022,6 @@ where
snap_mgr: SnapManager,
remote: Remote<yatp::task::future::TaskCell>,
collector_reg_handle: CollectorRegHandle,
region_read_progress: RegionReadProgressRegistry,
health_service: Option<HealthService>,
coprocessor_host: CoprocessorHost<EK>,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
Expand All @@ -1060,16 +1031,10 @@ where
let interval = store_heartbeat_interval / NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT;
let mut stats_monitor = StatsMonitor::new(
interval,
cfg.report_min_resolved_ts_interval.0,
cfg.inspect_interval.0,
WrappedScheduler(scheduler.clone()),
);
if let Err(e) = stats_monitor.start(
auto_split_controller,
region_read_progress,
collector_reg_handle,
store_id,
) {
if let Err(e) = stats_monitor.start(auto_split_controller, collector_reg_handle) {
error!("failed to start stats collector, error = {:?}", e);
}

Expand Down Expand Up @@ -2686,8 +2651,6 @@ mod tests {
use engine_test::{kv::KvTestEngine, raft::RaftTestEngine};
use tikv_util::worker::LazyWorker;

use crate::store::fsm::StoreMeta;

struct RunnerTest {
store_stat: Arc<Mutex<StoreStat>>,
stats_monitor: StatsMonitor<WrappedScheduler<KvTestEngine, RaftTestEngine>>,
Expand All @@ -2701,17 +2664,12 @@ mod tests {
) -> RunnerTest {
let mut stats_monitor = StatsMonitor::new(
Duration::from_secs(interval),
Duration::from_secs(0),
Duration::from_secs(interval),
WrappedScheduler(scheduler),
);
let store_meta = Arc::new(Mutex::new(StoreMeta::new(0)));
let region_read_progress = store_meta.lock().unwrap().region_read_progress.clone();
if let Err(e) = stats_monitor.start(
AutoSplitController::default(),
region_read_progress,
CollectorRegHandle::new_for_test(),
1,
) {
error!("failed to start stats collector, error = {:?}", e);
}
Expand Down

0 comments on commit bc1ae30

Please sign in to comment.