Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add a Raft admin command to put the region into a locking flashback state. #13541

Merged
merged 32 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0c5aaa6
roughly modify flashback
HuSharp Sep 27, 2022
88f6f19
remove redundant code
HuSharp Sep 27, 2022
61c4585
remove redundant code
HuSharp Sep 27, 2022
d5ff1c2
address comment
HuSharp Sep 27, 2022
87f1db0
modify to channel
HuSharp Sep 28, 2022
6674ac9
address comment
HuSharp Sep 29, 2022
c29d604
remove redundant code
HuSharp Sep 29, 2022
34a281d
modify parsist state to bool
HuSharp Sep 29, 2022
486de94
pass test_sync_log
HuSharp Sep 29, 2022
245998d
remove reply for apply
HuSharp Sep 29, 2022
4cccd6e
address comment
HuSharp Sep 29, 2022
378ec7f
Initialize the flashback state when init region
HuSharp Sep 30, 2022
0d9ac36
resolved apply snapshot
HuSharp Sep 30, 2022
3077790
check test
HuSharp Sep 30, 2022
6444293
make format happy
HuSharp Oct 8, 2022
c674265
put flashback state in region meta
HuSharp Oct 8, 2022
a8a4a80
remove redundant code
HuSharp Oct 8, 2022
a511716
add check in apply
HuSharp Oct 9, 2022
8053ccd
address comment
HuSharp Oct 9, 2022
cb94d0b
make format happy
HuSharp Oct 9, 2022
11d30c9
add flag for apply flashback
HuSharp Oct 10, 2022
16014f3
remove significant
HuSharp Oct 10, 2022
5e5a390
merge upstream
HuSharp Oct 10, 2022
2ddde71
modify comment
HuSharp Oct 10, 2022
c22d186
repair comment
HuSharp Oct 11, 2022
c6bc7ea
propagate channel error
HuSharp Oct 11, 2022
da25573
add flag for prepare and finish flashback admin req
HuSharp Oct 11, 2022
fab24bb
address comment
HuSharp Oct 11, 2022
f4656d1
Merge remote-tracking branch 'upstream/master' into flashback_persist_in
HuSharp Oct 12, 2022
e488437
merge upstream
HuSharp Oct 12, 2022
a6f05f0
update kvproto
HuSharp Oct 12, 2022
42ca68e
Make the first letter of the comment capitalized
HuSharp Oct 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
[patch.'https://github.com/pingcap/kvproto']
# kvproto = { git = "https://github.com/your_github_id/kvproto", branch="your_branch" }
kvproto = { git = "https://github.com/HuSharp/kvproto", branch = "modify_flashback_to_2pc" }

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
72 changes: 59 additions & 13 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use kvproto::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request,
},
raft_serverpb::{MergeState, PeerState, RaftApplyState, RaftTruncatedState, RegionLocalState},
raft_serverpb::{
FlashbackState, MergeState, PeerState, RaftApplyState, RaftTruncatedState, RegionLocalState,
},
};
use pd_client::{new_bucket_stats, BucketMeta, BucketStat};
use prometheus::local::LocalHistogram;
Expand Down Expand Up @@ -256,6 +258,9 @@ pub enum ExecResult<S> {
region: Region,
commit: u64,
},
SetFlashbackState {
state: FlashbackState,
},
ComputeHash {
region: Region,
index: u64,
Expand Down Expand Up @@ -1413,7 +1418,8 @@ where
| ExecResult::CompactLog { .. }
| ExecResult::DeleteRange { .. }
| ExecResult::IngestSst { .. }
| ExecResult::TransferLeader { .. } => {}
| ExecResult::TransferLeader { .. }
| ExecResult::SetFlashbackState { .. } => {}
ExecResult::SplitRegion { ref derived, .. } => {
self.region = derived.clone();
self.metrics.size_diff_hint = 0;
Expand Down Expand Up @@ -1547,6 +1553,10 @@ where
AdminCmdType::PrepareMerge => self.exec_prepare_merge(ctx, request),
AdminCmdType::CommitMerge => self.exec_commit_merge(ctx, request),
AdminCmdType::RollbackMerge => self.exec_rollback_merge(ctx, request),
AdminCmdType::PrepareFlashback => {
self.set_flashback_state(ctx, FlashbackState::Processing)
}
AdminCmdType::FinishFlashback => self.set_flashback_state(ctx, FlashbackState::Finish),
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")),
}?;
response.set_cmd_type(cmd_type);
Expand Down Expand Up @@ -2053,7 +2063,7 @@ where
} else {
PeerState::Normal
};
if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None) {
if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None, None) {
panic!("{} failed to update region state: {:?}", self.tag, e);
}

