Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage, lock_manager: Use the new lock waiting queue instead of WaiterManager to handle pessimistic lock waking up #13447

Merged
merged 78 commits into from Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
e1c0e5c
add new lock waiting queue
MyonKeminta Sep 5, 2022
89872af
Port new implementation of waiter manager from #12749
MyonKeminta Sep 6, 2022
eb2e51f
Fix some potential concurrency issues
MyonKeminta Sep 6, 2022
3ac944a
Make use of the new lock waiting queue in scheduler
MyonKeminta Sep 9, 2022
a85c333
Add tests for lock waiting queue
MyonKeminta Sep 13, 2022
437ee61
Allow pushing forward the delayed wake up time
MyonKeminta Sep 15, 2022
30fb395
Try not order by legacy wake up cnt
MyonKeminta Sep 16, 2022
7e2b632
Add tests
MyonKeminta Sep 16, 2022
97ce953
Add documents
MyonKeminta Sep 16, 2022
e6fce44
Add more comments
MyonKeminta Sep 19, 2022
af57580
Add lock wait queue and lock wait context; refactor storage::lock_man…
MyonKeminta Sep 19, 2022
f899ee1
Merge branch 'm/new-lock-waiting-queue-part' into m/new-lock-waiting-…
MyonKeminta Sep 19, 2022
e810deb
Remove commented code; remove debug level change in Cargo.toml
MyonKeminta Sep 19, 2022
c36257c
Merge branch 'm/new-lock-waiting-queue-part' into m/new-lock-waiting-…
MyonKeminta Sep 19, 2022
38be4b7
Fix lint
MyonKeminta Sep 19, 2022
bae7c54
Merge branch 'm/new-lock-waiting-queue-part' into m/new-lock-waiting-…
MyonKeminta Sep 19, 2022
8ca8ed1
Fix lint
MyonKeminta Sep 19, 2022
3e43a3f
Fix tests
MyonKeminta Sep 19, 2022
b9fe9c8
Fix tests
MyonKeminta Sep 20, 2022
bdb117d
fix lint
MyonKeminta Sep 20, 2022
efac233
Fix is_first_lock test and remove some commented code
MyonKeminta Sep 20, 2022
ff248e4
Fix tests
MyonKeminta Sep 20, 2022
1685e9d
Add document to explain how `legacy_wake_up_cnt` works
MyonKeminta Sep 21, 2022
df8ce96
Merge branch 'm/new-lock-waiting-queue-part' into m/new-lock-waiting-…
MyonKeminta Sep 21, 2022
a986eb2
Adapt the test for waking up
MyonKeminta Sep 21, 2022
3f9d878
Fix lint
MyonKeminta Sep 21, 2022
4dce9c1
Remove the comparator wrapper
MyonKeminta Sep 22, 2022
44d3eea
Move SharedError to storage/error.rs
MyonKeminta Sep 22, 2022
e18bf41
Remove Clone on PessimisticLockParameters which is not used yet
MyonKeminta Sep 22, 2022
a624ee9
Fix lint
MyonKeminta Sep 22, 2022
821d414
Fix lint
MyonKeminta Sep 22, 2022
a23cab9
Address comments
MyonKeminta Sep 23, 2022
51037df
Add metrics for the new lock waiting queue
MyonKeminta Sep 23, 2022
eae8902
Address comments
MyonKeminta Sep 26, 2022
8826a30
Address comments
MyonKeminta Sep 26, 2022
7338f53
Merge branch 'master' into m/new-lock-waiting-queue-part
ti-chi-bot Sep 28, 2022
9f25907
Merge branch 'master' into m/new-lock-waiting-queue-part
ti-chi-bot Sep 28, 2022
9e79ab2
Merge branch 'master' into m/new-lock-waiting-queue-part
ti-chi-bot Sep 29, 2022
cb6f1ca
Merge branch 'master' into m/new-lock-waiting-queue-part
ti-chi-bot Sep 29, 2022
8e1fb44
Set write conflict reason
MyonKeminta Sep 29, 2022
439fbc8
Merge branch 'm/new-lock-waiting-queue-part' of https://github.com/My…
MyonKeminta Sep 29, 2022
9d3b76f
Merge branch 'm/new-lock-waiting-queue-part' into m/new-lock-waiting-…
MyonKeminta Sep 29, 2022
1258793
Merge branch 'm/new-lock-waiting-queue-part' into m/new-lock-waiting-…
MyonKeminta Sep 29, 2022
c7038ab
Merge commit '9321040' into m/new-lock-waiting-queue
MyonKeminta Sep 29, 2022
4306dac
Merge commit '9321040' into m/new-lock-waiting-queue-metrics
MyonKeminta Sep 29, 2022
02c67e2
Merge branch 'm/new-lock-waiting-queue-metrics' into m/new-lock-waiti…
MyonKeminta Sep 29, 2022
4fc6177
Fix build
MyonKeminta Sep 29, 2022
3f77213
Fix build
MyonKeminta Sep 29, 2022
ecae239
Add self-made heap
MyonKeminta Oct 2, 2022
ffb9785
Use the HashMap in tikv_util
MyonKeminta Oct 10, 2022
a0e44e2
Use the new priority queue to replace the std binary heap in lock wai…
MyonKeminta Oct 10, 2022
b534c19
Merge branch 'm/new-lock-waiting-queue-remove-stale' into m/new-lock-…
MyonKeminta Oct 10, 2022
6e516ba
Fix build and tests
MyonKeminta Oct 11, 2022
b132f81
fix build
MyonKeminta Oct 11, 2022
ae7d572
Add priority queue to tikv_utils
MyonKeminta Oct 11, 2022
6cf249b
Add comments
MyonKeminta Oct 11, 2022
bb9d52c
Merge commit 'ae7d5722f' into m/new-lock-waiting-queue-remove-stale
MyonKeminta Oct 11, 2022
90ab847
Merge branch 'm/priority-queue' into m/new-lock-waiting-queue-remove-…
MyonKeminta Oct 11, 2022
f70811b
Merge branch 'm/new-lock-waiting-queue-remove-stale' into m/new-lock-…
MyonKeminta Oct 11, 2022
a5e6124
Revert "Use the HashMap in tikv_util"
MyonKeminta Oct 11, 2022
f2f2834
Revert "Add self-made heap"
MyonKeminta Oct 11, 2022
1caf48a
Use third-party crate keyed_priority_queue instead of the self-writte…
MyonKeminta Oct 11, 2022
404b59f
Merge remote-tracking branch 'upstream/master' into m/new-lock-waitin…
MyonKeminta Oct 11, 2022
403375b
Merge branch 'm/new-lock-waiting-queue-remove-stale' into m/new-lock-…
MyonKeminta Oct 11, 2022
7d959cb
Remove `finished` flag
MyonKeminta Oct 12, 2022
75199bc
Merge branch 'm/new-lock-waiting-queue-remove-stale' into m/new-lock-…
MyonKeminta Oct 12, 2022
3719265
Remove shared_state from WriteResultLockInfo
MyonKeminta Oct 12, 2022
4f7c58e
Merge branch 'master' into m/new-lock-waiting-queue-remove-stale
sticnarf Oct 13, 2022
b0d77ec
Merge remote-tracking branch 'origin/m/new-lock-waiting-queue-remove-…
MyonKeminta Oct 13, 2022
d8e5dfe
Merge branch 'master' of https://github.com/tikv/tikv into m/new-lock…
MyonKeminta Oct 13, 2022
50e5051
Address comments
MyonKeminta Oct 17, 2022
a928ea2
Address comments
MyonKeminta Oct 18, 2022
effffc7
Remove obsolete metric
MyonKeminta Oct 18, 2022
c0abfdf
Address comments
MyonKeminta Oct 20, 2022
4785619
Update grafana dashboard
MyonKeminta Oct 20, 2022
4005b10
Address comments
MyonKeminta Oct 25, 2022
77caef7
Address comments
MyonKeminta Oct 26, 2022
e7c4c70
Merge branch 'master' into m/new-lock-waiting-queue
ti-chi-bot Oct 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions components/resolved_ts/src/cmd.rs
Expand Up @@ -300,10 +300,9 @@ mod tests {
};
use tikv::storage::{
kv::{MockEngineBuilder, TestEngineBuilder},
lock_manager::DummyLockManager,
mvcc::{tests::write, Mutation, MvccTxn, SnapshotReader},
txn::{
commands::one_pc_commit_ts, prewrite, tests::*, CommitKind, TransactionKind,
commands::one_pc_commit, prewrite, tests::*, CommitKind, TransactionKind,
TransactionProperties,
},
Engine,
Expand Down Expand Up @@ -426,7 +425,7 @@ mod tests {
SkipPessimisticCheck,
)
.unwrap();
one_pc_commit_ts(true, &mut txn, 10.into(), &DummyLockManager);
one_pc_commit(true, &mut txn, 10.into());
write(&engine, &Default::default(), txn.into_modifies());
let one_pc_row = engine
.take_last_modifies()
Expand Down
4 changes: 2 additions & 2 deletions components/test_coprocessor/src/fixture.rs
Expand Up @@ -12,7 +12,7 @@ use tikv::{
read_pool::ReadPool,
server::Config,
storage::{
kv::RocksEngine, lock_manager::DummyLockManager, Engine, TestEngineBuilder,
kv::RocksEngine, lock_manager::MockLockManager, Engine, TestEngineBuilder,
TestStorageBuilderApiV1,
},
};
Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn init_data_with_details<E: Engine>(
commit: bool,
cfg: &Config,
) -> (Store<E>, Endpoint<E>, Arc<QuotaLimiter>) {
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(engine, DummyLockManager)
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(engine, MockLockManager::new())
.build()
.unwrap();
let mut store = Store::from_storage(storage);
Expand Down
6 changes: 3 additions & 3 deletions components/test_coprocessor/src/store.rs
Expand Up @@ -13,7 +13,7 @@ use tikv::{
server::gc_worker::GcConfig,
storage::{
kv::{Engine, RocksEngine},
lock_manager::DummyLockManager,
lock_manager::MockLockManager,
txn::FixtureStore,
SnapshotStore, StorageApiV1, TestStorageBuilderApiV1,
},
Expand Down Expand Up @@ -116,7 +116,7 @@ pub struct Store<E: Engine> {

impl Store<RocksEngine> {
pub fn new() -> Self {
let storage = TestStorageBuilderApiV1::new(DummyLockManager)
let storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
Self::from_storage(storage)
Expand All @@ -130,7 +130,7 @@ impl Default for Store<RocksEngine> {
}

impl<E: Engine> Store<E> {
pub fn from_storage(storage: StorageApiV1<E, DummyLockManager>) -> Self {
pub fn from_storage(storage: StorageApiV1<E, MockLockManager>) -> Self {
Self {
store: SyncTestStorageApiV1::from_storage(0, storage, GcConfig::default()).unwrap(),
current_ts: 1.into(),
Expand Down
10 changes: 5 additions & 5 deletions components/test_storage/src/sync_storage.rs
Expand Up @@ -19,7 +19,7 @@ use raftstore::{
use tikv::{
server::gc_worker::{AutoGcConfig, GcConfig, GcSafePointProvider, GcWorker},
storage::{
config::Config, kv::RocksEngine, lock_manager::DummyLockManager, test_util::GetConsumer,
config::Config, kv::RocksEngine, lock_manager::MockLockManager, test_util::GetConsumer,
txn::commands, Engine, KvGetStatistics, PrewriteResult, Result, Storage, TestEngineBuilder,
TestStorageBuilder, TxnStatus,
},
Expand Down Expand Up @@ -87,7 +87,7 @@ impl<E: Engine, F: KvFormat> SyncTestStorageBuilder<E, F> {
pub fn build(mut self, store_id: u64) -> Result<SyncTestStorage<E, F>> {
let mut builder = TestStorageBuilder::<_, _, F>::from_engine_and_lock_mgr(
self.engine.clone(),
DummyLockManager,
MockLockManager::new(),
);
if let Some(config) = self.config.take() {
builder = builder.config(config);
Expand All @@ -107,7 +107,7 @@ impl<E: Engine, F: KvFormat> SyncTestStorageBuilder<E, F> {
#[derive(Clone)]
pub struct SyncTestStorage<E: Engine, F: KvFormat> {
gc_worker: GcWorker<E, RaftStoreBlackHole>,
store: Storage<E, DummyLockManager, F>,
store: Storage<E, MockLockManager, F>,
}

/// SyncTestStorage for Api V1
Expand All @@ -117,7 +117,7 @@ pub type SyncTestStorageApiV1<E> = SyncTestStorage<E, ApiV1>;
impl<E: Engine, F: KvFormat> SyncTestStorage<E, F> {
pub fn from_storage(
store_id: u64,
storage: Storage<E, DummyLockManager, F>,
storage: Storage<E, MockLockManager, F>,
config: GcConfig,
) -> Result<Self> {
let (tx, _rx) = std::sync::mpsc::channel();
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<E: Engine, F: KvFormat> SyncTestStorage<E, F> {
.unwrap();
}

pub fn get_storage(&self) -> Storage<E, DummyLockManager, F> {
pub fn get_storage(&self) -> Storage<E, MockLockManager, F> {
self.store.clone()
}

Expand Down
9 changes: 8 additions & 1 deletion metrics/grafana/tikv_details.json
Expand Up @@ -35349,13 +35349,20 @@
"legendFormat": "{{type}}",
"refId": "A",
"step": 4
},
{
"expr": "sum(max_over_time(tikv_lock_wait_queue_entries_gauge_vec{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[15s])) by (type)",
"hide": false,
"intervalFactor": 2,
"legendFormat": "{{type}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Wait table",
"title": "Lock Waiting Queue",
"tooltip": {
"msResolution": false,
"shared": true,
Expand Down
6 changes: 3 additions & 3 deletions src/config.rs
Expand Up @@ -4104,7 +4104,7 @@ mod tests {
server::{config::ServerConfigManager, ttl::TtlCheckerTask},
storage::{
config_manager::StorageConfigManger,
lock_manager::DummyLockManager,
lock_manager::MockLockManager,
txn::flow_controller::{EngineFlowController, FlowController},
Storage, TestStorageBuilder,
},
Expand Down Expand Up @@ -4494,7 +4494,7 @@ mod tests {
fn new_engines<F: KvFormat>(
cfg: TikvConfig,
) -> (
Storage<RocksDBEngine, DummyLockManager, F>,
Storage<RocksDBEngine, MockLockManager, F>,
ConfigController,
ReceiverWrapper<TtlCheckerTask>,
Arc<FlowController>,
Expand All @@ -4513,7 +4513,7 @@ mod tests {
)
.unwrap();
let storage =
TestStorageBuilder::<_, _, F>::from_engine_and_lock_mgr(engine, DummyLockManager)
TestStorageBuilder::<_, _, F>::from_engine_and_lock_mgr(engine, MockLockManager::new())
.config(cfg.storage.clone())
.build()
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/coprocessor_v2/raw_storage_impl.rs
Expand Up @@ -215,11 +215,11 @@ mod test {
use kvproto::kvrpcpb::{ApiVersion, Context};

use super::*;
use crate::storage::{lock_manager::DummyLockManager, TestStorageBuilder};
use crate::storage::{lock_manager::MockLockManager, TestStorageBuilder};

#[tokio::test]
async fn test_storage_api() {
let storage = TestStorageBuilder::<_, _, ApiV2>::new(DummyLockManager)
let storage = TestStorageBuilder::<_, _, ApiV2>::new(MockLockManager::new())
.build()
.unwrap();
let ctx = Context {
Expand Down Expand Up @@ -255,7 +255,7 @@ mod test {

#[tokio::test]
async fn test_storage_api_batch() {
let storage = TestStorageBuilder::<_, _, ApiV2>::new(DummyLockManager)
let storage = TestStorageBuilder::<_, _, ApiV2>::new(MockLockManager::new())
.build()
.unwrap();
let ctx = Context {
Expand Down
8 changes: 4 additions & 4 deletions src/import/duplicate_detect.rs
Expand Up @@ -239,7 +239,7 @@ mod tests {

use super::*;
use crate::storage::{
lock_manager::{DummyLockManager, LockManager},
lock_manager::{LockManager, MockLockManager},
txn::commands,
Storage, TestStorageBuilderApiV1,
};
Expand Down Expand Up @@ -350,7 +350,7 @@ mod tests {

#[test]
fn test_duplicate_detect() {
let mut storage = TestStorageBuilderApiV1::new(DummyLockManager)
let mut storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
let mut data = vec![];
Expand Down Expand Up @@ -408,7 +408,7 @@ mod tests {
// (108,18) is not repeated with (108,10).
#[test]
fn test_duplicate_detect_incremental() {
let mut storage = TestStorageBuilderApiV1::new(DummyLockManager)
let mut storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
for &start in &[100, 104, 108, 112] {
Expand Down Expand Up @@ -469,7 +469,7 @@ mod tests {

#[test]
fn test_duplicate_detect_rollback_and_delete() {
let mut storage = TestStorageBuilderApiV1::new(DummyLockManager)
let mut storage = TestStorageBuilderApiV1::new(MockLockManager::new())
.build()
.unwrap();
let data = vec![
Expand Down
16 changes: 9 additions & 7 deletions src/server/gc_worker/gc_worker.rs
Expand Up @@ -1660,7 +1660,7 @@ mod tests {
server::gc_worker::{MockSafePointProvider, PrefixedEngine},
storage::{
kv::{metrics::GcKeyMode, Modify, TestEngineBuilder, WriteData},
lock_manager::DummyLockManager,
lock_manager::MockLockManager,
mvcc::{
tests::{must_get_none, must_get_none_on_region, must_get_on_region},
MAX_TXN_WRITE_SIZE,
Expand Down Expand Up @@ -1738,7 +1738,7 @@ mod tests {
/// Assert the data in `storage` is the same as `expected_data`. Keys in
/// `expected_data` should be encoded form without ts.
fn check_data<E: Engine, F: KvFormat>(
storage: &Storage<E, DummyLockManager, F>,
storage: &Storage<E, MockLockManager, F>,
expected_data: &BTreeMap<Vec<u8>, Vec<u8>>,
) {
let scan_res = block_on(storage.scan(
Expand Down Expand Up @@ -1773,10 +1773,12 @@ mod tests {
let store_id = 1;

let engine = TestEngineBuilder::new().build().unwrap();
let storage =
TestStorageBuilderApiV1::from_engine_and_lock_mgr(engine.clone(), DummyLockManager)
.build()
.unwrap();
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(
engine.clone(),
MockLockManager::new(),
)
.build()
.unwrap();
let gate = FeatureGate::default();
gate.set_version("5.0.0").unwrap();

Expand Down Expand Up @@ -1960,7 +1962,7 @@ mod tests {
let prefixed_engine = PrefixedEngine(engine);
let storage = TestStorageBuilderApiV1::from_engine_and_lock_mgr(
prefixed_engine.clone(),
DummyLockManager,
MockLockManager::new(),
)
.build()
.unwrap();
Expand Down
31 changes: 19 additions & 12 deletions src/server/lock_manager/config.rs
Expand Up @@ -3,7 +3,7 @@
use std::{
error::Error,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
Expand Down Expand Up @@ -80,6 +80,7 @@ pub struct LockManagerConfigManager {
pub detector_scheduler: DeadlockScheduler,
pub pipelined: Arc<AtomicBool>,
pub in_memory: Arc<AtomicBool>,
pub wake_up_delay_duration_ms: Arc<AtomicU64>,
}

impl LockManagerConfigManager {
Expand All @@ -88,29 +89,35 @@ impl LockManagerConfigManager {
detector_scheduler: DeadlockScheduler,
pipelined: Arc<AtomicBool>,
in_memory: Arc<AtomicBool>,
wake_up_delay_duration_ms: Arc<AtomicU64>,
) -> Self {
LockManagerConfigManager {
waiter_mgr_scheduler,
detector_scheduler,
pipelined,
in_memory,
wake_up_delay_duration_ms,
}
}
}

impl ConfigManager for LockManagerConfigManager {
fn dispatch(&mut self, mut change: ConfigChange) -> Result<(), Box<dyn Error>> {
match (
change.remove("wait_for_lock_timeout").map(Into::into),
change.remove("wake_up_delay_duration").map(Into::into),
) {
(timeout @ Some(_), delay) => {
self.waiter_mgr_scheduler.change_config(timeout, delay);
self.detector_scheduler.change_ttl(timeout.unwrap().into());
}
(None, delay @ Some(_)) => self.waiter_mgr_scheduler.change_config(None, delay),
(None, None) => {}
};
if let Some(p) = change.remove("wait_for_lock_timeout").map(Into::into) {
self.waiter_mgr_scheduler.change_config(Some(p));
self.detector_scheduler.change_ttl(p.into());
}
if let Some(p) = change
.remove("wake_up_delay_duration")
.map(ReadableDuration::from)
{
info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only the config wake_up_delay_duration need to print log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logs about the two config items mentioned here were previously printed in waiter_manager.rs when handling Task::ChangeConfig message. Now the wake_up_delay_duration is changed to be handled somewhere else, so I printed it here. When changing wait_for_lock_timeout, the log will still be printed at the old place.

"Waiter manager config changed";
"wake_up_delay_duration" => %p,
);
self.wake_up_delay_duration_ms
.store(p.as_millis(), Ordering::Relaxed);
}
if let Some(p) = change.remove("pipelined").map(Into::into) {
self.pipelined.store(p, Ordering::Relaxed);
}
Expand Down