Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: only wake up waiters when locks are indeed released #7379

Merged
merged 6 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/server/lock_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,15 @@ impl LockManagerTrait for LockManager {
fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Option<Vec<u64>>,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
) {
// If `hashes` is some, there may be some waiters waiting for these locks.
// Try to wake up them.
if self.has_waiter() {
if let Some(hashes) = hashes {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
if !hashes.is_empty() && self.has_waiter() {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
// If a pessimistic transaction is committed or rolled back and it once sent requests to
// detect deadlock, clean up its wait-for entries in the deadlock detector.
Expand Down Expand Up @@ -363,7 +361,7 @@ mod tests {
Some(WaitTimeout::Default),
);
assert!(lock_mgr.has_waiter());
lock_mgr.wake_up(lock.ts, Some(vec![lock.hash]), 30.into(), false);
lock_mgr.wake_up(lock.ts, vec![lock.hash], 30.into(), false);
assert_elapsed(
|| expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, 30.into()),
0,
Expand Down Expand Up @@ -398,7 +396,7 @@ mod tests {
200,
);
// Waiter2 releases its lock.
lock_mgr.wake_up(20.into(), Some(vec![20]), 20.into(), true);
lock_mgr.wake_up(20.into(), vec![20], 20.into(), true);
assert_elapsed(
|| expect_write_conflict(f1.wait().unwrap(), 10.into(), lock_info1, 20.into()),
0,
Expand All @@ -420,24 +418,24 @@ mod tests {
);
assert!(lock_mgr.has_waiter());
assert_eq!(lock_mgr.remove_from_detected(30.into()), !is_first_lock);
lock_mgr.wake_up(40.into(), Some(vec![40]), 40.into(), false);
lock_mgr.wake_up(40.into(), vec![40], 40.into(), false);
f.wait().unwrap().unwrap_err();
}
assert!(!lock_mgr.has_waiter());

// If key_hashes is none, no wake up.
let prev_wake_up = TASK_COUNTER_METRICS.wake_up.get();
lock_mgr.wake_up(10.into(), None, 10.into(), false);
lock_mgr.wake_up(10.into(), vec![], 10.into(), false);
assert_eq!(TASK_COUNTER_METRICS.wake_up.get(), prev_wake_up);

// If it's non-pessimistic-txn, no clean up.
let prev_clean_up = TASK_COUNTER_METRICS.clean_up.get();
lock_mgr.wake_up(10.into(), None, 10.into(), false);
lock_mgr.wake_up(10.into(), vec![], 10.into(), false);
assert_eq!(TASK_COUNTER_METRICS.clean_up.get(), prev_clean_up);

// If the txn doesn't wait for locks, no clean up.
let prev_clean_up = TASK_COUNTER_METRICS.clean_up.get();
lock_mgr.wake_up(10.into(), None, 10.into(), true);
lock_mgr.wake_up(10.into(), vec![], 10.into(), true);
assert_eq!(TASK_COUNTER_METRICS.clean_up.get(), prev_clean_up);

// If timeout is negative, no wait for.
Expand Down
4 changes: 2 additions & 2 deletions src/storage/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub trait LockManager: Clone + Send + 'static {
fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Option<Vec<u64>>,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
);
Expand Down Expand Up @@ -100,7 +100,7 @@ impl LockManager for DummyLockManager {
fn wake_up(
&self,
_lock_ts: TimeStamp,
_hashes: Option<Vec<u64>>,
_hashes: Vec<u64>,
_commit_ts: TimeStamp,
_is_pessimistic_txn: bool,
) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod reader;
mod txn;

pub use self::reader::*;
pub use self::txn::{MvccTxn, MAX_TXN_WRITE_SIZE};
pub use self::txn::{MvccTxn, ReleasedLock, MAX_TXN_WRITE_SIZE};
pub use crate::new_txn;
pub use txn_types::{
Key, Lock, LockType, Mutation, TimeStamp, Value, Write, WriteRef, WriteType,
Expand Down
80 changes: 54 additions & 26 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@ pub struct GcInfo {
pub is_completed: bool,
}

/// `ReleasedLock` contains the information of the lock released by `commit`, `rollback` and so on.
/// It's used by `LockManager` to wake up transactions waiting for locks.
#[derive(Debug)]
pub struct ReleasedLock {
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
/// The hash value of the lock.
pub hash: u64,
/// Whether it is a pessimistic lock.
pub pessimistic: bool,
}

impl ReleasedLock {
fn new(key: &Key, pessimistic: bool) -> Self {
Self {
hash: key.gen_hash(),
pessimistic,
}
}
}

pub struct MvccTxn<S: Snapshot> {
reader: MvccReader<S>,
start_ts: TimeStamp,
Expand Down Expand Up @@ -72,6 +91,10 @@ impl<S: Snapshot> MvccTxn<S> {
self.collapse_rollback = collapse;
}

pub fn set_start_ts(&mut self, start_ts: TimeStamp) {
self.start_ts = start_ts;
}

pub fn into_modifies(self) -> Vec<Modify> {
self.writes
}
Expand All @@ -92,10 +115,12 @@ impl<S: Snapshot> MvccTxn<S> {
self.writes.push(write);
}

fn unlock_key(&mut self, key: Key) {
fn unlock_key(&mut self, key: Key, pessimistic: bool) -> Option<ReleasedLock> {
let released = ReleasedLock::new(&key, pessimistic);
let write = Modify::Delete(CF_LOCK, key);
self.write_size += write.size();
self.writes.push(write);
Some(released)
}

fn put_value(&mut self, key: Key, ts: TimeStamp, value: Value) {
Expand Down Expand Up @@ -161,7 +186,12 @@ impl<S: Snapshot> MvccTxn<S> {
self.put_lock(key, &lock);
}

fn rollback_lock(&mut self, key: Key, lock: &Lock, is_pessimistic_txn: bool) -> Result<()> {
fn rollback_lock(
&mut self,
key: Key,
lock: &Lock,
is_pessimistic_txn: bool,
) -> Result<Option<ReleasedLock>> {
// If prewrite type is DEL or LOCK or PESSIMISTIC, it is no need to delete value.
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
self.delete_value(key.clone(), lock.ts);
Expand All @@ -171,11 +201,10 @@ impl<S: Snapshot> MvccTxn<S> {
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
let write = Write::new_rollback(self.start_ts, protected);
self.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
self.unlock_key(key.clone());
if self.collapse_rollback {
self.collapse_prev_rollback(key)?;
self.collapse_prev_rollback(key.clone())?;
}
Ok(())
Ok(self.unlock_key(key, is_pessimistic_txn))
}

/// Checks the existence of the key according to `should_not_exist`.
Expand Down Expand Up @@ -556,7 +585,7 @@ impl<S: Snapshot> MvccTxn<S> {
Ok(())
}

pub fn commit(&mut self, key: Key, commit_ts: TimeStamp) -> Result<bool> {
pub fn commit(&mut self, key: Key, commit_ts: TimeStamp) -> Result<Option<ReleasedLock>> {
fail_point!("commit", |err| Err(make_txn_error(
err,
&key,
Expand Down Expand Up @@ -629,7 +658,7 @@ impl<S: Snapshot> MvccTxn<S> {
| Some((_, WriteType::Delete))
| Some((_, WriteType::Lock)) => {
MVCC_DUPLICATE_CMD_COUNTER_VEC.commit.inc();
Ok(false)
Ok(None)
}
};
}
Expand All @@ -640,11 +669,10 @@ impl<S: Snapshot> MvccTxn<S> {
short_value,
);
self.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
self.unlock_key(key);
Ok(is_pessimistic_txn)
Ok(self.unlock_key(key, is_pessimistic_txn))
}

pub fn rollback(&mut self, key: Key) -> Result<bool> {
pub fn rollback(&mut self, key: Key) -> Result<Option<ReleasedLock>> {
fail_point!("rollback", |err| Err(make_txn_error(
err,
&key,
Expand Down Expand Up @@ -707,7 +735,7 @@ impl<S: Snapshot> MvccTxn<S> {
///
/// Returns whether the lock is a pessimistic lock. Returns error if the key has already been
/// committed.
pub fn cleanup(&mut self, key: Key, current_ts: TimeStamp) -> Result<bool> {
pub fn cleanup(&mut self, key: Key, current_ts: TimeStamp) -> Result<Option<ReleasedLock>> {
fail_point!("cleanup", |err| Err(make_txn_error(
err,
&key,
Expand All @@ -727,8 +755,7 @@ impl<S: Snapshot> MvccTxn<S> {
}

let is_pessimistic_txn = !lock.for_update_ts.is_zero();
self.rollback_lock(key, lock, is_pessimistic_txn)?;
Ok(is_pessimistic_txn)
self.rollback_lock(key, lock, is_pessimistic_txn)
}
_ => match self.check_txn_status_missing_lock(key, true)? {
TxnStatus::Committed { commit_ts } => {
Expand All @@ -738,16 +765,20 @@ impl<S: Snapshot> MvccTxn<S> {
TxnStatus::RolledBack => {
// Return Ok on Rollback already exist.
MVCC_DUPLICATE_CMD_COUNTER_VEC.rollback.inc();
Ok(false)
Ok(None)
}
TxnStatus::LockNotExist => Ok(false),
TxnStatus::LockNotExist => Ok(None),
_ => unreachable!(),
},
}
}

/// Delete any pessimistic lock with small for_update_ts belongs to this transaction.
pub fn pessimistic_rollback(&mut self, key: Key, for_update_ts: TimeStamp) -> Result<()> {
pub fn pessimistic_rollback(
&mut self,
key: Key,
for_update_ts: TimeStamp,
) -> Result<Option<ReleasedLock>> {
fail_point!("pessimistic_rollback", |err| Err(make_txn_error(
err,
&key,
Expand All @@ -760,10 +791,10 @@ impl<S: Snapshot> MvccTxn<S> {
&& lock.ts == self.start_ts
&& lock.for_update_ts <= for_update_ts
{
self.unlock_key(key);
return Ok(self.unlock_key(key, true));
}
}
Ok(())
Ok(None)
}

fn collapse_prev_rollback(&mut self, key: Key) -> Result<()> {
Expand Down Expand Up @@ -839,7 +870,7 @@ impl<S: Snapshot> MvccTxn<S> {
caller_start_ts: TimeStamp,
current_ts: TimeStamp,
rollback_if_not_exist: bool,
) -> Result<(TxnStatus, bool)> {
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
fail_point!("check_txn_status", |err| Err(make_txn_error(
err,
&primary_key,
Expand All @@ -853,9 +884,9 @@ impl<S: Snapshot> MvccTxn<S> {

if lock.ts.physical() + lock.ttl < current_ts.physical() {
// If the lock is expired, clean it up.
self.rollback_lock(primary_key, lock, is_pessimistic_txn)?;
let released = self.rollback_lock(primary_key, lock, is_pessimistic_txn)?;
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.rollback.inc();
return Ok((TxnStatus::TtlExpire, is_pessimistic_txn));
return Ok((TxnStatus::TtlExpire, released));
}

// If lock.minCommitTS is 0, it's not a large transaction and we can't push forward
Expand All @@ -873,14 +904,11 @@ impl<S: Snapshot> MvccTxn<S> {
MVCC_CHECK_TXN_STATUS_COUNTER_VEC.update_ts.inc();
}

Ok((
TxnStatus::uncommitted(lock.ttl, lock.min_commit_ts),
is_pessimistic_txn,
))
Ok((TxnStatus::uncommitted(lock.ttl, lock.min_commit_ts), None))
}
_ => self
.check_txn_status_missing_lock(primary_key, rollback_if_not_exist)
.map(|s| (s, false)),
.map(|s| (s, None)),
}
}

Expand Down
Loading