Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: reduce file count #14318

Merged
merged 7 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::borrow::Cow;
use batch_system::{BasicMailbox, Fsm};
use crossbeam::channel::TryRecvError;
use engine_traits::{KvEngine, RaftEngine, TabletRegistry};
use kvproto::{errorpb, raft_cmdpb::RaftCmdResponse};
use raftstore::store::{Config, TabletSnapManager, Transport};
use slog::{debug, error, info, trace, Logger};
use tikv_util::{
Expand All @@ -18,7 +19,7 @@ use tikv_util::{
use crate::{
batch::StoreContext,
raft::{Peer, Storage},
router::{PeerMsg, PeerTick},
router::{PeerMsg, PeerTick, QueryResult},
Result,
};

Expand Down Expand Up @@ -335,3 +336,33 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
self.schedule_pending_ticks();
}
}

impl<EK: KvEngine, ER: RaftEngine> Drop for PeerFsm<EK, ER> {
fn drop(&mut self) {
self.peer_mut().pending_reads_mut().clear_all(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.peer_mut().pending_reads_mut().clear_all(None);

Should it solve the "resource leak" panic found by QA?


let region_id = self.peer().region_id();

let build_resp = || {
let mut err = errorpb::Error::default();
err.set_message("region is not found".to_owned());
err.mut_region_not_found().set_region_id(region_id);
let mut resp = RaftCmdResponse::default();
resp.mut_header().set_error(err);
resp
};
while let Ok(msg) = self.receiver.try_recv() {
match msg {
// Only these messages need to be responded explicitly as they rely on
// deterministic response.
PeerMsg::RaftQuery(query) => {
query.ch.set_result(QueryResult::Response(build_resp()));
}
PeerMsg::SimpleWrite(w) => {
w.ch.set_result(build_resp());
}
_ => continue,
}
}
}
}
5 changes: 4 additions & 1 deletion components/raftstore-v2/src/operation/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,18 +618,21 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn finish_destroy<T>(&mut self, ctx: &mut StoreContext<EK, ER, T>) {
info!(self.logger, "peer destroyed");
let region_id = self.region_id();
ctx.router.close(region_id);
{
let mut meta = ctx.store_meta.lock().unwrap();
meta.remove_region(region_id);
meta.readers.remove(&region_id);
ctx.tablet_registry.remove(region_id);
}
// Remove tablet first, otherwise in extreme cases, a new peer can be created
// and race on tablet record removal and creation.
ctx.router.close(region_id);
if let Some(msg) = self.destroy_progress_mut().finish() {
// The message will be dispatched to store fsm, which will create a
// new peer. Ignore error as it's just a best effort.
let _ = ctx.router.send_raft_message(msg);
}
self.pending_reads_mut().clear_all(Some(region_id));
self.clear_apply_scheduler();
}
}
36 changes: 36 additions & 0 deletions components/test_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod security;

use std::{
env,
fmt::Debug,
sync::atomic::{AtomicU16, Ordering},
thread,
};
Expand Down Expand Up @@ -118,3 +119,38 @@ pub fn temp_dir(prefix: impl Into<Option<&'static str>>, prefer_mem: bool) -> te
_ => builder.tempdir().unwrap(),
}
}

/// Compare two structs and provide more helpful debug difference.
#[track_caller]
pub fn assert_eq_debug<C: PartialEq + Debug>(lhs: &C, rhs: &C) {
if lhs == rhs {
return;
}
let lhs_str = format!("{:?}", lhs);
let rhs_str = format!("{:?}", rhs);

fn find_index(l: impl Iterator<Item = (u8, u8)>) -> usize {
let it = l
.enumerate()
.take_while(|(_, (l, r))| l == r)
.filter(|(_, (l, _))| *l == b' ');
let mut last = None;
let mut second = None;
for a in it {
second = last;
last = Some(a);
}
second.map_or(0, |(i, _)| i)
}
let cpl = find_index(lhs_str.bytes().zip(rhs_str.bytes()));
let csl = find_index(lhs_str.bytes().rev().zip(rhs_str.bytes().rev()));
if cpl + csl > lhs_str.len() || cpl + csl > rhs_str.len() {
assert_eq!(lhs, rhs);
}
let lhs_diff = String::from_utf8_lossy(&lhs_str.as_bytes()[cpl..lhs_str.len() - csl]);
let rhs_diff = String::from_utf8_lossy(&rhs_str.as_bytes()[cpl..rhs_str.len() - csl]);
panic!(
"config not matched:\nlhs: ...{}...,\nrhs: ...{}...",
lhs_diff, rhs_diff
);
}
7 changes: 6 additions & 1 deletion etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,10 @@
## Target file size for compaction.
## The SST file size of level-0 is influenced by the compaction algorithm of `write-buffer-size`
## and level0. `target-file-size-base` is used to control the size of a single SST file of level1 to
## level6.
## level6. Each level will have `target-file-size-base * (target-file-size-multiplier ^ (level - 1))`.
# target-file-size-base = "8MB"
## In partitioned-raft-kv, the default value of target-file-size-multiplier is 2 for write and default cf.
# target-file-size-multiplier = 1

