Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: avoid useless calculation and wake-up msgs #4813

Merged
merged 20 commits into from Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/storage/lock_manager/mod.rs
Expand Up @@ -12,7 +12,7 @@ pub use self::deadlock::{
};
pub use self::util::{extract_lock_from_result, gen_key_hash, gen_key_hashes};
pub use self::waiter_manager::{
Scheduler as WaiterMgrScheduler, Task as WaiterTask, WaiterManager,
Scheduler as WaiterMgrScheduler, Task as WaiterTask, WaiterManager, WAIT_TABLE_IS_EMPTY,
};
use crate::pd::Error as PdError;
use futures::future::Future;
Expand Down
32 changes: 32 additions & 0 deletions src/storage/lock_manager/waiter_manager.rs
Expand Up @@ -13,10 +13,14 @@ use kvproto::deadlock::WaitForEntry;
use std::cell::RefCell;
use std::fmt::{self, Debug, Display, Formatter};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio_core::reactor::Handle;
use tokio_timer::Delay;

// If it is true, there is no need to calculate keys' hashes and wake up waiters.
pub static WAIT_TABLE_IS_EMPTY: AtomicBool = AtomicBool::new(true);

pub type Callback = Box<dyn FnOnce(Vec<WaitForEntry>) + Send>;

pub enum Task {
Expand Down Expand Up @@ -90,7 +94,14 @@ impl WaitTable {
self.wait_table.iter().map(|(_, v)| v.len()).sum()
}

fn flip_wait_table_is_empty(&self, is_empty: bool) {
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
if self.wait_table.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to abstract this to a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

WAIT_TABLE_IS_EMPTY.store(is_empty, Ordering::Relaxed);
}
}

fn add_waiter(&mut self, ts: u64, waiter: Waiter) -> bool {
self.flip_wait_table_is_empty(false);
self.wait_table.entry(ts).or_insert(vec![]).push(waiter);
true
}
Expand All @@ -112,6 +123,7 @@ impl WaitTable {
if waiters.is_empty() {
self.wait_table.remove(&ts);
}
self.flip_wait_table_is_empty(true);
}
ready_waiters
}
Expand All @@ -126,6 +138,7 @@ impl WaitTable {
if waiters.is_empty() {
self.wait_table.remove(&lock.ts);
}
self.flip_wait_table_is_empty(true);
return Some(waiter);
}
}
Expand Down Expand Up @@ -462,6 +475,25 @@ mod tests {
assert!(wait_for_enties.is_empty());
}

#[test]
fn test_wait_table_is_empty() {
WAIT_TABLE_IS_EMPTY.store(true, Ordering::Relaxed);

let mut wait_table = WaitTable::new();
wait_table.add_waiter(2, dummy_waiter(1, 2, 2));
assert_eq!(WAIT_TABLE_IS_EMPTY.load(Ordering::Relaxed), false);
assert!(wait_table
.remove_waiter(1, Lock { ts: 2, hash: 2 })
.is_some());
assert_eq!(WAIT_TABLE_IS_EMPTY.load(Ordering::Relaxed), true);
wait_table.add_waiter(2, dummy_waiter(1, 2, 2));
wait_table.add_waiter(3, dummy_waiter(2, 3, 3));
wait_table.get_ready_waiters(2, vec![2]);
assert_eq!(WAIT_TABLE_IS_EMPTY.load(Ordering::Relaxed), false);
wait_table.get_ready_waiters(3, vec![3]);
assert_eq!(WAIT_TABLE_IS_EMPTY.load(Ordering::Relaxed), true);
}

#[test]
fn test_waiter_manager() {
use crate::tikv_util::worker::FutureWorker;
Expand Down
107 changes: 64 additions & 43 deletions src/storage/txn/process.rs
@@ -1,6 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::marker::PhantomData;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{mem, thread, u64};

Expand All @@ -9,7 +10,9 @@ use kvproto::kvrpcpb::{CommandPri, Context, LockInfo};

use crate::storage::kv::with_tls_engine;
use crate::storage::kv::{CbContext, Modify, Result as EngineResult};
use crate::storage::lock_manager::{self, DetectorScheduler, WaiterMgrScheduler};
use crate::storage::lock_manager::{
self, DetectorScheduler, WaiterMgrScheduler, WAIT_TABLE_IS_EMPTY,
};
use crate::storage::mvcc::{
Error as MvccError, Lock as MvccLock, MvccReader, MvccTxn, Write, MAX_TXN_WRITE_SIZE,
};
Expand Down Expand Up @@ -481,14 +484,28 @@ fn process_read_impl<E: Engine>(
}
}

