Skip to content

Commit

Permalink
Add some test cases
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Sep 2, 2022
1 parent 89b5f6e commit c9f43f7
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
1 change: 0 additions & 1 deletion components/test_raftstore/src/util.rs
Expand Up @@ -931,7 +931,6 @@ pub fn must_kv_prewrite_with(
);
}

// Disk full test interface.
pub fn try_kv_prewrite_with(
client: &TikvClient,
ctx: Context,
Expand Down
3 changes: 3 additions & 0 deletions src/server/service/kv.rs
Expand Up @@ -1721,6 +1721,9 @@ fn future_flashback_to_version<
Err(e) => Err(e),
Ok(_) => f.await?,
};
fail_point!("skip_finish_flashback_to_version", |_| {
return Ok(FlashbackToVersionResponse::default());
});
// Send a `SignificantMsg::FinishFlashback` to notify the raftstore that the
// flashback has been finished.
raft_router_clone.significant_send(region_id, SignificantMsg::FinishFlashback)?;
Expand Down
6 changes: 5 additions & 1 deletion src/storage/txn/scheduler.rs
Expand Up @@ -568,7 +568,11 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
pb_ctx: task.cmd.ctx(),
..Default::default()
};
if let Command::FlashbackToVersionReadPhase { .. } = task.cmd {
if matches!(
task.cmd,
Command::FlashbackToVersionReadPhase { .. }
| Command::FlashbackToVersion { .. }
) {
snap_ctx.for_flashback = true;
}
// The program is currently in scheduler worker threads.
Expand Down
122 changes: 122 additions & 0 deletions tests/integrations/server/kv_service.rs
Expand Up @@ -596,6 +596,128 @@ fn test_mvcc_resolve_lock_gc_and_delete() {
assert!(del_resp.error.is_empty());
}

#[test]
fn test_mvcc_flashback() {
let (_cluster, client, ctx) = must_new_cluster_and_kv_client();
let mut ts = 0;
let k = b"key".to_vec();
for i in 0..10 {
let v = format!("value@{}", i).into_bytes();
// Prewrite
ts += 1;
let prewrite_start_version = ts;
let mut mutation = Mutation::default();
mutation.set_op(Op::Put);
mutation.set_key(k.clone());
mutation.set_value(v.clone());
must_kv_prewrite(
&client,
ctx.clone(),
vec![mutation],
k.clone(),
prewrite_start_version,
);
// Commit
ts += 1;
let commit_version = ts;
must_kv_commit(
&client,
ctx.clone(),
vec![k.clone()],
prewrite_start_version,
commit_version,
commit_version,
);
// Get
ts += 1;
must_kv_read_equal(&client, ctx.clone(), k.clone(), v.clone(), ts)
}
// Prewrite to leave a lock.
ts += 1;
let prewrite_start_version = ts;
let mut mutation = Mutation::default();
mutation.set_op(Op::Put);
mutation.set_key(k.clone());
mutation.set_value(b"value@latest".to_vec());
must_kv_prewrite(
&client,
ctx.clone(),
vec![mutation],
k.clone(),
prewrite_start_version,
);
ts += 1;
let get_version = ts;
let mut get_req = GetRequest::default();
get_req.set_context(ctx.clone());
get_req.key = k.clone();
get_req.version = get_version;
let get_resp = client.kv_get(&get_req).unwrap();
assert!(!get_resp.has_region_error());
assert!(get_resp.has_error());
assert!(get_resp.value.is_empty());
// Flashback
let mut flashback_to_version_req = FlashbackToVersionRequest::default();
flashback_to_version_req.set_context(ctx.clone());
flashback_to_version_req.version = 5;
flashback_to_version_req.start_key = b"a".to_vec();
flashback_to_version_req.end_key = b"z".to_vec();
let flashback_resp = client
.kv_flashback_to_version(&flashback_to_version_req)
.unwrap();
assert!(!flashback_resp.has_region_error());
assert!(flashback_resp.get_error().is_empty());
// Should not meet the lock and can not get the latest data any more.
must_kv_read_equal(&client, ctx.clone(), k.clone(), b"value@1".to_vec(), ts);
}

#[test]
#[cfg(feature = "failpoints")]
fn test_mvcc_flashback_block_rw() {
let (_cluster, client, ctx) = must_new_cluster_and_kv_client();
fail::cfg("skip_finish_flashback_to_version", "return").unwrap();
// Flashback
let mut flashback_to_version_req = FlashbackToVersionRequest::default();
flashback_to_version_req.set_context(ctx.clone());
flashback_to_version_req.version = 0;
flashback_to_version_req.start_key = b"a".to_vec();
flashback_to_version_req.end_key = b"z".to_vec();
let flashback_resp = client
.kv_flashback_to_version(&flashback_to_version_req)
.unwrap();
assert!(!flashback_resp.has_region_error());
assert!(flashback_resp.get_error().is_empty());
// Try to read.
let (k, v) = (b"key".to_vec(), b"value".to_vec());
// Get
let mut get_req = GetRequest::default();
get_req.set_context(ctx.clone());
get_req.key = k.clone();
get_req.version = 1;
let get_resp = client.kv_get(&get_req).unwrap();
assert!(get_resp.has_region_error());
assert!(!get_resp.has_error());
assert!(get_resp.value.is_empty());
// Scan
let mut scan_req = ScanRequest::default();
scan_req.set_context(ctx.clone());
scan_req.start_key = k.clone();
scan_req.limit = 1;
scan_req.version = 1;
let scan_resp = client.kv_scan(&scan_req).unwrap();
assert!(scan_resp.has_region_error());
assert!(scan_resp.pairs.is_empty());
// Try to write.
// Prewrite
let mut mutation = Mutation::default();
mutation.set_op(Op::Put);
mutation.set_key(k.clone());
mutation.set_value(v.clone());
let prewrite_resp = try_kv_prewrite(&client, ctx.clone(), vec![mutation], k.clone(), 1);
assert!(prewrite_resp.has_region_error());
fail::remove("skip_finish_flashback_to_version");
}

// raft related RPC is tested as parts of test_snapshot.rs, so skip here.

#[test]
Expand Down

0 comments on commit c9f43f7

Please sign in to comment.