Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raftstore-v2: support split operator #14199

Merged
merged 4 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
.peer_mut()
.on_request_split(self.store_ctx, request, ch)
}
PeerMsg::RequestHalfSplit { request, ch } => self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it's used by test purpose only. It would be better to make it test only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not for test purpose. The request is lanuched by pd-ctl.

.fsm
.peer_mut()
.on_request_half_split(self.store_ctx, request, ch),
PeerMsg::UpdateRegionSize { size } => {
self.fsm.peer_mut().on_update_region_size(size)
}
Expand Down
4 changes: 2 additions & 2 deletions components/raftstore-v2/src/operation/command/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use raftstore::store::{cmd_resp, fsm::apply, msg::ErrorCallback};
use slog::info;
use split::SplitResult;
pub use split::{
report_split_init_finish, temp_split_path, RequestSplit, SplitFlowControl, SplitInit,
SPLIT_PREFIX,
report_split_init_finish, temp_split_path, RequestHalfSplit, RequestSplit, SplitFlowControl,
SplitInit, SPLIT_PREFIX,
};
use tikv_util::{box_err, log::SlogFormat};
use txn_types::WriteBatchFlags;
Expand Down
60 changes: 59 additions & 1 deletion components/raftstore-v2/src/operation/command/admin/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use raftstore::{
},
Result,
};
use slog::{error, info};
use slog::{error, info, warn};
use tikv_util::{log::SlogFormat, slog_panic};

use crate::{
Expand Down Expand Up @@ -149,6 +149,15 @@ pub struct RequestSplit {
pub source: Cow<'static, str>,
}

#[derive(Debug)]
pub struct RequestHalfSplit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's test only, it would be better to make it test only. Check other example for test-only structures (search testexport)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

pub epoch: RegionEpoch,
pub start_key: Option<Vec<u8>>,
pub end_key: Option<Vec<u8>>,
pub policy: CheckPolicy,
pub source: Cow<'static, str>,
}

#[derive(Default, Debug)]
pub struct SplitFlowControl {
size_diff_hint: i64,
Expand Down Expand Up @@ -280,6 +289,55 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
self.ask_batch_split_pd(ctx, rs.split_keys, ch);
}

pub fn on_request_half_split<T>(
&mut self,
ctx: &mut StoreContext<EK, ER, T>,
rhs: RequestHalfSplit,
_ch: CmdResChannel,
) {
let is_key_range = rhs.start_key.is_some() && rhs.end_key.is_some();
info!(
self.logger,
"on half split";
"is_key_range" => is_key_range,
"policy" => ?rhs.policy,
"source" => ?rhs.source,
);
if !self.is_leader() {
// region on this store is no longer leader, skipped.
info!(self.logger, "not leader, skip.");
return;
}

let region = self.region();
if util::is_epoch_stale(&rhs.epoch, region.get_region_epoch()) {
warn!(
self.logger,
"receive a stale halfsplit message";
"is_key_range" => is_key_range,
);
return;
}

let task = SplitCheckTask::split_check_key_range(
region.clone(),
rhs.start_key,
rhs.end_key,
false,
rhs.policy,
// todo: bucket range
None,
);
if let Err(e) = ctx.schedulers.split_check.schedule(task) {
error!(
self.logger,
"failed to schedule split check";
"is_key_range" => is_key_range,
"err" => %e,
);
}
}

pub fn propose_split<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
Expand Down
4 changes: 2 additions & 2 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ mod control;
mod write;

pub use admin::{
report_split_init_finish, temp_split_path, AdminCmdResult, CompactLogContext, RequestSplit,
SplitFlowControl, SplitInit, SPLIT_PREFIX,
report_split_init_finish, temp_split_path, AdminCmdResult, CompactLogContext, RequestHalfSplit,
RequestSplit, SplitFlowControl, SplitInit, SPLIT_PREFIX,
};
pub use control::ProposalControl;
pub use write::{
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod txn_ext;

pub use command::{
AdminCmdResult, ApplyFlowControl, CommittedEntries, CompactLogContext, ProposalControl,
RequestSplit, SimpleWriteBinary, SimpleWriteEncoder, SimpleWriteReqDecoder,
RequestHalfSplit, RequestSplit, SimpleWriteBinary, SimpleWriteEncoder, SimpleWriteReqDecoder,
SimpleWriteReqEncoder, SplitFlowControl, SPLIT_PREFIX,
};
pub use life::{DestroyProgress, GcPeerContext};
Expand Down
6 changes: 5 additions & 1 deletion components/raftstore-v2/src/router/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::{
},
ApplyRes,
};
use crate::operation::{RequestSplit, SimpleWriteBinary, SplitInit};
use crate::operation::{RequestHalfSplit, RequestSplit, SimpleWriteBinary, SplitInit};

