Skip to content

Commit

Permalink
modify parsist state to bool
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Sep 29, 2022
1 parent c29d604 commit 34a281d
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 24 additions & 24 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ use kvproto::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request,
},
raft_serverpb::{
FlashbackState, MergeState, PeerState, RaftApplyState, RaftTruncatedState, RegionLocalState,
},
raft_serverpb::{MergeState, PeerState, RaftApplyState, RaftTruncatedState, RegionLocalState},
};
use pd_client::{new_bucket_stats, BucketMeta, BucketStat};
use prometheus::local::LocalHistogram;
Expand Down Expand Up @@ -258,8 +256,8 @@ pub enum ExecResult<S> {
region: Region,
commit: u64,
},
SetFlashbackState {
state: FlashbackState,
IsInFlashback {
is_in_flashback: bool,
},
ComputeHash {
region: Region,
Expand Down Expand Up @@ -1419,7 +1417,7 @@ where
| ExecResult::DeleteRange { .. }
| ExecResult::IngestSst { .. }
| ExecResult::TransferLeader { .. }
| ExecResult::SetFlashbackState { .. } => {}
| ExecResult::IsInFlashback { .. } => {}
ExecResult::SplitRegion { ref derived, .. } => {
self.region = derived.clone();
self.metrics.size_diff_hint = 0;
Expand Down Expand Up @@ -1553,7 +1551,10 @@ where
AdminCmdType::PrepareMerge => self.exec_prepare_merge(ctx, request),
AdminCmdType::CommitMerge => self.exec_commit_merge(ctx, request),
AdminCmdType::RollbackMerge => self.exec_rollback_merge(ctx, request),
AdminCmdType::SetFlashbackState => self.set_flashback_state(ctx, request),
AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback => {
self.set_flashback_state_persist(ctx, request)
}

AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")),
}?;
response.set_cmd_type(cmd_type);
Expand Down Expand Up @@ -2060,7 +2061,7 @@ where
} else {
PeerState::Normal
};
if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None, None) {
if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None, false) {
panic!("{} failed to update region state: {:?}", self.tag, e);
}

Expand Down Expand Up @@ -2105,7 +2106,7 @@ where
PeerState::Normal
};

if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None, None) {
if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None, false) {
panic!("{} failed to update region state: {:?}", self.tag, e);
}

