Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: support tracing peer lifetime #14056

Merged
merged 11 commits into from Feb 3, 2023
7 changes: 7 additions & 0 deletions components/raftstore-v2/src/batch/store.rs
Expand Up @@ -75,6 +75,7 @@ pub struct StoreContext<EK: KvEngine, ER: RaftEngine, T> {
pub schedulers: Schedulers<EK, ER>,
/// store meta
pub store_meta: Arc<Mutex<StoreMeta<EK>>>,
pub shutdown: Arc<AtomicBool>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this set? And FYI there's already a shutdown in StoreSystem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the exact same bool flag. It's for query only.

pub engine: ER,
pub tablet_registry: TabletRegistry<EK>,
pub apply_pool: FuturePool,
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StoreContext<EK, ER, T> {
self.cfg.report_region_buckets_tick_interval.0;
self.tick_batch[PeerTick::CheckLongUncommitted as usize].wait_duration =
self.cfg.check_long_uncommitted_interval.0;
self.tick_batch[PeerTick::GcPeer as usize].wait_duration = Duration::from_secs(60);
}
}

Expand Down Expand Up @@ -273,6 +275,7 @@ struct StorePollerBuilder<EK: KvEngine, ER: RaftEngine, T> {
apply_pool: FuturePool,
logger: Logger,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
shutdown: Arc<AtomicBool>,
snap_mgr: TabletSnapManager,
}

Expand All @@ -287,6 +290,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
schedulers: Schedulers<EK, ER>,
logger: Logger,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
shutdown: Arc<AtomicBool>,
snap_mgr: TabletSnapManager,
coprocessor_host: CoprocessorHost<EK>,
) -> Self {
Expand All @@ -312,6 +316,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
schedulers,
store_meta,
snap_mgr,
shutdown,
coprocessor_host,
}
}
Expand Down Expand Up @@ -418,6 +423,7 @@ where
timer: SteadyTimer::default(),
schedulers: self.schedulers.clone(),
store_meta: self.store_meta.clone(),
shutdown: self.shutdown.clone(),
engine: self.engine.clone(),
tablet_registry: self.tablet_registry.clone(),
apply_pool: self.apply_pool.clone(),
Expand Down Expand Up @@ -613,6 +619,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
schedulers.clone(),
self.logger.clone(),
store_meta.clone(),
self.shutdown.clone(),
snap_mgr,
coprocessor_host,
);
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/fsm/peer.rs
Expand Up @@ -225,6 +225,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
}
PeerTick::ReportBuckets => unimplemented!(),
PeerTick::CheckLongUncommitted => self.on_check_long_uncommitted(),
PeerTick::GcPeer => self.fsm.peer_mut().on_gc_peer_tick(self.store_ctx),
}
}

Expand Down
9 changes: 7 additions & 2 deletions components/raftstore-v2/src/fsm/store.rs
Expand Up @@ -12,7 +12,9 @@ use engine_traits::{KvEngine, RaftEngine};
use futures::{compat::Future01CompatExt, FutureExt};
use keys::{data_end_key, data_key};
use kvproto::metapb::Region;
use raftstore::store::{fsm::store::StoreRegionMeta, Config, RegionReadProgressRegistry};
use raftstore::store::{
fsm::store::StoreRegionMeta, Config, RegionReadProgressRegistry, Transport,
};
use slog::{info, o, Logger};
use tikv_util::{
future::poll_future_notify,
Expand Down Expand Up @@ -255,7 +257,10 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> {
}
}

pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>) {
pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>)
where
T: Transport,
{
for msg in store_msg_buf.drain(..) {
match msg {
StoreMsg::Start => self.on_start(),
Expand Down
76 changes: 72 additions & 4 deletions components/raftstore-v2/src/operation/command/admin/conf_change.rs
Expand Up @@ -49,6 +49,12 @@ pub struct ConfChangeResult {
pub region_state: RegionLocalState,
}

#[derive(Debug)]
pub struct UpdateGcPeersResult {
index: u64,
region_state: RegionLocalState,
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
#[inline]
pub fn propose_conf_change<T>(
Expand Down Expand Up @@ -177,10 +183,13 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
}
}
}
if has_new_peer.is_some() {
// Speed up snapshot instead of waiting another heartbeat.
self.raft_group_mut().ping();
self.set_has_ready();
if self.is_leader() {
if has_new_peer.is_some() {
// Speed up snapshot instead of waiting another heartbeat.
self.raft_group_mut().ping();
self.set_has_ready();
}
self.maybe_schedule_gc_peer_tick();
}
}
ctx.coprocessor_host.on_region_changed(
Expand All @@ -199,6 +208,15 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
self.set_has_extra_write();
}
}

pub fn on_apply_res_update_gc_peers(&mut self, result: UpdateGcPeersResult) {
let region_id = self.region_id();
self.state_changes_mut()
.put_region_state(region_id, result.index, &result.region_state)
.unwrap();
self.set_has_extra_write();
self.storage_mut().set_region_state(result.region_state);
}
}

