Skip to content

Commit

Permalink
txn: only wake up waiters when locks are indeed released (tikv#7379)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Apr 21, 2020
1 parent f3d2ba4 commit 29750ab
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 161 deletions.
31 changes: 31 additions & 0 deletions benches/misc/storage/key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use rand::{self, Rng, RngCore};
use tikv::coprocessor::codec::{datum, table, Datum};
use tikv::storage::Key;

#[inline]
fn gen_rand_str(len: usize) -> Vec<u8> {
let mut rand_str = vec![0; len];
rand::thread_rng().fill_bytes(&mut rand_str);
rand_str
}

#[bench]
fn bench_row_key_gen_hash(b: &mut test::Bencher) {
let id: i64 = rand::thread_rng().gen();
let row_key = Key::from_raw(&table::encode_row_key(id, id));
b.iter(|| {
test::black_box(row_key.gen_hash());
});
}

#[bench]
fn bench_index_key_gen_hash(b: &mut test::Bencher) {
let id: i64 = rand::thread_rng().gen();
let encoded_index_val = datum::encode_key(&[Datum::Bytes(gen_rand_str(64))]).unwrap();
let index_key = Key::from_raw(&table::encode_index_seek_key(id, id, &encoded_index_val));
b.iter(|| {
test::black_box(index_key.gen_hash());
});
}
1 change: 1 addition & 0 deletions benches/misc/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

mod key;
mod scan;
28 changes: 10 additions & 18 deletions src/server/lock_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,20 +230,12 @@ impl LockMgr for LockManager {
}
}

