Skip to content

Commit

Permalink
raftstore: introduce the CPU-based Load Base Split strategy (#12955)
Browse files Browse the repository at this point in the history
ref #12063, ref #12593, ref #12942

Introduce the CPU-based Load Base Split strategy.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JmPotato and ti-chi-bot committed Jul 6, 2022
1 parent c762224 commit 07e7cd4
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 51 deletions.
106 changes: 84 additions & 22 deletions components/raftstore/src/store/worker/pd.rs
Expand Up @@ -472,6 +472,7 @@ where
handle: Option<JoinHandle<()>>,
timer: Option<Sender<bool>>,
read_stats_sender: Option<Sender<ReadStats>>,
cpu_stats_sender: Option<Sender<Arc<RawRecords>>>,
collect_store_infos_interval: Duration,
load_base_split_check_interval: Duration,
collect_tick_interval: Duration,
Expand All @@ -493,6 +494,7 @@ where
handle: None,
timer: None,
read_stats_sender: None,
cpu_stats_sender: None,
collect_store_infos_interval: interval,
load_base_split_check_interval: cmp::min(
DEFAULT_LOAD_BASE_SPLIT_CHECK_INTERVAL,
Expand Down Expand Up @@ -537,6 +539,9 @@ where
let (read_stats_sender, read_stats_receiver) = mpsc::channel();
self.read_stats_sender = Some(read_stats_sender);

let (cpu_stats_sender, cpu_stats_receiver) = mpsc::channel();
self.cpu_stats_sender = Some(cpu_stats_sender);

let scheduler = self.scheduler.clone();
let props = tikv_util::thread_group::current_properties();

Expand All @@ -548,17 +553,25 @@ where
.spawn_wrapper(move || {
tikv_util::thread_group::set_properties(props);
tikv_alloc::add_thread_memory_accessor();
let mut thread_stats = ThreadInfoStatistics::new();
// Create different `ThreadInfoStatistics` for different purposes to
// make sure the record won't be disturbed.
let mut collect_store_infos_thread_stats = ThreadInfoStatistics::new();
let mut load_base_split_thread_stats = ThreadInfoStatistics::new();
while let Err(mpsc::RecvTimeoutError::Timeout) =
timer_rx.recv_timeout(tick_interval)
{
if is_enable_tick(timer_cnt, collect_store_infos_interval) {
StatsMonitor::collect_store_infos(&mut thread_stats, &scheduler);
StatsMonitor::collect_store_infos(
&mut collect_store_infos_thread_stats,
&scheduler,
);
}
if is_enable_tick(timer_cnt, load_base_split_check_interval) {
StatsMonitor::load_base_split(
&mut auto_split_controller,
&read_stats_receiver,
&cpu_stats_receiver,
&mut load_base_split_thread_stats,
&scheduler,
);
}
Expand Down Expand Up @@ -602,7 +615,9 @@ where

pub fn load_base_split(
auto_split_controller: &mut AutoSplitController,
receiver: &Receiver<ReadStats>,
read_stats_receiver: &Receiver<ReadStats>,
cpu_stats_receiver: &Receiver<Arc<RawRecords>>,
thread_stats: &mut ThreadInfoStatistics,
scheduler: &Scheduler<Task<EK, ER>>,
) {
let start_time = TiInstant::now();
Expand All @@ -618,11 +633,17 @@ where
}
SplitConfigChange::Noop => {}
}
let mut others = vec![];
while let Ok(other) = receiver.try_recv() {
others.push(other);
let mut read_stats_vec = vec![];
while let Ok(read_stats) = read_stats_receiver.try_recv() {
read_stats_vec.push(read_stats);
}
let (top, split_infos) = auto_split_controller.flush(others);
let mut cpu_stats_vec = vec![];
while let Ok(cpu_stats) = cpu_stats_receiver.try_recv() {
cpu_stats_vec.push(cpu_stats);
}
thread_stats.record();
let (top_qps, split_infos) =
auto_split_controller.flush(read_stats_vec, cpu_stats_vec, thread_stats);
auto_split_controller.clear();
let task = Task::AutoSplit { split_infos };
if let Err(e) = scheduler.schedule(task) {
Expand All @@ -632,10 +653,10 @@ where
);
}
for i in 0..TOP_N {
if i < top.len() {
if i < top_qps.len() {
READ_QPS_TOPN
.with_label_values(&[&i.to_string()])
.set(top[i] as f64);
.set(top_qps[i] as f64);
} else {
READ_QPS_TOPN.with_label_values(&[&i.to_string()]).set(0.0);
}
Expand Down Expand Up @@ -672,15 +693,22 @@ where
if let Some(h) = self.handle.take() {
drop(self.timer.take());
drop(self.read_stats_sender.take());
drop(self.cpu_stats_sender.take());
if let Err(e) = h.join() {
error!("join stats collector failed"; "err" => ?e);
}
}
}

pub fn get_read_stats_sender(&self) -> &Option<Sender<ReadStats>> {
#[inline(always)]
fn get_read_stats_sender(&self) -> &Option<Sender<ReadStats>> {
&self.read_stats_sender
}

#[inline(always)]
fn get_cpu_stats_sender(&self) -> &Option<Sender<Arc<RawRecords>>> {
&self.cpu_stats_sender
}
}

const HOTSPOT_KEY_RATE_THRESHOLD: u64 = 128;
Expand Down Expand Up @@ -1684,6 +1712,12 @@ where
// which is the read load portion of the write path.
// TODO: more accurate CPU consumption of a specified region.
fn handle_region_cpu_records(&mut self, records: Arc<RawRecords>) {
// Send Region CPU info to AutoSplitController inside the stats_monitor.
if let Some(cpu_stats_sender) = self.stats_monitor.get_cpu_stats_sender() {
if cpu_stats_sender.send(records.clone()).is_err() {
warn!("send region cpu info failed, are we shutting down?")
}
}
calculate_region_cpu_records(self.store_id, records, &mut self.region_cpu_records);
}

Expand Down Expand Up @@ -1831,18 +1865,46 @@ where
if let Ok(Some(region)) =
pd_client.get_region_by_id(split_info.region_id).await
{
Self::handle_ask_batch_split(
router.clone(),
scheduler.clone(),
pd_client.clone(),
region,
vec![split_info.split_key],
split_info.peer,
true,
Callback::None,
String::from("auto_split"),
remote.clone(),
);
// Try to split the region with the given split key.
if let Some(split_key) = split_info.split_key {
Self::handle_ask_batch_split(
router.clone(),
scheduler.clone(),
pd_client.clone(),
region,
vec![split_key],
split_info.peer,
true,
Callback::None,
String::from("auto_split"),
remote.clone(),
);
return;
}
// Try to split the region on half within the given key range
// if there is no `split_key` been given.
if split_info.start_key.is_some() && split_info.end_key.is_some() {
let start_key = split_info.start_key.unwrap();
let end_key = split_info.end_key.unwrap();
let region_id = region.get_id();
let msg = CasualMessage::HalfSplitRegion {
region_epoch: region.get_region_epoch().clone(),
start_key: Some(start_key.clone()),
end_key: Some(end_key.clone()),
policy: pdpb::CheckPolicy::Scan,
source: "auto_split",
cb: Callback::None,
};
if let Err(e) = router.send(region_id, PeerMsg::CasualMessage(msg))
{
error!("send auto half split request failed";
"region_id" => region_id,
"start_key" => log_wrappers::Value::key(&start_key),
"end_key" => log_wrappers::Value::key(&end_key),
"err" => ?e,
);
}
}
}
}
};
Expand Down

0 comments on commit 07e7cd4

Please sign in to comment.