Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: clean up import sst file only if flushed epoch is stale. #15064

Merged
merged 17 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 63 additions & 14 deletions components/engine_traits/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
//! be used as the start state.

use std::{
collections::{HashMap, LinkedList},
collections::LinkedList,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
};

use kvproto::import_sstpb::SstMeta;
use slog_global::info;
use tikv_util::set_panic_mark;

Expand Down Expand Up @@ -61,36 +62,62 @@ struct FlushProgress {
/// if the flushed index greater than it .
#[derive(Debug, Clone)]
pub struct SstApplyState {
sst_map: Arc<RwLock<HashMap<Vec<u8>, u64>>>,
// vec from cf to Vec<SstApplyEntry>.
ssts: Arc<RwLock<[Vec<SstApplyEntry>; DATA_CFS_LEN]>>,
}

impl Default for SstApplyState {
fn default() -> Self {
Self {
sst_map: Arc::new(RwLock::new(HashMap::new())),
ssts: Arc::new(RwLock::new(Default::default())),
}
}
}

#[derive(Debug)]
pub struct SstApplyEntry {
pub applied_index: u64,
pub sst: SstMeta,
}

impl SstApplyEntry {
pub fn new(applied_index: u64, sst: SstMeta) -> Self {
Self { applied_index, sst }
}
}

impl SstApplyState {
#[inline]
pub fn registe_ssts(&self, uuids: Vec<Vec<u8>>, sst_applied_index: u64) {
let mut map = self.sst_map.write().unwrap();
for uuid in uuids {
map.insert(uuid, sst_applied_index);
pub fn register_ssts(&self, applied_index: u64, ssts: Vec<SstMeta>) {
let mut sst_list = self.ssts.write().unwrap();
for sst in ssts {
let cf_index = data_cf_offset(sst.get_cf_name());
let entry = SstApplyEntry::new(applied_index, sst);
sst_list.get_mut(cf_index).unwrap().push(entry);
}
}

/// Query the sst applied index.
#[inline]
pub fn sst_applied_index(&self, uuid: &Vec<u8>) -> Option<u64> {
self.sst_map.read().unwrap().get(uuid).copied()
pub fn stale_ssts(&self, cf: &str, flushed_index: u64) -> Vec<SstMeta> {
let sst_list = self.ssts.read().unwrap();
let cf_index = data_cf_offset(cf);
if let Some(ssts) = sst_list.get(cf_index) {
return ssts
.iter()
.filter(|entry| entry.applied_index <= flushed_index)
.map(|entry| entry.sst.clone())
.collect();
}
vec![]
}

pub fn delete_ssts(&self, uuids: Vec<Vec<u8>>) {
let mut map = self.sst_map.write().unwrap();
for uuid in uuids {
map.remove(&uuid);
pub fn delete_ssts(&self, ssts: &Vec<SstMeta>) {
let mut sst_list = self.ssts.write().unwrap();
for sst in ssts {
let cf_index = data_cf_offset(sst.get_cf_name());
if let Some(metas) = sst_list.get_mut(cf_index) {
metas.drain_filter(|entry| entry.sst.get_uuid() == sst.get_uuid());
}
}
}
}
Expand Down Expand Up @@ -270,3 +297,25 @@ impl<R: RaftEngine> StateStorage for R {
self.consume(&mut batch, true).unwrap();
}
}

#[cfg(test)]
mod test {
use std::vec;

use kvproto::import_sstpb::SstMeta;

use super::SstApplyState;

#[test]
pub fn test_sst_apply_state() {
let stat = SstApplyState::default();
let mut sst = SstMeta::default();
sst.set_cf_name("write".to_owned());
sst.set_uuid(vec![1, 2, 3, 4]);
stat.register_ssts(10, vec![sst.clone()]);
assert!(stat.stale_ssts("default", 10).is_empty());
let sst = stat.stale_ssts("write", 10);
assert_eq!(sst[0].get_uuid(), vec![1, 2, 3, 4]);
stat.delete_ssts(&sst);
}
}
1 change: 1 addition & 0 deletions components/engine_traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@
#![feature(linked_list_cursors)]
#![feature(let_chains)]
#![feature(str_split_as_str)]
#![feature(drain_filter)]

#[macro_use(fail_point)]
extern crate fail;
Expand Down
9 changes: 6 additions & 3 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,12 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
tablet_index,
flushed_index,
} => {
self.fsm
.peer_mut()
.on_data_flushed(cf, tablet_index, flushed_index);
self.fsm.peer_mut().on_data_flushed(
self.store_ctx,
cf,
tablet_index,
flushed_index,
);
}
PeerMsg::PeerUnreachable { to_peer_id } => {
self.fsm.peer_mut().on_peer_unreachable(to_peer_id)
Expand Down
46 changes: 16 additions & 30 deletions components/raftstore-v2/src/operation/command/write/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use raftstore::{
store::{check_sst_for_ingestion, metrics::PEER_WRITE_CMD_COUNTER, util},
Result,
};
use slog::error;
use slog::{error, info};
use sst_importer::range_overlaps;
use tikv_util::{box_try, slog_panic};

Expand Down Expand Up @@ -80,38 +80,27 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
ctx: &mut StoreContext<EK, ER, T>,
ssts: Box<[SstMeta]>,
) {
let mut stale_ssts = Vec::from(ssts);
let epoch = self.region().get_region_epoch();
stale_ssts.retain(|sst| {
fail::fail_point!("on_cleanup_import_sst", |_| true);
util::is_epoch_stale(sst.get_region_epoch(), epoch)
});

// some sst needs to be kept if the log didn't flush the disk.
let flushed_indexes = self.storage().apply_trace().flushed_indexes();
stale_ssts.retain(|sst| {
let off = data_cf_offset(sst.get_cf_name());
let uuid = sst.get_uuid().to_vec();
let sst_index = self.sst_apply_state().sst_applied_index(&uuid);
if let Some(index) = sst_index {
return flushed_indexes.as_ref()[off] >= index;
}
true
});
let mut stale_ssts: Vec<SstMeta> = Vec::from(ssts);
let flushed_epoch = self.storage().flushed_epoch();
stale_ssts.retain(|sst| util::is_epoch_stale(sst.get_region_epoch(), flushed_epoch));

fail::fail_point!("on_cleanup_import_sst_schedule");
if stale_ssts.is_empty() {
return;
}
let uuids = stale_ssts
.iter()
.map(|sst| sst.get_uuid().to_vec())
.collect();
self.sst_apply_state().delete_ssts(uuids);
info!(
self.logger,
"clean up import sst file by interval task";
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
"flushed_epoch" => ?flushed_epoch,
"stale_ssts" => ?stale_ssts);

self.sst_apply_state().delete_ssts(&stale_ssts);
let _ = ctx
.schedulers
.tablet
.schedule(tablet::Task::CleanupImportSst(stale_ssts.into()));
.schedule(tablet::Task::CleanupImportSst(
stale_ssts.into_boxed_slice(),
));
}
}

Expand Down Expand Up @@ -156,12 +145,9 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
if let Err(e) = self.sst_importer().ingest(&infos, self.tablet()) {
slog_panic!(self.logger, "ingest fail"; "ssts" => ?ssts, "error" => ?e);
}
let metas: Vec<SstMeta> = infos.iter().map(|info| info.meta.clone()).collect();
self.sst_apply_state().register_ssts(index, metas);
}
let uuids = infos
.iter()
.map(|info| info.meta.get_uuid().to_vec())
.collect::<Vec<_>>();
self.set_sst_applied_index(uuids, index);

self.metrics.size_diff_hint += size;
self.metrics.written_bytes += size as u64;
Expand Down
43 changes: 40 additions & 3 deletions components/raftstore-v2/src/operation/ready/apply_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use kvproto::{
raft_serverpb::{PeerState, RaftApplyState, RaftLocalState, RegionLocalState},
};
use raftstore::store::{
ReadTask, TabletSnapManager, WriteTask, RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM,
util, ReadTask, TabletSnapManager, WriteTask, RAFT_INIT_LOG_INDEX, RAFT_INIT_LOG_TERM,
};
use slog::{info, trace, Logger};
use tikv_util::{box_err, slog_panic, worker::Scheduler};
Expand All @@ -57,6 +57,7 @@ use crate::{
},
raft::{Peer, Storage},
router::PeerMsg,
worker::tablet,
Result, StoreRouter,
};

Expand Down Expand Up @@ -509,6 +510,8 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
for cf in ALL_CFS {
lb.put_flushed_index(region_id, cf, 0, 0).unwrap();
}
write_task.flushed_epoch =
Some(self.region_state().get_region().get_region_epoch().clone());
}