Expand Down Expand Up @@ -2098,7 +2108,7 @@ where
PeerState::Normal
};

if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None) {
if let Err(e) = write_peer_state(ctx.kv_wb_mut(), &region, state, None, None) {
panic!("{} failed to update region state: {:?}", self.tag, e);
}

Expand Down Expand Up @@ -2502,7 +2512,7 @@ where
);
continue;
}
write_peer_state(kv_wb_mut, new_region, PeerState::Normal, None)
write_peer_state(kv_wb_mut, new_region, PeerState::Normal, None, None)
.and_then(|_| write_initial_apply_state(kv_wb_mut, new_region.get_id()))
.unwrap_or_else(|e| {
panic!(
Expand All @@ -2511,7 +2521,7 @@ where
)
});
}
write_peer_state(kv_wb_mut, &derived, PeerState::Normal, None).unwrap_or_else(|e| {
write_peer_state(kv_wb_mut, &derived, PeerState::Normal, None, None).unwrap_or_else(|e| {
panic!("{} fails to update region {:?}: {:?}", self.tag, derived, e)
});
let mut resp = AdminResponse::default();
Expand Down Expand Up @@ -2578,6 +2588,7 @@ where
&region,
PeerState::Merging,
Some(merging_state.clone()),
None,
)
.unwrap_or_else(|e| {
panic!(
Expand Down Expand Up @@ -2716,7 +2727,7 @@ where
region.set_start_key(source_region.get_start_key().to_vec());
}
let kv_wb_mut = ctx.kv_wb_mut();
write_peer_state(kv_wb_mut, &region, PeerState::Normal, None)
write_peer_state(kv_wb_mut, &region, PeerState::Normal, None, None)
.and_then(|_| {
// TODO: maybe all information needs to be filled?
let mut merging_state = MergeState::default();
Expand All @@ -2726,6 +2737,7 @@ where
source_region,
PeerState::Tombstone,
Some(merging_state),
Some(state.get_flashback_state()),
)
})
.unwrap_or_else(|e| {
Expand Down Expand Up @@ -2773,12 +2785,14 @@ where
let version = region.get_region_epoch().get_version();
// Update version to avoid duplicated rollback requests.
region.mut_region_epoch().set_version(version + 1);
write_peer_state(ctx.kv_wb_mut(), &region, PeerState::Normal, None).unwrap_or_else(|e| {
panic!(
"{} failed to rollback merge {:?}: {:?}",
self.tag, rollback, e
)
});
write_peer_state(ctx.kv_wb_mut(), &region, PeerState::Normal, None, None).unwrap_or_else(
|e| {
panic!(
"{} failed to rollback merge {:?}: {:?}",
self.tag, rollback, e
)
},
);

PEER_ADMIN_CMD_COUNTER.rollback_merge.success.inc();
let resp = AdminResponse::default();
Expand All @@ -2791,6 +2805,38 @@ where
))
}

fn set_flashback_state(
&self,
ctx: &mut ApplyContext<EK>,
state: FlashbackState,
) -> Result<(AdminResponse, ApplyResult<EK::Snapshot>)> {
let region_id = self.region_id();
let region_state_key = keys::region_state_key(region_id);
let mut old_state: RegionLocalState = match ctx
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
.engine
.get_msg_cf::<RegionLocalState>(CF_RAFT, &region_state_key)
{
Ok(Some(s)) => s,
_ => {
return Err(box_err!("failed to get region state of {}", region_id));
}
};
old_state.set_flashback_state(state);
ctx.kv_wb_mut()
.put_msg_cf(CF_RAFT, &keys::region_state_key(region_id), &old_state)
.unwrap_or_else(|e| {
panic!(
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
"{} failed to exec flashback state {:?} for region {}: {:?}",
self.tag, state, region_id, e
)
});

Ok((
AdminResponse::default(),
ApplyResult::Res(ExecResult::SetFlashbackState { state }),
))
}

fn exec_compact_log(
&mut self,
req: &AdminRequest,
Expand Down
119 changes: 100 additions & 19 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ use std::{

use batch_system::{BasicMailbox, Fsm};
use collections::{HashMap, HashSet};
use crossbeam::channel::Sender;
use engine_traits::{Engines, KvEngine, RaftEngine, SstMetaInfo, WriteBatchExt, CF_LOCK, CF_RAFT};
use error_code::ErrorCodeExt;
use fail::fail_point;
use futures::channel::{mpsc::UnboundedSender, oneshot::Sender};
use futures::channel::mpsc::UnboundedSender;
use keys::{self, enc_end_key, enc_start_key};
use kvproto::{
brpb::CheckAdminResponse,
Expand All @@ -35,8 +36,8 @@ use kvproto::{
StatusCmdType, StatusResponse,
},
raft_serverpb::{
ExtraMessage, ExtraMessageType, MergeState, PeerState, RaftMessage, RaftSnapshotData,
RaftTruncatedState, RegionLocalState,
ExtraMessage, ExtraMessageType, FlashbackState, MergeState, PeerState, RaftMessage,
RaftSnapshotData, RaftTruncatedState, RegionLocalState,
},
replication_modepb::{DrAutoSyncState, ReplicationMode},
};
Expand Down Expand Up @@ -82,7 +83,7 @@ use crate::{
metrics::*,
msg::{Callback, ExtCallback, InspectedRaftMessage},
peer::{
ConsistencyState, FlashbackState, ForceLeaderState, Peer, PersistSnapshotResult,
ConsistencyState, FlashbackMemoryState, ForceLeaderState, Peer, PersistSnapshotResult,
SnapshotRecoveryState, SnapshotRecoveryWaitApplySyncer, StaleState,
UnsafeRecoveryExecutePlanSyncer, UnsafeRecoveryFillOutReportSyncer,
UnsafeRecoveryForceLeaderSyncer, UnsafeRecoveryState, UnsafeRecoveryWaitApplySyncer,
Expand Down Expand Up @@ -987,36 +988,95 @@ where
syncer.report_for_self(self_report);
}

// Call msg PrepareFlashback to stop the scheduling and RW tasks.
// Once called, it will wait for the channel's notification in FlashbackState to
// finish. We place a flag in the request, which is checked when the
// pre_propose_raft_command is called. Stopping tasks is done by applying
// the flashback-only command in this way, But for RW local reads which need
// to be considered, we let the leader lease to None to ensure that local reads
// are not executed.
// Call msg PrepareFlashback is to check if the persistent state is present in
// RegionLocalState. Once called, the persistent state and memory state will
// first be checked and if neither is found, then a Raft Admin Command will be
// proposed to persist the state in the RegionLocalState to lock the region.
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
// Once this command is submitted, a callback will be invoked to call msg
// PrepareFlashback again to wait for the new change to be applied completely.
// So we can perform the flashback safely later.
fn on_prepare_flashback(&mut self, ch: Sender<bool>) {
let region_id = self.region().get_id();
info!(
"prepare flashback";
"region_id" => self.region().get_id(),
"region_id" => region_id,
"peer_id" => self.fsm.peer.peer_id(),
);
if self.fsm.peer.flashback_state.is_some() {
ch.send(false).unwrap();
return;
// check persist state
let state_key = keys::region_state_key(region_id);
if let Some(target_state) = self
.ctx
.engines
.kv
.get_msg_cf::<RegionLocalState>(CF_RAFT, &state_key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why check persist state here? Persistent state is used only on start to recover the memory state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because we can not know whether the current peer is in a flashback state in PeerFSM before we read once from the persistent state?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just check the memory state. Memory state is filled on start-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since the notify method will be called after apply to send the result of apply to peer fsm.The memory state will be set in the on_flashback_memory_set function. So here we need to set the meta information of cf and current region.

.unwrap()
{
info!(
"Prepare Flashback, already has a flashback local state:{:?} for {:?} ",
target_state.get_flashback_state(),
self.region(),
);
if target_state.get_flashback_state() == FlashbackState::Prepare {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
// callback itself to wait for the new change to be applied completely.
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
let raft_router_clone = self.ctx.router.clone();
let ch_clone = ch.clone();
let cb = Callback::write(Box::new(move |resp| {
raft_router_clone
.force_send(
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
region_id,
PeerMsg::SignificantMsg(SignificantMsg::PrepareFlashback(ch_clone)),
)
.unwrap();
if resp.response.get_header().has_error() {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
ch.send(false).unwrap();
error!("send flashback prepare msg failed"; "region_id" => region_id);
}
}));
// set persist state by a Raft Admin Command
let req = {
let mut request = new_admin_request(region_id, self.fsm.peer.peer.clone());
let mut admin = AdminRequest::default();
admin.set_cmd_type(AdminCmdType::PrepareFlashback);
request.set_admin_request(admin);
request
};
self.propose_raft_command(req, cb, DiskFullOpt::AllowedOnAlmostFull);
return;
}
// check memory state
if self.fsm.peer.flashback_state.is_none() {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
self.fsm.peer.flashback_state = Some(FlashbackMemoryState::new(ch));
}
}
self.fsm.peer.flashback_state = Some(FlashbackState::new(ch));
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
self.fsm.peer.maybe_finish_flashback_wait_apply();
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}

fn on_finish_flashback(&mut self) {
fn on_finish_flashback(&mut self, ch: Sender<bool>) {
let region_id = self.region().get_id();
info!(
"finish flashback";
"region_id" => self.region().get_id(),
"region_id" => region_id,
"peer_id" => self.fsm.peer.peer_id(),
);
let cb = Callback::write(Box::new(move |resp| {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
if resp.response.get_header().has_error() {
ch.send(false).unwrap();
error!("send flashback finish msg failed"; "region_id" => region_id);
}
ch.send(true).unwrap();
}));
self.fsm.peer.flashback_state.take();

let req = {
let mut request = new_admin_request(self.region().get_id(), self.fsm.peer.peer.clone());
let mut admin = AdminRequest::default();
admin.set_cmd_type(AdminCmdType::FinishFlashback);
request.set_admin_request(admin);
request
};
self.propose_raft_command(req, cb, DiskFullOpt::AllowedOnAlmostFull);
}

fn on_check_pending_admin(&mut self, ch: UnboundedSender<CheckAdminResponse>) {
Expand Down Expand Up @@ -1465,7 +1525,7 @@ where
}

SignificantMsg::PrepareFlashback(ch) => self.on_prepare_flashback(ch),
SignificantMsg::FinishFlashback => self.on_finish_flashback(),
SignificantMsg::FinishFlashback(ch) => self.on_finish_flashback(ch),
// for snapshot recovery (safe recovery)
SignificantMsg::SnapshotRecoveryWaitApply(syncer) => {
self.on_snapshot_recovery_wait_apply(syncer)
Expand Down Expand Up @@ -4786,6 +4846,9 @@ where
}
ExecResult::IngestSst { ssts } => self.on_ingest_sst_result(ssts),
ExecResult::TransferLeader { term } => self.on_transfer_leader(term),
ExecResult::SetFlashbackState { state } => {
self.on_ready_check_flashback_state(state)
}
}
}

Expand Down Expand Up @@ -6128,6 +6191,24 @@ where
self.fsm.has_ready = true;
}

fn on_ready_check_flashback_state(&mut self, state: FlashbackState) {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
let state_key = keys::region_state_key(self.region().get_id());
if let Some(target_state) = self
.ctx
.engines
.kv
.get_msg_cf::<RegionLocalState>(CF_RAFT, &state_key)
.unwrap()
{
if target_state.get_flashback_state() != state {
panic!(
"Prepare Flashback, already has a local state for {:?} but not a finish state",
self.region(),
);
}
}
}

/// Verify and store the hash to state. return true means the hash has been
/// stored successfully.
// TODO: Consider context in the function.
Expand Down
3 changes: 2 additions & 1 deletion components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2874,7 +2874,8 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
);
}
let mut kv_wb = self.ctx.engines.kv.write_batch();
if let Err(e) = peer_storage::write_peer_state(&mut kv_wb, &region, PeerState::Normal, None)
if let Err(e) =
peer_storage::write_peer_state(&mut kv_wb, &region, PeerState::Normal, None, None)
{
panic!(
"Unsafe recovery, fail to add peer state for {:?} into write batch, the error is {:?}",
Expand Down
5 changes: 3 additions & 2 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::sync::Arc;
use std::{borrow::Cow, fmt};

use collections::HashSet;
use crossbeam::channel::Sender;
use engine_traits::{CompactedEvent, KvEngine, Snapshot};
use futures::channel::{mpsc::UnboundedSender, oneshot::Sender};
use futures::channel::mpsc::UnboundedSender;
use kvproto::{
brpb::CheckAdminResponse,
import_sstpb::SstMeta,
Expand Down Expand Up @@ -514,7 +515,7 @@ where
UnsafeRecoveryFillOutReport(UnsafeRecoveryFillOutReportSyncer),
SnapshotRecoveryWaitApply(SnapshotRecoveryWaitApplySyncer),
PrepareFlashback(Sender<bool>),
FinishFlashback,
FinishFlashback(Sender<bool>),
CheckPendingAdmin(UnboundedSender<CheckAdminResponse>),
}

Expand Down