Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trace peers' availability info on leader side #13209

Merged
merged 14 commits into from
Oct 12, 2022
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229
# kvproto at the same time.
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
[patch.'https://github.com/pingcap/kvproto']
# kvproto = { git = "https://github.com/your_github_id/kvproto", branch="your_branch" }
kvproto = { git = "https://github.com/ethercflow/kvproto", branch = "witness" }

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
8 changes: 8 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ pub struct Config {
pub max_snapshot_file_raw_size: ReadableSize,

pub unreachable_backoff: ReadableDuration,

#[doc(hidden)]
#[serde(skip_serializing)]
#[online_config(hidden)]
// Interval to check peers availability info.
pub check_peers_availability_interval: ReadableDuration,
}

impl Default for Config {
Expand Down Expand Up @@ -407,6 +413,8 @@ impl Default for Config {
report_region_buckets_tick_interval: ReadableDuration::secs(10),
max_snapshot_file_raw_size: ReadableSize::mb(100),
unreachable_backoff: ReadableDuration::secs(10),
// TODO: make its value reasonable
check_peers_availability_interval: ReadableDuration::secs(30),
}
}
}
Expand Down
65 changes: 65 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,7 @@ where
PeerTick::ReactivateMemoryLock => self.on_reactivate_memory_lock_tick(),
PeerTick::ReportBuckets => self.on_report_region_buckets_tick(),
PeerTick::CheckLongUncommitted => self.on_check_long_uncommitted_tick(),
PeerTick::CheckPeersAvailability => self.on_check_peers_availability(),
}
}

Expand Down Expand Up @@ -2627,6 +2628,36 @@ where
self.fsm.hibernate_state.count_vote(from.get_id());
}

fn on_trace_peer_availability_info(&mut self, from: &metapb::Peer, msg: &ExtraMessage) {
if !self.fsm.peer.is_leader() {
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
let mut resp = ExtraMessage::default();
resp.set_type(ExtraMessageType::MsgTracePeerAvailabilityInfo);
resp.wait_data = self.fsm.peer.wait_data;
self.fsm
.peer
.send_extra_message(resp, &mut self.ctx.trans, from);
info!(
"peer responses availability info to leader";
"region_id" => self.region().get_id(),
"peer_id" => self.fsm.peer.peer.get_id(),
"leader_id" => from.id,
);
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
return;
}
if !msg.wait_data {
self.fsm
.peer
.wait_data_peers
.retain(|id| *id != from.get_id());
info!(
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
"receive peer ready info";
"peer_id" => self.fsm.peer.peer.get_id(),
);
return;
}
self.register_check_peers_availability_tick();
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
}

fn on_extra_message(&mut self, mut msg: RaftMessage) {
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
match msg.get_extra_msg().get_type() {
ExtraMessageType::MsgRegionWakeUp | ExtraMessageType::MsgCheckStalePeer => {
Expand Down Expand Up @@ -2660,6 +2691,9 @@ where
ExtraMessageType::MsgRejectRaftLogCausedByMemoryUsage => {
unimplemented!()
}
ExtraMessageType::MsgTracePeerAvailabilityInfo => {
self.on_trace_peer_availability_info(msg.get_from_peer(), msg.get_extra_msg());
}
}
}

Expand Down Expand Up @@ -3209,6 +3243,7 @@ where
);
} else {
self.fsm.peer.transfer_leader(&from);
self.fsm.peer.wait_data_peers.clear();
}
}
}
Expand Down Expand Up @@ -3660,6 +3695,7 @@ where
.peer
.peers_start_pending_time
.retain(|&(p, _)| p != peer_id);
self.fsm.peer.wait_data_peers.retain(|id| *id != peer_id);
}
self.fsm.peer.remove_peer_from_cache(peer_id);
// We only care remove itself now.
Expand Down Expand Up @@ -5858,6 +5894,35 @@ where
self.schedule_tick(PeerTick::PdHeartbeat)
}

fn register_check_peers_availability_tick(&mut self) {
fail_point!("ignore schedule check peers availability tick", |_| {});
self.schedule_tick(PeerTick::CheckPeersAvailability)
}

fn on_check_peers_availability(&mut self) {
for peer_id in self.fsm.peer.wait_data_peers.iter() {
if let Some(peer) = self.fsm.peer.get_peer_from_cache(*peer_id) {
let mut msg = ExtraMessage::default();
msg.set_type(ExtraMessageType::MsgTracePeerAvailabilityInfo);
self.fsm
.peer
.send_extra_message(msg, &mut self.ctx.trans, &peer);
info!(
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
"check peer availability";
"target peer id" => *peer_id,
);
continue;
}
// TODO: make sure if the path is reasonable
warn!(
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
"peer not found, ignore check availability";
"region_id" => self.region_id(),
"peer_id" => self.fsm.peer_id(),
"to_peer_id" => peer_id,
);
}
}

fn on_check_peer_stale_state_tick(&mut self) {
if self.fsm.peer.pending_remove {
return;
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ where
self.cfg.report_region_buckets_tick_interval.0;
self.tick_batch[PeerTick::CheckLongUncommitted as usize].wait_duration =
self.cfg.check_long_uncommitted_interval.0;
self.tick_batch[PeerTick::CheckPeersAvailability as usize].wait_duration =
self.cfg.check_peers_availability_interval.0;
}
}