## Max bytes for `compaction.max_compaction_bytes`.
## If it's necessary to enlarge value of this entry, it's better to also enlarge `reserve-space`
Expand Down Expand Up @@ -925,6 +927,7 @@
## Recommend to set it the same as `rocksdb.defaultcf.max-bytes-for-level-base`.
# max-bytes-for-level-base = "512MB"
# target-file-size-base = "8MB"
# target-file-size-multiplier = 1

# level0-file-num-compaction-trigger = 4
# level0-slowdown-writes-trigger = 20
Expand Down Expand Up @@ -953,6 +956,7 @@
# min-write-buffer-number-to-merge = 1
# max-bytes-for-level-base = "128MB"
# target-file-size-base = "8MB"
# target-file-size-multiplier = 1
# level0-file-num-compaction-trigger = 1
# level0-slowdown-writes-trigger = 20
# level0-stop-writes-trigger = 20
Expand Down Expand Up @@ -1014,6 +1018,7 @@
## Recommend to set it the same as `rocksdb.defaultcf.max-bytes-for-level-base`.
# max-bytes-for-level-base = "512MB"
# target-file-size-base = "8MB"
# target-file-size-multiplier = 1

# level0-file-num-compaction-trigger = 4
# level0-slowdown-writes-trigger = 20
Expand Down
50 changes: 37 additions & 13 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ macro_rules! cf_config {
pub min_write_buffer_number_to_merge: i32,
pub max_bytes_for_level_base: ReadableSize,
pub target_file_size_base: ReadableSize,
pub target_file_size_multiplier: i32,
pub level0_file_num_compaction_trigger: i32,
pub level0_slowdown_writes_trigger: Option<i32>,
pub level0_stop_writes_trigger: Option<i32>,
Expand Down Expand Up @@ -572,6 +573,9 @@ macro_rules! build_cf_opt {
cf_opts.set_min_write_buffer_number_to_merge($opt.min_write_buffer_number_to_merge);
cf_opts.set_max_bytes_for_level_base($opt.max_bytes_for_level_base.0);
cf_opts.set_target_file_size_base($opt.target_file_size_base.0);
if $opt.target_file_size_multiplier != 0 {
cf_opts.set_target_file_size_multiplier($opt.target_file_size_multiplier);
}
cf_opts.set_level_zero_file_num_compaction_trigger($opt.level0_file_num_compaction_trigger);
cf_opts.set_level_zero_slowdown_writes_trigger(
$opt.level0_slowdown_writes_trigger.unwrap_or_default(),
Expand Down Expand Up @@ -659,6 +663,7 @@ impl Default for DefaultCfConfig {
min_write_buffer_number_to_merge: 1,
max_bytes_for_level_base: ReadableSize::mb(512),
target_file_size_base: ReadableSize::mb(8),
target_file_size_multiplier: 0,
level0_file_num_compaction_trigger: 4,
level0_slowdown_writes_trigger: None,
level0_stop_writes_trigger: None,
Expand Down Expand Up @@ -784,6 +789,7 @@ impl Default for WriteCfConfig {
min_write_buffer_number_to_merge: 1,
max_bytes_for_level_base: ReadableSize::mb(512),
target_file_size_base: ReadableSize::mb(8),
target_file_size_multiplier: 0,
level0_file_num_compaction_trigger: 4,
level0_slowdown_writes_trigger: None,
level0_stop_writes_trigger: None,
Expand Down Expand Up @@ -890,6 +896,7 @@ impl Default for LockCfConfig {
min_write_buffer_number_to_merge: 1,
max_bytes_for_level_base: ReadableSize::mb(128),
target_file_size_base: ReadableSize::mb(8),
target_file_size_multiplier: 0,
level0_file_num_compaction_trigger: 1,
level0_slowdown_writes_trigger: None,
level0_stop_writes_trigger: None,
Expand Down Expand Up @@ -973,6 +980,7 @@ impl Default for RaftCfConfig {
min_write_buffer_number_to_merge: 1,
max_bytes_for_level_base: ReadableSize::mb(128),
target_file_size_base: ReadableSize::mb(8),
target_file_size_multiplier: 0,
level0_file_num_compaction_trigger: 1,
level0_slowdown_writes_trigger: None,
level0_stop_writes_trigger: None,
Expand Down Expand Up @@ -1233,6 +1241,16 @@ impl DbConfig {
self.write_buffer_limit.get_or_insert(ReadableSize(
(total_mem * WRITE_BUFFER_MEMORY_LIMIT_RATE) as u64,
));
if self.writecf.enable_compaction_guard != Some(true)
&& self.writecf.target_file_size_multiplier == 0
{
self.writecf.target_file_size_multiplier = 2;
}
if self.defaultcf.enable_compaction_guard != Some(true)
&& self.defaultcf.target_file_size_multiplier == 0
{
self.defaultcf.target_file_size_multiplier = 2;
}
self.defaultcf.disable_write_stall = true;
self.writecf.disable_write_stall = true;
self.lockcf.disable_write_stall = true;
Expand Down Expand Up @@ -1475,6 +1493,7 @@ impl Default for RaftDefaultCfConfig {
min_write_buffer_number_to_merge: 1,
max_bytes_for_level_base: ReadableSize::mb(512),
target_file_size_base: ReadableSize::mb(8),
target_file_size_multiplier: 0,
level0_file_num_compaction_trigger: 4,
level0_slowdown_writes_trigger: None,
level0_stop_writes_trigger: None,
Expand Down Expand Up @@ -4182,6 +4201,7 @@ mod tests {
};
use slog::Level;
use tempfile::Builder;
use test_util::assert_eq_debug;
use tikv_kv::RocksEngine as RocksDBEngine;
use tikv_util::{
config::VersionTrack,
Expand Down Expand Up @@ -5001,25 +5021,25 @@ mod tests {
Module::Quota,
Box::new(QuotaLimitConfigManager::new(Arc::clone(&quota_limiter))),
);
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);

// u64::MAX ns casts to 213503d.
cfg_controller
.update_config("quota.max-delay-duration", "213504d")
.unwrap_err();
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);

cfg_controller
.update_config("quota.foreground-cpu-time", "2000")
.unwrap();
cfg.quota.foreground_cpu_time = 2000;
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);

cfg_controller
.update_config("quota.foreground-write-bandwidth", "256MB")
.unwrap();
cfg.quota.foreground_write_bandwidth = ReadableSize::mb(256);
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);

let mut sample = quota_limiter.new_sample(true);
sample.add_read_bytes(ReadableSize::mb(32).0 as usize);
Expand All @@ -5040,13 +5060,13 @@ mod tests {
.update_config("quota.background-cpu-time", "2000")
.unwrap();
cfg.quota.background_cpu_time = 2000;
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);

cfg_controller
.update_config("quota.background-write-bandwidth", "256MB")
.unwrap();
cfg.quota.background_write_bandwidth = ReadableSize::mb(256);
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);

let mut sample = quota_limiter.new_sample(false);
sample.add_read_bytes(ReadableSize::mb(32).0 as usize);
Expand All @@ -5057,7 +5077,7 @@ mod tests {
.update_config("quota.background-read-bandwidth", "512MB")
.unwrap();
cfg.quota.background_read_bandwidth = ReadableSize::mb(512);
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);
let mut sample = quota_limiter.new_sample(false);
sample.add_write_bytes(ReadableSize::mb(128).0 as usize);
let should_delay = block_on(quota_limiter.consume_sample(sample, false));
Expand All @@ -5067,7 +5087,7 @@ mod tests {
.update_config("quota.max-delay-duration", "50ms")
.unwrap();
cfg.quota.max_delay_duration = ReadableDuration::millis(50);
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);
let mut sample = quota_limiter.new_sample(true);
sample.add_write_bytes(ReadableSize::mb(128).0 as usize);
let should_delay = block_on(quota_limiter.consume_sample(sample, true));
Expand All @@ -5083,7 +5103,7 @@ mod tests {
.update_config("quota.enable-auto-tune", "true")
.unwrap();
cfg.quota.enable_auto_tune = true;
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);
}

#[test]
Expand All @@ -5103,7 +5123,7 @@ mod tests {
);

let check_cfg = |cfg: &TikvConfig| {
assert_eq!(&cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), cfg);
assert_eq!(&*version_tracker.value(), &cfg.server);
};

Expand All @@ -5117,7 +5137,7 @@ mod tests {
.update_config("server.raft-msg-max-batch-size", "32")
.unwrap();
cfg.server.raft_msg_max_batch_size = 32;
assert_eq!(cfg_controller.get_current(), cfg);
assert_eq_debug(&cfg_controller.get_current(), &cfg);
check_cfg(&cfg);
}

Expand All @@ -5133,7 +5153,7 @@ mod tests {
for _ in 0..10 {
cfg.compatible_adjust();
cfg.validate().unwrap();
assert_eq!(c, cfg);
assert_eq_debug(&c, &cfg);
}
}

Expand Down Expand Up @@ -5552,6 +5572,10 @@ mod tests {
Some(default_cfg.coprocessor.region_split_size() * 3 / 4 / ReadableSize::kb(1));
default_cfg.raft_store.region_split_check_diff =
Some(default_cfg.coprocessor.region_split_size() / 16);
default_cfg.rocksdb.writecf.target_file_size_multiplier = 1;
default_cfg.rocksdb.defaultcf.target_file_size_multiplier = 1;
default_cfg.rocksdb.lockcf.target_file_size_multiplier = 1;
default_cfg.raftdb.defaultcf.target_file_size_multiplier = 1;

// Other special cases.
cfg.pd.retry_max_count = default_cfg.pd.retry_max_count; // Both -1 and isize::MAX are the same.
Expand Down Expand Up @@ -5588,7 +5612,7 @@ mod tests {
cfg.coprocessor
.optimize_for(default_cfg.storage.engine == EngineType::RaftKv2);

assert_eq!(cfg, default_cfg);
assert_eq_debug(&cfg, &default_cfg);
}

#[test]
Expand Down