// If waiter_mgr_scheduler is some and wait_table is not empty,
// there may be some transactions waiting for these keys,
// so calculates keys' hashes to wake up them.
fn gen_key_hashes_if_needed(
waiter_mgr_scheduler: &Option<WaiterMgrScheduler>,
keys: &[Key],
) -> Option<Vec<u64>> {
if waiter_mgr_scheduler.is_some() && !WAIT_TABLE_IS_EMPTY.load(Ordering::Relaxed) {
Some(lock_manager::gen_key_hashes(keys))
} else {
None
}
}

// Wake up pessimistic transactions that waiting for these locks
fn notify_waiter_mgr(
fn notify_waiter_mgr_if_needed(
waiter_mgr_scheduler: &Option<WaiterMgrScheduler>,
lock_ts: u64,
key_hashes: Option<Vec<u64>>,
commit_ts: u64,
) {
if waiter_mgr_scheduler.is_some() {
if waiter_mgr_scheduler.is_some() && key_hashes.is_some() {
waiter_mgr_scheduler
.as_ref()
.unwrap()
Expand All @@ -497,7 +514,7 @@ fn notify_waiter_mgr(
}

// When it is a pessimistic transaction, we need to clean up `wait_for_entries`.
fn notify_deadlock_detector(
fn notify_deadlock_detector_if_needed(
detector_scheduler: &Option<DetectorScheduler>,
is_pessimistic_txn: bool,
txn_ts: u64,
Expand Down Expand Up @@ -631,11 +648,7 @@ fn process_write_impl<S: Snapshot>(
});
}
// Pessimistic txn needs key_hashes to wake up waiters
let key_hashes = if waiter_mgr_scheduler.is_some() {
Some(lock_manager::gen_key_hashes(&keys))
} else {
None
};
let key_hashes = gen_key_hashes_if_needed(&waiter_mgr_scheduler, &keys);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every commit will load atomic variable, can we avoid it?


let mut txn = MvccTxn::new(snapshot, lock_ts, !ctx.get_not_fill_cache())?;
let mut is_pessimistic_txn = false;
Expand All @@ -644,25 +657,22 @@ fn process_write_impl<S: Snapshot>(
is_pessimistic_txn = txn.commit(k, commit_ts)?;
}

notify_waiter_mgr(&waiter_mgr_scheduler, lock_ts, key_hashes, commit_ts);
notify_deadlock_detector(&detector_scheduler, is_pessimistic_txn, lock_ts);
notify_waiter_mgr_if_needed(&waiter_mgr_scheduler, lock_ts, key_hashes, commit_ts);
notify_deadlock_detector_if_needed(&detector_scheduler, is_pessimistic_txn, lock_ts);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge these two functions into one since they are always used together? Maybe define a new function which call these two functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function needs many parameters, and I think it is a bit redundant.

statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), rows, ctx, None)
}
Command::Cleanup {
ctx, key, start_ts, ..
} => {
let key_hash = if waiter_mgr_scheduler.is_some() {
Some(vec![lock_manager::gen_key_hash(&key)])
} else {
None
};
let mut keys = vec![key];
let key_hashes = gen_key_hashes_if_needed(&waiter_mgr_scheduler, &keys);

let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
let is_pessimistic_txn = txn.rollback(key)?;
let is_pessimistic_txn = txn.rollback(keys.pop().unwrap())?;

notify_waiter_mgr(&waiter_mgr_scheduler, start_ts, key_hash, 0);
notify_deadlock_detector(&detector_scheduler, is_pessimistic_txn, start_ts);
notify_waiter_mgr_if_needed(&waiter_mgr_scheduler, start_ts, key_hashes, 0);
notify_deadlock_detector_if_needed(&detector_scheduler, is_pessimistic_txn, start_ts);
statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), 1, ctx, None)
}
Expand All @@ -672,11 +682,7 @@ fn process_write_impl<S: Snapshot>(
start_ts,
..
} => {
let key_hashes = if waiter_mgr_scheduler.is_some() {
Some(lock_manager::gen_key_hashes(&keys))
} else {
None
};
let key_hashes = gen_key_hashes_if_needed(&waiter_mgr_scheduler, &keys);

let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
let mut is_pessimistic_txn = false;
Expand All @@ -685,8 +691,8 @@ fn process_write_impl<S: Snapshot>(
is_pessimistic_txn = txn.rollback(k)?;
}

notify_waiter_mgr(&waiter_mgr_scheduler, start_ts, key_hashes, 0);
notify_deadlock_detector(&detector_scheduler, is_pessimistic_txn, start_ts);
notify_waiter_mgr_if_needed(&waiter_mgr_scheduler, start_ts, key_hashes, 0);
notify_deadlock_detector_if_needed(&detector_scheduler, is_pessimistic_txn, start_ts);
statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), rows, ctx, None)
}
Expand All @@ -696,7 +702,8 @@ fn process_write_impl<S: Snapshot>(
mut scan_key,
key_locks,
} => {
// Map (txn's start_ts, is_pessimistic_txn) => keys
let wait_table_is_empty = WAIT_TABLE_IS_EMPTY.load(Ordering::Relaxed);
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
// Map (txn's start_ts, is_pessimistic_txn) => Option<key_hashes>
let mut txn_to_keys = if waiter_mgr_scheduler.is_some() {
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
Some(HashMap::new())
} else {
Expand All @@ -713,8 +720,21 @@ fn process_write_impl<S: Snapshot>(
.as_mut()
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
.unwrap()
.entry((current_lock.ts, current_lock.is_pessimistic_txn))
.or_insert(vec![])
.push(lock_manager::gen_key_hash(&current_key));
.and_modify(|key_hashes: &mut Option<Vec<u64>>| {
if key_hashes.is_some() {
key_hashes
.as_mut()
.unwrap()
.push(lock_manager::gen_key_hash(&current_key));
}
})
.or_insert_with(|| {
if wait_table_is_empty {
None
} else {
Some(vec![])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should insert current keys into the empty vec here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

}
});
}

let mut txn =
Expand Down Expand Up @@ -746,13 +766,16 @@ fn process_write_impl<S: Snapshot>(
}
}
if txn_to_keys.is_some() {
txn_to_keys
.unwrap()
.into_iter()
.for_each(|((ts, is_pessimistic_txn), hashes)| {
notify_waiter_mgr(&waiter_mgr_scheduler, ts, Some(hashes), 0);
notify_deadlock_detector(&detector_scheduler, is_pessimistic_txn, ts);
});
txn_to_keys.unwrap().into_iter().for_each(
|((ts, is_pessimistic_txn), key_hashes)| {
notify_waiter_mgr_if_needed(&waiter_mgr_scheduler, ts, key_hashes, 0);
notify_deadlock_detector_if_needed(
&detector_scheduler,
is_pessimistic_txn,
ts,
);
},
);
}

let pr = if scan_key.is_none() {
Expand All @@ -776,11 +799,8 @@ fn process_write_impl<S: Snapshot>(
commit_ts,
resolve_keys,
} => {
let key_hashes = if waiter_mgr_scheduler.is_some() {
Some(lock_manager::gen_key_hashes(&resolve_keys))
} else {
None
};
let key_hashes = gen_key_hashes_if_needed(&waiter_mgr_scheduler, &resolve_keys);

let mut txn = MvccTxn::new(snapshot.clone(), start_ts, !ctx.get_not_fill_cache())?;
let rows = resolve_keys.len();
let mut is_pessimistic_txn = false;
Expand All @@ -793,8 +813,9 @@ fn process_write_impl<S: Snapshot>(
is_pessimistic_txn = txn.rollback(key)?;
}
}
notify_waiter_mgr(&waiter_mgr_scheduler, start_ts, key_hashes, 0);
notify_deadlock_detector(&detector_scheduler, is_pessimistic_txn, start_ts);

notify_waiter_mgr_if_needed(&waiter_mgr_scheduler, start_ts, key_hashes, 0);
notify_deadlock_detector_if_needed(&detector_scheduler, is_pessimistic_txn, start_ts);
statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), rows, ctx, None)
}
Expand Down