Expand Down
3 changes: 3 additions & 0 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ pub enum PeerTick {
ReactivateMemoryLock = 8,
ReportBuckets = 9,
CheckLongUncommitted = 10,
CheckPeersAvailability = 11,
}

impl PeerTick {
Expand All @@ -395,6 +396,7 @@ impl PeerTick {
PeerTick::ReactivateMemoryLock => "reactivate_memory_lock",
PeerTick::ReportBuckets => "report_buckets",
PeerTick::CheckLongUncommitted => "check_long_uncommitted",
PeerTick::CheckPeersAvailability => "check_peers_availability",
}
}

Expand All @@ -411,6 +413,7 @@ impl PeerTick {
PeerTick::ReactivateMemoryLock,
PeerTick::ReportBuckets,
PeerTick::CheckLongUncommitted,
PeerTick::CheckPeersAvailability,
];
TICKS
}
Expand Down
37 changes: 37 additions & 0 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ where
peer_cache: RefCell<HashMap<u64, metapb::Peer>>,
/// Record the last instant of each peer's heartbeat response.
pub peer_heartbeats: HashMap<u64, Instant>,
/// Record the waiting data status of each follower or learner peer.
pub wait_data_peers: Vec<u64>,

proposals: ProposalQueue<Callback<EK::Snapshot>>,
leader_missing_time: Option<Instant>,
Expand All @@ -910,6 +912,13 @@ where
/// target peer.
/// - all read requests must be rejected.
pub pending_remove: bool,
/// Currently it's used to indicate whether the witness -> non-witess
/// convertion operation is complete. The meaning of completion is that
/// this peer must contain the applied data, then PD can consider that
/// the conversion operation is complete, and can continue to schedule
/// other operators to prevent the existence of multiple witnesses in
/// the same time period.
pub wait_data: bool,

/// Force leader state is only used in online recovery when the majority of
/// peers are missing. In this state, it forces one peer to become leader
Expand Down Expand Up @@ -1112,6 +1121,7 @@ where
long_uncommitted_threshold: cfg.long_uncommitted_base_threshold.0,
peer_cache: RefCell::new(HashMap::default()),
peer_heartbeats: HashMap::default(),
wait_data_peers: Vec::default(),
ethercflow marked this conversation as resolved.
Show resolved Hide resolved
peers_start_pending_time: vec![],
down_peer_ids: vec![],
size_diff_hint: 0,
Expand All @@ -1122,6 +1132,7 @@ where
compaction_declined_bytes: 0,
leader_unreachable: false,
pending_remove: false,
wait_data: false,
should_wake_up: false,
force_leader: None,
pending_merge_state: None,
Expand Down Expand Up @@ -2005,6 +2016,7 @@ where
if !self.is_leader() {
self.peer_heartbeats.clear();
self.peers_start_pending_time.clear();
self.wait_data_peers.clear();
return;
}

Expand Down Expand Up @@ -2564,6 +2576,7 @@ where
// Update apply index to `last_applying_idx`
self.read_progress
.update_applied(self.last_applying_idx, &ctx.coprocessor_host);
self.notify_leader_the_peer_is_available(ctx);
}
CheckApplyingSnapStatus::Idle => {
// FIXME: It's possible that the snapshot applying task is canceled.
Expand All @@ -2580,6 +2593,29 @@ where
true
}

fn notify_leader_the_peer_is_available<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
) {
if self.wait_data {
self.wait_data = false;
fail_point!("ignore notify leader the peer is available", |_| {});
let leader_id = self.leader_id();
let leader = self.get_peer_from_cache(leader_id);
if let Some(leader) = leader {
let mut msg = ExtraMessage::default();
msg.set_type(ExtraMessageType::MsgTracePeerAvailabilityInfo);
msg.wait_data = false;
self.send_extra_message(msg, &mut ctx.trans, &leader);
info!(
"notify leader the leader is available";
"region id" => self.region().get_id(),
"peer id" => self.peer.id
);
}
}
}

pub fn handle_raft_ready_append<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
Expand Down Expand Up @@ -5221,6 +5257,7 @@ where
approximate_size: self.approximate_size,
approximate_keys: self.approximate_keys,
replication_status: self.region_replication_status(),
wait_data_peers: self.wait_data_peers.clone(),
});
if let Err(e) = ctx.pd_scheduler.schedule(task) {
error!(
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub struct HeartbeatTask {
pub approximate_size: Option<u64>,
pub approximate_keys: Option<u64>,
pub replication_status: Option<RegionReplicationStatus>,
pub wait_data_peers: Vec<u64>,
}

/// Uses an asynchronous thread to tell PD something.
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ fn test_serde_custom_tikv_config() {
long_uncommitted_base_threshold: ReadableDuration::secs(1),
max_snapshot_file_raw_size: ReadableSize::gb(10),
unreachable_backoff: ReadableDuration::secs(111),
check_peers_availability_interval: ReadableDuration::secs(30),
};
value.pd = PdConfig::new(vec!["example.com:443".to_owned()]);
let titan_cf_config = TitanCfConfig {
Expand Down