Skip to content

Commit

Permalink
cherry pick tikv#7379 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Apr 21, 2020
1 parent 82f37e5 commit fdd4941
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 203 deletions.
22 changes: 10 additions & 12 deletions src/server/lock_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,15 @@ impl LockManagerTrait for LockManager {
fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Option<Vec<u64>>,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
) {
// If `hashes` is some, there may be some waiters waiting for these locks.
// Try to wake up them.
if self.has_waiter() {
if let Some(hashes) = hashes {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
if !hashes.is_empty() && self.has_waiter() {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
// If a pessimistic transaction is committed or rolled back and it once sent requests to
// detect deadlock, clean up its wait-for entries in the deadlock detector.
Expand Down Expand Up @@ -361,7 +359,7 @@ mod tests {
Some(WaitTimeout::Default),
);
assert!(lock_mgr.has_waiter());
lock_mgr.wake_up(lock.ts, Some(vec![lock.hash]), 30.into(), false);
lock_mgr.wake_up(lock.ts, vec![lock.hash], 30.into(), false);
assert_elapsed(
|| expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, 30.into()),
0,
Expand Down Expand Up @@ -396,7 +394,7 @@ mod tests {
200,
);
// Waiter2 releases its lock.
lock_mgr.wake_up(20.into(), Some(vec![20]), 20.into(), true);
lock_mgr.wake_up(20.into(), vec![20], 20.into(), true);
assert_elapsed(
|| expect_write_conflict(f1.wait().unwrap(), 10.into(), lock_info1, 20.into()),
0,
Expand All @@ -418,27 +416,27 @@ mod tests {
);
assert!(lock_mgr.has_waiter());
assert_eq!(lock_mgr.remove_from_detected(30.into()), !is_first_lock);
lock_mgr.wake_up(40.into(), Some(vec![40]), 40.into(), false);
lock_mgr.wake_up(40.into(), vec![40], 40.into(), false);
f.wait().unwrap().unwrap_err();
}
assert!(!lock_mgr.has_waiter());

// If key_hashes is none, no wake up.
let prev_wake_up = TASK_COUNTER_METRICS.with(|m| m.wake_up.get());
lock_mgr.wake_up(10.into(), None, 10.into(), false);
lock_mgr.wake_up(10.into(), vec![], 10.into(), false);
assert_eq!(TASK_COUNTER_METRICS.with(|m| m.wake_up.get()), prev_wake_up);

// If it's non-pessimistic-txn, no clean up.
let prev_clean_up = TASK_COUNTER_METRICS.with(|m| m.clean_up.get());
lock_mgr.wake_up(10.into(), None, 10.into(), false);
lock_mgr.wake_up(10.into(), vec![], 10.into(), false);
assert_eq!(
TASK_COUNTER_METRICS.with(|m| m.clean_up.get()),
prev_clean_up
);

// If the txn doesn't wait for locks, no clean up.
let prev_clean_up = TASK_COUNTER_METRICS.with(|m| m.clean_up.get());
lock_mgr.wake_up(10.into(), None, 10.into(), true);
lock_mgr.wake_up(10.into(), vec![], 10.into(), true);
assert_eq!(
TASK_COUNTER_METRICS.with(|m| m.clean_up.get()),
prev_clean_up
Expand Down
4 changes: 2 additions & 2 deletions src/storage/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub trait LockManager: Clone + Send + 'static {
fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Option<Vec<u64>>,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
);
Expand Down Expand Up @@ -100,7 +100,7 @@ impl LockManager for DummyLockManager {
fn wake_up(
&self,
_lock_ts: TimeStamp,
_hashes: Option<Vec<u64>>,
_hashes: Vec<u64>,
_commit_ts: TimeStamp,
_is_pessimistic_txn: bool,
) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod reader;
mod txn;

pub use self::reader::*;
pub use self::txn::{MvccTxn, MAX_TXN_WRITE_SIZE};
pub use self::txn::{MvccTxn, ReleasedLock, MAX_TXN_WRITE_SIZE};
pub use crate::new_txn;
pub use txn_types::{
Key, Lock, LockType, Mutation, TimeStamp, Value, Write, WriteRef, WriteType,
Expand Down
80 changes: 54 additions & 26 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@ pub struct GcInfo {
pub is_completed: bool,
}

/// `ReleasedLock` contains the information of the lock released by `commit`, `rollback` and so on.
/// It's used by `LockManager` to wake up transactions waiting for locks.
#[derive(Debug)]
pub struct ReleasedLock {
/// The hash value of the lock.
pub hash: u64,
/// Whether it is a pessimistic lock.
pub pessimistic: bool,
}

impl ReleasedLock {
fn new(key: &Key, pessimistic: bool) -> Self {
Self {
hash: key.gen_hash(),
pessimistic,
}
}
}

pub struct MvccTxn<S: Snapshot> {
reader: MvccReader<S>,
start_ts: TimeStamp,
Expand Down Expand Up @@ -72,6 +91,10 @@ impl<S: Snapshot> MvccTxn<S> {
self.collapse_rollback = collapse;
}

pub fn set_start_ts(&mut self, start_ts: TimeStamp) {
self.start_ts = start_ts;
}

pub fn into_modifies(self) -> Vec<Modify> {
self.writes
}
Expand All @@ -92,10 +115,12 @@ impl<S: Snapshot> MvccTxn<S> {
self.writes.push(write);
}

fn unlock_key(&mut self, key: Key) {
fn unlock_key(&mut self, key: Key, pessimistic: bool) -> Option<ReleasedLock> {
let released = ReleasedLock::new(&key, pessimistic);
let write = Modify::Delete(CF_LOCK, key);
self.write_size += write.size();
self.writes.push(write);
Some(released)
}

fn put_value(&mut self, key: Key, ts: TimeStamp, value: Value) {
Expand Down Expand Up @@ -161,7 +186,12 @@ impl<S: Snapshot> MvccTxn<S> {
self.put_lock(key, &lock);
}

fn rollback_lock(&mut self, key: Key, lock: &Lock, is_pessimistic_txn: bool) -> Result<()> {
fn rollback_lock(
&mut self,
key: Key,
lock: &Lock,
is_pessimistic_txn: bool,
) -> Result<Option<ReleasedLock>> {
// If prewrite type is DEL or LOCK or PESSIMISTIC, it is no need to delete value.
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
self.delete_value(key.clone(), lock.ts);
Expand All @@ -171,11 +201,10 @@ impl<S: Snapshot> MvccTxn<S> {
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
let write = Write::new_rollback(self.start_ts, protected);
self.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
self.unlock_key(key.clone());
if self.collapse_rollback {
self.collapse_prev_rollback(key)?;
self.collapse_prev_rollback(key.clone())?;
}
Ok(())
Ok(self.unlock_key(key, is_pessimistic_txn))
}

/// Checks the existence of the key according to `should_not_exist`.
Expand Down Expand Up @@ -556,7 +585,7 @@ impl<S: Snapshot> MvccTxn<S> {
Ok(())
}

pub fn commit(&mut self, key: Key, commit_ts: TimeStamp) -> Result<bool> {
pub fn commit(&mut self, key: Key, commit_ts: TimeStamp) -> Result<Option<ReleasedLock>> {
fail_point!("commit", |err| Err(make_txn_error(
err,
&key,
Expand Down Expand Up @@ -629,7 +658,7 @@ impl<S: Snapshot> MvccTxn<S> {
| Some((_, WriteType::Delete))
| Some((_, WriteType::Lock)) => {
MVCC_DUPLICATE_CMD_COUNTER_VEC.commit.inc();
Ok(false)
Ok(None)
}
};
}
Expand All @@ -640,11 +669,10 @@ impl<S: Snapshot> MvccTxn<S> {
short_value,
);
self.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
self.unlock_key(key);
Ok(is_pessimistic_txn)
Ok(self.unlock_key(key, is_pessimistic_txn))
}

pub fn rollback(&mut self, key: Key) -> Result<bool> {
pub fn rollback(&mut self, key: Key) -> Result<Option<ReleasedLock>> {
fail_point!("rollback", |err| Err(make_txn_error(
err,
&key,
Expand Down Expand Up @@ -707,7 +735,7 @@ impl<S: Snapshot> MvccTxn<S> {
///
/// Returns whether the lock is a pessimistic lock. Returns error if the key has already been
/// committed.
pub fn cleanup(&mut self, key: Key, current_ts: TimeStamp) -> Result<bool> {
pub fn cleanup(&mut self, key: Key, current_ts: TimeStamp) -> Result<Option<ReleasedLock>> {
fail_point!("cleanup", |err| Err(make_txn_error(
err,
&key,
Expand All @@ -727,8 +755,7 @@ impl<S: Snapshot> MvccTxn<S> {
}

let is_pessimistic_txn = !lock.for_update_ts.is_zero();
self.rollback_lock(key, lock, is_pessimistic_txn)?;
Ok(is_pessimistic_txn)
self.rollback_lock(key, lock, is_pessimistic_txn)
}
_ => match self.check_txn_status_missing_lock(key, true)? {
TxnStatus::Committed { commit_ts } => {
Expand All @@ -738,16 +765,20 @@ impl<S: Snapshot> MvccTxn<S> {
TxnStatus::RolledBack => {
// Return Ok on Rollback already exist.
MVCC_DUPLICATE_CMD_COUNTER_VEC.rollback.inc();
Ok(false)
Ok(None)
}
TxnStatus::LockNotExist => Ok(false),
TxnStatus::LockNotExist => Ok(None),
_ => unreachable!(),
},
}
}

/// Delete any pessimistic lock with small for_update_ts belongs to this transaction.
pub fn pessimistic_rollback(&mut self, key: Key, for_update_ts: TimeStamp) -> Result<()> {
pub fn pessimistic_rollback(
&mut self,
key: Key,
for_update_ts: TimeStamp,
) -> Result<Option<ReleasedLock>> {
fail_point!("pessimistic_rollback", |err| Err(make_txn_error(
err,
&key,
Expand All @@ -760,10 +791,10 @@ impl<S: Snapshot> MvccTxn<S> {
&& lock.ts == self.start_ts
&& lock.for_update_ts <= for_update_ts
{
self.unlock_key(key);
return Ok(self.unlock_key(key, true));
}
}
Ok(())
Ok(None)
}

fn collapse_prev_rollback(&mut self, key: Key) -> Result<()> {
Expand Down Expand Up @@ -839,7 +870,7 @@ impl<S: Snapshot> MvccTxn<S> {
caller_start_ts: TimeStamp,
current_ts: TimeStamp,
rollback_if_not_exist: bool,
) -> Result<(TxnStatus, bool)> {
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
fail_point!("check_txn_status", |err| Err(make_txn_error(
err,
&primary_key,
Expand All @@ -853,9 +884,9 @@ impl<S: Snapshot> MvccTxn<S> {

if lock.ts.physical() + lock.ttl < current_ts.physical() {
// If the lock is expired, clean it up.
self.rollback_lock(primary_key, lock, is_pessimistic_txn)?;
let released = self.rollback_lock(primary_key, lock, is_pessimistic_txn)?;
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.rollback.inc();
return Ok((TxnStatus::TtlExpire, is_pessimistic_txn));
return Ok((TxnStatus::TtlExpire, released));
}

// If lock.minCommitTS is 0, it's not a large transaction and we can't push forward
Expand All @@ -873,14 +904,11 @@ impl<S: Snapshot> MvccTxn<S> {
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.update_ts.inc();
}

Ok((
TxnStatus::uncommitted(lock.ttl, lock.min_commit_ts),
is_pessimistic_txn,
))
Ok((TxnStatus::uncommitted(lock.ttl, lock.min_commit_ts), None))
}
_ => self
.check_txn_status_missing_lock(primary_key, rollback_if_not_exist)
.map(|s| (s, false)),
.map(|s| (s, None)),
}
}

Expand Down
Loading

0 comments on commit fdd4941

Please sign in to comment.