Skip to content

Commit

Permalink
raftstore-v2: reduce file count (#14318)
Browse files Browse the repository at this point in the history
close #14306, close #14316, close #14324

Compaction guard is disabled in v2, which will use 8MiB for file size. We need
set multiplier to reduce sst file count.

This PR also fixes a race between region creation and destroy.

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
BusyJay and ti-chi-bot committed Mar 2, 2023
1 parent b050f07 commit 69dba51
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 62 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::borrow::Cow;
use batch_system::{BasicMailbox, Fsm};
use crossbeam::channel::TryRecvError;
use engine_traits::{KvEngine, RaftEngine, TabletRegistry};
use kvproto::{errorpb, raft_cmdpb::RaftCmdResponse};
use raftstore::store::{Config, TabletSnapManager, Transport};
use slog::{debug, error, info, trace, Logger};
use tikv_util::{
Expand All @@ -18,7 +19,7 @@ use tikv_util::{
use crate::{
batch::StoreContext,
raft::{Peer, Storage},
router::{PeerMsg, PeerTick},
router::{PeerMsg, PeerTick, QueryResult},
Result,
};

Expand Down Expand Up @@ -335,3 +336,33 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
self.schedule_pending_ticks();
}
}

impl<EK: KvEngine, ER: RaftEngine> Drop for PeerFsm<EK, ER> {
fn drop(&mut self) {
self.peer_mut().pending_reads_mut().clear_all(None);

let region_id = self.peer().region_id();

let build_resp = || {
let mut err = errorpb::Error::default();
err.set_message("region is not found".to_owned());
err.mut_region_not_found().set_region_id(region_id);
let mut resp = RaftCmdResponse::default();
resp.mut_header().set_error(err);
resp
};
while let Ok(msg) = self.receiver.try_recv() {
match msg {
// Only these messages need to be responded explicitly as they rely on
// deterministic response.
PeerMsg::RaftQuery(query) => {
query.ch.set_result(QueryResult::Response(build_resp()));
}
PeerMsg::SimpleWrite(w) => {
w.ch.set_result(build_resp());
}
_ => continue,
}
}
}
}
12 changes: 6 additions & 6 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,13 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
committed_time: Instant::now(),
};
assert!(
self.apply_scheduler().is_some(),
"apply_scheduler should be something. region_id {}",
self.region_id()
self.apply_scheduler().is_some() || ctx.router.is_shutdown(),
"{} apply_scheduler should not be None",
SlogFormat(&self.logger)
);
self.apply_scheduler()
.unwrap()
.send(ApplyTask::CommittedEntries(apply));
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::CommittedEntries(apply));
}
}

#[inline]
Expand Down
5 changes: 4 additions & 1 deletion components/raftstore-v2/src/operation/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,18 +618,21 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn finish_destroy<T>(&mut self, ctx: &mut StoreContext<EK, ER, T>) {
info!(self.logger, "peer destroyed");
let region_id = self.region_id();
ctx.router.close(region_id);
{
let mut meta = ctx.store_meta.lock().unwrap();
meta.remove_region(region_id);
meta.readers.remove(&region_id);
ctx.tablet_registry.remove(region_id);
}
// Remove tablet first, otherwise in extreme cases, a new peer can be created
// and race on tablet record removal and creation.
ctx.router.close(region_id);
if let Some(msg) = self.destroy_progress_mut().finish() {
// The message will be dispatched to store fsm, which will create a
// new peer. Ignore error as it's just a best effort.
let _ = ctx.router.send_raft_message(msg);
}
self.pending_reads_mut().clear_all(Some(region_id));
self.clear_apply_scheduler();
}
}
36 changes: 36 additions & 0 deletions components/test_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod security;

use std::{
env,
fmt::Debug,
sync::atomic::{AtomicU16, Ordering},
thread,
};
Expand Down Expand Up @@ -118,3 +119,38 @@ pub fn temp_dir(prefix: impl Into<Option<&'static str>>, prefer_mem: bool) -> te
_ => builder.tempdir().unwrap(),
}
}

/// Compare two structs and provide more helpful debug difference.
#[track_caller]
pub fn assert_eq_debug<C: PartialEq + Debug>(lhs: &C, rhs: &C) {
if lhs == rhs {
return;
}
let lhs_str = format!("{:?}", lhs);
let rhs_str = format!("{:?}", rhs);

fn find_index(l: impl Iterator<Item = (u8, u8)>) -> usize {
let it = l
.enumerate()
.take_while(|(_, (l, r))| l == r)
.filter(|(_, (l, _))| *l == b' ');
let mut last = None;
let mut second = None;
for a in it {
second = last;
last = Some(a);
}
second.map_or(0, |(i, _)| i)
}
let cpl = find_index(lhs_str.bytes().zip(rhs_str.bytes()));
let csl = find_index(lhs_str.bytes().rev().zip(rhs_str.bytes().rev()));
if cpl + csl > lhs_str.len() || cpl + csl > rhs_str.len() {
assert_eq!(lhs, rhs);
}
let lhs_diff = String::from_utf8_lossy(&lhs_str.as_bytes()[cpl..lhs_str.len() - csl]);
let rhs_diff = String::from_utf8_lossy(&rhs_str.as_bytes()[cpl..rhs_str.len() - csl]);
panic!(
"config not matched:\nlhs: ...{}...,\nrhs: ...{}...",
lhs_diff, rhs_diff
);
}
7 changes: 6 additions & 1 deletion etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,10 @@
## Target file size for compaction.
## The SST file size of level-0 is influenced by the compaction algorithm of `write-buffer-size`
## and level0. `target-file-size-base` is used to control the size of a single SST file of level1 to
## level6.
## level6. Each level will have `target-file-size-base * (target-file-size-multiplier ^ (level - 1))`.
# target-file-size-base = "8MB"
## In partitioned-raft-kv, the default value of target-file-size-multiplier is 2 for write and default cf.
# target-file-size-multiplier = 1

## Max bytes for `compaction.max_compaction_bytes`.
## If it's necessary to enlarge value of this entry, it's better to also enlarge `reserve-space`
Expand Down Expand Up @@ -925,6 +927,7 @@
## Recommend to set it the same as `rocksdb.defaultcf.max-bytes-for-level-base`.
# max-bytes-for-level-base = "512MB"
# target-file-size-base = "8MB"
# target-file-size-multiplier = 1

# level0-file-num-compaction-trigger = 4
# level0-slowdown-writes-trigger = 20
Expand Down Expand Up @@ -953,6 +956,7 @@
# min-write-buffer-number-to-merge = 1
# max-bytes-for-level-base = "128MB"
# target-file-size-base = "8MB"
# target-file-size-multiplier = 1
# level0-file-num-compaction-trigger = 1
# level0-slowdown-writes-trigger = 20
# level0-stop-writes-trigger = 20
Expand Down Expand Up @@ -1014,6 +1018,7 @@
## Recommend to set it the same as `rocksdb.defaultcf.max-bytes-for-level-base`.
# max-bytes-for-level-base = "512MB"
# target-file-size-base = "8MB"
# target-file-size-multiplier = 1

# level0-file-num-compaction-trigger = 4
# level0-slowdown-writes-trigger = 20
Expand Down

0 comments on commit 69dba51

Please sign in to comment.