Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: support online change lock manager config #6338

Merged
merged 9 commits into from Dec 27, 2019
3 changes: 2 additions & 1 deletion cmd/src/server.rs
Expand Up @@ -272,12 +272,14 @@ impl TiKVServer {
&mut self,
gc_worker: &GcWorker<RaftKv<ServerRaftStoreRouter>>,
) -> Arc<ServerConfig> {
let mut cfg_controller = ConfigController::new(self.config.clone());
// Create CoprocessorHost.
let mut coprocessor_host =
CoprocessorHost::new(self.config.coprocessor.clone(), self.router.clone());

let lock_mgr = if self.config.pessimistic_txn.enabled {
let lock_mgr = LockManager::new();
cfg_controller.register("pessimistic_txn", Box::new(lock_mgr.config_manager()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pessimistic_txn or pessimistic-txn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pessimistic_txn same as the TiKvConfig field name.

lock_mgr.register_detector_role_change_observer(&mut coprocessor_host);
Some(lock_mgr)
} else {
Expand Down Expand Up @@ -351,7 +353,6 @@ impl TiKVServer {
&self.config.raft_store,
self.pd_client.clone(),
);
let cfg_controller = ConfigController::new(self.config.clone());
node.start(
engines.engines.clone(),
server.transport(),
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Expand Up @@ -1370,7 +1370,7 @@ pub struct TiKvConfig {
pub security: SecurityConfig,
#[config(skip)]
pub import: ImportConfig,
#[config(skip)]
#[config(submodule)]
pub pessimistic_txn: PessimisticTxnConfig,
#[config(skip)]
pub gc: GcConfig,
Expand Down
166 changes: 165 additions & 1 deletion src/server/lock_manager/config.rs
@@ -1,11 +1,18 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::deadlock::Scheduler as DeadlockScheduler;
use super::waiter_manager::Scheduler as WaiterMgrScheduler;
use crate::config::ConfigManager;

use configuration::{ConfigChange, Configuration};

use std::error::Error;

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Debug, Configuration)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
#[config(skip)]
pub enabled: bool,
pub wait_for_lock_timeout: u64,
pub wake_up_delay_duration: u64,
Expand All @@ -30,9 +37,51 @@ impl Config {
}
}

pub struct LockManagerConfigManager {
waiter_mgr_scheduler: WaiterMgrScheduler,
detector_scheduler: DeadlockScheduler,
}

impl LockManagerConfigManager {
pub fn new(
waiter_mgr_scheduler: WaiterMgrScheduler,
detector_scheduler: DeadlockScheduler,
) -> Self {
LockManagerConfigManager {
waiter_mgr_scheduler,
detector_scheduler,
}
}
}

impl ConfigManager for LockManagerConfigManager {
fn dispatch(&mut self, mut change: ConfigChange) -> Result<(), Box<dyn Error>> {
match (
change.remove("wait_for_lock_timeout").map(Into::into),
change.remove("wake_up_delay_duration").map(Into::into),
) {
(timeout @ Some(_), delay) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if delay is None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaiterManager need to listening both update of timeout and delay, even one of them are not change (represent as None here), we still need to dispatch the change to it, and WaiterManager::handle_config_change will take care of various situations.

self.waiter_mgr_scheduler.change_config(timeout, delay);
self.detector_scheduler.change_ttl(timeout.unwrap());
}
(None, delay @ Some(_)) => self.waiter_mgr_scheduler.change_config(None, delay),
(None, None) => {}
};
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::{mpsc, Arc};
use std::time::Duration;

use super::super::LockManager;
use super::*;
use crate::config::*;
use crate::server::resolve;
use pd_client::PdClient;
use tikv_util::security::SecurityManager;

#[test]
fn test_config_validate() {
Expand All @@ -43,4 +92,119 @@ mod tests {
invalid_cfg.wait_for_lock_timeout = 0;
assert!(invalid_cfg.validate().is_err());
}

struct MockPdClient;
impl PdClient for MockPdClient {}

fn setup(cfg: TiKvConfig) -> (ConfigController, WaiterMgrScheduler, DeadlockScheduler) {
let mut lock_mgr = LockManager::new();
let pd_client = Arc::new(MockPdClient);
let (_, resolver) = resolve::new_resolver(Arc::clone(&pd_client)).unwrap();
let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap());
lock_mgr
.start(1, pd_client, resolver, security_mgr, &cfg.pessimistic_txn)
.unwrap();

let mgr = lock_mgr.config_manager();
let (w, d) = (
mgr.waiter_mgr_scheduler.clone(),
mgr.detector_scheduler.clone(),
);
let mut cfg_controller = ConfigController::new(cfg);
cfg_controller.register("pessimistic_txn", Box::new(mgr));

(cfg_controller, w, d)
}

fn validate_waiter<F>(router: &WaiterMgrScheduler, f: F)
where
F: FnOnce(u64, u64) + Send + 'static,
{
let (tx, rx) = mpsc::channel();
router.validate(Box::new(move |v1, v2| {
f(v1, v2);
tx.send(()).unwrap();
}));
rx.recv_timeout(Duration::from_secs(3)).unwrap();
}

fn validate_dead_lock<F>(router: &DeadlockScheduler, f: F)
where
F: FnOnce(u64) + Send + 'static,
{
let (tx, rx) = mpsc::channel();
router.validate(Box::new(move |v| {
f(v);
tx.send(()).unwrap();
}));
rx.recv_timeout(Duration::from_secs(3)).unwrap();
}

#[test]
fn test_lock_manager_cfg_update() {
const DEFAULT_TIMEOUT: u64 = 3000;
const DEFAULT_DELAY: u64 = 100;
let mut cfg = TiKvConfig::default();
cfg.pessimistic_txn.wait_for_lock_timeout = DEFAULT_TIMEOUT;
cfg.pessimistic_txn.wake_up_delay_duration = DEFAULT_DELAY;
cfg.validate().unwrap();
let (mut cfg_controller, waiter, deadlock) = setup(cfg.clone());

// update of other module's config should not effect lock manager config
let mut incoming = cfg.clone();
incoming.raft_store.raft_log_gc_threshold = 2000;
let rollback = cfg_controller.update_or_rollback(incoming).unwrap();
assert_eq!(rollback.right(), Some(true));
validate_waiter(&waiter, move |timeout: u64, delay: u64| {
assert_eq!(timeout, DEFAULT_TIMEOUT);
assert_eq!(delay, DEFAULT_DELAY);
});
validate_dead_lock(&deadlock, move |ttl: u64| {
assert_eq!(ttl, DEFAULT_TIMEOUT);
});

// only update wake_up_delay_duration
let mut incoming = cfg.clone();
incoming.pessimistic_txn.wake_up_delay_duration = 500;
let rollback = cfg_controller.update_or_rollback(incoming).unwrap();
assert_eq!(rollback.right(), Some(true));
validate_waiter(&waiter, move |timeout: u64, delay: u64| {
assert_eq!(timeout, DEFAULT_TIMEOUT);
assert_eq!(delay, 500);
});
validate_dead_lock(&deadlock, move |ttl: u64| {
// dead lock ttl should not change
assert_eq!(ttl, DEFAULT_TIMEOUT);
});

NingLin-P marked this conversation as resolved.
Show resolved Hide resolved
// only update wait_for_lock_timeout
let mut incoming = cfg.clone();
incoming.pessimistic_txn.wait_for_lock_timeout = 4000;
// keep wake_up_delay_duration the same as last update
incoming.pessimistic_txn.wake_up_delay_duration = 500;
let rollback = cfg_controller.update_or_rollback(incoming).unwrap();
assert_eq!(rollback.right(), Some(true));
validate_waiter(&waiter, move |timeout: u64, delay: u64| {
assert_eq!(timeout, 4000);
// wake_up_delay_duration should be the same as last update
assert_eq!(delay, 500);
});
validate_dead_lock(&deadlock, move |ttl: u64| {
assert_eq!(ttl, 4000);
});

// update both config
let mut incoming = cfg;
incoming.pessimistic_txn.wait_for_lock_timeout = 4321;
incoming.pessimistic_txn.wake_up_delay_duration = 123;
let rollback = cfg_controller.update_or_rollback(incoming).unwrap();
assert_eq!(rollback.right(), Some(true));
validate_waiter(&waiter, move |timeout: u64, delay: u64| {
assert_eq!(timeout, 4321);
assert_eq!(delay, 123);
});
validate_dead_lock(&deadlock, move |ttl: u64| {
assert_eq!(ttl, 4321);
});
}
}
31 changes: 31 additions & 0 deletions src/server/lock_manager/deadlock.rs
Expand Up @@ -224,6 +224,11 @@ impl DetectTable {
self.wait_for_map.clear();
}

/// Reset the ttl
fn reset_ttl(&mut self, ttl: Duration) {
self.ttl = ttl;
}

/// The threshold of detect table size to trigger `active_expire`.
const ACTIVE_EXPIRE_THRESHOLD: usize = 100000;
/// The interval between `active_expire`.
Expand Down Expand Up @@ -307,6 +312,11 @@ pub enum Task {
///
/// It's the only way to change the node from leader to follower, and vice versa.
ChangeRole(Role),
/// Change the ttl of DetectTable
ChangeTTL(Duration),
/// Task only used for test
#[cfg(test)]
Validate(Box<dyn FnOnce(u64) + Send>),
}

impl Display for Task {
Expand All @@ -319,6 +329,9 @@ impl Display for Task {
),
Task::DetectRpc { .. } => write!(f, "Detect Rpc"),
Task::ChangeRole(role) => write!(f, "ChangeRole {{ role: {:?} }}", role),
NingLin-P marked this conversation as resolved.
Show resolved Hide resolved
Task::ChangeTTL(ttl) => write!(f, "ChangeTTL {{ ttl: {:?} }}", ttl),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Task::ChangeTTL(ttl) => write!(f, "ChangeTTL {{ ttl: {:?} }}", ttl),
Task::ChangeTTL(ttl) => write!(f, "ChangeTTL { ttl: {:?} }", ttl),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need {{ here to print out {, so Task::ChangeTTL print something like ChangeTTL { ttl: 100ms }.

#[cfg(test)]
Task::Validate(_) => write!(f, "Validate dead lock config"),
}
}
}
Expand Down Expand Up @@ -368,6 +381,15 @@ impl Scheduler {
fn change_role(&self, role: Role) {
self.notify_scheduler(Task::ChangeRole(role));
}

pub fn change_ttl(&self, t: u64) {
self.notify_scheduler(Task::ChangeTTL(Duration::from_millis(t)));
}

#[cfg(test)]
pub fn validate(&self, f: Box<dyn FnOnce(u64) + Send>) {
self.notify_scheduler(Task::Validate(f));
}
}

impl Coprocessor for Scheduler {}
Expand Down Expand Up @@ -758,6 +780,12 @@ where
debug!("handle change role"; "role" => ?role);
self.change_role(role);
}

fn handle_change_ttl(&mut self, ttl: Duration) {
let mut inner = self.inner.borrow_mut();
inner.detect_table.reset_ttl(ttl);
info!("Deadlock detector config changed"; "ttl" => ?ttl);
}
}

impl<S, P> FutureRunnable<Task> for Detector<S, P>
Expand All @@ -774,6 +802,9 @@ where
self.handle_detect_rpc(handle, stream, sink);
}
Task::ChangeRole(role) => self.handle_change_role(role),
Task::ChangeTTL(ttl) => self.handle_change_ttl(ttl),
#[cfg(test)]
Task::Validate(f) => f(self.inner.borrow().detect_table.ttl.as_millis() as u64),
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/server/lock_manager/mod.rs
Expand Up @@ -6,7 +6,7 @@ pub mod deadlock;
mod metrics;
pub mod waiter_manager;

pub use self::config::Config;
pub use self::config::{Config, LockManagerConfigManager};
pub use self::deadlock::Service as DeadlockService;

use self::deadlock::{Detector, Scheduler as DetectorScheduler};
Expand Down Expand Up @@ -193,6 +193,13 @@ impl LockManager {
)
}

pub fn config_manager(&self) -> LockManagerConfigManager {
LockManagerConfigManager::new(
self.waiter_mgr_scheduler.clone(),
self.detector_scheduler.clone(),
)
}

fn add_to_detected(&self, txn_ts: TimeStamp) {
let mut detected = self.detected[detected_slot_idx(txn_ts)].lock();
detected.insert(txn_ts);
Expand Down
42 changes: 42 additions & 0 deletions src/server/lock_manager/waiter_manager.rs
Expand Up @@ -116,6 +116,12 @@ pub enum Task {
lock: Lock,
deadlock_key_hash: u64,
},
ChangeConfig {
timeout: Option<u64>,
delay: Option<u64>,
},
#[cfg(test)]
Validate(Box<dyn FnOnce(u64, u64) + Send>),
}

/// Debug for task.
Expand All @@ -135,6 +141,13 @@ impl Display for Task {
Task::WakeUp { lock_ts, .. } => write!(f, "waking up txns waiting for {}", lock_ts),
Task::Dump { .. } => write!(f, "dump"),
Task::Deadlock { start_ts, .. } => write!(f, "txn:{} deadlock", start_ts),
Task::ChangeConfig { timeout, delay } => write!(
f,
"change config to default_wait_for_lock_timeout: {:?}, wake_up_delay_duration: {:?}",
timeout, delay
),
#[cfg(test)]
Task::Validate(_) => write!(f, "validate waiter manager config"),
}
}
}
Expand Down Expand Up @@ -407,6 +420,15 @@ impl Scheduler {
deadlock_key_hash,
});
}

