Skip to content

Commit

Permalink
deadlock: more solid role change observer (#6415) (#6431)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 authored and sre-bot committed Jan 7, 2020
1 parent 47ff98c commit da69418
Show file tree
Hide file tree
Showing 3 changed files with 400 additions and 99 deletions.
253 changes: 230 additions & 23 deletions src/server/lock_manager/deadlock.rs
Expand Up @@ -6,7 +6,10 @@ use super::metrics::*;
use super::waiter_manager::Scheduler as WaiterMgrScheduler;
use super::{Error, Result};
use crate::pd::{PdClient, INVALID_ID};
use crate::raftstore::coprocessor::{Coprocessor, ObserverContext, RoleObserver};
use crate::raftstore::coprocessor::{
Coprocessor, CoprocessorHost, ObserverContext, RegionChangeEvent, RegionChangeObserver,
RoleObserver,
};
use crate::server::resolve::StoreAddrResolver;
use crate::storage::lock_manager::Lock;
use futures::{Future, Sink, Stream};
Expand All @@ -22,7 +25,7 @@ use raft::StateRole;
use std::cell::RefCell;
use std::fmt::{self, Display, Formatter};
use std::rc::Rc;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tikv_util::collections::{HashMap, HashSet};
use tikv_util::future::paired_future_callback;
use tikv_util::security::SecurityManager;
Expand Down Expand Up @@ -215,16 +218,6 @@ impl DetectTable {
}
}

/// The leader of the region containing the LEADER_KEY is the leader of deadlock detector.
const LEADER_KEY: &[u8] = b"";

/// Returns true if the region containing the LEADER_KEY.
fn is_leader_region(region: &'_ Region) -> bool {
!region.get_peers().is_empty()
&& region.get_start_key() <= LEADER_KEY
&& (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())
}

/// The role of the detector.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum Role {
Expand Down Expand Up @@ -273,6 +266,9 @@ pub enum Task {
///
/// It's the only way to change the node from leader to follower, and vice versa.
ChangeRole(Role),
/// Task only used for test
#[cfg(test)]
GetRole(Box<dyn FnOnce(Role) + Send>),
}

impl Display for Task {
Expand All @@ -285,6 +281,8 @@ impl Display for Task {
),
Task::DetectRpc { .. } => write!(f, "Detect Rpc"),
Task::ChangeRole(role) => write!(f, "ChangeRole {{ role: {:?} }}", role),
#[cfg(test)]
Task::GetRole(_) => write!(f, "Get role of the deadlock detector"),
}
}
}
Expand Down Expand Up @@ -334,20 +332,99 @@ impl Scheduler {
fn change_role(&self, role: Role) {
self.notify_scheduler(Task::ChangeRole(role));
}

#[cfg(test)]
pub fn get_role(&self, f: Box<dyn FnOnce(Role) + Send>) {
self.notify_scheduler(Task::GetRole(f));
}
}

impl Coprocessor for Scheduler {}
/// The leader region is the region containing the LEADER_KEY and the leader of the
/// leader region is also the leader of the deadlock detector.
const LEADER_KEY: &[u8] = b"";

/// `RoleChangeNotifier` observes region or role change events of raftstore. If the
/// region is the leader region and the role of this node is changed, a `ChangeRole`
/// task will be scheduled to the deadlock detector. It's the only way to change the
/// node from the leader of deadlock detector to follower, and vice versa.
#[derive(Clone)]
pub(crate) struct RoleChangeNotifier {
/// The id of the valid leader region.
// raftstore.coprocessor needs it to be Sync + Send.
leader_region_id: Arc<Mutex<u64>>,
scheduler: Scheduler,
}

