Skip to content

Commit

Permalink
raftstore: remove is_in_flashback field in peer fsm (#13877)
Browse files Browse the repository at this point in the history
close #13868

- Remove `is_in_flashback` field and use the region meta as the only source of truth in `PeerFSM`.
- Add a corresponding test case.
- Some minor refinement to the code and tests.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
2 people authored and dveeden committed Dec 6, 2022
1 parent 9bfe66f commit 029be44
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 79 deletions.
27 changes: 15 additions & 12 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ where
new_read_index_request(region_id, region_epoch.clone(), self.fsm.peer.peer.clone());
// Allow to capture change even is in flashback state.
// TODO: add a test case for this kind of situation.
if self.fsm.peer.is_in_flashback {
if self.region().is_in_flashback {
let mut flags = WriteBatchFlags::from_bits_check(msg.get_header().get_flags());
flags.insert(WriteBatchFlags::FLASHBACK);
msg.mut_header().set_flags(flags.bits());
Expand Down Expand Up @@ -4894,9 +4894,7 @@ where
}
ExecResult::IngestSst { ssts } => self.on_ingest_sst_result(ssts),
ExecResult::TransferLeader { term } => self.on_transfer_leader(term),
ExecResult::SetFlashbackState { region } => {
self.on_set_flashback_state(region.get_is_in_flashback())
}
ExecResult::SetFlashbackState { region } => self.on_set_flashback_state(region),
}
}

Expand Down Expand Up @@ -5108,11 +5106,11 @@ where
};
// Check whether the region is in the flashback state and the request could be
// proposed. Skip the not prepared error because the
// `self.fsm.peer.is_in_flashback` may not be the latest right after applying
// `self.region().is_in_flashback` may not be the latest right after applying
// the `PrepareFlashback` admin command, we will let it pass here and check in
// the apply phase.
if let Err(e) =
util::check_flashback_state(self.fsm.peer.is_in_flashback, msg, region_id, true)
util::check_flashback_state(self.region().is_in_flashback, msg, region_id, true)
{
match e {
Error::FlashbackInProgress(_) => self
Expand Down Expand Up @@ -6281,12 +6279,17 @@ where
self.fsm.has_ready = true;
}

fn on_set_flashback_state(&mut self, is_in_flashback: bool) {
// Set flashback memory
self.fsm.peer.is_in_flashback = (|| {
fail_point!("keep_peer_fsm_flashback_state_false", |_| false);
is_in_flashback
})();
fn on_set_flashback_state(&mut self, region: metapb::Region) {
// Update the region meta.
self.update_region((|| {
#[cfg(feature = "failpoints")]
fail_point!("keep_peer_fsm_flashback_state_false", |_| {
let mut region = region.clone();
region.is_in_flashback = false;
region
});
region
})());
// Let the leader lease to None to ensure that local reads are not executed.
self.fsm.peer.leader_lease_mut().expire_remote_lease();
}
Expand Down
5 changes: 1 addition & 4 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,6 @@ where
/// lead_transferee if this peer(leader) is in a leadership transferring.
pub lead_transferee: u64,
pub unsafe_recovery_state: Option<UnsafeRecoveryState>,
// Used as the memory state for Flashback to reject RW/Schedule before proposing.
pub is_in_flashback: bool,
pub snapshot_recovery_state: Option<SnapshotRecoveryState>,
}

Expand Down Expand Up @@ -1167,7 +1165,6 @@ where
last_region_buckets: None,
lead_transferee: raft::INVALID_ID,
unsafe_recovery_state: None,
is_in_flashback: region.get_is_in_flashback(),
snapshot_recovery_state: None,
};

