Skip to content

Commit

Permalink
[raftstore-v2]: check apply_scheduler before using in on_refresh_regi…
Browse files Browse the repository at this point in the history
…on_buckets (#14526)

close #14506

check apply_scheduler before using it in on_refresh_region_buckets.
This is to solve the race condition when the peer is just created by split meanwhile a refresh bucket is called immediately.

Signed-off-by: tonyxuqqi <tonyxuqi@outlook.com>

Co-authored-by: buffer <doufuxiaowangzi@gmail.com>
  • Loading branch information
tonyxuqqi and bufferflies committed Apr 8, 2023
1 parent 4199ed9 commit abb672b
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 10 deletions.
16 changes: 9 additions & 7 deletions components/raftstore-v2/src/operation/bucket.rs
Expand Up @@ -254,14 +254,16 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
let meta = region_buckets.meta.clone();
self.region_buckets_info_mut()
.set_bucket_stat(Some(region_buckets.clone()));

let mut store_meta = store_ctx.store_meta.lock().unwrap();
if let Some(reader) = store_meta.readers.get_mut(&self.region_id()) {
reader.0.update(ReadProgress::region_buckets(meta));
{
let mut store_meta = store_ctx.store_meta.lock().unwrap();
if let Some(reader) = store_meta.readers.get_mut(&self.region_id()) {
reader.0.update(ReadProgress::region_buckets(meta));
}
}
// it's possible that apply_scheduler is not initialized yet
if let Some(apply_scheduler) = self.apply_scheduler() {
apply_scheduler.send(ApplyTask::RefreshBucketStat(region_buckets.meta.clone()));
}
self.apply_scheduler()
.unwrap()
.send(ApplyTask::RefreshBucketStat(region_buckets.meta.clone()));
}

#[inline]
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Expand Up @@ -148,6 +148,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
.apply_pool
.spawn(async move { apply_fsm.handle_all_tasks().await })
.unwrap();
fail::fail_point!("delay_set_apply_scheduler", |_| {});
self.set_apply_scheduler(apply_scheduler);
}