#[derive(Debug, Clone, Copy, PartialEq, Hash)]
#[repr(u8)]
Expand Down Expand Up @@ -185,6 +185,10 @@ pub enum PeerMsg {
request: RequestSplit,
ch: CmdResChannel,
},
RequestHalfSplit {
request: RequestHalfSplit,
ch: CmdResChannel,
},
UpdateRegionSize {
size: u64,
},
Expand Down
49 changes: 46 additions & 3 deletions components/raftstore-v2/src/worker/pd/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use pd_client::{
};
use raftstore::store::{ReadStats, WriteStats};
use resource_metering::RawRecords;
use slog::{debug, info};
use slog::{debug, error, info};
use tikv_util::{store::QueryStats, time::UnixSecs};

use super::{requests::*, Runner};
use crate::{
operation::{RequestHalfSplit, RequestSplit},
router::{CmdResChannel, PeerMsg},
};

pub struct RegionHeartbeatTask {
pub term: u64,
Expand Down Expand Up @@ -276,8 +280,47 @@ where
);
send_admin_request(&logger, &router, region_id, epoch, peer, req, None);
} else if resp.has_split_region() {
// TODO
info!(logger, "pd asks for split but ignored");
PD_HEARTBEAT_COUNTER_VEC
.with_label_values(&["split region"])
.inc();

let mut split_region = resp.take_split_region();
info!(
logger,
"try to split";
"region_id" => region_id,
"region_epoch" => ?epoch,
);

let (ch, _) = CmdResChannel::pair();
let msg = if split_region.get_policy() == pdpb::CheckPolicy::Usekey {
PeerMsg::RequestSplit {
request: RequestSplit {
epoch,
split_keys: split_region.take_keys().into(),
source: "pd".into(),
},
ch,
}
} else {
PeerMsg::RequestHalfSplit {
request: RequestHalfSplit {
epoch,
start_key: None,
end_key: None,
policy: split_region.get_policy(),
source: "pd".into(),
},
ch,
}
};
if let Err(e) = router.send(region_id, msg) {
error!(logger,
"send split request failed";
"region_id" => region_id,
"err" => ?e
);
}
} else if resp.has_merge() {
// TODO
info!(logger, "pd asks for merge but ignored");
Expand Down
38 changes: 18 additions & 20 deletions tests/integrations/raftstore/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ fn check_cluster(cluster: &mut Cluster<impl Simulator>, k: &[u8], v: &[u8], all_
/// sure broadcast commit is disabled when split.
#[test]
fn test_delay_split_region() {
let mut cluster = test_raftstore::new_server_cluster(0, 3);
let mut cluster = new_server_cluster(0, 3);
cluster.cfg.raft_store.raft_log_gc_count_limit = Some(500);
cluster.cfg.raft_store.merge_max_log_gap = 100;
cluster.cfg.raft_store.raft_log_gc_threshold = 500;
Expand Down Expand Up @@ -411,9 +411,7 @@ fn test_node_split_overlap_snapshot() {
must_get_equal(&engine3, b"k3", b"v3");
}

fn test_apply_new_version_snapshot<T: test_raftstore::Simulator>(
cluster: &mut test_raftstore::Cluster<T>,
) {
fn test_apply_new_version_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
// truncate the log quickly so that we can force sending snapshot.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
cluster.cfg.raft_store.raft_log_gc_count_limit = Some(5);
Expand Down Expand Up @@ -468,19 +466,19 @@ fn test_apply_new_version_snapshot<T: test_raftstore::Simulator>(

#[test]
fn test_node_apply_new_version_snapshot() {
let mut cluster = test_raftstore::new_node_cluster(0, 3);
let mut cluster = new_node_cluster(0, 3);
test_apply_new_version_snapshot(&mut cluster);
}

#[test]
fn test_server_apply_new_version_snapshot() {
let mut cluster = test_raftstore::new_server_cluster(0, 3);
let mut cluster = new_server_cluster(0, 3);
test_apply_new_version_snapshot(&mut cluster);
}

#[test]
fn test_server_split_with_stale_peer() {
let mut cluster = test_raftstore::new_server_cluster(0, 3);
let mut cluster = new_server_cluster(0, 3);
// disable raft log gc.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::secs(60);
cluster.cfg.raft_store.peer_stale_state_check_interval = ReadableDuration::millis(500);
Expand Down Expand Up @@ -604,7 +602,7 @@ fn test_split_region_diff_check() {
#[test]
fn test_node_split_region_after_reboot_with_config_change() {
let count = 1;
let mut cluster = test_raftstore::new_server_cluster(0, count);
let mut cluster = new_server_cluster(0, count);
let region_max_size = 2000;
let region_split_size = 2000;
cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(50);
Expand Down Expand Up @@ -645,10 +643,7 @@ fn test_node_split_region_after_reboot_with_config_change() {
}
}

fn test_split_epoch_not_match<T: test_raftstore::Simulator>(
cluster: &mut test_raftstore::Cluster<T>,
right_derive: bool,
) {
fn test_split_epoch_not_match<T: Simulator>(cluster: &mut Cluster<T>, right_derive: bool) {
cluster.cfg.raft_store.right_derive_when_split = right_derive;
cluster.run();
let pd_client = Arc::clone(&cluster.pd_client);
Expand Down Expand Up @@ -720,25 +715,25 @@ fn test_split_epoch_not_match<T: test_raftstore::Simulator>(

#[test]
fn test_server_split_epoch_not_match_left_derive() {
let mut cluster = test_raftstore::new_server_cluster(0, 3);
let mut cluster = new_server_cluster(0, 3);
test_split_epoch_not_match(&mut cluster, false);
}

#[test]
fn test_server_split_epoch_not_match_right_derive() {
let mut cluster = test_raftstore::new_server_cluster(0, 3);
let mut cluster = new_server_cluster(0, 3);
test_split_epoch_not_match(&mut cluster, true);
}

#[test]
fn test_node_split_epoch_not_match_left_derive() {
let mut cluster = test_raftstore::new_node_cluster(0, 3);
let mut cluster = new_node_cluster(0, 3);
test_split_epoch_not_match(&mut cluster, false);
}

#[test]
fn test_node_split_epoch_not_match_right_derive() {
let mut cluster = test_raftstore::new_node_cluster(0, 3);
let mut cluster = new_node_cluster(0, 3);
test_split_epoch_not_match(&mut cluster, true);
}

Expand Down Expand Up @@ -780,10 +775,13 @@ fn test_node_quick_election_after_split() {
assert!(new_leader.is_some());
}

#[test]
#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore::new_server_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
#[test_case(test_raftstore_v2::new_server_cluster)]
fn test_node_split_region() {
let count = 5;
let mut cluster = test_raftstore::new_node_cluster(0, count);
let mut cluster = new_cluster(0, count);
// length of each key+value
let item_len = 74;
// make bucket's size to item_len, which means one row one bucket
Expand Down Expand Up @@ -988,7 +986,7 @@ fn test_split_with_in_memory_pessimistic_locks() {
#[test]
fn test_refresh_region_bucket_keys() {
let count = 5;
let mut cluster = test_raftstore::new_server_cluster(0, count);
let mut cluster = new_server_cluster(0, count);
cluster.run();
let pd_client = Arc::clone(&cluster.pd_client);

Expand Down Expand Up @@ -1174,7 +1172,7 @@ fn test_refresh_region_bucket_keys() {
#[test]
fn test_gen_split_check_bucket_ranges() {
let count = 5;
let mut cluster = test_raftstore::new_server_cluster(0, count);
let mut cluster = new_server_cluster(0, count);
cluster.cfg.coprocessor.region_bucket_size = ReadableSize(5);
cluster.cfg.coprocessor.enable_region_bucket = true;
// disable report buckets; as it will reset the user traffic stats to randomize
Expand Down