Skip to content

Commit

Permalink
mvcc/reader: scan versions from u64::max to start_ts in get_txn… (tik…
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 committed Jul 11, 2019
1 parent 5e12aa1 commit 46aa12e
Showing 1 changed file with 52 additions and 16 deletions.
68 changes: 52 additions & 16 deletions src/storage/mvcc/reader/reader.rs
Expand Up @@ -256,19 +256,20 @@ impl<S: Snapshot> MvccReader<S> {
key: &Key,
start_ts: u64,
) -> Result<Option<(u64, WriteType)>> {
let mut seek_ts = start_ts;
while let Some((commit_ts, write)) = self.reverse_seek_write(key, seek_ts)? {
// It's possible a txn with a small `start_ts` has a greater `commit_ts` than a txn with
// a greater `start_ts` in pessimistic transaction.
// I.e., txn_1.commit_ts > txn_2.commit_ts > txn_2.start_ts > txn_1.start_ts.
//
// Scan all the versions from `u64::max_value()` to `start_ts`.
let mut seek_ts = u64::max_value();
while let Some((commit_ts, write)) = self.seek_write(key, seek_ts)? {
if write.start_ts == start_ts {
return Ok(Some((commit_ts, write.write_type)));
}

// If we reach a commit version whose type is not Rollback and start ts is
// larger than the given start ts, stop searching.
if write.write_type != WriteType::Rollback && write.start_ts > start_ts {
if commit_ts <= start_ts {
break;
}

seek_ts = commit_ts + 1;
seek_ts = commit_ts - 1;
}
Ok(None)
}
Expand Down Expand Up @@ -811,9 +812,9 @@ mod tests {
let snap = RegionSnapshot::from_raw(Arc::clone(&db), region.clone());
let mut reader = MvccReader::new(snap, None, false, None, None, IsolationLevel::SI);

// Let's assume `45_40 PUT` means a commit version with start ts is 35 and commit ts
// is 40.
// Commit versions: [45_40 PUT, 40_35 PUT, 30_25 PUT, 20_20 Rollback, 10_1 PUT, 5_5 Rollback].
// Let's assume `50_45 PUT` means a commit version with start ts is 45 and commit ts
// is 50.
// Commit versions: [50_45 PUT, 45_40 PUT, 40_35 PUT, 30_25 PUT, 20_20 Rollback, 10_1 PUT, 5_5 Rollback].
let key = Key::from_raw(k);
let (commit_ts, write_type) = reader.get_txn_commit_info(&key, 45).unwrap().unwrap();
assert_eq!(commit_ts, 50);
Expand All @@ -839,12 +840,47 @@ mod tests {
assert_eq!(commit_ts, 5);
assert_eq!(write_type, WriteType::Rollback);

let seek_for_prev_old = reader.get_statistics().write.seek_for_prev;
assert!(reader.get_txn_commit_info(&key, 15).unwrap().is_none());
let seek_for_prev_new = reader.get_statistics().write.seek_for_prev;
let seek_old = reader.get_statistics().write.seek;
assert!(reader.get_txn_commit_info(&key, 30).unwrap().is_none());
let seek_new = reader.get_statistics().write.seek;

// `get_txn_commit_info(&key, 30)` stopped at `30_25 PUT`.
assert_eq!(seek_new - seek_old, 3);
}

#[test]
fn test_get_txn_commit_info_of_pessimistic_txn() {
let path = Builder::new()
.prefix("_test_storage_mvcc_reader_get_txn_commit_info_of_pessimistic_txn")
.tempdir()
.unwrap();
let path = path.path().to_str().unwrap();
let region = make_region(1, vec![], vec![]);
let db = open_db(path, true);
let mut engine = RegionEngine::new(Arc::clone(&db), region.clone());

let (k, v) = (b"k", b"v");
let key = Key::from_raw(k);
let m = Mutation::Put((key.clone(), v.to_vec()));

// txn: start_ts = 2, commit_ts = 3
engine.acquire_pessimistic_lock(key.clone(), k, 2, 2);
engine.prewrite_pessimistic_lock(m.clone(), k, 2);
engine.commit(k, 2, 3);
// txn: start_ts = 1, commit_ts = 4
engine.acquire_pessimistic_lock(key.clone(), k, 1, 3);
engine.prewrite_pessimistic_lock(m, k, 1);
engine.commit(k, 1, 4);

let snap = RegionSnapshot::from_raw(Arc::clone(&db), region.clone());
let mut reader = MvccReader::new(snap, None, false, None, None, IsolationLevel::SI);
let (commit_ts, write_type) = reader.get_txn_commit_info(&key, 2).unwrap().unwrap();
assert_eq!(commit_ts, 3);
assert_eq!(write_type, WriteType::Put);

// `get_txn_commit_info(&key, 15)` stopped at `30_25 PUT`.
assert_eq!(seek_for_prev_new - seek_for_prev_old, 2);
let (commit_ts, write_type) = reader.get_txn_commit_info(&key, 1).unwrap().unwrap();
assert_eq!(commit_ts, 4);
assert_eq!(write_type, WriteType::Put);
}

#[test]
Expand Down

0 comments on commit 46aa12e

Please sign in to comment.