From 868f1cde4a4aa4059df39a453d1c568a0f1c460b Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sun, 15 Jan 2023 18:11:39 +0800 Subject: [PATCH 1/6] raftstore-v2: support tracing peer lifetime In V1, a peer is responsible to destroy itself. The design is to make leader do less work and reduce writes. But from the practice of the pass years, not making it a strong guarantee actually makes the implementation complicated and hard to be correct and difficult to understand. In V2, we changes to make leader the very role to make sures all removed peers or merged peers must be destroyed in the end. Push mode is way easier to understand and implement correctly. The downside is extra writes are introduced but it's worthy. Signed-off-by: Jay Lee --- Cargo.lock | 2 +- Cargo.toml | 4 +- components/raftstore-v2/src/batch/store.rs | 7 + components/raftstore-v2/src/fsm/peer.rs | 1 + components/raftstore-v2/src/fsm/store.rs | 9 +- .../operation/command/admin/conf_change.rs | 76 ++++- .../src/operation/command/admin/mod.rs | 7 +- .../raftstore-v2/src/operation/command/mod.rs | 20 +- components/raftstore-v2/src/operation/life.rs | 292 +++++++++++++++++- components/raftstore-v2/src/operation/mod.rs | 2 +- .../raftstore-v2/src/operation/ready/mod.rs | 54 +++- .../src/operation/ready/snapshot.rs | 4 + components/raftstore-v2/src/raft/peer.rs | 43 ++- components/raftstore-v2/src/router/message.rs | 3 + .../tests/integrations/cluster.rs | 4 + .../tests/integrations/test_conf_change.rs | 40 ++- .../tests/integrations/test_life.rs | 214 ++++++++++++- .../raftstore/src/store/async_io/read.rs | 2 + components/raftstore/src/store/fsm/apply.rs | 1 + components/raftstore/src/store/fsm/peer.rs | 6 +- components/raftstore/src/store/peer.rs | 4 +- components/raftstore/src/store/util.rs | 1 + 22 files changed, 742 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b7ca52725c..97a81a60a24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2726,7 +2726,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#a14c44ef44b378d15adb5baad8402b838f031b51" +source = "git+https://github.com/busyjay/kvproto?branch=introduce-gc-peers#83db2c0ca5c05413b49a5f63db16cfd1a81b213e" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/Cargo.toml b/Cargo.toml index d76dce26a18..057f035c4bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -222,8 +222,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229 # When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to # kvproto at the same time. # After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`. -# [patch.'https://github.com/pingcap/kvproto'] -# kvproto = { git = "https://github.com/your_github_id/kvproto", branch = "your_branch" } +[patch.'https://github.com/pingcap/kvproto'] +kvproto = { git = "https://github.com/busyjay/kvproto", branch = "introduce-gc-peers" } [workspace] # See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md diff --git a/components/raftstore-v2/src/batch/store.rs b/components/raftstore-v2/src/batch/store.rs index ccf3f19f3ea..ded10cf95b5 100644 --- a/components/raftstore-v2/src/batch/store.rs +++ b/components/raftstore-v2/src/batch/store.rs @@ -74,6 +74,7 @@ pub struct StoreContext { pub schedulers: Schedulers, /// store meta pub store_meta: Arc>>, + pub shutdown: Arc, pub engine: ER, pub tablet_registry: TabletRegistry, pub apply_pool: FuturePool, @@ -107,6 +108,7 @@ impl StoreContext { self.cfg.report_region_buckets_tick_interval.0; self.tick_batch[PeerTick::CheckLongUncommitted as usize].wait_duration = self.cfg.check_long_uncommitted_interval.0; + self.tick_batch[PeerTick::GcPeer as usize].wait_duration = Duration::from_secs(60); } } @@ -272,6 +274,7 @@ struct StorePollerBuilder { apply_pool: FuturePool, logger: Logger, store_meta: Arc>>, + shutdown: Arc, snap_mgr: TabletSnapManager, } @@ -286,6 +289,7 @@ impl StorePollerBuilder { schedulers: Schedulers, logger: Logger, store_meta: Arc>>, + shutdown: Arc, snap_mgr: TabletSnapManager, coprocessor_host: CoprocessorHost, ) -> Self { @@ -311,6 +315,7 @@ impl StorePollerBuilder { schedulers, store_meta, snap_mgr, + shutdown, coprocessor_host, } } @@ -417,6 +422,7 @@ where timer: SteadyTimer::default(), schedulers: self.schedulers.clone(), store_meta: self.store_meta.clone(), + shutdown: self.shutdown.clone(), engine: self.engine.clone(), tablet_registry: self.tablet_registry.clone(), apply_pool: self.apply_pool.clone(), @@ -604,6 +610,7 @@ impl StoreSystem { schedulers.clone(), self.logger.clone(), store_meta.clone(), + self.shutdown.clone(), snap_mgr, coprocessor_host, ); diff --git a/components/raftstore-v2/src/fsm/peer.rs b/components/raftstore-v2/src/fsm/peer.rs index 26d5c2a1458..47d23a67d1d 100644 --- a/components/raftstore-v2/src/fsm/peer.rs +++ b/components/raftstore-v2/src/fsm/peer.rs @@ -225,6 +225,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER, } PeerTick::ReportBuckets => unimplemented!(), PeerTick::CheckLongUncommitted => self.on_check_long_uncommitted(), + PeerTick::GcPeer => self.fsm.peer_mut().on_gc_peer_tick(self.store_ctx), } } diff --git a/components/raftstore-v2/src/fsm/store.rs b/components/raftstore-v2/src/fsm/store.rs index 86e3540d23c..fb238f5e99d 100644 --- a/components/raftstore-v2/src/fsm/store.rs +++ b/components/raftstore-v2/src/fsm/store.rs @@ -12,7 +12,9 @@ use engine_traits::{KvEngine, RaftEngine}; use futures::{compat::Future01CompatExt, FutureExt}; use keys::{data_end_key, data_key}; use kvproto::metapb::Region; -use raftstore::store::{fsm::store::StoreRegionMeta, Config, RegionReadProgressRegistry}; +use raftstore::store::{ + fsm::store::StoreRegionMeta, Config, RegionReadProgressRegistry, Transport, +}; use slog::{info, o, Logger}; use tikv_util::{ future::poll_future_notify, @@ -255,7 +257,10 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T> StoreFsmDelegate<'a, EK, ER, T> { } } - pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec) { + pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec) + where + T: Transport, + { for msg in store_msg_buf.drain(..) { match msg { StoreMsg::Start => self.on_start(), diff --git a/components/raftstore-v2/src/operation/command/admin/conf_change.rs b/components/raftstore-v2/src/operation/command/admin/conf_change.rs index 42c433584fe..1b8d29a7a54 100644 --- a/components/raftstore-v2/src/operation/command/admin/conf_change.rs +++ b/components/raftstore-v2/src/operation/command/admin/conf_change.rs @@ -49,6 +49,12 @@ pub struct ConfChangeResult { pub region_state: RegionLocalState, } +#[derive(Debug)] +pub struct UpdateGcPeersResult { + index: u64, + region_state: RegionLocalState, +} + impl Peer { #[inline] pub fn propose_conf_change( @@ -177,10 +183,13 @@ impl Peer { } } } - if has_new_peer.is_some() { - // Speed up snapshot instead of waiting another heartbeat. - self.raft_group_mut().ping(); - self.set_has_ready(); + if self.is_leader() { + if has_new_peer.is_some() { + // Speed up snapshot instead of waiting another heartbeat. + self.raft_group_mut().ping(); + self.set_has_ready(); + } + self.maybe_schedule_gc_peer_tick(); } } ctx.coprocessor_host.on_region_changed( @@ -199,6 +208,15 @@ impl Peer { self.set_has_extra_write(); } } + + pub fn on_apply_res_update_gc_peers(&mut self, result: UpdateGcPeersResult) { + let region_id = self.region_id(); + self.state_changes_mut() + .put_region_state(region_id, result.index, &result.region_state) + .unwrap(); + self.set_has_extra_write(); + self.storage_mut().set_region_state(result.region_state); + } } impl Apply { @@ -279,7 +297,28 @@ impl Apply { ); let my_id = self.peer().get_id(); let state = self.region_state_mut(); + let mut removed_records: Vec<_> = state.take_removed_records().into(); + for p0 in state.get_region().get_peers() { + // No matching store ID means the peer must be removed. + if new_region + .get_peers() + .iter() + .all(|p1| p1.get_store_id() != p0.get_store_id()) + { + removed_records.push(p0.clone()); + } + } + // If a peer is replaced in the same store, the leader will keep polling the + // new peer on the same store, which implies that the old peer must be + // tombstone in the end. + removed_records.retain(|p0| { + new_region + .get_peers() + .iter() + .all(|p1| p1.get_store_id() != p0.get_store_id()) + }); state.set_region(new_region.clone()); + state.set_removed_records(removed_records.into()); let new_peer = new_region .get_peers() .iter() @@ -534,4 +573,33 @@ impl Apply { .inc(); Ok(()) } + + pub fn apply_update_gc_peer( + &mut self, + log_index: u64, + admin_req: &AdminRequest, + ) -> (AdminResponse, AdminCmdResult) { + let mut removed_records: Vec<_> = self.region_state_mut().take_removed_records().into(); + let mut merged_records: Vec<_> = self.region_state_mut().take_merged_records().into(); + let updates = admin_req.get_update_gc_peers().get_peer_id(); + info!(self.logger, "update gc peer"; "index" => log_index, "updates" => ?updates, "gc_peers" => ?removed_records, "merged_peers" => ?merged_records); + removed_records.retain(|p| !updates.contains(&p.get_id())); + merged_records.retain_mut(|r| { + let mut sources: Vec<_> = r.take_source_peers().into(); + sources.retain(|p| !updates.contains(&p.get_id())); + r.set_source_peers(sources.into()); + !r.get_source_peers().is_empty() + }); + self.region_state_mut() + .set_removed_records(removed_records.into()); + self.region_state_mut() + .set_merged_records(merged_records.into()); + ( + AdminResponse::default(), + AdminCmdResult::UpdateGcPeers(UpdateGcPeersResult { + index: log_index, + region_state: self.region_state().clone(), + }), + ) + } } diff --git a/components/raftstore-v2/src/operation/command/admin/mod.rs b/components/raftstore-v2/src/operation/command/admin/mod.rs index 52bc5329dd4..1546983645f 100644 --- a/components/raftstore-v2/src/operation/command/admin/mod.rs +++ b/components/raftstore-v2/src/operation/command/admin/mod.rs @@ -7,7 +7,7 @@ mod transfer_leader; pub use compact_log::CompactLogContext; use compact_log::CompactLogResult; -use conf_change::ConfChangeResult; +use conf_change::{ConfChangeResult, UpdateGcPeersResult}; use engine_traits::{KvEngine, RaftEngine}; use kvproto::raft_cmdpb::{AdminCmdType, RaftCmdRequest}; use protobuf::Message; @@ -28,6 +28,7 @@ pub enum AdminCmdResult { ConfChange(ConfChangeResult), TransferLeader(u64), CompactLog(CompactLogResult), + UpdateGcPeers(UpdateGcPeersResult), } impl Peer { @@ -110,6 +111,10 @@ impl Peer { } } AdminCmdType::CompactLog => self.propose_compact_log(ctx, req), + AdminCmdType::UpdateGcPeer => { + let data = req.write_to_bytes().unwrap(); + self.propose(ctx, data) + } _ => unimplemented!(), } }; diff --git a/components/raftstore-v2/src/operation/command/mod.rs b/components/raftstore-v2/src/operation/command/mod.rs index cf29d9ee25a..5434eca6b38 100644 --- a/components/raftstore-v2/src/operation/command/mod.rs +++ b/components/raftstore-v2/src/operation/command/mod.rs @@ -16,7 +16,7 @@ //! - Applied result are sent back to peer fsm, and update memory state in //! `on_apply_res`. -use std::{mem, time::Duration}; +use std::{mem, sync::atomic::Ordering, time::Duration}; use engine_traits::{KvEngine, PerfContext, RaftEngine, WriteBatch, WriteOptions}; use kvproto::raft_cmdpb::{ @@ -41,7 +41,9 @@ use raftstore::{ }; use slog::{info, warn}; use tikv_util::{ - box_err, slog_panic, + box_err, + log::SlogFormat, + slog_panic, time::{duration_to_sec, monotonic_raw_now, Instant}, }; @@ -107,7 +109,17 @@ impl Peer { #[inline] pub fn schedule_apply_fsm(&mut self, store_ctx: &mut StoreContext) { let region_state = self.storage().region_state().clone(); - let mailbox = store_ctx.router.mailbox(self.region_id()).unwrap(); + let mailbox = match store_ctx.router.mailbox(self.region_id()) { + Some(m) => m, + None => { + assert!( + store_ctx.shutdown.load(Ordering::Relaxed), + "failed to load mailbox: {}", + SlogFormat(&self.logger) + ); + return; + } + }; let logger = self.logger.clone(); let read_scheduler = self.storage().read_scheduler(); let (apply_scheduler, mut apply_fsm) = ApplyFsm::new( @@ -334,6 +346,7 @@ impl Peer { } AdminCmdResult::TransferLeader(term) => self.on_transfer_leader(ctx, term), AdminCmdResult::CompactLog(res) => self.on_apply_res_compact_log(ctx, res), + AdminCmdResult::UpdateGcPeers(state) => self.on_apply_res_update_gc_peers(state), } } @@ -587,6 +600,7 @@ impl Apply { AdminCmdType::PrepareFlashback => unimplemented!(), AdminCmdType::FinishFlashback => unimplemented!(), AdminCmdType::BatchSwitchWitness => unimplemented!(), + AdminCmdType::UpdateGcPeer => self.apply_update_gc_peer(log_index, admin_req), AdminCmdType::InvalidAdmin => { return Err(box_err!("invalid admin command type")); } diff --git a/components/raftstore-v2/src/operation/life.rs b/components/raftstore-v2/src/operation/life.rs index 88646f06b59..2b1704e2c0e 100644 --- a/components/raftstore-v2/src/operation/life.rs +++ b/components/raftstore-v2/src/operation/life.rs @@ -9,15 +9,34 @@ //! In v1, it can also be created by split. In v2, it's required to create by //! sending a message to store fsm first, and then using split to initialized //! the peer. +//! +//! A peer can only be removed in a raft group by conf change or merge. When +//! applying conf change, removed peer is added to `removed_records`; when +//! applying merge, source peer is added to merged_records. Quorum must agree +//! on the removal, but the removed peer may not necessary be in the quorum. So +//! the peer may not really destroy itself until either: +//! - applying conf change remove; +//! - receiving a RaftMessage with `is_tombstone` set; +//! - receiving a RaftMessage targeting larger ID. +//! +//! Leader is responsible to keep polling all removed peers and guarantee they +//! are really destroyed. A peer is considered destroyed only when a tombstone +//! record with the same ID or larger ID is persisted. For `removed_records`, +//! leader only needs to send a message with `is_tombstone` set. For +//! `merged_records`, to avoid race between destroy and merge, leader needs to +//! ask target peer to destroy source peer. + +use std::{cmp, mem}; use batch_system::BasicMailbox; use crossbeam::channel::{SendError, TrySendError}; use engine_traits::{KvEngine, RaftEngine, RaftLogBatch}; use kvproto::{ - metapb::Region, - raft_serverpb::{PeerState, RaftMessage}, + metapb::{self, Region}, + raft_cmdpb::{AdminCmdType, RaftCmdRequest}, + raft_serverpb::{ExtraMessageType, PeerState, RaftMessage}, }; -use raftstore::store::{util, WriteTask}; +use raftstore::store::{util, Transport, WriteTask}; use slog::{debug, error, info, warn}; use tikv_util::store::find_peer; @@ -26,7 +45,7 @@ use crate::{ batch::StoreContext, fsm::{PeerFsm, Store}, raft::{Peer, Storage}, - router::PeerMsg, + router::{CmdResChannel, PeerMsg, PeerTick}, }; /// When a peer is about to destroy, it becomes `WaitReady` first. If there is @@ -87,6 +106,11 @@ impl DestroyProgress { } } +#[derive(Default)] +pub struct GcPeerContext { + confirmed_ids: Vec, +} + impl Store { /// The method is called during split. /// The creation process is: @@ -100,6 +124,7 @@ impl Store { ) where EK: KvEngine, ER: RaftEngine, + T: Transport, { let region_id = msg.region.id; let mut raft_msg = Box::::default(); @@ -137,10 +162,11 @@ impl Store { ) where EK: KvEngine, ER: RaftEngine, + T: Transport, { let region_id = msg.get_region_id(); // The message can be sent when the peer is being created, so try send it first. - let msg = if let Err(TrySendError::Disconnected(PeerMsg::RaftMessage(m))) = + let mut msg = if let Err(TrySendError::Disconnected(PeerMsg::RaftMessage(m))) = ctx.router.send(region_id, PeerMsg::RaftMessage(msg)) { m @@ -166,13 +192,12 @@ impl Store { ctx.raft_metrics.message_dropped.mismatch_region_epoch.inc(); return; } - // TODO: maybe we need to ack the message to confirm the peer is destroyed. - if msg.get_is_tombstone() || msg.has_merge_target() { + if msg.has_merge_target() { // Target tombstone peer doesn't exist, so ignore it. ctx.raft_metrics.message_dropped.stale_msg.inc(); return; } - let from_epoch = msg.get_region_epoch(); + let mut destroyed = false; let local_state = match ctx.engine.get_region_state(region_id, u64::MAX) { Ok(s) => s, Err(e) => { @@ -192,30 +217,51 @@ impl Store { // skip handling gc for simplicity. let local_epoch = local_state.get_region().get_region_epoch(); // The region in this peer is already destroyed - if util::is_epoch_stale(from_epoch, local_epoch) { - ctx.raft_metrics.message_dropped.region_tombstone_peer.inc(); + if util::is_epoch_stale(msg.get_region_epoch(), local_epoch) { + destroyed = true; + } + if !destroyed && let Some(local_peer) = find_peer(local_state.get_region(), self.store_id()) && to_peer.id <= local_peer.get_id() { + destroyed = true; + } + } + if destroyed { + if msg.get_is_tombstone() { + if let Some(msg) = report_peer_destroyed(&mut msg) { + let _ = ctx.trans.send(msg); + } return; } - if let Some(local_peer) = find_peer(local_state.get_region(), self.store_id()) { - if to_peer.id <= local_peer.get_id() { - ctx.raft_metrics.message_dropped.region_tombstone_peer.inc(); + if msg.has_extra_msg() { + let extra_msg = msg.get_extra_msg(); + if extra_msg.get_type() == ExtraMessageType::MsgGcPeerRequest + && extra_msg.has_check_gc_peer() + { + tell_source_peer_to_destroy(ctx, &msg); return; } } + ctx.raft_metrics.message_dropped.region_tombstone_peer.inc(); + return; } + // If it's not destroyed, and the message is a tombstone message, create the + // peer and destroy immediately to leave a tombstone record. // So the peer must need to be created. We don't need to synchronous with split // as split won't create peer in v2. And we don't check for range // conflict as v2 depends on tablet, which allows conflict ranges. let mut region = Region::default(); region.set_id(region_id); - region.set_region_epoch(from_epoch.clone()); + region.set_region_epoch(msg.get_region_epoch().clone()); // Peer list doesn't have to be complete, as it's uninitialized. // // If the id of the from_peer is INVALID_ID, this msg must be sent from parent // peer in the split execution in which case we do not add it into the region. - if from_peer.id != raft::INVALID_ID { + if from_peer.id != raft::INVALID_ID + // Check merge may be sent from different region + && (msg.get_extra_msg().get_type() != ExtraMessageType::MsgGcPeerRequest + || msg.get_extra_msg().get_check_gc_peer().get_from_region_id() == region_id) + { region.mut_peers().push(from_peer.clone()); } region.mut_peers().push(to_peer.clone()); @@ -260,7 +306,223 @@ impl Store { } } +/// Tell leader that target peer is destroyed. +fn report_peer_destroyed(tombstone_msg: &mut RaftMessage) -> Option { + let to_region_id = if tombstone_msg.has_extra_msg() { + assert_eq!( + tombstone_msg.get_extra_msg().get_type(), + ExtraMessageType::MsgGcPeerRequest + ); + tombstone_msg + .get_extra_msg() + .get_check_gc_peer() + .get_from_region_id() + } else { + tombstone_msg.get_region_id() + }; + if to_region_id == 0 || tombstone_msg.get_from_peer().get_id() == 0 { + return None; + } + let mut msg = RaftMessage::default(); + msg.set_region_id(to_region_id); + msg.set_from_peer(tombstone_msg.take_to_peer()); + msg.set_to_peer(tombstone_msg.take_from_peer()); + msg.mut_extra_msg() + .set_type(ExtraMessageType::MsgGcPeerResponse); + Some(msg) +} + +fn tell_source_peer_to_destroy(ctx: &mut StoreContext, msg: &RaftMessage) +where + EK: KvEngine, + ER: RaftEngine, + T: Transport, +{ + let extra_msg = msg.get_extra_msg(); + // Instead of respond leader directly, send a message to target region to + // double check it's really destroyed. + let check_gc_peer = extra_msg.get_check_gc_peer(); + let mut tombstone_msg = Box::::default(); + tombstone_msg.set_region_id(check_gc_peer.get_check_region_id()); + tombstone_msg.set_from_peer(msg.get_from_peer().clone()); + tombstone_msg.set_to_peer(check_gc_peer.get_check_peer().clone()); + tombstone_msg.set_region_epoch(check_gc_peer.get_check_region_epoch().clone()); + tombstone_msg.set_is_tombstone(true); + // No need to set epoch as we don't know what it is. + tombstone_msg + .mut_extra_msg() + .set_type(ExtraMessageType::MsgGcPeerRequest); + tombstone_msg + .mut_extra_msg() + .mut_check_gc_peer() + .set_from_region_id(check_gc_peer.get_from_region_id()); + let _ = ctx.router.send_raft_message(tombstone_msg); +} + impl Peer { + pub fn maybe_schedule_gc_peer_tick(&mut self) { + let region_state = self.storage().region_state(); + if !region_state.get_removed_records().is_empty() + || !region_state.get_merged_records().is_empty() + { + self.add_pending_tick(PeerTick::GcPeer); + } + } + + /// Returns `true` means the sender will be gced. The message is stale. + pub fn maybe_gc_sender(&mut self, msg: &RaftMessage) -> bool { + let removed_peers = self.storage().region_state().get_removed_records(); + // Only removed_records can be determined directly. + if let Some(peer) = removed_peers + .iter() + .find(|p| p.id == msg.get_from_peer().get_id()) + { + let tombstone_msg = self.tombstone_message_for_same_region(peer.clone()); + self.add_message(tombstone_msg); + self.set_has_ready(); + true + } else { + false + } + } + + fn tombstone_message_for_same_region(&self, peer: metapb::Peer) -> RaftMessage { + let region_id = self.region_id(); + let mut tombstone_message = RaftMessage::default(); + tombstone_message.set_region_id(region_id); + tombstone_message.set_from_peer(self.peer().clone()); + tombstone_message.set_to_peer(peer); + tombstone_message.set_region_epoch(self.region().get_region_epoch().clone()); + tombstone_message.set_is_tombstone(true); + tombstone_message + } + + pub fn on_tombstone_message(&mut self, msg: &mut RaftMessage) { + match msg.get_to_peer().get_id().cmp(&self.peer_id()) { + cmp::Ordering::Less => { + if let Some(msg) = report_peer_destroyed(msg) { + self.add_message(msg); + self.set_has_ready(); + } + } + // No matter it's greater or equal, the current peer must be destroyed. + _ => { + self.mark_for_destroy(None); + } + } + } + + /// When leader tries to gc merged source peer, it will send a gc request to + /// target peer. If target peer makes sure the merged is finished, it should + /// send back a gc response to leader. + pub fn on_gc_peer_request( + &mut self, + ctx: &mut StoreContext, + msg: &RaftMessage, + ) { + let extra_msg = msg.get_extra_msg(); + if !extra_msg.has_check_gc_peer() || extra_msg.get_index() == 0 { + // Corrupted message. + return; + } + if self.storage().tablet_index() < extra_msg.get_index() { + // Merge not finish. + return; + } + + tell_source_peer_to_destroy(ctx, msg); + } + + /// A peer confirms it's destroyed. + pub fn on_gc_peer_response(&mut self, msg: &RaftMessage) { + let gc_peer_id = msg.get_from_peer().get_id(); + let state = self.storage().region_state(); + if state + .get_removed_records() + .iter() + .all(|p| p.get_id() != gc_peer_id) + && state.get_merged_records().iter().all(|p| { + p.get_source_peers() + .iter() + .all(|p| p.get_id() != gc_peer_id) + }) + { + return; + } + let ctx = self.gc_peer_context_mut(); + if ctx.confirmed_ids.contains(&gc_peer_id) { + return; + } + ctx.confirmed_ids.push(gc_peer_id); + } + + pub fn on_gc_peer_tick(&mut self, ctx: &mut StoreContext) { + if !self.is_leader() { + return; + } + let state = self.storage().region_state(); + if state.get_removed_records().is_empty() && state.get_merged_records().is_empty() { + return; + } + let mut need_gc_ids = Vec::with_capacity(5); + let gc_context = self.gc_peer_context(); + for peer in state.get_removed_records() { + need_gc_ids.push(peer.get_id()); + if gc_context.confirmed_ids.contains(&peer.get_id()) { + continue; + } + + let msg = self.tombstone_message_for_same_region(peer.clone()); + // For leader, it's OK to send gc message immediately. + let _ = ctx.trans.send(msg); + } + for record in state.get_merged_records() { + // For merge, we ask target to check whether source should be deleted. + for (source, target) in record + .get_source_peers() + .iter() + .zip(record.get_target_peers()) + { + need_gc_ids.push(source.get_id()); + if gc_context.confirmed_ids.contains(&source.get_id()) { + continue; + } + + let mut msg = RaftMessage::default(); + msg.set_region_id(record.get_target_region_id()); + msg.set_from_peer(self.peer().clone()); + msg.set_to_peer(target.clone()); + msg.set_region_epoch(record.get_target_epoch().clone()); + let extra_msg = msg.mut_extra_msg(); + extra_msg.set_type(ExtraMessageType::MsgGcPeerRequest); + extra_msg.set_index(record.get_index()); + let check_peer = extra_msg.mut_check_gc_peer(); + check_peer.set_from_region_id(self.region_id()); + check_peer.set_check_region_id(record.get_source_region_id()); + check_peer.set_check_peer(source.clone()); + check_peer.set_check_region_epoch(record.get_source_epoch().clone()); + let _ = ctx.trans.send(msg); + } + } + let gc_ctx = self.gc_peer_context_mut(); + if !gc_ctx.confirmed_ids.is_empty() { + let mut confirmed_ids = mem::take(&mut gc_ctx.confirmed_ids); + confirmed_ids.retain(|id| need_gc_ids.contains(id)); + let mut req = RaftCmdRequest::default(); + let header = req.mut_header(); + header.set_region_id(self.region_id()); + header.set_peer(self.peer().clone()); + let admin = req.mut_admin_request(); + admin.set_cmd_type(AdminCmdType::UpdateGcPeer); + let gc_peer = admin.mut_update_gc_peers(); + gc_peer.set_peer_id(confirmed_ids); + let (ch, _) = CmdResChannel::pair(); + // It's OK to fail as we will retry by tick. + self.on_admin_command(ctx, req, ch); + } + self.maybe_schedule_gc_peer_tick(); + } + /// A peer can be destroyed in three cases: /// 1. Received a gc message; /// 2. Received a message whose target peer's ID is larger than this; diff --git a/components/raftstore-v2/src/operation/mod.rs b/components/raftstore-v2/src/operation/mod.rs index 76baf31f9c8..492595851e2 100644 --- a/components/raftstore-v2/src/operation/mod.rs +++ b/components/raftstore-v2/src/operation/mod.rs @@ -12,7 +12,7 @@ pub use command::{ RequestSplit, SimpleWriteBinary, SimpleWriteEncoder, SimpleWriteReqDecoder, SimpleWriteReqEncoder, SplitFlowControl, SPLIT_PREFIX, }; -pub use life::DestroyProgress; +pub use life::{DestroyProgress, GcPeerContext}; pub use ready::{ cf_offset, write_initial_states, ApplyTrace, AsyncWriter, DataTrace, GenSnapTask, SnapState, StateStorage, diff --git a/components/raftstore-v2/src/operation/ready/mod.rs b/components/raftstore-v2/src/operation/ready/mod.rs index 87e1c100a87..e2a5f3f7935 100644 --- a/components/raftstore-v2/src/operation/ready/mod.rs +++ b/components/raftstore-v2/src/operation/ready/mod.rs @@ -25,7 +25,10 @@ use std::{cmp, time::Instant}; use engine_traits::{KvEngine, RaftEngine}; use error_code::ErrorCodeExt; -use kvproto::{raft_cmdpb::AdminCmdType, raft_serverpb::RaftMessage}; +use kvproto::{ + raft_cmdpb::AdminCmdType, + raft_serverpb::{ExtraMessageType, RaftMessage}, +}; use protobuf::Message as _; use raft::{eraftpb, prelude::MessageType, Ready, StateRole, INVALID_ID}; use raftstore::{ @@ -155,7 +158,7 @@ impl Peer { } } - pub fn on_raft_message( + pub fn on_raft_message( &mut self, ctx: &mut StoreContext, mut msg: Box, @@ -174,16 +177,34 @@ impl Peer { if !self.serving() { return; } + if util::is_vote_msg(msg.get_message()) && self.maybe_gc_sender(&msg) { + return; + } if msg.get_to_peer().get_store_id() != self.peer().get_store_id() { ctx.raft_metrics.message_dropped.mismatch_store_id.inc(); return; } - if !msg.has_region_epoch() { - ctx.raft_metrics.message_dropped.mismatch_region_epoch.inc(); + if msg.get_is_tombstone() { + self.on_tombstone_message(&mut msg); return; } - if msg.get_is_tombstone() { - self.mark_for_destroy(None); + if msg.has_extra_msg() && msg.get_to_peer().get_id() == self.peer_id() { + // GcRequest/GcResponse may be sent from/to different regions, skip further + // checks. + match msg.get_extra_msg().get_type() { + ExtraMessageType::MsgGcPeerResponse => { + self.on_gc_peer_response(&msg); + return; + } + ExtraMessageType::MsgGcPeerRequest => { + self.on_gc_peer_request(ctx, &msg); + return; + } + _ => (), + } + } + if !msg.has_region_epoch() { + ctx.raft_metrics.message_dropped.mismatch_region_epoch.inc(); return; } if msg.has_merge_target() { @@ -207,8 +228,11 @@ impl Peer { } } if msg.has_extra_msg() { - unimplemented!(); - // return; + match msg.get_extra_msg().get_type() { + ExtraMessageType::MsgGcPeerResponse => self.on_gc_peer_response(&msg), + _ => unimplemented!(), + } + return; } // TODO: drop all msg append when the peer is uninitialized and has conflict @@ -452,6 +476,7 @@ impl Peer { ctx.has_ready = true; if !has_extra_write + && !self.has_pending_messages() && !self.raft_group().has_ready() && (self.serving() || self.postponed_destroy()) { @@ -494,6 +519,11 @@ impl Peer { self.send_raft_message_on_leader(ctx, msg); } } + if self.has_pending_messages() { + for msg in self.take_pending_messages() { + self.send_raft_message_on_leader(ctx, msg); + } + } } self.apply_reads(ctx, &ready); @@ -527,6 +557,13 @@ impl Peer { .flat_map(|m| self.build_raft_message(m)) .collect(); } + if self.has_pending_messages() { + if write_task.messages.is_empty() { + write_task.messages = self.take_pending_messages(); + } else { + write_task.messages.extend(self.take_pending_messages()); + } + } if !self.serving() { self.start_destroy(ctx, &mut write_task); ctx.coprocessor_host.on_region_changed( @@ -742,6 +779,7 @@ impl Peer { self.add_pending_tick(PeerTick::CompactLog); self.add_pending_tick(PeerTick::SplitRegionCheck); self.add_pending_tick(PeerTick::CheckLongUncommitted); + self.maybe_schedule_gc_peer_tick(); } StateRole::Follower => { self.leader_lease_mut().expire(); diff --git a/components/raftstore-v2/src/operation/ready/snapshot.rs b/components/raftstore-v2/src/operation/ready/snapshot.rs index 04b6ed7e12b..aa1e95dfaed 100644 --- a/components/raftstore-v2/src/operation/ready/snapshot.rs +++ b/components/raftstore-v2/src/operation/ready/snapshot.rs @@ -529,6 +529,8 @@ impl Storage { let mut snap_data = RaftSnapshotData::default(); snap_data.merge_from_bytes(snap.get_data())?; let region = snap_data.take_region(); + let removed_records = snap_data.take_removed_records(); + let merged_records = snap_data.take_merged_records(); if region.get_id() != region_id { return Err(box_err!( "mismatch region id {}!={}", @@ -569,6 +571,8 @@ impl Storage { let region_state = self.region_state_mut(); region_state.set_state(PeerState::Normal); region_state.set_region(region); + region_state.set_removed_records(removed_records); + region_state.set_merged_records(merged_records); region_state.set_tablet_index(last_index); let entry_storage = self.entry_storage_mut(); entry_storage.raft_state_mut().set_last_index(last_index); diff --git a/components/raftstore-v2/src/raft/peer.rs b/components/raftstore-v2/src/raft/peer.rs index 6cfcda4da25..814dc72e622 100644 --- a/components/raftstore-v2/src/raft/peer.rs +++ b/components/raftstore-v2/src/raft/peer.rs @@ -10,7 +10,10 @@ use collections::{HashMap, HashSet}; use engine_traits::{ CachedTablet, FlushState, KvEngine, RaftEngine, TabletContext, TabletRegistry, }; -use kvproto::{metapb, pdpb, raft_serverpb::RegionLocalState}; +use kvproto::{ + metapb, pdpb, + raft_serverpb::{RaftMessage, RegionLocalState}, +}; use pd_client::BucketStat; use raft::{RawNode, StateRole}; use raftstore::{ @@ -28,8 +31,8 @@ use super::storage::Storage; use crate::{ fsm::ApplyScheduler, operation::{ - AsyncWriter, CompactLogContext, DestroyProgress, ProposalControl, SimpleWriteReqEncoder, - SplitFlowControl, TxnContext, + AsyncWriter, CompactLogContext, DestroyProgress, GcPeerContext, ProposalControl, + SimpleWriteReqEncoder, SplitFlowControl, TxnContext, }, router::{CmdResChannel, PeerTick, QueryResChannel}, Result, @@ -103,6 +106,12 @@ pub struct Peer { leader_transferee: u64, long_uncommitted_threshold: u64, + + /// Pending messages to be sent on handle ready. We should avoid sending + /// messages immediately otherwise it may break the persistence assumption. + pending_messages: Vec, + + gc_peer_context: GcPeerContext, } impl Peer { @@ -182,6 +191,8 @@ impl Peer { cfg.long_uncommitted_base_threshold.0.as_secs(), 1, ), + pending_messages: vec![], + gc_peer_context: GcPeerContext::default(), }; // If this region has only one peer and I am the one, campaign directly. @@ -624,6 +635,7 @@ impl Peer { #[inline] pub fn add_pending_tick(&mut self, tick: PeerTick) { + // Msg per batch is 4096/256 by default, the buffer won't grow too large. self.pending_ticks.push(tick); } @@ -755,4 +767,29 @@ impl Peer { pub fn set_long_uncommitted_threshold(&mut self, dur: Duration) { self.long_uncommitted_threshold = cmp::max(dur.as_secs(), 1); } + + #[inline] + pub fn add_message(&mut self, msg: RaftMessage) { + self.pending_messages.push(msg); + } + + #[inline] + pub fn has_pending_messages(&mut self) -> bool { + !self.pending_messages.is_empty() + } + + #[inline] + pub fn take_pending_messages(&mut self) -> Vec { + mem::take(&mut self.pending_messages) + } + + #[inline] + pub fn gc_peer_context(&self) -> &GcPeerContext { + &self.gc_peer_context + } + + #[inline] + pub fn gc_peer_context_mut(&mut self) -> &mut GcPeerContext { + &mut self.gc_peer_context + } } diff --git a/components/raftstore-v2/src/router/message.rs b/components/raftstore-v2/src/router/message.rs index c1e5f0d37dc..d58e75a2a38 100644 --- a/components/raftstore-v2/src/router/message.rs +++ b/components/raftstore-v2/src/router/message.rs @@ -32,6 +32,7 @@ pub enum PeerTick { ReactivateMemoryLock = 8, ReportBuckets = 9, CheckLongUncommitted = 10, + GcPeer = 11, } impl PeerTick { @@ -51,6 +52,7 @@ impl PeerTick { PeerTick::ReactivateMemoryLock => "reactivate_memory_lock", PeerTick::ReportBuckets => "report_buckets", PeerTick::CheckLongUncommitted => "check_long_uncommitted", + PeerTick::GcPeer => "gc_peer", } } @@ -67,6 +69,7 @@ impl PeerTick { PeerTick::ReactivateMemoryLock, PeerTick::ReportBuckets, PeerTick::CheckLongUncommitted, + PeerTick::GcPeer, ]; TICKS } diff --git a/components/raftstore-v2/tests/integrations/cluster.rs b/components/raftstore-v2/tests/integrations/cluster.rs index ce0248130fb..1949e6eaf0f 100644 --- a/components/raftstore-v2/tests/integrations/cluster.rs +++ b/components/raftstore-v2/tests/integrations/cluster.rs @@ -512,6 +512,10 @@ impl Cluster { &self.nodes[offset] } + pub fn receiver(&self, offset: usize) -> &Receiver { + &self.receivers[offset] + } + /// Send messages and wait for side effects are all handled. #[allow(clippy::vec_box)] pub fn dispatch(&self, region_id: u64, mut msgs: Vec>) { diff --git a/components/raftstore-v2/tests/integrations/test_conf_change.rs b/components/raftstore-v2/tests/integrations/test_conf_change.rs index 8a075bb9a35..84b3fdbed10 100644 --- a/components/raftstore-v2/tests/integrations/test_conf_change.rs +++ b/components/raftstore-v2/tests/integrations/test_conf_change.rs @@ -2,7 +2,7 @@ use std::{self, time::Duration}; -use engine_traits::{Peekable, CF_DEFAULT}; +use engine_traits::{Peekable, RaftEngineReadOnly, CF_DEFAULT}; use kvproto::raft_cmdpb::AdminCmdType; use raft::prelude::ConfChangeType; use raftstore_v2::{ @@ -37,7 +37,7 @@ fn test_simple_change() { let match_index = meta.raft_apply.applied_index; assert_eq!(meta.region_state.epoch.version, epoch.get_version()); assert_eq!(meta.region_state.epoch.conf_ver, new_conf_ver); - assert_eq!(meta.region_state.peers, vec![leader_peer, new_peer]); + assert_eq!(meta.region_state.peers, vec![leader_peer, new_peer.clone()]); // So heartbeat will create a learner. cluster.dispatch(2, vec![]); @@ -95,6 +95,42 @@ fn test_simple_change() { assert_eq!(meta.region_state.epoch.version, epoch.get_version()); assert_eq!(meta.region_state.epoch.conf_ver, new_conf_ver); assert_eq!(meta.region_state.peers, vec![leader_peer]); + cluster.routers[0].wait_flush(region_id, Duration::from_millis(300)); + let raft_engine = &cluster.node(0).running_state().unwrap().raft_engine; + let region_state = raft_engine + .get_region_state(region_id, u64::MAX) + .unwrap() + .unwrap(); + assert!( + region_state.get_removed_records().contains(&new_peer), + "{:?}", + region_state + ); + + // If adding a peer on the same store, removed_records should be cleaned. + req.mut_header() + .mut_region_epoch() + .set_conf_ver(new_conf_ver); + req.mut_admin_request() + .mut_change_peer() + .set_change_type(ConfChangeType::AddLearnerNode); + req.mut_admin_request() + .mut_change_peer() + .mut_peer() + .set_id(11); + let resp = cluster.routers[0].admin_command(2, req.clone()).unwrap(); + assert!(!resp.get_header().has_error(), "{:?}", resp); + cluster.routers[0].wait_flush(region_id, Duration::from_millis(300)); + let region_state = raft_engine + .get_region_state(region_id, u64::MAX) + .unwrap() + .unwrap(); + assert!( + region_state.get_removed_records().is_empty(), + "{:?}", + region_state + ); + // TODO: check if the peer is removed once life trace is implemented or // snapshot is implemented. diff --git a/components/raftstore-v2/tests/integrations/test_life.rs b/components/raftstore-v2/tests/integrations/test_life.rs index a2ae0bbb9f8..86ff211b4f4 100644 --- a/components/raftstore-v2/tests/integrations/test_life.rs +++ b/components/raftstore-v2/tests/integrations/test_life.rs @@ -7,14 +7,19 @@ use std::{ }; use crossbeam::channel::TrySendError; -use engine_traits::{RaftEngine, RaftEngineReadOnly}; +use engine_traits::{RaftEngine, RaftEngineReadOnly, CF_DEFAULT}; use futures::executor::block_on; use kvproto::{ metapb, - raft_serverpb::{PeerState, RaftMessage}, + raft_cmdpb::AdminCmdType, + raft_serverpb::{ExtraMessageType, PeerState, RaftMessage}, }; -use raftstore_v2::router::{DebugInfoChannel, PeerMsg}; -use tikv_util::store::new_peer; +use raft::prelude::{ConfChangeType, MessageType}; +use raftstore_v2::{ + router::{DebugInfoChannel, PeerMsg, PeerTick}, + SimpleWriteEncoder, +}; +use tikv_util::store::{new_learner_peer, new_peer}; use crate::cluster::{Cluster, TestRouter}; @@ -62,6 +67,23 @@ fn assert_tombstone(raft_engine: &impl RaftEngine, region_id: u64, peer: &metapb ); } +#[track_caller] +fn assert_valid_report(report: &RaftMessage, region_id: u64, peer_id: u64) { + assert_eq!( + report.get_extra_msg().get_type(), + ExtraMessageType::MsgGcPeerResponse + ); + assert_eq!(report.get_region_id(), region_id); + assert_eq!(report.get_from_peer().get_id(), peer_id); +} + +#[track_caller] +fn assert_tombstone_msg(msg: &RaftMessage, region_id: u64, peer_id: u64) { + assert_eq!(msg.get_region_id(), region_id); + assert_eq!(msg.get_to_peer().get_id(), peer_id); + assert!(msg.get_is_tombstone()); +} + /// Test a peer can be created by general raft message and destroyed tombstone /// message. #[test] @@ -99,9 +121,6 @@ fn test_life_by_message() { msg.take_region_epoch(); }); - // Check tombstone. - assert_wrong(&|msg| msg.set_is_tombstone(true)); - // Correct message will create a peer, but the peer will not be initialized. router.send_raft_message(msg.clone()).unwrap(); let timeout = Duration::from_secs(3); @@ -156,11 +175,20 @@ fn test_destroy_by_larger_id() { msg.mut_region_epoch().set_conf_ver(1); msg.set_from_peer(new_peer(2, 8)); let raft_message = msg.mut_message(); - raft_message.set_msg_type(raft::prelude::MessageType::MsgHeartbeat); + raft_message.set_msg_type(MessageType::MsgHeartbeat); raft_message.set_from(6); raft_message.set_term(init_term); // Create the peer. router.send_raft_message(msg.clone()).unwrap(); + // There must be heartbeat response. + let hb = cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap(); + assert_eq!( + hb.get_message().get_msg_type(), + MessageType::MsgHeartbeatResponse + ); let timeout = Duration::from_secs(3); let meta = router @@ -178,6 +206,20 @@ fn test_destroy_by_larger_id() { .unwrap(); assert_eq!(meta.raft_status.id, test_peer_id); assert_eq!(meta.raft_status.hard_state.term, init_term); + cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap_err(); + + // Smaller ID tombstone message should trigger report. + let mut smaller_id_tombstone_msg = smaller_id_msg.clone(); + smaller_id_tombstone_msg.set_is_tombstone(true); + router.send_raft_message(smaller_id_tombstone_msg).unwrap(); + let report = cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap(); + assert_valid_report(&report, test_region_id, 8); // Larger ID should trigger destroy. let mut larger_id_msg = smaller_id_msg; @@ -199,3 +241,159 @@ fn test_destroy_by_larger_id() { assert_eq!(meta.raft_status.id, test_peer_id + 1); assert_eq!(meta.raft_status.hard_state.term, init_term + 1); } + +#[test] +fn test_gc_peer_request() { + let cluster = Cluster::default(); + let router = &cluster.routers[0]; + let test_region_id = 4; + let test_peer_id = 5; + let test_leader_id = 6; + + let mut msg = Box::::default(); + msg.set_region_id(test_region_id); + msg.set_to_peer(new_peer(1, test_peer_id)); + msg.mut_region_epoch().set_conf_ver(1); + msg.set_from_peer(new_peer(2, test_leader_id)); + let raft_message = msg.mut_message(); + raft_message.set_msg_type(raft::prelude::MessageType::MsgHeartbeat); + raft_message.set_from(6); + raft_message.set_term(5); + + // Tombstone message should create the peer and then destroy it. + let mut tombstone_msg = msg.clone(); + tombstone_msg.set_is_tombstone(true); + router.send_raft_message(tombstone_msg.clone()).unwrap(); + cluster.routers[0].wait_flush(test_region_id, Duration::from_millis(300)); + assert_peer_not_exist(test_region_id, test_peer_id, router); + // Resend a normal message will not create the peer. + router.send_raft_message(msg).unwrap(); + assert_peer_not_exist(test_region_id, test_peer_id, router); + cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap_err(); + // Resend tombstone message should trigger report. + router.send_raft_message(tombstone_msg).unwrap(); + assert_peer_not_exist(test_region_id, test_peer_id, router); + let report = cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap(); + assert_valid_report(&report, test_region_id, test_leader_id); +} + +#[test] +fn test_gc_peer_response() { + let cluster = Cluster::with_node_count(2, None); + let region_id = 2; + let mut req = cluster.routers[0].new_request_for(region_id); + let admin_req = req.mut_admin_request(); + admin_req.set_cmd_type(AdminCmdType::ChangePeer); + admin_req + .mut_change_peer() + .set_change_type(ConfChangeType::AddLearnerNode); + let store_id = cluster.node(1).id(); + let new_peer = new_learner_peer(store_id, 10); + admin_req.mut_change_peer().set_peer(new_peer.clone()); + let resp = cluster.routers[0].admin_command(2, req.clone()).unwrap(); + assert!(!resp.get_header().has_error(), "{:?}", resp); + let raft_engine = &cluster.node(0).running_state().unwrap().raft_engine; + let region_state = raft_engine + .get_region_state(region_id, u64::MAX) + .unwrap() + .unwrap(); + assert!(region_state.get_removed_records().is_empty()); + + let new_conf_ver = req.get_header().get_region_epoch().get_conf_ver() + 1; + req.mut_header() + .mut_region_epoch() + .set_conf_ver(new_conf_ver); + req.mut_admin_request() + .mut_change_peer() + .set_change_type(ConfChangeType::RemoveNode); + let resp = cluster.routers[0] + .admin_command(region_id, req.clone()) + .unwrap(); + assert!(!resp.get_header().has_error(), "{:?}", resp); + cluster.routers[0].wait_flush(region_id, Duration::from_millis(300)); + // Drain all existing messages. + while cluster.receiver(0).try_recv().is_ok() {} + + let mut msg = Box::::default(); + msg.set_region_id(region_id); + msg.set_to_peer(req.get_header().get_peer().clone()); + msg.set_from_peer(new_peer); + let receiver = &cluster.receiver(0); + for ty in &[MessageType::MsgRequestVote, MessageType::MsgRequestPreVote] { + msg.mut_message().set_msg_type(*ty); + cluster.routers[0].send_raft_message(msg.clone()).unwrap(); + let tombstone_msg = match receiver.recv_timeout(Duration::from_millis(300)) { + Ok(msg) => msg, + Err(e) => panic!("failed to receive tombstone message {:?}: {:?}", ty, e), + }; + assert_tombstone_msg(&tombstone_msg, region_id, 10); + } + // Non-vote message should not trigger tombstone. + msg.mut_message().set_msg_type(MessageType::MsgHeartbeat); + cluster.routers[0].send_raft_message(msg).unwrap(); + cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap_err(); + + // GcTick should also trigger tombstone. + cluster.routers[0] + .send(region_id, PeerMsg::Tick(PeerTick::GcPeer)) + .unwrap(); + let tombstone_msg = cluster + .receiver(0) + .recv_timeout(Duration::from_millis(300)) + .unwrap(); + assert_tombstone_msg(&tombstone_msg, region_id, 10); + + // First message to create the peer and destroy. + cluster.routers[1] + .send_raft_message(Box::new(tombstone_msg.clone())) + .unwrap(); + cluster.routers[1].wait_flush(region_id, Duration::from_millis(300)); + cluster + .receiver(1) + .recv_timeout(Duration::from_millis(300)) + .unwrap_err(); + // Send message should trigger tombstone report. + cluster.routers[1] + .send_raft_message(Box::new(tombstone_msg)) + .unwrap(); + let report = cluster + .receiver(1) + .recv_timeout(Duration::from_millis(300)) + .unwrap(); + assert_valid_report(&report, region_id, 10); + cluster.routers[0] + .send_raft_message(Box::new(report)) + .unwrap(); + let raft_engine = &cluster.node(0).running_state().unwrap().raft_engine; + let region_state = raft_engine + .get_region_state(region_id, u64::MAX) + .unwrap() + .unwrap(); + assert_eq!(region_state.get_removed_records().len(), 1); + // Tick should flush records gc. + cluster.routers[0] + .send(region_id, PeerMsg::Tick(PeerTick::GcPeer)) + .unwrap(); + // Trigger a write to make sure records gc is finished. + let header = Box::new(cluster.routers[0].new_request_for(region_id).take_header()); + let mut put = SimpleWriteEncoder::with_capacity(64); + put.put(CF_DEFAULT, b"key", b"value"); + let (msg, sub) = PeerMsg::simple_write(header, put.encode()); + cluster.routers[0].send(region_id, msg).unwrap(); + block_on(sub.result()).unwrap(); + cluster.routers[0].wait_flush(region_id, Duration::from_millis(300)); + let region_state = raft_engine + .get_region_state(region_id, u64::MAX) + .unwrap() + .unwrap(); + assert!(region_state.get_removed_records().is_empty()); +} diff --git a/components/raftstore/src/store/async_io/read.rs b/components/raftstore/src/store/async_io/read.rs index b298ed3529e..f5e2c946398 100644 --- a/components/raftstore/src/store/async_io/read.rs +++ b/components/raftstore/src/store/async_io/read.rs @@ -218,6 +218,8 @@ where snap_data.set_region(region_state.get_region().clone()); snap_data.set_version(TABLET_SNAPSHOT_VERSION); snap_data.mut_meta().set_for_balance(for_balance); + snap_data.set_removed_records(region_state.get_removed_records().into()); + snap_data.set_merged_records(region_state.get_merged_records().into()); snapshot.set_data(snap_data.write_to_bytes().unwrap().into()); // create checkpointer. diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 58df32fd404..33c5a51e9da 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1664,6 +1664,7 @@ where self.exec_flashback(ctx, request) } AdminCmdType::BatchSwitchWitness => Err(box_err!("unsupported admin command type")), + AdminCmdType::UpdateGcPeer => Err(box_err!("v2 only command and it's safe to skip")), AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")), }?; response.set_cmd_type(cmd_type); diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 7e00798b6df..c80abae5f84 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -2700,7 +2700,7 @@ where } let mut resp = ExtraMessage::default(); resp.set_type(ExtraMessageType::MsgVoterReplicatedIndexResponse); - resp.voter_replicated_index = voter_replicated_idx; + resp.index = voter_replicated_idx; self.fsm .peer .send_extra_message(resp, &mut self.ctx.trans, from); @@ -2717,7 +2717,7 @@ where if self.fsm.peer.is_leader() || !self.fsm.peer.is_witness() { return; } - let voter_replicated_index = msg.voter_replicated_index; + let voter_replicated_index = msg.index; if let Ok(voter_replicated_term) = self.fsm.peer.get_store().term(voter_replicated_index) { self.ctx.apply_router.schedule_task( self.region_id(), @@ -2785,6 +2785,8 @@ where ExtraMessageType::MsgVoterReplicatedIndexResponse => { self.on_voter_replicated_index_response(msg.get_extra_msg()); } + // It's v2 only message and ignore does no harm. + ExtraMessageType::MsgGcPeerRequest | ExtraMessageType::MsgGcPeerResponse => (), } } diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 586ab7ba133..af0cceb0029 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -4793,7 +4793,7 @@ where return; } if let Some(ref state) = self.pending_merge_state { - if state.get_commit() == extra_msg.get_premerge_commit() { + if state.get_commit() == extra_msg.get_index() { self.add_want_rollback_merge_peer(peer_id); } } @@ -5388,7 +5388,7 @@ where }; let mut extra_msg = ExtraMessage::default(); extra_msg.set_type(ExtraMessageType::MsgWantRollbackMerge); - extra_msg.set_premerge_commit(premerge_commit); + extra_msg.set_index(premerge_commit); self.send_extra_message(extra_msg, &mut ctx.trans, &to_peer); } diff --git a/components/raftstore/src/store/util.rs b/components/raftstore/src/store/util.rs index 2d27b56fda5..3660f2454ca 100644 --- a/components/raftstore/src/store/util.rs +++ b/components/raftstore/src/store/util.rs @@ -229,6 +229,7 @@ pub fn admin_cmd_epoch_lookup(admin_cmp_type: AdminCmdType) -> AdminCmdEpochStat AdminCmdEpochState::new(true, true, false, false) } AdminCmdType::BatchSwitchWitness => unimplemented!(), + AdminCmdType::UpdateGcPeer => AdminCmdEpochState::new(false, false, false, false), } } From ef3a2bb8e960c2fe64445955137beb98ad7272ec Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Mon, 16 Jan 2023 20:48:48 +0800 Subject: [PATCH 2/6] fix copr Signed-off-by: Jay Lee --- components/raftstore-v2/src/operation/ready/mod.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/components/raftstore-v2/src/operation/ready/mod.rs b/components/raftstore-v2/src/operation/ready/mod.rs index e2a5f3f7935..95117ff6b1c 100644 --- a/components/raftstore-v2/src/operation/ready/mod.rs +++ b/components/raftstore-v2/src/operation/ready/mod.rs @@ -21,7 +21,7 @@ mod apply_trace; mod async_writer; mod snapshot; -use std::{cmp, time::Instant}; +use std::{borrow::Cow, cmp, time::Instant}; use engine_traits::{KvEngine, RaftEngine}; use error_code::ErrorCodeExt; @@ -566,8 +566,18 @@ impl Peer { } if !self.serving() { self.start_destroy(ctx, &mut write_task); + let region = if self.storage().is_initialized() { + Cow::Borrowed(self.region()) + } else { + // Hack: In v1, it expects epoch empty to work correct. + // TODO: maybe we need to check uninitialized correctly in coprocessor. + let mut region = self.region().clone(); + region.take_peers(); + region.take_region_epoch(); + Cow::Owned(region) + }; ctx.coprocessor_host.on_region_changed( - self.region(), + ®ion, RegionChangeEvent::Destroy, self.raft_group().raft.state, ); From 68290d535d7570a71823cfcfbff0628af725ff89 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sun, 29 Jan 2023 18:33:21 +0800 Subject: [PATCH 3/6] fix test Signed-off-by: Jay Lee --- components/raftstore-v2/tests/integrations/test_life.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/raftstore-v2/tests/integrations/test_life.rs b/components/raftstore-v2/tests/integrations/test_life.rs index 86ff211b4f4..2a5dfafc509 100644 --- a/components/raftstore-v2/tests/integrations/test_life.rs +++ b/components/raftstore-v2/tests/integrations/test_life.rs @@ -219,7 +219,7 @@ fn test_destroy_by_larger_id() { .receiver(0) .recv_timeout(Duration::from_millis(300)) .unwrap(); - assert_valid_report(&report, test_region_id, 8); + assert_valid_report(&report, test_region_id, test_peer_id - 1); // Larger ID should trigger destroy. let mut larger_id_msg = smaller_id_msg; @@ -280,7 +280,7 @@ fn test_gc_peer_request() { .receiver(0) .recv_timeout(Duration::from_millis(300)) .unwrap(); - assert_valid_report(&report, test_region_id, test_leader_id); + assert_valid_report(&report, test_region_id, test_peer_id); } #[test] From 84f0957b3d2edb73a5e4a2bbe9fe823ca661a6e4 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Mon, 30 Jan 2023 19:12:41 +0800 Subject: [PATCH 4/6] address comment Signed-off-by: Jay Lee --- components/raftstore-v2/src/operation/life.rs | 2 +- components/raftstore-v2/src/operation/ready/mod.rs | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/components/raftstore-v2/src/operation/life.rs b/components/raftstore-v2/src/operation/life.rs index 2b1704e2c0e..fd9a1c649a3 100644 --- a/components/raftstore-v2/src/operation/life.rs +++ b/components/raftstore-v2/src/operation/life.rs @@ -306,7 +306,7 @@ impl Store { } } -/// Tell leader that target peer is destroyed. +/// Tell leader that `to_peer` is destroyed. fn report_peer_destroyed(tombstone_msg: &mut RaftMessage) -> Option { let to_region_id = if tombstone_msg.has_extra_msg() { assert_eq!( diff --git a/components/raftstore-v2/src/operation/ready/mod.rs b/components/raftstore-v2/src/operation/ready/mod.rs index 4e4aef73fcf..b427d071474 100644 --- a/components/raftstore-v2/src/operation/ready/mod.rs +++ b/components/raftstore-v2/src/operation/ready/mod.rs @@ -228,11 +228,7 @@ impl Peer { } } if msg.has_extra_msg() { - match msg.get_extra_msg().get_type() { - ExtraMessageType::MsgGcPeerResponse => self.on_gc_peer_response(&msg), - _ => unimplemented!(), - } - return; + unimplemented!(); } // TODO: drop all msg append when the peer is uninitialized and has conflict From 7c0246c07181318c14b907b6c4e0447c95cedafd Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Tue, 31 Jan 2023 15:16:02 +0800 Subject: [PATCH 5/6] address comment Signed-off-by: Jay Lee --- components/raftstore-v2/src/operation/life.rs | 20 ++++++++++--------- .../raftstore-v2/src/operation/ready/mod.rs | 4 +++- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/components/raftstore-v2/src/operation/life.rs b/components/raftstore-v2/src/operation/life.rs index fd9a1c649a3..3a79c49c2c2 100644 --- a/components/raftstore-v2/src/operation/life.rs +++ b/components/raftstore-v2/src/operation/life.rs @@ -226,7 +226,7 @@ impl Store { } if destroyed { if msg.get_is_tombstone() { - if let Some(msg) = report_peer_destroyed(&mut msg) { + if let Some(msg) = build_peer_destroyed_report(&mut msg) { let _ = ctx.trans.send(msg); } return; @@ -236,7 +236,7 @@ impl Store { if extra_msg.get_type() == ExtraMessageType::MsgGcPeerRequest && extra_msg.has_check_gc_peer() { - tell_source_peer_to_destroy(ctx, &msg); + forward_destroy_source_peer(ctx, &msg); return; } } @@ -306,8 +306,8 @@ impl Store { } } -/// Tell leader that `to_peer` is destroyed. -fn report_peer_destroyed(tombstone_msg: &mut RaftMessage) -> Option { +/// Tell leader that `to_peer` from `tombstone_msg` is destroyed. +fn build_peer_destroyed_report(tombstone_msg: &mut RaftMessage) -> Option { let to_region_id = if tombstone_msg.has_extra_msg() { assert_eq!( tombstone_msg.get_extra_msg().get_type(), @@ -332,7 +332,8 @@ fn report_peer_destroyed(tombstone_msg: &mut RaftMessage) -> Option Some(msg) } -fn tell_source_peer_to_destroy(ctx: &mut StoreContext, msg: &RaftMessage) +/// Forward the destroy request from target peer to merged source peer. +fn forward_destroy_source_peer(ctx: &mut StoreContext, msg: &RaftMessage) where EK: KvEngine, ER: RaftEngine, @@ -400,7 +401,7 @@ impl Peer { pub fn on_tombstone_message(&mut self, msg: &mut RaftMessage) { match msg.get_to_peer().get_id().cmp(&self.peer_id()) { cmp::Ordering::Less => { - if let Some(msg) = report_peer_destroyed(msg) { + if let Some(msg) = build_peer_destroyed_report(msg) { self.add_message(msg); self.set_has_ready(); } @@ -413,8 +414,9 @@ impl Peer { } /// When leader tries to gc merged source peer, it will send a gc request to - /// target peer. If target peer makes sure the merged is finished, it should - /// send back a gc response to leader. + /// target peer. If target peer makes sure the merged is finished, it + /// forward the message to source peer and let source peer send back a + /// response. pub fn on_gc_peer_request( &mut self, ctx: &mut StoreContext, @@ -430,7 +432,7 @@ impl Peer { return; } - tell_source_peer_to_destroy(ctx, msg); + forward_destroy_source_peer(ctx, msg); } /// A peer confirms it's destroyed. diff --git a/components/raftstore-v2/src/operation/ready/mod.rs b/components/raftstore-v2/src/operation/ready/mod.rs index b427d071474..775126f8525 100644 --- a/components/raftstore-v2/src/operation/ready/mod.rs +++ b/components/raftstore-v2/src/operation/ready/mod.rs @@ -557,7 +557,9 @@ impl Peer { if write_task.messages.is_empty() { write_task.messages = self.take_pending_messages(); } else { - write_task.messages.extend(self.take_pending_messages()); + write_task + .messages + .append(&mut self.take_pending_messages()); } } if !self.serving() { From df107c590cc596fda9f2773ef008e173b2ba1168 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Fri, 3 Feb 2023 14:17:03 +0800 Subject: [PATCH 6/6] fix clippy Signed-off-by: Jay Lee --- components/raftstore-v2/src/operation/ready/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/raftstore-v2/src/operation/ready/mod.rs b/components/raftstore-v2/src/operation/ready/mod.rs index 031ffc11b37..3e8d049df74 100644 --- a/components/raftstore-v2/src/operation/ready/mod.rs +++ b/components/raftstore-v2/src/operation/ready/mod.rs @@ -21,7 +21,7 @@ mod apply_trace; mod async_writer; mod snapshot; -use std::{borrow::Cow, cmp, time::Instant}; +use std::{cmp, time::Instant}; use engine_traits::{KvEngine, RaftEngine}; use error_code::ErrorCodeExt;