/// Implements observer traits for `Scheduler`.
/// If the role of the node in the leader region changes, notifys the deadlock detector.
///
/// If the leader region is merged or splited in the node, the role of the node won't change.
/// If the leader region is removed and the node is the leader, it will change to follower first.
/// So there is no need to observe region change events.
impl RoleObserver for Scheduler {
impl RoleChangeNotifier {
fn is_leader_region(region: &Region) -> bool {
// The key range of a new created region is empty which misleads the leader
// of the deadlock detector stepping down.
//
// If the peers of a region is not empty, the region info is complete.
!region.get_peers().is_empty()
&& region.get_start_key() <= LEADER_KEY
&& (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())
}

pub(crate) fn new(scheduler: Scheduler) -> Self {
Self {
leader_region_id: Arc::new(Mutex::new(INVALID_ID)),
scheduler,
}
}

pub(crate) fn register(self, host: &mut CoprocessorHost) {
host.registry
.register_role_observer(1, Box::new(self.clone()));
host.registry
.register_region_change_observer(1, Box::new(self));
}
}

impl Coprocessor for RoleChangeNotifier {}

impl RoleObserver for RoleChangeNotifier {
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, role: StateRole) {
if is_leader_region(ctx.region()) {
self.change_role(role.into());
let region = ctx.region();
// A region is created first, so the leader region id must be valid.
if Self::is_leader_region(region)
&& *self.leader_region_id.lock().unwrap() == region.get_id()
{
self.scheduler.change_role(role.into());
}
}
}

impl RegionChangeObserver for RoleChangeNotifier {
fn on_region_changed(
&self,
ctx: &mut ObserverContext<'_>,
event: RegionChangeEvent,
role: StateRole,
) {
let region = ctx.region();
if Self::is_leader_region(region) {
match event {
RegionChangeEvent::Create | RegionChangeEvent::Update => {
*self.leader_region_id.lock().unwrap() = region.get_id();
self.scheduler.change_role(role.into());
}
RegionChangeEvent::Destroy => {
// When one region is merged to target region, it will be destroyed.
// If the leader region is merged to the target region and the node
// is also the leader of the target region, the RoleChangeNotifier will
// receive one RegionChangeEvent::Update of the target region and one
// RegionChangeEvent::Destroy of the leader region. To prevent the
// destroy event misleading the leader stepping down, it saves the
// valid leader region id and only when the id equals to the destroyed
// region id, it sends a ChangeRole(Follower) task to the deadlock detector.
let mut leader_region_id = self.leader_region_id.lock().unwrap();
if *leader_region_id == region.get_id() {
*leader_region_id = INVALID_ID;
self.scheduler.change_role(Role::Follower);
}
}
}
}
}
}
Expand Down Expand Up @@ -732,6 +809,8 @@ where
self.handle_detect_rpc(handle, stream, sink);
}
Task::ChangeRole(role) => self.handle_change_role(role),
#[cfg(test)]
Task::GetRole(f) => f(self.inner.borrow().role),
}
}
}
Expand Down Expand Up @@ -795,8 +874,11 @@ impl deadlock_grpc::Deadlock for Service {
}

#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::server::resolve::Callback;
use tikv_util::security::SecurityConfig;
use tikv_util::worker::FutureWorker;

#[test]
fn test_detect_table() {
Expand Down Expand Up @@ -938,4 +1020,129 @@ mod tests {
assert!(detect_table.detect(3, 1, 3).is_none());
assert_eq!(detect_table.wait_for_map.len(), 1);
}

pub(crate) struct MockPdClient;

impl PdClient for MockPdClient {}

#[derive(Clone)]
pub(crate) struct MockResolver;

impl StoreAddrResolver for MockResolver {
fn resolve(&self, _store_id: u64, _cb: Callback) -> Result<()> {
Err(Error::Other(box_err!("unimplemented")))
}
}

fn start_deadlock_detector(host: &mut CoprocessorHost) -> (FutureWorker<Task>, Scheduler) {
let waiter_mgr_worker = FutureWorker::new("dummy-waiter-mgr");
let waiter_mgr_scheduler = WaiterMgrScheduler::new(waiter_mgr_worker.scheduler());
let mut detector_worker = FutureWorker::new("test-deadlock-detector");
let detector_runner = Detector::new(
1,
Arc::new(MockPdClient {}),
MockResolver {},
Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()),
waiter_mgr_scheduler,
&Config::default(),
);
let detector_scheduler = Scheduler::new(detector_worker.scheduler());
let role_change_notifier = RoleChangeNotifier::new(detector_scheduler.clone());
role_change_notifier.register(host);
detector_worker.start(detector_runner).unwrap();
(detector_worker, detector_scheduler)
}

