Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Sep 29, 2022
1 parent 87f1db0 commit 7a62778
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1553,10 +1553,7 @@ where
AdminCmdType::PrepareMerge => self.exec_prepare_merge(ctx, request),
AdminCmdType::CommitMerge => self.exec_commit_merge(ctx, request),
AdminCmdType::RollbackMerge => self.exec_rollback_merge(ctx, request),
AdminCmdType::PrepareFlashback => {
self.set_flashback_state(ctx, FlashbackState::Processing)
}
AdminCmdType::FinishFlashback => self.set_flashback_state(ctx, FlashbackState::Finish),
AdminCmdType::SetFlashbackState => self.set_flashback_state(ctx, request),
AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")),
}?;
response.set_cmd_type(cmd_type);
Expand Down Expand Up @@ -2808,7 +2805,7 @@ where
fn set_flashback_state(
&self,
ctx: &mut ApplyContext<EK>,
state: FlashbackState,
req: &AdminRequest,
) -> Result<(AdminResponse, ApplyResult<EK::Snapshot>)> {
let region_id = self.region_id();
let region_state_key = keys::region_state_key(region_id);
Expand All @@ -2821,13 +2818,17 @@ where
return Err(box_err!("failed to get region state of {}", region_id));
}
};
let state = req.get_set_flashback().get_state();
old_state.set_flashback_state(state);
ctx.kv_wb_mut()
.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &old_state)
.unwrap_or_else(|e| {
panic!(
error!(
"{} failed to exec flashback state {:?} for region {}: {:?}",
self.tag, state, region_id, e
self.tag,
state,
region_id,
e
)
});

Expand Down
25 changes: 16 additions & 9 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ where
target_state.get_flashback_state(),
self.region(),
);
if target_state.get_flashback_state() == FlashbackState::Prepare {
if target_state.get_flashback_state() == FlashbackState::NotStart {
// callback itself to wait for the new change to be applied completely.
let raft_router_clone = self.ctx.router.clone();
let ch_clone = ch.clone();
Expand All @@ -1036,7 +1036,8 @@ where
let req = {
let mut request = new_admin_request(region_id, self.fsm.peer.peer.clone());
let mut admin = AdminRequest::default();
admin.set_cmd_type(AdminCmdType::PrepareFlashback);
admin.set_cmd_type(AdminCmdType::SetFlashbackState);
admin.mut_set_flashback().set_state(FlashbackState::Prepare);
request.set_admin_request(admin);
request
};
Expand All @@ -1060,19 +1061,25 @@ where
"region_id" => region_id,
"peer_id" => self.fsm.peer.peer_id(),
);
let cb = Callback::write(Box::new(move |resp| {
if resp.response.get_header().has_error() {
ch.send(false).unwrap();
error!("send flashback finish msg failed"; "region_id" => region_id);
}
let ch_clone = ch.clone();
let cb = Callback::write_ext(
Box::new(move |resp| {
if resp.response.get_header().has_error() {
ch_clone.send(false).unwrap();
error!("send flashback prepare msg failed"; "region_id" => region_id);
}
}),
None,
Some(Box::new(move || {
ch.send(true).unwrap();
}));
})));
self.fsm.peer.flashback_state.take();

let req = {
let mut request = new_admin_request(self.region().get_id(), self.fsm.peer.peer.clone());
let mut admin = AdminRequest::default();
admin.set_cmd_type(AdminCmdType::FinishFlashback);
admin.set_cmd_type(AdminCmdType::SetFlashbackState);
admin.mut_set_flashback().set_state(FlashbackState::Finish);
request.set_admin_request(admin);
request
};
Expand Down
3 changes: 1 addition & 2 deletions components/raftstore/src/store/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ pub fn admin_cmd_epoch_lookup(admin_cmp_type: AdminCmdType) -> AdminCmdEpochStat
AdminCmdType::RollbackMerge => AdminCmdEpochState::new(true, true, true, false),
// Transfer leader
AdminCmdType::TransferLeader => AdminCmdEpochState::new(true, true, false, false),
AdminCmdType::PrepareFlashback => AdminCmdEpochState::new(false, false, false, false),
AdminCmdType::FinishFlashback => AdminCmdEpochState::new(false, false, false, false),
AdminCmdType::SetFlashbackState => AdminCmdEpochState::new(false, false, false, false)
}
}

Expand Down
7 changes: 5 additions & 2 deletions tests/integrations/raftstore/test_flashback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ fn test_flashback_for_schedule() {
assert!(!resp.get_header().has_error());

block_on(cluster.call_finish_flashback(region.get_id(), 1));
// transfer leader to (1, 1)
cluster.must_transfer_leader(1, new_peer(1, 1));
// transfer leader to (2, 2)
cluster.must_transfer_leader(1, new_peer(2, 2));
}

#[test]
Expand Down Expand Up @@ -201,6 +201,9 @@ fn test_flahsback_for_local_read() {
assert_eq!(state.get_last_index(), last_index + 1);

block_on(cluster.call_finish_flashback(region.get_id(), store_id));

let state = cluster.raft_local_state(region.get_id(), store_id);
assert_eq!(state.get_last_index(), last_index + 2);

// check local read after finish flashback
let state = cluster.raft_local_state(region.get_id(), store_id);
Expand Down

0 comments on commit 7a62778

Please sign in to comment.