Expand Down Expand Up @@ -3531,7 +3528,7 @@ where
self.force_leader.is_some(),
) {
None
} else if self.is_in_flashback {
} else if self.region().is_in_flashback {
debug!(
"prevents renew lease while in flashback state";
"region_id" => self.region_id,
Expand Down
109 changes: 68 additions & 41 deletions src/storage/txn/actions/flashback_to_version.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::ops::Bound;

use txn_types::{Key, Lock, LockType, TimeStamp, Write, WriteType};

use crate::storage::{
Expand Down Expand Up @@ -35,11 +33,6 @@ pub fn flashback_to_version_read_write(
flashback_version: TimeStamp,
flashback_commit_ts: TimeStamp,
) -> TxnResult<Vec<Key>> {
// Filter out the SST that does not have a newer version than
// `flashback_version` in `CF_WRITE`, i.e, whose latest `commit_ts` <=
// `flashback_version`. By doing this, we can only flashback those keys that
// have version changed since `flashback_version` as much as possible.
reader.set_hint_min_ts(Some(Bound::Excluded(flashback_version)));
// To flashback the data, we need to get all the latest visible keys first by
// scanning every unique key in `CF_WRITE`.
let keys_result = reader.scan_latest_user_keys(
Expand Down Expand Up @@ -241,15 +234,30 @@ pub fn check_flashback_commit(
if lock.ts == flashback_start_ts {
return Ok(false);
}
error!(
"check flashback commit exception: lock not found";
"key_to_commit" => log_wrappers::Value::key(key_to_commit.as_encoded()),
"flashback_start_ts" => flashback_start_ts,
"flashback_commit_ts" => flashback_commit_ts,
"lock" => ?lock,
);
}
// If the lock doesn't exist and the flashback commit record exists, it means the flashback
// has been finished.
None => {
if let Some(write) = reader.get_write(key_to_commit, flashback_commit_ts, None)? {
if write.start_ts == flashback_start_ts {
let write_res = reader.seek_write(key_to_commit, flashback_commit_ts)?;
if let Some((commit_ts, ref write)) = write_res {
if commit_ts == flashback_commit_ts && write.start_ts == flashback_start_ts {
return Ok(true);
}
}
error!(
"check flashback commit exception: write record mismatched";
"key_to_commit" => log_wrappers::Value::key(key_to_commit.as_encoded()),
"flashback_start_ts" => flashback_start_ts,
"flashback_commit_ts" => flashback_commit_ts,
"write" => ?write_res,
);
}
}
Err(txn::Error::from_mvcc(mvcc::ErrorInner::TxnLockNotFound {
Expand All @@ -263,9 +271,15 @@ pub fn get_first_user_key(
reader: &mut MvccReader<impl Snapshot>,
start_key: &Key,
end_key: Option<&Key>,
flashback_version: TimeStamp,
) -> TxnResult<Option<Key>> {
let (mut keys_result, _) =
reader.scan_latest_user_keys(Some(start_key), end_key, |_, _| true, 1)?;
let (mut keys_result, _) = reader.scan_latest_user_keys(
Some(start_key),
end_key,
// Make sure we will get the same first user key each time.
|_, latest_commit_ts| latest_commit_ts > flashback_version,
1,
)?;
Ok(keys_result.pop())
}

Expand Down Expand Up @@ -326,6 +340,7 @@ pub mod tests {
&mut reader,
&Key::from_raw(key),
Some(Key::from_raw(b"z")).as_ref(),
version,
)
.unwrap()
{
Expand Down Expand Up @@ -375,10 +390,11 @@ pub mod tests {
fn must_commit_flashback_key<E: Engine>(
engine: &mut E,
key: &[u8],
version: impl Into<TimeStamp>,
start_ts: impl Into<TimeStamp>,
commit_ts: impl Into<TimeStamp>,
) -> usize {
let (start_ts, commit_ts) = (start_ts.into(), commit_ts.into());
let (version, start_ts, commit_ts) = (version.into(), start_ts.into(), commit_ts.into());
let cm = ConcurrencyManager::new(TimeStamp::zero());
let mut txn = MvccTxn::new(start_ts, cm);
let snapshot = engine.snapshot(Default::default()).unwrap();
Expand All @@ -388,6 +404,7 @@ pub mod tests {
&mut reader,
&Key::from_raw(key),
Some(Key::from_raw(b"z")).as_ref(),
version,
)
.unwrap()
.unwrap();
Expand Down Expand Up @@ -545,9 +562,11 @@ pub mod tests {
let mut engine = TestEngineBuilder::new().build().unwrap();
let mut ts = TimeStamp::zero();
let (k, v) = (b"k", [u8::MAX; SHORT_VALUE_MAX_LEN + 1]);
must_prewrite_put(&mut engine, k, &v, k, *ts.incr());
must_commit(&mut engine, k, ts, *ts.incr());
must_get(&mut engine, k, ts, &v);
for _ in 0..2 {
must_prewrite_put(&mut engine, k, &v, k, *ts.incr());
must_commit(&mut engine, k, ts, *ts.incr());
must_get(&mut engine, k, ts, &v);
}

let flashback_start_ts = *ts.incr();
// Rollback nothing.
Expand Down Expand Up @@ -579,30 +598,23 @@ pub mod tests {
fn test_prewrite_with_special_key() {
let mut engine = TestEngineBuilder::new().build().unwrap();
let mut ts = TimeStamp::zero();
let (prewrite_key, prewrite_val) = (b"b", b"val");
must_prewrite_put(
&mut engine,
prewrite_key,
prewrite_val,
prewrite_key,
*ts.incr(),
);
must_commit(&mut engine, prewrite_key, ts, *ts.incr());
must_get(&mut engine, prewrite_key, ts, prewrite_val);
let (k, v1, v2) = (b"c", b"v1", b"v2");
must_prewrite_put(&mut engine, k, v1, k, *ts.incr());
must_commit(&mut engine, k, ts, *ts.incr());
must_prewrite_put(&mut engine, k, v2, k, *ts.incr());
must_commit(&mut engine, k, ts, *ts.incr());
must_get(&mut engine, k, ts, v2);
let (prewrite_key, k, v) = (b"b", b"c", b"val");
for k in [prewrite_key, k] {
let (start_ts, commit_ts) = (*ts.incr(), *ts.incr());
must_prewrite_put(&mut engine, k, v, k, start_ts);
must_commit(&mut engine, k, start_ts, commit_ts);
must_get(&mut engine, k, commit_ts, v);
}
// Check for prewrite key b"b".
let ctx = Context::default();
let snapshot = engine.snapshot(Default::default()).unwrap();
let mut reader = MvccReader::new_with_ctx(snapshot, Some(ScanMode::Forward), &ctx);
let flashback_version = TimeStamp::zero();
let first_key = get_first_user_key(
&mut reader,
&Key::from_raw(b""),
Some(Key::from_raw(b"z")).as_ref(),
flashback_version,
)
.unwrap_or_else(|_| Some(Key::from_raw(b"")))
.unwrap();
Expand All @@ -615,15 +627,20 @@ pub mod tests {
assert_eq!(must_rollback_lock(&mut engine, k, flashback_start_ts), 0);
// Prewrite "prewrite_key" not "start_key".
assert_eq!(
must_prewrite_flashback_key(&mut engine, start_key, 4, flashback_start_ts),
must_prewrite_flashback_key(
&mut engine,
start_key,
flashback_version,
flashback_start_ts
),
1
);
// Flashback (b"c", v2) to (b"c", v1).
assert_eq!(
must_flashback_write_to_version(
&mut engine,
k,
4,
flashback_version,
flashback_start_ts,
flashback_commit_ts
),
Expand All @@ -634,27 +651,37 @@ pub mod tests {
must_commit_flashback_key(
&mut engine,
start_key,
flashback_version,
flashback_start_ts,
flashback_commit_ts
),
2
);
must_get(&mut engine, k, ts, v1);
must_get(&mut engine, prewrite_key, ts, prewrite_val);

must_get_none(&mut engine, prewrite_key, ts);
must_get_none(&mut engine, k, ts);
// case 2: start key is after all keys, prewrite will return None.
let start_key = b"d";
let flashback_start_ts = *ts.incr();
// Rollback nothing.
assert_eq!(must_rollback_lock(&mut engine, k, flashback_start_ts), 0);
// Prewrite null.
assert_eq!(
must_prewrite_flashback_key(&mut engine, start_key, 4, flashback_start_ts),
must_prewrite_flashback_key(
&mut engine,
start_key,
flashback_version,
flashback_start_ts
),
0
);
// case 3: for last region, end_key will be None, prewrite key will valid.
let first_key = get_first_user_key(&mut reader, &Key::from_raw(b"a"), None)
.unwrap_or_else(|_| Some(Key::from_raw(b"")));
assert_eq!(first_key, Some(Key::from_raw(prewrite_key)));
must_get_none(&mut engine, prewrite_key, ts);
must_get_none(&mut engine, k, ts);
// case 3: for last region, end_key will be None, prewrite key will be valid.
assert_eq!(
get_first_user_key(&mut reader, &Key::from_raw(b"a"), None, flashback_version)
.unwrap()
.unwrap(),
Key::from_raw(prewrite_key)
);
}
}
35 changes: 24 additions & 11 deletions src/storage/txn/commands/flashback_to_version_read_phase.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::ops::Bound;

// #[PerformanceCriticalPath]
use txn_types::{Key, Lock, TimeStamp};

Expand Down Expand Up @@ -109,20 +111,25 @@ impl CommandExt for FlashbackToVersionReadPhase {
/// - Scan all locks.
/// - Rollback all these locks.
/// 2. [PrepareFlashback] Prewrite phase:
/// - Prewrite the `self.start_key` specifically to prevent the
/// `resolved_ts` from advancing.
/// - Prewrite the first user key after `self.start_key` specifically to
/// prevent the `resolved_ts` from advancing.
/// 3. [FinishFlashback] FlashbackWrite phase:
/// - Scan all the latest writes and their corresponding values at
/// `self.version`.
/// - Write the old MVCC version writes again for all these keys with
/// `self.commit_ts` excluding the `self.start_key`.
/// `self.commit_ts` excluding the first user key after `self.start_key`.
/// 4. [FinishFlashback] Commit phase:
/// - Commit the `self.start_key` we write at the second phase to finish the
/// flashback.
/// - Commit the first user key after `self.start_key` we write at the
/// second phase to finish the flashback.
impl<S: Snapshot> ReadCommand<S> for FlashbackToVersionReadPhase {
fn process_read(self, snapshot: S, statistics: &mut Statistics) -> Result<ProcessResult> {
let tag = self.tag().get_str();
let mut reader = MvccReader::new_with_ctx(snapshot, Some(ScanMode::Forward), &self.ctx);
// Filter out the SST that does not have a newer version than `self.version` in
// `CF_WRITE`, i.e, whose latest `commit_ts` <= `self.version` in the later
// scan. By doing this, we can only flashback those keys that have version
// changed since `self.version` as much as possible.
reader.set_hint_min_ts(Some(Bound::Excluded(self.version)));
let mut start_key = self.start_key.clone();
let next_state = match self.state {
FlashbackToVersionState::RollbackLock { next_lock_key, .. } => {
Expand All @@ -141,9 +148,12 @@ impl<S: Snapshot> ReadCommand<S> for FlashbackToVersionReadPhase {
// completion of the 2pc.
// - To make sure the key locked in the latch is the same as the actual key
// written, we pass it to the key in `process_write' after getting it.
let key_to_lock = if let Some(first_key) =
get_first_user_key(&mut reader, &self.start_key, self.end_key.as_ref())?
{
let key_to_lock = if let Some(first_key) = get_first_user_key(
&mut reader,
&self.start_key,
self.end_key.as_ref(),
self.version,
)? {
first_key
} else {
// If the key is None return directly
Expand Down Expand Up @@ -180,9 +190,12 @@ impl<S: Snapshot> ReadCommand<S> for FlashbackToVersionReadPhase {
// 2pc. So When overwriting the write, we skip the immediate
// write of this key and instead put it after the completion
// of the 2pc.
next_write_key = if let Some(first_key) =
get_first_user_key(&mut reader, &self.start_key, self.end_key.as_ref())?
{
next_write_key = if let Some(first_key) = get_first_user_key(
&mut reader,
&self.start_key,
self.end_key.as_ref(),
self.version,
)? {
first_key
} else {
// If the key is None return directly
Expand Down

0 comments on commit 029be44

Please sign in to comment.