Expand Down Expand Up @@ -2509,7 +2510,7 @@ where
);
continue;
}
write_peer_state(kv_wb_mut, new_region, PeerState::Normal, None, None)
write_peer_state(kv_wb_mut, new_region, PeerState::Normal, None, false)
.and_then(|_| write_initial_apply_state(kv_wb_mut, new_region.get_id()))
.unwrap_or_else(|e| {
panic!(
Expand All @@ -2518,7 +2519,7 @@ where
)
});
}
write_peer_state(kv_wb_mut, &derived, PeerState::Normal, None, None).unwrap_or_else(|e| {
write_peer_state(kv_wb_mut, &derived, PeerState::Normal, None, false).unwrap_or_else(|e| {
panic!("{} fails to update region {:?}: {:?}", self.tag, derived, e)
});
let mut resp = AdminResponse::default();
Expand Down Expand Up @@ -2585,7 +2586,7 @@ where
&region,
PeerState::Merging,
Some(merging_state.clone()),
None,
false,
)
.unwrap_or_else(|e| {
panic!(
Expand Down Expand Up @@ -2724,7 +2725,7 @@ where
region.set_start_key(source_region.get_start_key().to_vec());
}
let kv_wb_mut = ctx.kv_wb_mut();
write_peer_state(kv_wb_mut, &region, PeerState::Normal, None, None)
write_peer_state(kv_wb_mut, &region, PeerState::Normal, None, false)
.and_then(|_| {
// TODO: maybe all information needs to be filled?
let mut merging_state = MergeState::default();
Expand All @@ -2734,7 +2735,7 @@ where
source_region,
PeerState::Tombstone,
Some(merging_state),
Some(state.get_flashback_state()),
state.get_is_in_flashback(),
)
})
.unwrap_or_else(|e| {
Expand Down Expand Up @@ -2782,7 +2783,7 @@ where
let version = region.get_region_epoch().get_version();
// Update version to avoid duplicated rollback requests.
region.mut_region_epoch().set_version(version + 1);
write_peer_state(ctx.kv_wb_mut(), &region, PeerState::Normal, None, None).unwrap_or_else(
write_peer_state(ctx.kv_wb_mut(), &region, PeerState::Normal, None, false).unwrap_or_else(
|e| {
panic!(
"{} failed to rollback merge {:?}: {:?}",
Expand All @@ -2802,7 +2803,7 @@ where
))
}

fn set_flashback_state(
fn set_flashback_state_persist(
&self,
ctx: &mut ApplyContext<EK>,
req: &AdminRequest,
Expand All @@ -2818,23 +2819,22 @@ where
return Err(box_err!("failed to get region state of {}", region_id));
}
};
let state = req.get_set_flashback().get_state();
old_state.set_flashback_state(state);
let shouild_in_flashback = req.get_cmd_type() == AdminCmdType::PrepareFlashback;
old_state.set_is_in_flashback(shouild_in_flashback);
ctx.kv_wb_mut()
.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &old_state)
.unwrap_or_else(|e| {
error!(
"{} failed to exec flashback state {:?} for region {}: {:?}",
self.tag,
state,
region_id,
e
"{} failed to change flashback state to {:?} for region {}: {:?}",
self.tag, req, region_id, e
)
});

Ok((
AdminResponse::default(),
ApplyResult::Res(ExecResult::SetFlashbackState { state }),
ApplyResult::Res(ExecResult::IsInFlashback {
is_in_flashback: shouild_in_flashback,
}),
))
}

Expand Down
37 changes: 18 additions & 19 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use kvproto::{
StatusCmdType, StatusResponse,
},
raft_serverpb::{
ExtraMessage, ExtraMessageType, FlashbackState, MergeState, PeerState, RaftMessage,
RaftSnapshotData, RaftTruncatedState, RegionLocalState,
ExtraMessage, ExtraMessageType, MergeState, PeerState, RaftMessage, RaftSnapshotData,
RaftTruncatedState, RegionLocalState,
},
replication_modepb::{DrAutoSyncState, ReplicationMode},
};
Expand Down Expand Up @@ -1012,11 +1012,11 @@ where
.unwrap()
{
info!(
"Prepare Flashback, already has a flashback local state:{:?} for {:?} ",
target_state.get_flashback_state(),
"Prepare Flashback, a flashback persist state is: {} for {:?} ",
target_state.get_is_in_flashback(),
self.region(),
);
if target_state.get_flashback_state() == FlashbackState::NotStart {
if !target_state.get_is_in_flashback() {
// callback itself to wait for the new change to be applied completely.
let raft_router_clone = self.ctx.router.clone();
let ch_clone = ch.clone();
Expand All @@ -1036,8 +1036,7 @@ where
let req = {
let mut request = new_admin_request(region_id, self.fsm.peer.peer.clone());
let mut admin = AdminRequest::default();
admin.set_cmd_type(AdminCmdType::SetFlashbackState);
admin.mut_set_flashback().set_state(FlashbackState::Prepare);
admin.set_cmd_type(AdminCmdType::PrepareFlashback);
request.set_admin_request(admin);
request
};
Expand Down Expand Up @@ -1068,18 +1067,18 @@ where
ch_clone.send(false).unwrap();
error!("send flashback prepare msg failed"; "region_id" => region_id);
}
}),
None,
Some(Box::new(move || {
ch.send(true).unwrap();
})));
}),
None,
Some(Box::new(move || {
ch.send(true).unwrap();
})),
);
self.fsm.peer.flashback_state.take();

let req = {
let mut request = new_admin_request(self.region().get_id(), self.fsm.peer.peer.clone());
let mut admin = AdminRequest::default();
admin.set_cmd_type(AdminCmdType::SetFlashbackState);
admin.mut_set_flashback().set_state(FlashbackState::Finish);
admin.set_cmd_type(AdminCmdType::FinishFlashback);
request.set_admin_request(admin);
request
};
Expand Down Expand Up @@ -4853,8 +4852,8 @@ where
}
ExecResult::IngestSst { ssts } => self.on_ingest_sst_result(ssts),
ExecResult::TransferLeader { term } => self.on_transfer_leader(term),
ExecResult::SetFlashbackState { state } => {
self.on_ready_check_flashback_state(state)
ExecResult::IsInFlashback { is_in_flashback } => {
self.on_ready_check_flashback_persist(is_in_flashback)
}
}
}
Expand Down Expand Up @@ -6198,7 +6197,7 @@ where
self.fsm.has_ready = true;
}