pub fn record_apply_trace(&mut self, write_task: &mut WriteTask<EK, ER>) {
Expand All @@ -519,6 +522,17 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
}
let region_id = self.region().get_id();
let raft_engine = self.entry_storage().raft_engine();
let epoch = raft_engine
.get_region_state(region_id, trace.admin.flushed)
.unwrap()
.unwrap()
.get_region()
.get_region_epoch()
.clone();
if util::is_epoch_stale(self.flushed_epoch(), &epoch) {
write_task.flushed_epoch = Some(epoch);
}
bufferflies marked this conversation as resolved.
Show resolved Hide resolved

let tablet_index = self.tablet_index();
let lb = write_task
.extra_write
Expand All @@ -533,7 +547,13 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn on_data_flushed(&mut self, cf: &str, tablet_index: u64, index: u64) {
pub fn on_data_flushed<T>(
&mut self,
ctx: &mut StoreContext<EK, ER, T>,
cf: &str,
tablet_index: u64,
index: u64,
) {
trace!(self.logger, "data flushed"; "cf" => cf, "tablet_index" => tablet_index, "index" => index, "trace" => ?self.storage().apply_trace());
if tablet_index < self.storage().tablet_index() {
// Stale tablet.
Expand All @@ -543,6 +563,24 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
let apply_trace = self.storage_mut().apply_trace_mut();
apply_trace.on_flush(cf, index);
apply_trace.maybe_advance_admin_flushed(apply_index);
let stale_ssts = self.sst_apply_state().stale_ssts(cf, index);
if stale_ssts.is_empty() {
return;
}
info!(
self.logger,
"schedule delete stale ssts after flush";
"stale_ssts" => ?stale_ssts,
"apply_index" => apply_index,
"cf" => cf,
"flushed_index" => index,
);
let _ = ctx
.schedulers
.tablet
.schedule(tablet::Task::CleanupImportSst(
stale_ssts.into_boxed_slice(),
));
}

pub fn on_data_modified(&mut self, modification: DataTrace) {
Expand Down Expand Up @@ -608,7 +646,6 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
let mut lb = ctx.engine.log_batch(1);
lb.put_flushed_index(region_id, CF_RAFT, tablet_index, admin_flush)
.unwrap();
ctx.engine.consume(&mut lb, true).unwrap();
info!(
self.logger,
"flush before close flush admin for region";
Expand Down
16 changes: 12 additions & 4 deletions components/raftstore-v2/src/operation/ready/async_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::collections::VecDeque;

use engine_traits::{KvEngine, RaftEngine};
use kvproto::raft_serverpb::RaftMessage;
use kvproto::{metapb::RegionEpoch, raft_serverpb::RaftMessage};
use raftstore::store::{
local_metrics::RaftMetrics, Config, PersistedNotifier, WriteRouter, WriteRouterContext,
WriteSenders, WriteTask,
Expand All @@ -24,6 +24,7 @@ struct UnpersistedReady {
max_empty_number: u64,
raft_msgs: Vec<Vec<RaftMessage>>,
has_snapshot: bool,
flushed_epoch: Option<RegionEpoch>,
}

/// A writer that handles asynchronous writes.
Expand Down Expand Up @@ -73,6 +74,7 @@ impl<EK: KvEngine, ER: RaftEngine> AsyncWriter<EK, ER> {
fn send(&mut self, ctx: &mut impl WriteRouterContext<EK, ER>, task: WriteTask<EK, ER>) {
let ready_number = task.ready_number();
let has_snapshot = task.has_snapshot;
let flushed_epoch = task.flushed_epoch.clone();
self.write_router.send_write_msg(
ctx,
self.unpersisted_readies.back().map(|r| r.number),
Expand All @@ -83,6 +85,7 @@ impl<EK: KvEngine, ER: RaftEngine> AsyncWriter<EK, ER> {
max_empty_number: ready_number,
raft_msgs: vec![],
has_snapshot,
flushed_epoch,
});
}

Expand Down Expand Up @@ -111,9 +114,9 @@ impl<EK: KvEngine, ER: RaftEngine> AsyncWriter<EK, ER> {
ctx: &mut impl WriteRouterContext<EK, ER>,
ready_number: u64,
logger: &Logger,
) -> (Vec<Vec<RaftMessage>>, bool) {
) -> (Vec<Vec<RaftMessage>>, Option<RegionEpoch>, bool) {
if self.persisted_number >= ready_number {
return (vec![], false);
return (vec![], None, false);
}

let last_unpersisted = self.unpersisted_readies.back();
Expand All @@ -128,12 +131,14 @@ impl<EK: KvEngine, ER: RaftEngine> AsyncWriter<EK, ER> {

let mut raft_messages = vec![];
let mut has_snapshot = false;
let mut flushed_epoch = None;
// There must be a match in `self.unpersisted_readies`.
loop {
let Some(v) = self.unpersisted_readies.pop_front() else {
slog_panic!(logger, "ready number not found"; "ready_number" => ready_number);
};
has_snapshot |= v.has_snapshot;

if v.number > ready_number {
slog_panic!(
logger,
Expand All @@ -142,6 +147,9 @@ impl<EK: KvEngine, ER: RaftEngine> AsyncWriter<EK, ER> {
"ready_number" => ready_number
);
}
if let Some(epoch) = v.flushed_epoch {
flushed_epoch = Some(epoch.clone());
}
if raft_messages.is_empty() {
raft_messages = v.raft_msgs;
} else {
Expand All @@ -156,7 +164,7 @@ impl<EK: KvEngine, ER: RaftEngine> AsyncWriter<EK, ER> {
self.write_router
.check_new_persisted(ctx, self.persisted_number);

(raft_messages, has_snapshot)
(raft_messages, flushed_epoch, has_snapshot)
}

pub fn persisted_number(&self) -> u64 {
Expand Down