// Region with non-empty peers is valid.
fn new_region(id: u64, start_key: &[u8], end_key: &[u8], valid: bool) -> Region {
let mut region = Region::default();
region.set_id(id);
region.set_start_key(start_key.to_vec());
region.set_end_key(end_key.to_vec());
if valid {
region.set_peers(vec![kvproto::metapb::Peer::default()].into());
}
region
}

#[test]
fn test_role_change_notifier() {
let mut host = CoprocessorHost::default();
let (mut worker, scheduler) = start_deadlock_detector(&mut host);

let mut region = new_region(1, b"", b"", true);
let invalid = new_region(2, b"", b"", false);
let other = new_region(3, b"0", b"", true);
let follower_roles = [
StateRole::Follower,
StateRole::PreCandidate,
StateRole::Candidate,
];
let events = [
RegionChangeEvent::Create,
RegionChangeEvent::Update,
RegionChangeEvent::Destroy,
];
let check_role = |role| {
let (tx, f) = paired_future_callback();
scheduler.get_role(tx);
assert_eq!(f.wait().unwrap(), role);
};

// Region changed
for &event in &events[..2] {
for &follower_role in &follower_roles {
host.on_region_changed(&region, event, follower_role);
check_role(Role::Follower);
host.on_region_changed(&invalid, event, StateRole::Leader);
check_role(Role::Follower);
host.on_region_changed(&other, event, StateRole::Leader);
check_role(Role::Follower);
host.on_region_changed(&region, event, StateRole::Leader);
check_role(Role::Leader);
host.on_region_changed(&invalid, event, follower_role);
check_role(Role::Leader);
host.on_region_changed(&other, event, follower_role);
check_role(Role::Leader);
host.on_region_changed(&region, event, follower_role);
check_role(Role::Follower);
}
}
host.on_region_changed(&region, RegionChangeEvent::Create, StateRole::Leader);
host.on_region_changed(&invalid, RegionChangeEvent::Destroy, StateRole::Leader);
host.on_region_changed(&other, RegionChangeEvent::Destroy, StateRole::Leader);
check_role(Role::Leader);
host.on_region_changed(&region, RegionChangeEvent::Destroy, StateRole::Leader);
check_role(Role::Follower);
// Leader region id is changed.
region.set_id(2);
host.on_region_changed(&region, RegionChangeEvent::Update, StateRole::Leader);
// Destroy the previous leader region.
region.set_id(1);
host.on_region_changed(&region, RegionChangeEvent::Destroy, StateRole::Leader);
check_role(Role::Leader);

// Role changed
let region = new_region(1, b"", b"", true);
host.on_region_changed(&region, RegionChangeEvent::Create, StateRole::Follower);
check_role(Role::Follower);
for &follower_role in &follower_roles {
host.on_role_change(&region, follower_role);
check_role(Role::Follower);
host.on_role_change(&invalid, StateRole::Leader);
check_role(Role::Follower);
host.on_role_change(&other, StateRole::Leader);
check_role(Role::Follower);
host.on_role_change(&region, StateRole::Leader);
check_role(Role::Leader);
host.on_role_change(&invalid, follower_role);
check_role(Role::Leader);
host.on_role_change(&other, follower_role);
check_role(Role::Leader);
host.on_role_change(&region, follower_role);
check_role(Role::Follower);
}

worker.stop();
}
}

0 comments on commit da69418

Please sign in to comment.