impl<EK: KvEngine, R> Apply<EK, R> {
Expand Down Expand Up @@ -279,7 +297,28 @@ impl<EK: KvEngine, R> Apply<EK, R> {
);
let my_id = self.peer().get_id();
let state = self.region_state_mut();
let mut removed_records: Vec<_> = state.take_removed_records().into();
for p0 in state.get_region().get_peers() {
// No matching store ID means the peer must be removed.
if new_region
.get_peers()
.iter()
.all(|p1| p1.get_store_id() != p0.get_store_id())
{
removed_records.push(p0.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need dedup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's impossible to be duplicated.

}
}
// If a peer is replaced in the same store, the leader will keep polling the
// new peer on the same store, which implies that the old peer must be
// tombstone in the end.
removed_records.retain(|p0| {
new_region
.get_peers()
.iter()
.all(|p1| p1.get_store_id() != p0.get_store_id())
});
state.set_region(new_region.clone());
state.set_removed_records(removed_records.into());
let new_peer = new_region
.get_peers()
.iter()
Expand Down Expand Up @@ -534,4 +573,33 @@ impl<EK: KvEngine, R> Apply<EK, R> {
.inc();
Ok(())
}

pub fn apply_update_gc_peer(
&mut self,
log_index: u64,
admin_req: &AdminRequest,
) -> (AdminResponse, AdminCmdResult) {
let mut removed_records: Vec<_> = self.region_state_mut().take_removed_records().into();
let mut merged_records: Vec<_> = self.region_state_mut().take_merged_records().into();
let updates = admin_req.get_update_gc_peers().get_peer_id();
info!(self.logger, "update gc peer"; "index" => log_index, "updates" => ?updates, "gc_peers" => ?removed_records, "merged_peers" => ?merged_records);
removed_records.retain(|p| !updates.contains(&p.get_id()));
merged_records.retain_mut(|r| {
let mut sources: Vec<_> = r.take_source_peers().into();
sources.retain(|p| !updates.contains(&p.get_id()));
r.set_source_peers(sources.into());
!r.get_source_peers().is_empty()
});
self.region_state_mut()
.set_removed_records(removed_records.into());
self.region_state_mut()
.set_merged_records(merged_records.into());
(
AdminResponse::default(),
AdminCmdResult::UpdateGcPeers(UpdateGcPeersResult {
index: log_index,
region_state: self.region_state().clone(),
}),
)
}
}
7 changes: 6 additions & 1 deletion components/raftstore-v2/src/operation/command/admin/mod.rs
Expand Up @@ -7,7 +7,7 @@ mod transfer_leader;

pub use compact_log::CompactLogContext;
use compact_log::CompactLogResult;
use conf_change::ConfChangeResult;
use conf_change::{ConfChangeResult, UpdateGcPeersResult};
use engine_traits::{KvEngine, RaftEngine};
use kvproto::raft_cmdpb::{AdminCmdType, RaftCmdRequest};
use protobuf::Message;
Expand All @@ -28,6 +28,7 @@ pub enum AdminCmdResult {
ConfChange(ConfChangeResult),
TransferLeader(u64),
CompactLog(CompactLogResult),
UpdateGcPeers(UpdateGcPeersResult),
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
Expand Down Expand Up @@ -110,6 +111,10 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
}
}
AdminCmdType::CompactLog => self.propose_compact_log(ctx, req),
AdminCmdType::UpdateGcPeer => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, we rely on this cmd to propagate the removed peer list.
But what if a conf change happen and a newly added node become leader. And at that time how does the new node get the removed node list? (maybe this cmd not run yet before that node get elected).
My original thought is this information needs to be transferred from the snapshot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list is part of region local state and snapshot state. Every node should have the exact same peer list when they applies to the same index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The protocol updates can be found in pingcap/kvproto#1040.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The protocol updates can be found in pingcap/kvproto#1040.

I see. So it's part of snapshot. Thanks.

let data = req.write_to_bytes().unwrap();
self.propose(ctx, data)
}
_ => unimplemented!(),
}
};
Expand Down
21 changes: 17 additions & 4 deletions components/raftstore-v2/src/operation/command/mod.rs
Expand Up @@ -16,7 +16,7 @@
//! - Applied result are sent back to peer fsm, and update memory state in
//! `on_apply_res`.

use std::{mem, time::Duration};
use std::{mem, sync::atomic::Ordering, time::Duration};

use engine_traits::{KvEngine, PerfContext, RaftEngine, WriteBatch, WriteOptions};
use kvproto::raft_cmdpb::{
Expand All @@ -41,7 +41,9 @@ use raftstore::{
};
use slog::{info, warn};
use tikv_util::{
box_err, slog_panic,
box_err,
log::SlogFormat,
slog_panic,
time::{duration_to_sec, monotonic_raw_now, Instant},
};

Expand Down Expand Up @@ -107,7 +109,17 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
#[inline]
pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) {
let region_state = self.storage().region_state().clone();
let mailbox = store_ctx.router.mailbox(self.region_id()).unwrap();
let mailbox = match store_ctx.router.mailbox(self.region_id()) {
Some(m) => m,
None => {
assert!(
store_ctx.shutdown.load(Ordering::Relaxed),
"failed to load mailbox: {}",
SlogFormat(&self.logger)
);
return;
}
};
let logger = self.logger.clone();
let read_scheduler = self.storage().read_scheduler();
let (apply_scheduler, mut apply_fsm) = ApplyFsm::new(
Expand Down Expand Up @@ -334,6 +346,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
}
AdminCmdResult::TransferLeader(term) => self.on_transfer_leader(ctx, term),
AdminCmdResult::CompactLog(res) => self.on_apply_res_compact_log(ctx, res),
AdminCmdResult::UpdateGcPeers(state) => self.on_apply_res_update_gc_peers(state),
}
}

Expand Down Expand Up @@ -587,10 +600,10 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
AdminCmdType::PrepareFlashback => unimplemented!(),
AdminCmdType::FinishFlashback => unimplemented!(),
AdminCmdType::BatchSwitchWitness => unimplemented!(),
AdminCmdType::UpdateGcPeer => self.apply_update_gc_peer(log_index, admin_req),
AdminCmdType::InvalidAdmin => {
return Err(box_err!("invalid admin command type"));
}
AdminCmdType::UpdateGcPeer => unimplemented!(),
};

match admin_result {
Expand Down