fn wake_up(
&self,
lock_ts: u64,
hashes: Option<Vec<u64>>,
commit_ts: u64,
is_pessimistic_txn: bool,
) {
fn wake_up(&self, lock_ts: u64, hashes: Vec<u64>, commit_ts: u64, is_pessimistic_txn: bool) {
// If `hashes` is some, there may be some waiters waiting for these locks.
// Try to wake up them.
if self.has_waiter() {
if let Some(hashes) = hashes {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
if !hashes.is_empty() && self.has_waiter() {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
// If a pessimistic transaction is committed or rolled back and it once sent requests to
// detect deadlock, clean up its wait-for entries in the deadlock detector.
Expand Down Expand Up @@ -326,7 +318,7 @@ mod tests {
let (waiter, lock_info, f) = new_test_waiter(waiter_ts, lock.ts, lock.hash);
lock_mgr.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, true, 0);
assert!(lock_mgr.has_waiter());
lock_mgr.wake_up(lock.ts, Some(vec![lock.hash]), 30, false);
lock_mgr.wake_up(lock.ts, vec![lock.hash], 30, false);
assert_elapsed(
|| expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, 30),
0,
Expand Down Expand Up @@ -361,7 +353,7 @@ mod tests {
200,
);
// Waiter2 releases its lock.
lock_mgr.wake_up(20, Some(vec![20]), 20, true);
lock_mgr.wake_up(20, vec![20], 20, true);
assert_elapsed(
|| expect_write_conflict(f1.wait().unwrap(), 10, lock_info1, 20),
0,
Expand All @@ -383,24 +375,24 @@ mod tests {
);
assert!(lock_mgr.has_waiter());
assert_eq!(lock_mgr.remove_from_detected(30), !is_first_lock);
lock_mgr.wake_up(40, Some(vec![40]), 40, false);
lock_mgr.wake_up(40, vec![40], 40, false);
f.wait().unwrap().unwrap_err();
}
assert!(!lock_mgr.has_waiter());

// If key_hashes is none, no wake up.
let prev_wake_up = TASK_COUNTER_VEC.wake_up.get();
lock_mgr.wake_up(10, None, 10, false);
lock_mgr.wake_up(10, vec![], 10, false);
assert_eq!(TASK_COUNTER_VEC.wake_up.get(), prev_wake_up);

// If it's non-pessimistic-txn, no clean up.
let prev_clean_up = TASK_COUNTER_VEC.clean_up.get();
lock_mgr.wake_up(10, None, 10, false);
lock_mgr.wake_up(10, vec![], 10, false);
assert_eq!(TASK_COUNTER_VEC.clean_up.get(), prev_clean_up);

// If the txn doesn't wait for locks, no clean up.
let prev_clean_up = TASK_COUNTER_VEC.clean_up.get();
lock_mgr.wake_up(10, None, 10, true);
lock_mgr.wake_up(10, vec![], 10, true);
assert_eq!(TASK_COUNTER_VEC.clean_up.get(), prev_clean_up);

// If timeout is negative, no wait for.
Expand Down
10 changes: 2 additions & 8 deletions src/storage/lock_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ pub trait LockMgr: Clone + Send + 'static {
);

/// The locks with `lock_ts` and `hashes` are released, trys to wake up transactions.
fn wake_up(
&self,
lock_ts: u64,
hashes: Option<Vec<u64>>,
commit_ts: u64,
is_pessimistic_txn: bool,
);
fn wake_up(&self, lock_ts: u64, hashes: Vec<u64>, commit_ts: u64, is_pessimistic_txn: bool);

/// Returns true if there are waiters in the `LockMgr`.
///
Expand Down Expand Up @@ -68,7 +62,7 @@ impl LockMgr for DummyLockMgr {
fn wake_up(
&self,
_lock_ts: u64,
_hashes: Option<Vec<u64>>,
_hashes: Vec<u64>,
_commit_ts: u64,
_is_pessimistic_txn: bool,
) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod write;

pub use self::lock::{Lock, LockType};
pub use self::reader::*;
pub use self::txn::{MvccTxn, MAX_TXN_WRITE_SIZE};
pub use self::txn::{MvccTxn, ReleasedLock, MAX_TXN_WRITE_SIZE};
pub use self::write::{Write, WriteType};

use std::error;
Expand Down
67 changes: 49 additions & 18 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@ pub struct GcInfo {
pub is_completed: bool,
}

/// `ReleasedLock` contains the information of the lock released by `commit`, `rollback` and so on.
/// It's used by `LockManager` to wake up transactions waiting for locks.
#[derive(Debug)]
pub struct ReleasedLock {
/// The hash value of the lock.
pub hash: u64,
/// Whether it is a pessimistic lock.
pub pessimistic: bool,
}

impl ReleasedLock {
fn new(key: &Key, pessimistic: bool) -> Self {
Self {
hash: key.gen_hash(),
pessimistic,
}
}
}

pub struct MvccTxn<S: Snapshot> {
reader: MvccReader<S>,
gc_reader: MvccReader<S>,
Expand Down Expand Up @@ -72,6 +91,10 @@ impl<S: Snapshot> MvccTxn<S> {
self.collapse_rollback = collapse;
}

pub fn set_start_ts(&mut self, start_ts: u64) {
self.start_ts = start_ts;
}

pub fn into_modifies(self) -> Vec<Modify> {
self.writes
}
Expand Down Expand Up @@ -113,9 +136,11 @@ impl<S: Snapshot> MvccTxn<S> {
self.put_lock(key, &lock);
}

fn unlock_key(&mut self, key: Key) {
fn unlock_key(&mut self, key: Key, pessimistic: bool) -> Option<ReleasedLock> {
let released = ReleasedLock::new(&key, pessimistic);
self.write_size += CF_LOCK.len() + key.as_encoded().len();
self.writes.push(Modify::Delete(CF_LOCK, key));
Some(released)
}

fn put_value(&mut self, key: Key, ts: u64, value: Value) {
Expand Down Expand Up @@ -171,7 +196,12 @@ impl<S: Snapshot> MvccTxn<S> {
}
}

fn rollback_lock(&mut self, key: Key, lock: &Lock, is_pessimistic_txn: bool) -> Result<()> {
fn rollback_lock(
&mut self,
key: Key,
lock: &Lock,
is_pessimistic_txn: bool,
) -> Result<Option<ReleasedLock>> {
// If prewrite type is DEL or LOCK or PESSIMISTIC, it is no need to delete value.
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
self.delete_value(key.clone(), lock.ts);
Expand All @@ -181,11 +211,10 @@ impl<S: Snapshot> MvccTxn<S> {
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
let write = Write::new_rollback(self.start_ts, protected);
self.put_write(key.clone(), self.start_ts, write.to_bytes());
self.unlock_key(key.clone());
if self.collapse_rollback {
self.collapse_prev_rollback(key)?;
self.collapse_prev_rollback(key.clone())?;
}
Ok(())
Ok(self.unlock_key(key, is_pessimistic_txn))
}

fn check_data_constraint(
Expand Down Expand Up @@ -472,7 +501,7 @@ impl<S: Snapshot> MvccTxn<S> {
Ok(())
}

pub fn commit(&mut self, key: Key, commit_ts: u64) -> Result<bool> {
pub fn commit(&mut self, key: Key, commit_ts: u64) -> Result<Option<ReleasedLock>> {
let (lock_type, short_value, is_pessimistic_txn) = match self.reader.load_lock(&key)? {
Some(ref mut lock) if lock.ts == self.start_ts => {
// It's an abnormal routine since pessimistic locks shouldn't be committed in our
Expand Down Expand Up @@ -519,7 +548,7 @@ impl<S: Snapshot> MvccTxn<S> {
| Some((_, WriteType::Delete))
| Some((_, WriteType::Lock)) => {
MVCC_DUPLICATE_CMD_COUNTER_VEC.commit.inc();
Ok(false)
Ok(None)
}
};
}
Expand All @@ -530,11 +559,10 @@ impl<S: Snapshot> MvccTxn<S> {
short_value,
);
self.put_write(key.clone(), commit_ts, write.to_bytes());
self.unlock_key(key);
Ok(is_pessimistic_txn)
Ok(self.unlock_key(key, is_pessimistic_txn))
}

pub fn rollback(&mut self, key: Key) -> Result<bool> {
pub fn rollback(&mut self, key: Key) -> Result<Option<ReleasedLock>> {
self.cleanup(key, 0)
}

Expand All @@ -544,7 +572,7 @@ impl<S: Snapshot> MvccTxn<S> {
///
/// Returns whether the lock is a pessimistic lock. Returns error if the key has already been
/// committed.
pub fn cleanup(&mut self, key: Key, current_ts: u64) -> Result<bool> {
pub fn cleanup(&mut self, key: Key, current_ts: u64) -> Result<Option<ReleasedLock>> {
match self.reader.load_lock(&key)? {
Some(ref lock) if lock.ts == self.start_ts => {
// If current_ts is not 0, check the Lock's TTL.
Expand All @@ -558,16 +586,15 @@ impl<S: Snapshot> MvccTxn<S> {
}

let is_pessimistic_txn = lock.for_update_ts != 0;
self.rollback_lock(key, lock, is_pessimistic_txn)?;
Ok(is_pessimistic_txn)
self.rollback_lock(key, lock, is_pessimistic_txn)
}
_ => {
match self.reader.get_txn_commit_info(&key, self.start_ts)? {
Some((ts, write_type)) => {
if write_type == WriteType::Rollback {
// return Ok on Rollback already exist
MVCC_DUPLICATE_CMD_COUNTER_VEC.rollback.inc();
Ok(false)
Ok(None)
} else {
MVCC_CONFLICT_COUNTER.rollback_committed.inc();
info!(
Expand All @@ -593,24 +620,28 @@ impl<S: Snapshot> MvccTxn<S> {
// [issue #7364](https://github.com/tikv/tikv/issues/7364)
let write = Write::new_rollback(ts, true);
self.put_write(key, ts, write.to_bytes());
Ok(false)
Ok(None)
}
}
}
}
}

/// Delete any pessimistic lock with small for_update_ts belongs to this transaction.
pub fn pessimistic_rollback(&mut self, key: Key, for_update_ts: u64) -> Result<()> {
pub fn pessimistic_rollback(
&mut self,
key: Key,
for_update_ts: u64,
) -> Result<Option<ReleasedLock>> {
if let Some(lock) = self.reader.load_lock(&key)? {
if lock.lock_type == LockType::Pessimistic
&& lock.ts == self.start_ts
&& lock.for_update_ts <= for_update_ts
{
self.unlock_key(key);
return Ok(self.unlock_key(key, true));
}
}
Ok(())
Ok(None)
}

fn collapse_prev_rollback(&mut self, key: Key) -> Result<()> {
Expand Down
Loading

0 comments on commit 29750ab

Please sign in to comment.