Expand Down
3 changes: 3 additions & 0 deletions components/raftstore-v2/src/operation/query/mod.rs
Expand Up @@ -401,6 +401,9 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
.raft_log
.term(meta.raft_apply.commit_index)
.unwrap();
if let Some(bucket_stats) = self.region_buckets_info().bucket_stat() {
meta.bucket_keys = bucket_stats.meta.keys.clone();
}
debug!(self.logger, "on query debug info";
"tick" => self.raft_group().raft.election_elapsed,
"election_timeout" => self.raft_group().raft.randomized_election_timeout(),
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/tests/failpoints/mod.rs
Expand Up @@ -10,6 +10,7 @@
mod cluster;
mod test_basic_write;
mod test_bootstrap;
mod test_bucket;
mod test_life;
mod test_merge;
mod test_split;
Expand Down
58 changes: 58 additions & 0 deletions components/raftstore-v2/tests/failpoints/test_bucket.rs
@@ -0,0 +1,58 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use std::time::Duration;

use engine_traits::RaftEngineReadOnly;
use raftstore::store::RAFT_INIT_LOG_INDEX;
use tikv_util::store::new_peer;

use crate::cluster::{split_helper::split_region_and_refresh_bucket, Cluster};

/// Test refresh bucket.
#[test]
fn test_refresh_bucket() {
let mut cluster = Cluster::default();
let store_id = cluster.node(0).id();
let raft_engine = cluster.node(0).running_state().unwrap().raft_engine.clone();
let router = &mut cluster.routers[0];

let region_2 = 2;
let region = router.region_detail(region_2);
let peer = region.get_peers()[0].clone();
router.wait_applied_to_current_term(region_2, Duration::from_secs(3));

// Region 2 ["", ""]
// -> Region 2 ["", "k22"]
// Region 1000 ["k22", ""] peer(1, 10)
let region_state = raft_engine
.get_region_state(region_2, u64::MAX)
.unwrap()
.unwrap();
assert_eq!(region_state.get_tablet_index(), RAFT_INIT_LOG_INDEX);

// to simulate the delay of set_apply_scheduler
fail::cfg("delay_set_apply_scheduler", "sleep(1000)").unwrap();
split_region_and_refresh_bucket(
router,
region,
peer,
1000,
new_peer(store_id, 10),
b"k22",
false,
);

for _i in 1..100 {
std::thread::sleep(Duration::from_millis(50));
let meta = router
.must_query_debug_info(1000, Duration::from_secs(1))
.unwrap();
if !meta.bucket_keys.is_empty() {
assert_eq!(meta.bucket_keys.len(), 4); // include region start/end keys
assert_eq!(meta.bucket_keys[1], b"1".to_vec());
assert_eq!(meta.bucket_keys[2], b"2".to_vec());
return;
}
}
panic!("timeout for updating buckets"); // timeout
}
60 changes: 57 additions & 3 deletions components/raftstore-v2/tests/integrations/cluster.rs
Expand Up @@ -31,11 +31,11 @@ use kvproto::{
use pd_client::RpcClient;
use raft::eraftpb::MessageType;
use raftstore::{
coprocessor::{Config as CopConfig, CoprocessorHost},
coprocessor::{Config as CopConfig, CoprocessorHost, StoreHandle},
store::{
region_meta::{RegionLocalState, RegionMeta},
AutoSplitController, Config, RegionSnapshot, TabletSnapKey, TabletSnapManager, Transport,
RAFT_INIT_LOG_INDEX,
AutoSplitController, Bucket, Config, RegionSnapshot, TabletSnapKey, TabletSnapManager,
Transport, RAFT_INIT_LOG_INDEX,
},
};
use raftstore_v2::{
Expand Down Expand Up @@ -232,6 +232,11 @@ impl TestRouter {
}
region
}

pub fn refresh_bucket(&self, region_id: u64, region_epoch: RegionEpoch, buckets: Vec<Bucket>) {
self.store_router()
.refresh_region_buckets(region_id, region_epoch, buckets, None);
}
}

pub struct RunningState {
Expand Down Expand Up @@ -653,6 +658,7 @@ pub mod split_helper {
metapb, pdpb,
raft_cmdpb::{AdminCmdType, AdminRequest, RaftCmdRequest, RaftCmdResponse, SplitRequest},
};
use raftstore::store::Bucket;
use raftstore_v2::{router::PeerMsg, SimpleWriteEncoder};

use super::TestRouter;
Expand Down Expand Up @@ -760,6 +766,54 @@ pub mod split_helper {

(left, right)
}

// Split the region and refresh bucket immediately
// This is to simulate the case when the splitted peer's storage is not
// initialized yet when refresh bucket happens
pub fn split_region_and_refresh_bucket(
router: &mut TestRouter,
region: metapb::Region,
peer: metapb::Peer,
split_region_id: u64,
split_peer: metapb::Peer,
propose_key: &[u8],
right_derive: bool,
) {
let region_id = region.id;
let mut req = RaftCmdRequest::default();
req.mut_header().set_region_id(region_id);
req.mut_header()
.set_region_epoch(region.get_region_epoch().clone());
req.mut_header().set_peer(peer);

let mut split_id = pdpb::SplitId::new();
split_id.new_region_id = split_region_id;
split_id.new_peer_ids = vec![split_peer.id];
let admin_req = new_batch_split_region_request(
vec![propose_key.to_vec()],
vec![split_id],
right_derive,
);
req.mut_requests().clear();
req.set_admin_request(admin_req);

let (msg, sub) = PeerMsg::admin_command(req);
router.send(region_id, msg).unwrap();
block_on(sub.result()).unwrap();

let meta = router
.must_query_debug_info(split_region_id, Duration::from_secs(1))
.unwrap();
let epoch = &meta.region_state.epoch;
let buckets = vec![Bucket {
keys: vec![b"1".to_vec(), b"2".to_vec()],
size: 100,
}];
let mut region_epoch = kvproto::metapb::RegionEpoch::default();
region_epoch.set_conf_ver(epoch.conf_ver);
region_epoch.set_version(epoch.version);
router.refresh_bucket(split_region_id, region_epoch, buckets);
}
}

pub mod merge_helper {
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/tests/integrations/mod.rs
Expand Up @@ -7,6 +7,7 @@

// TODO: test conflict control in integration tests after split is supported.

#[allow(dead_code)]
mod cluster;
mod test_basic_write;
mod test_conf_change;
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/region_meta.rs
Expand Up @@ -246,6 +246,7 @@ pub struct RegionMeta {
pub raft_status: RaftStatus,
pub raft_apply: RaftApplyState,
pub region_state: RegionLocalState,
pub bucket_keys: Vec<Vec<u8>>,
}

impl RegionMeta {
Expand Down Expand Up @@ -308,6 +309,7 @@ impl RegionMeta {
}),
tablet_index: local_state.get_tablet_index(),
},
bucket_keys: vec![],
}
}
}

0 comments on commit abb672b

Please sign in to comment.