New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
deadlock: more solid role change observer #6415
Changes from all commits
ae9c970
ed09711
e41d3a7
38e1626
68ea573
3682e55
2f3d3dd
62a3572
ed4c203
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,10 @@ use super::config::Config; | |
use super::metrics::*; | ||
use super::waiter_manager::Scheduler as WaiterMgrScheduler; | ||
use super::{Error, Result}; | ||
use crate::raftstore::coprocessor::{Coprocessor, ObserverContext, RoleObserver}; | ||
use crate::raftstore::coprocessor::{ | ||
Coprocessor, CoprocessorHost, ObserverContext, RegionChangeEvent, RegionChangeObserver, | ||
RoleObserver, | ||
}; | ||
use crate::server::resolve::StoreAddrResolver; | ||
use crate::storage::lock_manager::Lock; | ||
use futures::{Future, Sink, Stream}; | ||
|
@@ -20,7 +23,7 @@ use raft::StateRole; | |
use std::cell::RefCell; | ||
use std::fmt::{self, Display, Formatter}; | ||
use std::rc::Rc; | ||
use std::sync::Arc; | ||
use std::sync::{Arc, Mutex}; | ||
use tikv_util::collections::{HashMap, HashSet}; | ||
use tikv_util::future::paired_future_callback; | ||
use tikv_util::security::SecurityManager; | ||
|
@@ -236,20 +239,6 @@ impl DetectTable { | |
} | ||
} | ||
|
||
/// The leader of the region containing the LEADER_KEY is the leader of deadlock detector. | ||
const LEADER_KEY: &[u8] = b""; | ||
|
||
/// Returns true if the region containing the LEADER_KEY. | ||
fn is_leader_region(region: &'_ Region) -> bool { | ||
// The key range of a new created region is empty which misleads the leader | ||
// of the deadlock detector stepping down. | ||
// | ||
// If the peers of a region is not empty, the region info is complete. | ||
!region.get_peers().is_empty() | ||
&& region.get_start_key() <= LEADER_KEY | ||
&& (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key()) | ||
} | ||
|
||
/// The role of the detector. | ||
#[derive(Debug, PartialEq, Clone, Copy)] | ||
pub enum Role { | ||
|
@@ -303,6 +292,8 @@ pub enum Task { | |
/// Task only used for test | ||
#[cfg(test)] | ||
Validate(Box<dyn FnOnce(u64) + Send>), | ||
#[cfg(test)] | ||
GetRole(Box<dyn FnOnce(Role) + Send>), | ||
} | ||
|
||
impl Display for Task { | ||
|
@@ -318,6 +309,8 @@ impl Display for Task { | |
Task::ChangeTTL(ttl) => write!(f, "ChangeTTL {{ ttl: {:?} }}", ttl), | ||
#[cfg(test)] | ||
Task::Validate(_) => write!(f, "Validate dead lock config"), | ||
#[cfg(test)] | ||
Task::GetRole(_) => write!(f, "Get role of the deadlock detector"), | ||
} | ||
} | ||
} | ||
|
@@ -376,20 +369,99 @@ impl Scheduler { | |
pub fn validate(&self, f: Box<dyn FnOnce(u64) + Send>) { | ||
self.notify_scheduler(Task::Validate(f)); | ||
} | ||
|
||
#[cfg(test)] | ||
pub fn get_role(&self, f: Box<dyn FnOnce(Role) + Send>) { | ||
self.notify_scheduler(Task::GetRole(f)); | ||
} | ||
} | ||
|
||
impl Coprocessor for Scheduler {} | ||
/// The leader region is the region containing the LEADER_KEY and the leader of the | ||
/// leader region is also the leader of the deadlock detector. | ||
const LEADER_KEY: &[u8] = b""; | ||
|
||
/// Implements observer traits for `Scheduler`. | ||
/// If the role of the node in the leader region changes, notifys the deadlock detector. | ||
/// | ||
/// If the leader region is merged or splited in the node, the role of the node won't change. | ||
/// If the leader region is removed and the node is the leader, it will change to follower first. | ||
/// So there is no need to observe region change events. | ||
impl RoleObserver for Scheduler { | ||
/// `RoleChangeNotifier` observes region or role change events of raftstore. If the | ||
/// region is the leader region and the role of this node is changed, a `ChangeRole` | ||
/// task will be scheduled to the deadlock detector. It's the only way to change the | ||
/// node from the leader of deadlock detector to follower, and vice versa. | ||
#[derive(Clone)] | ||
pub(crate) struct RoleChangeNotifier { | ||
/// The id of the valid leader region. | ||
// raftstore.coprocessor needs it to be Sync + Send. | ||
leader_region_id: Arc<Mutex<u64>>, | ||
scheduler: Scheduler, | ||
} | ||
|
||
impl RoleChangeNotifier { | ||
fn is_leader_region(region: &Region) -> bool { | ||
// The key range of a new created region is empty which misleads the leader | ||
// of the deadlock detector stepping down. | ||
// | ||
// If the peers of a region is not empty, the region info is complete. | ||
!region.get_peers().is_empty() | ||
&& region.get_start_key() <= LEADER_KEY | ||
&& (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key()) | ||
} | ||
|
||
pub(crate) fn new(scheduler: Scheduler) -> Self { | ||
Self { | ||
leader_region_id: Arc::new(Mutex::new(INVALID_ID)), | ||
scheduler, | ||
} | ||
} | ||
|
||
pub(crate) fn register(self, host: &mut CoprocessorHost) { | ||
host.registry | ||
.register_role_observer(1, Box::new(self.clone())); | ||
host.registry | ||
.register_region_change_observer(1, Box::new(self)); | ||
} | ||
} | ||
|
||
impl Coprocessor for RoleChangeNotifier {} | ||
|
||
impl RoleObserver for RoleChangeNotifier { | ||
fn on_role_change(&self, ctx: &mut ObserverContext<'_>, role: StateRole) { | ||
if is_leader_region(ctx.region()) { | ||
self.change_role(role.into()); | ||
let region = ctx.region(); | ||
// A region is created first, so the leader region id must be valid. | ||
if Self::is_leader_region(region) | ||
&& *self.leader_region_id.lock().unwrap() == region.get_id() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible it is the leader region while the region id is not leader_region_id ? what shall we do if it happens? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it happens, it must be a stale region and need to do nothing since region id is updated when receiving Create or Update event. |
||
{ | ||
self.scheduler.change_role(role.into()); | ||
} | ||
} | ||
} | ||
|
||
impl RegionChangeObserver for RoleChangeNotifier { | ||
fn on_region_changed( | ||
&self, | ||
ctx: &mut ObserverContext<'_>, | ||
event: RegionChangeEvent, | ||
role: StateRole, | ||
) { | ||
let region = ctx.region(); | ||
if Self::is_leader_region(region) { | ||
match event { | ||
RegionChangeEvent::Create | RegionChangeEvent::Update => { | ||
*self.leader_region_id.lock().unwrap() = region.get_id(); | ||
self.scheduler.change_role(role.into()); | ||
} | ||
RegionChangeEvent::Destroy => { | ||
// When one region is merged to target region, it will be destroyed. | ||
// If the leader region is merged to the target region and the node | ||
// is also the leader of the target region, the RoleChangeNotifier will | ||
// receive one RegionChangeEvent::Update of the target region and one | ||
// RegionChangeEvent::Destroy of the leader region. To prevent the | ||
// destroy event misleading the leader stepping down, it saves the | ||
// valid leader region id and only when the id equals to the destroyed | ||
// region id, it sends a ChangeRole(Follower) task to the deadlock detector. | ||
let mut leader_region_id = self.leader_region_id.lock().unwrap(); | ||
if *leader_region_id == region.get_id() { | ||
*leader_region_id = INVALID_ID; | ||
self.scheduler.change_role(Role::Follower); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that when a merge happens, a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The |
||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -797,6 +869,8 @@ where | |
Task::ChangeTTL(ttl) => self.handle_change_ttl(ttl), | ||
#[cfg(test)] | ||
Task::Validate(f) => f(self.inner.borrow().detect_table.ttl.as_millis() as u64), | ||
#[cfg(test)] | ||
Task::GetRole(f) => f(self.inner.borrow().role), | ||
} | ||
} | ||
} | ||
|
@@ -866,8 +940,11 @@ impl Deadlock for Service { | |
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
pub mod tests { | ||
use super::*; | ||
use crate::server::resolve::Callback; | ||
use tikv_util::security::SecurityConfig; | ||
use tikv_util::worker::FutureWorker; | ||
|
||
#[test] | ||
fn test_detect_table() { | ||
|
@@ -1009,4 +1086,129 @@ mod tests { | |
assert!(detect_table.detect(3.into(), 1.into(), 3).is_none()); | ||
assert_eq!(detect_table.wait_for_map.len(), 1); | ||
} | ||
|
||
pub(crate) struct MockPdClient; | ||
|
||
impl PdClient for MockPdClient {} | ||
|
||
#[derive(Clone)] | ||
pub(crate) struct MockResolver; | ||
|
||
impl StoreAddrResolver for MockResolver { | ||
fn resolve(&self, _store_id: u64, _cb: Callback) -> Result<()> { | ||
Err(Error::Other(box_err!("unimplemented"))) | ||
} | ||
} | ||
|
||
fn start_deadlock_detector(host: &mut CoprocessorHost) -> (FutureWorker<Task>, Scheduler) { | ||
let waiter_mgr_worker = FutureWorker::new("dummy-waiter-mgr"); | ||
let waiter_mgr_scheduler = WaiterMgrScheduler::new(waiter_mgr_worker.scheduler()); | ||
let mut detector_worker = FutureWorker::new("test-deadlock-detector"); | ||
let detector_runner = Detector::new( | ||
1, | ||
Arc::new(MockPdClient {}), | ||
MockResolver {}, | ||
Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()), | ||
waiter_mgr_scheduler, | ||
&Config::default(), | ||
); | ||
let detector_scheduler = Scheduler::new(detector_worker.scheduler()); | ||
let role_change_notifier = RoleChangeNotifier::new(detector_scheduler.clone()); | ||
role_change_notifier.register(host); | ||
detector_worker.start(detector_runner).unwrap(); | ||
(detector_worker, detector_scheduler) | ||
} | ||
|
||
// Region with non-empty peers is valid. | ||
fn new_region(id: u64, start_key: &[u8], end_key: &[u8], valid: bool) -> Region { | ||
let mut region = Region::default(); | ||
region.set_id(id); | ||
region.set_start_key(start_key.to_vec()); | ||
region.set_end_key(end_key.to_vec()); | ||
if valid { | ||
region.set_peers(vec![kvproto::metapb::Peer::default()].into()); | ||
} | ||
region | ||
} | ||
|
||
#[test] | ||
fn test_role_change_notifier() { | ||
let mut host = CoprocessorHost::default(); | ||
let (mut worker, scheduler) = start_deadlock_detector(&mut host); | ||
|
||
let mut region = new_region(1, b"", b"", true); | ||
let invalid = new_region(2, b"", b"", false); | ||
let other = new_region(3, b"0", b"", true); | ||
let follower_roles = [ | ||
StateRole::Follower, | ||
StateRole::PreCandidate, | ||
StateRole::Candidate, | ||
]; | ||
let events = [ | ||
RegionChangeEvent::Create, | ||
RegionChangeEvent::Update, | ||
RegionChangeEvent::Destroy, | ||
]; | ||
let check_role = |role| { | ||
let (tx, f) = paired_future_callback(); | ||
scheduler.get_role(tx); | ||
assert_eq!(f.wait().unwrap(), role); | ||
}; | ||
|
||
// Region changed | ||
for &event in &events[..2] { | ||
for &follower_role in &follower_roles { | ||
host.on_region_changed(®ion, event, follower_role); | ||
check_role(Role::Follower); | ||
host.on_region_changed(&invalid, event, StateRole::Leader); | ||
check_role(Role::Follower); | ||
host.on_region_changed(&other, event, StateRole::Leader); | ||
check_role(Role::Follower); | ||
host.on_region_changed(®ion, event, StateRole::Leader); | ||
check_role(Role::Leader); | ||
host.on_region_changed(&invalid, event, follower_role); | ||
check_role(Role::Leader); | ||
host.on_region_changed(&other, event, follower_role); | ||
check_role(Role::Leader); | ||
host.on_region_changed(®ion, event, follower_role); | ||
check_role(Role::Follower); | ||
} | ||
} | ||
host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Leader); | ||
host.on_region_changed(&invalid, RegionChangeEvent::Destroy, StateRole::Leader); | ||
host.on_region_changed(&other, RegionChangeEvent::Destroy, StateRole::Leader); | ||
check_role(Role::Leader); | ||
host.on_region_changed(®ion, RegionChangeEvent::Destroy, StateRole::Leader); | ||
check_role(Role::Follower); | ||
// Leader region id is changed. | ||
region.set_id(2); | ||
host.on_region_changed(®ion, RegionChangeEvent::Update, StateRole::Leader); | ||
// Destroy the previous leader region. | ||
region.set_id(1); | ||
host.on_region_changed(®ion, RegionChangeEvent::Destroy, StateRole::Leader); | ||
check_role(Role::Leader); | ||
|
||
// Role changed | ||
let region = new_region(1, b"", b"", true); | ||
host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Follower); | ||
check_role(Role::Follower); | ||
for &follower_role in &follower_roles { | ||
host.on_role_change(®ion, follower_role); | ||
check_role(Role::Follower); | ||
host.on_role_change(&invalid, StateRole::Leader); | ||
check_role(Role::Follower); | ||
host.on_role_change(&other, StateRole::Leader); | ||
check_role(Role::Follower); | ||
host.on_role_change(®ion, StateRole::Leader); | ||
check_role(Role::Leader); | ||
host.on_role_change(&invalid, follower_role); | ||
check_role(Role::Leader); | ||
host.on_role_change(&other, follower_role); | ||
check_role(Role::Leader); | ||
host.on_role_change(®ion, follower_role); | ||
check_role(Role::Follower); | ||
} | ||
|
||
worker.stop(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if
is_leader_region
is false, please return as early as possible.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is.