pub fn change_config(&self, timeout: Option<u64>, delay: Option<u64>) {
self.notify_scheduler(Task::ChangeConfig { timeout, delay });
}

#[cfg(test)]
pub fn validate(&self, f: Box<dyn FnOnce(u64, u64) + Send>) {
self.notify_scheduler(Task::Validate(f));
}
}

/// WaiterManager handles waiting and wake-up of pessimistic lock
Expand Down Expand Up @@ -512,6 +534,20 @@ impl WaiterManager {
Some(())
});
}

fn handle_config_change(&mut self, timeout: Option<u64>, delay: Option<u64>) {
if let Some(timeout) = timeout {
self.default_wait_for_lock_timeout = timeout;
}
if let Some(delay) = delay {
self.wake_up_delay_duration = delay;
}
info!(
"Waiter manager config changed";
"default_wait_for_lock_timeout" => self.default_wait_for_lock_timeout,
"wake_up_delay_duration" => self.wake_up_delay_duration
);
}
}

impl FutureRunnable<Task> for WaiterManager {
Expand Down Expand Up @@ -556,6 +592,12 @@ impl FutureRunnable<Task> for WaiterManager {
} => {
self.handle_deadlock(start_ts, lock, deadlock_key_hash);
}
Task::ChangeConfig { timeout, delay } => self.handle_config_change(timeout, delay),
#[cfg(test)]
Task::Validate(f) => f(
self.default_wait_for_lock_timeout,
self.wake_up_delay_duration,
),
}
}
}
Expand Down