fn on_ready_check_flashback_state(&mut self, state: FlashbackState) {
fn on_ready_check_flashback_persist(&mut self, should_in_flashback: bool) {
let state_key = keys::region_state_key(self.region().get_id());
if let Some(target_state) = self
.ctx
Expand All @@ -6207,9 +6206,9 @@ where
.get_msg_cf::<RegionLocalState>(CF_RAFT, &state_key)
.unwrap()
{
if target_state.get_flashback_state() != state {
if target_state.get_is_in_flashback() != should_in_flashback {
panic!(
"Prepare Flashback, already has a local state for {:?} but not a finish state",
"Check for Flashback whether in persist for {:?}",
self.region(),
);
}
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2875,7 +2875,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
}
let mut kv_wb = self.ctx.engines.kv.write_batch();
if let Err(e) =
peer_storage::write_peer_state(&mut kv_wb, &region, PeerState::Normal, None, None)
peer_storage::write_peer_state(&mut kv_wb, &region, PeerState::Normal, None, false)
{
panic!(
"Unsafe recovery, fail to add peer state for {:?} into write batch, the error is {:?}",
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ where
} else {
None
},
None,
false,
)?;

// write kv rocksdb first in case of restart happen between two write
Expand Down
13 changes: 6 additions & 7 deletions components/raftstore/src/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use keys::{self, enc_end_key, enc_start_key};
use kvproto::{
metapb::{self, Region},
raft_serverpb::{
FlashbackState, MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData,
RegionLocalState,
MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState,
},
};
use protobuf::Message;
Expand Down Expand Up @@ -593,9 +592,9 @@ where
}
// Write its source peers' `RegionLocalState` together with itself for atomicity
for r in destroy_regions {
write_peer_state(kv_wb, r, PeerState::Tombstone, None, None)?;
write_peer_state(kv_wb, r, PeerState::Tombstone, None, false)?;
}
write_peer_state(kv_wb, &region, PeerState::Applying, None, None)?;
write_peer_state(kv_wb, &region, PeerState::Applying, None, false)?;

let last_index = snap.get_metadata().get_index();

Expand Down Expand Up @@ -1088,7 +1087,7 @@ pub fn write_peer_state<T: Mutable>(
region: &metapb::Region,
state: PeerState,
merge_state: Option<MergeState>,
flashback_state: Option<FlashbackState>,
is_in_flashback: bool,
) -> Result<()> {
let region_id = region.get_id();
let mut region_state = RegionLocalState::default();
Expand All @@ -1097,8 +1096,8 @@ pub fn write_peer_state<T: Mutable>(
if let Some(state) = merge_state {
region_state.set_merge_state(state);
}
if let Some(state) = flashback_state {
region_state.set_flashback_state(state);
if is_in_flashback {
region_state.set_is_in_flashback(is_in_flashback);
}

debug!(
Expand Down
4 changes: 3 additions & 1 deletion components/raftstore/src/store/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ pub fn admin_cmd_epoch_lookup(admin_cmp_type: AdminCmdType) -> AdminCmdEpochStat
AdminCmdType::RollbackMerge => AdminCmdEpochState::new(true, true, true, false),
// Transfer leader
AdminCmdType::TransferLeader => AdminCmdEpochState::new(true, true, false, false),
AdminCmdType::SetFlashbackState => AdminCmdEpochState::new(false, false, false, false)
AdminCmdType::PrepareFlashback | AdminCmdType::FinishFlashback => {
AdminCmdEpochState::new(false, false, false, false)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl<ER: RaftEngine> Debugger<ER> {
continue;
}
let region = &region_state.get_region();
write_peer_state(&mut wb, region, PeerState::Tombstone, None, None).unwrap();
write_peer_state(&mut wb, region, PeerState::Tombstone, None, false).unwrap();
}

let mut write_opts = WriteOptions::new();
Expand Down Expand Up @@ -1369,7 +1369,7 @@ fn set_region_tombstone(
&region,
PeerState::Tombstone,
None,
None
false
));
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/raftstore/test_flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn test_flahsback_for_local_read() {
assert_eq!(state.get_last_index(), last_index + 1);

block_on(cluster.call_finish_flashback(region.get_id(), store_id));

let state = cluster.raft_local_state(region.get_id(), store_id);
assert_eq!(state.get_last_index(), last_index + 2);

Expand Down

0 comments on commit 34a281d

Please sign in to comment.