Skip to content

Commit

Permalink
cherry pick tikv#7379 to release-3.1
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
  • Loading branch information
youjiali1995 authored and sre-bot committed Apr 20, 2020
1 parent fca9588 commit 9a16fe1
Show file tree
Hide file tree
Showing 7 changed files with 607 additions and 39 deletions.
5 changes: 5 additions & 0 deletions benches/misc/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

<<<<<<< HEAD:benches/misc/storage/mod.rs
=======
mod incremental_get;
mod key;
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379):tests/benches/misc/storage/mod.rs
mod scan;
43 changes: 38 additions & 5 deletions src/server/lock_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,22 @@ impl LockMgr for LockManager {

fn wake_up(
&self,
<<<<<<< HEAD
lock_ts: u64,
hashes: Option<Vec<u64>>,
commit_ts: u64,
=======
lock_ts: TimeStamp,
hashes: Vec<u64>,
commit_ts: TimeStamp,
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379)
is_pessimistic_txn: bool,
) {
// If `hashes` is some, there may be some waiters waiting for these locks.
// Try to wake up them.
if self.has_waiter() {
if let Some(hashes) = hashes {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
if !hashes.is_empty() && self.has_waiter() {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
// If a pessimistic transaction is committed or rolled back and it once sent requests to
// detect deadlock, clean up its wait-for entries in the deadlock detector.
Expand Down Expand Up @@ -327,7 +331,11 @@ mod tests {
let (waiter, lock_info, f) = new_test_waiter(waiter_ts, lock.ts, lock.hash);
lock_mgr.wait_for(waiter.start_ts, waiter.cb, waiter.pr, waiter.lock, true, 0);
assert!(lock_mgr.has_waiter());
<<<<<<< HEAD
lock_mgr.wake_up(lock.ts, Some(vec![lock.hash]), 30, false);
=======
lock_mgr.wake_up(lock.ts, vec![lock.hash], 30.into(), false);
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379)
assert_elapsed(
|| expect_write_conflict(f.wait().unwrap(), waiter_ts, lock_info, 30),
0,
Expand Down Expand Up @@ -362,7 +370,11 @@ mod tests {
200,
);
// Waiter2 releases its lock.
<<<<<<< HEAD
lock_mgr.wake_up(20, Some(vec![20]), 20, true);
=======
lock_mgr.wake_up(20.into(), vec![20], 20.into(), true);
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379)
assert_elapsed(
|| expect_write_conflict(f1.wait().unwrap(), 10, lock_info1, 20),
0,
Expand All @@ -383,13 +395,19 @@ mod tests {
0,
);
assert!(lock_mgr.has_waiter());
<<<<<<< HEAD
assert_eq!(lock_mgr.remove_from_detected(30), !is_first_lock);
lock_mgr.wake_up(40, Some(vec![40]), 40, false);
=======
assert_eq!(lock_mgr.remove_from_detected(30.into()), !is_first_lock);
lock_mgr.wake_up(40.into(), vec![40], 40.into(), false);
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379)
f.wait().unwrap().unwrap_err();
}
assert!(!lock_mgr.has_waiter());

// If key_hashes is none, no wake up.
<<<<<<< HEAD
let prev_wake_up = TASK_COUNTER_VEC.wake_up.get();
lock_mgr.wake_up(10, None, 10, false);
assert_eq!(TASK_COUNTER_VEC.wake_up.get(), prev_wake_up);
Expand All @@ -403,6 +421,21 @@ mod tests {
let prev_clean_up = TASK_COUNTER_VEC.clean_up.get();
lock_mgr.wake_up(10, None, 10, true);
assert_eq!(TASK_COUNTER_VEC.clean_up.get(), prev_clean_up);
=======
let prev_wake_up = TASK_COUNTER_METRICS.wake_up.get();
lock_mgr.wake_up(10.into(), vec![], 10.into(), false);
assert_eq!(TASK_COUNTER_METRICS.wake_up.get(), prev_wake_up);

// If it's non-pessimistic-txn, no clean up.
let prev_clean_up = TASK_COUNTER_METRICS.clean_up.get();
lock_mgr.wake_up(10.into(), vec![], 10.into(), false);
assert_eq!(TASK_COUNTER_METRICS.clean_up.get(), prev_clean_up);

// If the txn doesn't wait for locks, no clean up.
let prev_clean_up = TASK_COUNTER_METRICS.clean_up.get();
lock_mgr.wake_up(10.into(), vec![], 10.into(), true);
assert_eq!(TASK_COUNTER_METRICS.clean_up.get(), prev_clean_up);
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379)

// If timeout is negative, no wait for.
let (waiter, lock_info, f) = new_test_waiter(10, 20, 20);
Expand Down
108 changes: 108 additions & 0 deletions src/storage/lock_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::storage::{txn::ProcessResult, types::StorageCallback};
use std::time::Duration;
use txn_types::TimeStamp;

#[derive(Clone, Copy, PartialEq, Debug, Default)]
pub struct Lock {
pub ts: TimeStamp,
pub hash: u64,
}

/// Time to wait for lock released when encountering locks.
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum WaitTimeout {
Default,
Millis(u64),
}

impl WaitTimeout {
pub fn into_duration_with_ceiling(self, ceiling: u64) -> Duration {
match self {
WaitTimeout::Default => Duration::from_millis(ceiling),
WaitTimeout::Millis(ms) if ms > ceiling => Duration::from_millis(ceiling),
WaitTimeout::Millis(ms) => Duration::from_millis(ms),
}
}

/// Timeouts are encoded as i64s in protobufs where 0 means using default timeout.
/// Negative means no wait.
pub fn from_encoded(i: i64) -> Option<WaitTimeout> {
use std::cmp::Ordering::*;

match i.cmp(&0) {
Equal => Some(WaitTimeout::Default),
Less => None,
Greater => Some(WaitTimeout::Millis(i as u64)),
}
}
}

impl From<u64> for WaitTimeout {
fn from(i: u64) -> WaitTimeout {
WaitTimeout::Millis(i)
}
}

/// `LockManager` manages transactions waiting for locks held by other transactions.
/// It has responsibility to handle deadlocks between transactions.
pub trait LockManager: Clone + Send + 'static {
/// Transaction with `start_ts` waits for `lock` released.
///
/// If the lock is released or waiting times out or deadlock occurs, the transaction
/// should be waken up and call `cb` with `pr` to notify the caller.
///
/// If the lock is the first lock the transaction waits for, it won't result in deadlock.
fn wait_for(
&self,
start_ts: TimeStamp,
cb: StorageCallback,
pr: ProcessResult,
lock: Lock,
is_first_lock: bool,
timeout: Option<WaitTimeout>,
);

/// The locks with `lock_ts` and `hashes` are released, tries to wake up transactions.
fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
);

/// Returns true if there are waiters in the `LockManager`.
///
/// This function is used to avoid useless calculation and wake-up.
fn has_waiter(&self) -> bool {
true
}
}

// For test
#[derive(Clone)]
pub struct DummyLockManager;

impl LockManager for DummyLockManager {
fn wait_for(
&self,
_start_ts: TimeStamp,
_cb: StorageCallback,
_pr: ProcessResult,
_lock: Lock,
_is_first_lock: bool,
_wait_timeout: Option<WaitTimeout>,
) {
}

fn wake_up(
&self,
_lock_ts: TimeStamp,
_hashes: Vec<u64>,
_commit_ts: TimeStamp,
_is_pessimistic_txn: bool,
) {
}
}
9 changes: 9 additions & 0 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ mod write;

pub use self::lock::{Lock, LockType};
pub use self::reader::*;
<<<<<<< HEAD
pub use self::txn::{MvccTxn, MAX_TXN_WRITE_SIZE};
pub use self::write::{Write, WriteType};
=======
pub use self::txn::{MvccTxn, ReleasedLock, MAX_TXN_WRITE_SIZE};
pub use crate::new_txn;
pub use txn_types::{
Key, Lock, LockType, Mutation, TimeStamp, Value, Write, WriteRef, WriteType,
SHORT_VALUE_MAX_LEN,
};
>>>>>>> b4dd42f... txn: only wake up waiters when locks are indeed released (#7379)

use std::error;
use std::io;
Expand Down
Loading

0 comments on commit 9a16fe1

Please sign in to comment.