Skip to content

Commit

Permalink
storage, lock_manager: Use the new lock waiting queue instead of Wait…
Browse files Browse the repository at this point in the history
…erManager to handle pessimistic lock waking up (#13447)

ref #13298

Updates the write path of acquiring lock and releasing lock to make use of the new `LockWaitQueue`. Some important points are:

1. `WriteResultLockInfo` (returned by `AcquirePessimisticLock::process_write`) carries parameters, which can be used for resuming the request in the future.
2. `WriteResultLockInfo` will be converted into `LockWaitContext` and `LockWaitEntry`, and then send to both `LockManager` and the new `LockWaitQueues`.
3. When a storage command releases some locks, will return the released locks to `Scheduler::process_write`, which will then call `on_release_locks` to pop lock waiting entries from the queues and wake up them asynchronously (to avoid increasing too much latency of the current command).
4. The `LockManager` (and its inner module `WaiterManager`) no longer has the responsibility for waking up waiters, but keeps its functionality of handling timeout and performing deadlock detection. Instead, it has a new `remove_lock_wait` method to remove a waiter from it.
5. Waiters in `WaiterManager` can now be uniquely identified by a `LockWaitToken`, and the data structure in `WaiterManager` is therefore changed. Accessing by lock hash and transaction ts is still necessary to handle the result of deadlock detection.

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Co-authored-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
3 people committed Oct 26, 2022
1 parent c74c8ca commit a4dc37b
Show file tree
Hide file tree
Showing 45 changed files with 1,552 additions and 1,475 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions components/resolved_ts/src/cmd.rs
Expand Up @@ -300,10 +300,9 @@ mod tests {
};
use tikv::storage::{
kv::{MockEngineBuilder, TestEngineBuilder},
lock_manager::DummyLockManager,
mvcc::{tests::write, Mutation, MvccTxn, SnapshotReader},
txn::{
commands::one_pc_commit_ts, prewrite, tests::*, CommitKind, TransactionKind,
commands::one_pc_commit, prewrite, tests::*, CommitKind, TransactionKind,
TransactionProperties,
},
Engine,
Expand Down Expand Up @@ -426,7 +425,7 @@ mod tests {
SkipPessimisticCheck,
)
.unwrap();
one_pc_commit_ts(true, &mut txn, 10.into(), &DummyLockManager);
one_pc_commit(true, &mut txn, 10.into());
write(&engine, &Default::default(), txn.into_modifies());
let one_pc_row = engine
.take_last_modifies()
Expand Down
4 changes: 2 additions & 2 deletions components/test_coprocessor/src/fixture.rs
Expand Up @@ -12,7 +12,7 @@ use tikv::{
read_pool::ReadPool,
server::Config,
storage::{
kv::RocksEngine, lock_manager::DummyLockManager, Engine, TestEngineBuilder,
kv::RocksEngine, lock_manager::MockLockManager, Engine, TestEngineBuilder,
TestStorageBuilderApiV1,
},
};
Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn init_data_with_details<E: Engine>(
commit: bool,
cfg: &Config,
) -> (Store<E>, Endpoint<E>, Arc<QuotaLimiter>) {
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(engine, DummyLockManager)
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(engine, MockLockManager::new())
.build()
.unwrap();
let mut store = Store::from_storage(storage);
Expand Down
6 changes: 3 additions & 3 deletions components/test_coprocessor/src/store.rs
Expand Up @@ -13,7 +13,7 @@ use tikv::{
server::gc_worker::GcConfig,
storage::{
kv::{Engine, RocksEngine},
lock_manager::DummyLockManager,
lock_manager::MockLockManager,
txn::FixtureStore,
SnapshotStore, StorageApiV1, TestStorageBuilderApiV1,
},
Expand Down Expand Up @@ -116,7 +116,7 @@ pub struct Store<E: Engine> {

impl Store<RocksEngine> {
pub fn new() -> Self {
let storage = TestStorageBuilderApiV1::new(DummyLockManager)
let storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
Self::from_storage(storage)
Expand All @@ -130,7 +130,7 @@ impl Default for Store<RocksEngine> {
}

impl<E: Engine> Store<E> {
pub fn from_storage(storage: StorageApiV1<E, DummyLockManager>) -> Self {
pub fn from_storage(storage: StorageApiV1<E, MockLockManager>) -> Self {
Self {
store: SyncTestStorageApiV1::from_storage(0, storage, GcConfig::default()).unwrap(),
current_ts: 1.into(),
Expand Down
10 changes: 5 additions & 5 deletions components/test_storage/src/sync_storage.rs
Expand Up @@ -19,7 +19,7 @@ use raftstore::{
use tikv::{
server::gc_worker::{AutoGcConfig, GcConfig, GcSafePointProvider, GcWorker},
storage::{
config::Config, kv::RocksEngine, lock_manager::DummyLockManager, test_util::GetConsumer,
config::Config, kv::RocksEngine, lock_manager::MockLockManager, test_util::GetConsumer,
txn::commands, Engine, KvGetStatistics, PrewriteResult, Result, Storage, TestEngineBuilder,
TestStorageBuilder, TxnStatus,
},
Expand Down Expand Up @@ -87,7 +87,7 @@ impl<E: Engine, F: KvFormat> SyncTestStorageBuilder<E, F> {
pub fn build(mut self, store_id: u64) -> Result<SyncTestStorage<E, F>> {
let mut builder = TestStorageBuilder::<_, _, F>::from_engine_and_lock_mgr(
self.engine.clone(),
DummyLockManager,
MockLockManager::new(),
);
if let Some(config) = self.config.take() {
builder = builder.config(config);
Expand All @@ -107,7 +107,7 @@ impl<E: Engine, F: KvFormat> SyncTestStorageBuilder<E, F> {
#[derive(Clone)]
pub struct SyncTestStorage<E: Engine, F: KvFormat> {
gc_worker: GcWorker<E, RaftStoreBlackHole>,
store: Storage<E, DummyLockManager, F>,
store: Storage<E, MockLockManager, F>,
}

/// SyncTestStorage for Api V1
Expand All @@ -117,7 +117,7 @@ pub type SyncTestStorageApiV1<E> = SyncTestStorage<E, ApiV1>;
impl<E: Engine, F: KvFormat> SyncTestStorage<E, F> {
pub fn from_storage(
store_id: u64,
storage: Storage<E, DummyLockManager, F>,
storage: Storage<E, MockLockManager, F>,
config: GcConfig,
) -> Result<Self> {
let (tx, _rx) = std::sync::mpsc::channel();
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<E: Engine, F: KvFormat> SyncTestStorage<E, F> {
.unwrap();
}

pub fn get_storage(&self) -> Storage<E, DummyLockManager, F> {
pub fn get_storage(&self) -> Storage<E, MockLockManager, F> {
self.store.clone()
}

Expand Down
9 changes: 8 additions & 1 deletion metrics/grafana/tikv_details.json
Expand Up @@ -35349,13 +35349,20 @@
"legendFormat": "{{type}}",
"refId": "A",
"step": 4
},
{
"expr": "sum(max_over_time(tikv_lock_wait_queue_entries_gauge_vec{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[15s])) by (type)",
"hide": false,
"intervalFactor": 2,
"legendFormat": "{{type}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Wait table",
"title": "Lock Waiting Queue",
"tooltip": {
"msResolution": false,
"shared": true,
Expand Down
6 changes: 3 additions & 3 deletions src/config.rs
Expand Up @@ -4104,7 +4104,7 @@ mod tests {
server::{config::ServerConfigManager, ttl::TtlCheckerTask},
storage::{
config_manager::StorageConfigManger,
lock_manager::DummyLockManager,
lock_manager::MockLockManager,
txn::flow_controller::{EngineFlowController, FlowController},
Storage, TestStorageBuilder,
},
Expand Down Expand Up @@ -4494,7 +4494,7 @@ mod tests {
fn new_engines<F: KvFormat>(
cfg: TikvConfig,
) -> (
Storage<RocksDBEngine, DummyLockManager, F>,
Storage<RocksDBEngine, MockLockManager, F>,
ConfigController,
ReceiverWrapper<TtlCheckerTask>,
Arc<FlowController>,
Expand All @@ -4513,7 +4513,7 @@ mod tests {
)
.unwrap();
let storage =
TestStorageBuilder::<_, _, F>::from_engine_and_lock_mgr(engine, DummyLockManager)
TestStorageBuilder::<_, _, F>::from_engine_and_lock_mgr(engine, MockLockManager::new())
.config(cfg.storage.clone())
.build()
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/coprocessor_v2/raw_storage_impl.rs
Expand Up @@ -215,11 +215,11 @@ mod test {
use kvproto::kvrpcpb::{ApiVersion, Context};

use super::*;
use crate::storage::{lock_manager::DummyLockManager, TestStorageBuilder};
use crate::storage::{lock_manager::MockLockManager, TestStorageBuilder};

#[tokio::test]
async fn test_storage_api() {
let storage = TestStorageBuilder::<_, _, ApiV2>::new(DummyLockManager)
let storage = TestStorageBuilder::<_, _, ApiV2>::new(MockLockManager::new())
.build()
.unwrap();
let ctx = Context {
Expand Down Expand Up @@ -255,7 +255,7 @@ mod test {

#[tokio::test]
async fn test_storage_api_batch() {
let storage = TestStorageBuilder::<_, _, ApiV2>::new(DummyLockManager)
let storage = TestStorageBuilder::<_, _, ApiV2>::new(MockLockManager::new())
.build()
.unwrap();
let ctx = Context {
Expand Down
8 changes: 4 additions & 4 deletions src/import/duplicate_detect.rs
Expand Up @@ -239,7 +239,7 @@ mod tests {

use super::*;
use crate::storage::{
lock_manager::{DummyLockManager, LockManager},
lock_manager::{LockManager, MockLockManager},
txn::commands,
Storage, TestStorageBuilderApiV1,
};
Expand Down Expand Up @@ -350,7 +350,7 @@ mod tests {

#[test]
fn test_duplicate_detect() {
let mut storage = TestStorageBuilderApiV1::new(DummyLockManager)
let mut storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
let mut data = vec![];
Expand Down Expand Up @@ -408,7 +408,7 @@ mod tests {
// (108,18) is not repeated with (108,10).
#[test]
fn test_duplicate_detect_incremental() {
let mut storage = TestStorageBuilderApiV1::new(DummyLockManager)
let mut storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
for &start in &[100, 104, 108, 112] {
Expand Down Expand Up @@ -469,7 +469,7 @@ mod tests {

#[test]
fn test_duplicate_detect_rollback_and_delete() {
let mut storage = TestStorageBuilderApiV1::new(DummyLockManager)
let mut storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
let data = vec![
Expand Down
16 changes: 9 additions & 7 deletions src/server/gc_worker/gc_worker.rs
Expand Up @@ -1660,7 +1660,7 @@ mod tests {
server::gc_worker::{MockSafePointProvider, PrefixedEngine},
storage::{
kv::{metrics::GcKeyMode, Modify, TestEngineBuilder, WriteData},
lock_manager::DummyLockManager,
lock_manager::MockLockManager,
mvcc::{
tests::{must_get_none, must_get_none_on_region, must_get_on_region},
MAX_TXN_WRITE_SIZE,
Expand Down Expand Up @@ -1738,7 +1738,7 @@ mod tests {
/// Assert the data in `storage` is the same as `expected_data`. Keys in
/// `expected_data` should be encoded form without ts.
fn check_data<E: Engine, F: KvFormat>(
storage: &Storage<E, DummyLockManager, F>,
storage: &Storage<E, MockLockManager, F>,
expected_data: &BTreeMap<Vec<u8>, Vec<u8>>,
) {
let scan_res = block_on(storage.scan(
Expand Down Expand Up @@ -1773,10 +1773,12 @@ mod tests {
let store_id = 1;

let engine = TestEngineBuilder::new().build().unwrap();
let storage =
TestStorageBuilderApiV1::from_engine_and_lock_mgr(engine.clone(), DummyLockManager)
.build()
.unwrap();
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(
engine.clone(),
MockLockManager::new(),
)
.build()
.unwrap();
let gate = FeatureGate::default();
gate.set_version("5.0.0").unwrap();

Expand Down Expand Up @@ -1960,7 +1962,7 @@ mod tests {
let prefixed_engine = PrefixedEngine(engine);
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(
prefixed_engine.clone(),
DummyLockManager,
MockLockManager::new(),
)
.build()
.unwrap();
Expand Down
31 changes: 19 additions & 12 deletions src/server/lock_manager/config.rs
Expand Up @@ -3,7 +3,7 @@
use std::{
error::Error,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
Expand Down Expand Up @@ -80,6 +80,7 @@ pub struct LockManagerConfigManager {
pub detector_scheduler: DeadlockScheduler,
pub pipelined: Arc<AtomicBool>,
pub in_memory: Arc<AtomicBool>,
pub wake_up_delay_duration_ms: Arc<AtomicU64>,
}

impl LockManagerConfigManager {
Expand All @@ -88,29 +89,35 @@ impl LockManagerConfigManager {
detector_scheduler: DeadlockScheduler,
pipelined: Arc<AtomicBool>,
in_memory: Arc<AtomicBool>,
wake_up_delay_duration_ms: Arc<AtomicU64>,
) -> Self {
LockManagerConfigManager {
waiter_mgr_scheduler,
detector_scheduler,
pipelined,
in_memory,
wake_up_delay_duration_ms,
}
}
}

impl ConfigManager for LockManagerConfigManager {
fn dispatch(&mut self, mut change: ConfigChange) -> Result<(), Box<dyn Error>> {
match (
change.remove("wait_for_lock_timeout").map(Into::into),
change.remove("wake_up_delay_duration").map(Into::into),
) {
(timeout @ Some(_), delay) => {
self.waiter_mgr_scheduler.change_config(timeout, delay);
self.detector_scheduler.change_ttl(timeout.unwrap().into());
}
(None, delay @ Some(_)) => self.waiter_mgr_scheduler.change_config(None, delay),
(None, None) => {}
};
if let Some(p) = change.remove("wait_for_lock_timeout").map(Into::into) {
self.waiter_mgr_scheduler.change_config(Some(p));
self.detector_scheduler.change_ttl(p.into());
}
if let Some(p) = change
.remove("wake_up_delay_duration")
.map(ReadableDuration::from)
{
info!(
"Waiter manager config changed";
"wake_up_delay_duration" => %p,
);
self.wake_up_delay_duration_ms
.store(p.as_millis(), Ordering::Relaxed);
}
if let Some(p) = change.remove("pipelined").map(Into::into) {
self.pipelined.store(p, Ordering::Relaxed);
}
Expand Down

0 comments on commit a4dc37b

Please sign in to comment.