Skip to content

Commit

Permalink
Merge branch 'master' into fix-51547
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish committed Mar 11, 2024
2 parents f3d57bf + a1e6a6a commit 5a5a631
Show file tree
Hide file tree
Showing 23 changed files with 547 additions and 108 deletions.
35 changes: 30 additions & 5 deletions components/engine_rocks/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,9 @@ mod tests {
use engine_traits::{Iterable, KvEngine, Peekable, SyncMutable, CF_DEFAULT};
use kvproto::metapb::Region;
use proptest::prelude::*;
use rocksdb::{DBOptions, SeekKey, TitanDBOptions, Writable, DB};
use rocksdb::{DBOptions, FlushOptions, SeekKey, TitanDBOptions, Writable, DB};
use tempfile::Builder;
use tikv_util::config::ReadableSize;

use crate::{util, RocksSnapshot};

Expand Down Expand Up @@ -419,6 +420,7 @@ mod tests {
Delete(Vec<u8>),
Scan(Vec<u8>, usize),
DeleteRange(Vec<u8>, Vec<u8>),
Sync(),
}

fn gen_operations(value_size: usize) -> impl Strategy<Value = Vec<Operation>> {
Expand All @@ -444,6 +446,7 @@ mod tests {
prop::collection::vec(prop::num::u8::ANY, 0..key_size)
)
.prop_map(|(k1, k2)| Operation::DeleteRange(k1, k2)),
Just(Operation::Sync()),
],
0..100,
)
Expand All @@ -469,7 +472,11 @@ mod tests {
res
}

fn test_rocks_titan_basic_operations(operations: Vec<Operation>, min_blob_size: u64) {
fn test_rocks_titan_basic_operations(
operations: Vec<Operation>,
min_blob_size: u64,
enable_dict_compress: bool,
) {
let path_rocks = Builder::new()
.prefix("test_rocks_titan_basic_operations_rocks")
.tempdir()
Expand All @@ -480,6 +487,15 @@ mod tests {
.unwrap();
let mut tdb_opts = TitanDBOptions::new();
tdb_opts.set_min_blob_size(min_blob_size);
if enable_dict_compress {
tdb_opts.set_compression_options(
-14, // window_bits
32767, // level
0, // strategy
ReadableSize::kb(16).0 as i32, // zstd dict size
ReadableSize::kb(16).0 as i32 * 100, // zstd sample size
);
}
let mut opts = DBOptions::new();
opts.set_titandb_options(&tdb_opts);
opts.create_if_missing(true);
Expand All @@ -490,7 +506,7 @@ mod tests {
opts.create_if_missing(true);

let db_rocks = DB::open(opts, path_rocks.path().to_str().unwrap()).unwrap();

let mut flush = false;
for op in operations {
match op {
Operation::Put(k, v) => {
Expand Down Expand Up @@ -520,20 +536,29 @@ mod tests {
db_titan.delete_range(&k2, &k1).unwrap();
}
}
Operation::Sync() => {
if !flush {
let mut opts = FlushOptions::default();
opts.set_wait(false);
let _ = db_rocks.flush(&opts);
let _ = db_titan.flush(&opts);
flush = true;
}
}
}
}
}

proptest! {
#[test]
fn test_rocks_titan_basic_ops(operations in gen_operations(1000)) {
test_rocks_titan_basic_operations(operations.clone(), 8);
test_rocks_titan_basic_operations(operations.clone(), 8, true);
}

#[test]
fn test_rocks_titan_basic_ops_large_min_blob_size(operations in gen_operations(1000)) {
// titan actually is not enabled
test_rocks_titan_basic_operations(operations, 1024);
test_rocks_titan_basic_operations(operations, 1024, false);
}
}
}
2 changes: 1 addition & 1 deletion components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ impl Default for Config {
local_read_batch_size: 1024,
apply_batch_system: BatchSystemConfig::default(),
store_batch_system: BatchSystemConfig::default(),
store_io_pool_size: 0,
store_io_pool_size: 1,
store_io_notify_capacity: 40960,
future_poll_size: 1,
hibernate_regions: true,
Expand Down
19 changes: 13 additions & 6 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,12 +699,7 @@ where
PeerMsg::UpdateReplicationMode => self.on_update_replication_mode(),
PeerMsg::Destroy(peer_id) => {
if self.fsm.peer.peer_id() == peer_id {
match self.fsm.peer.maybe_destroy(self.ctx) {
None => self.ctx.raft_metrics.message_dropped.applying_snap.inc(),
Some(job) => {
self.handle_destroy_peer(job);
}
}
self.maybe_destroy();
}
}
}
Expand Down Expand Up @@ -1246,6 +1241,9 @@ where
}
CasualMessage::SnapshotApplied => {
self.fsm.has_ready = true;
if self.fsm.peer.should_destroy_after_apply_snapshot() {
self.maybe_destroy();
}
}
CasualMessage::Campaign => {
let _ = self.fsm.peer.raft_group.campaign();
Expand Down Expand Up @@ -3637,6 +3635,15 @@ where
}
}

fn maybe_destroy(&mut self) {
match self.fsm.peer.maybe_destroy(self.ctx) {
None => self.ctx.raft_metrics.message_dropped.applying_snap.inc(),
Some(job) => {
self.handle_destroy_peer(job);
}
}
}

/// Check if destroy can be executed immediately. If it can't, the reason is
/// returned.
fn maybe_delay_destroy(&mut self) -> Option<DelayReason> {
Expand Down
20 changes: 19 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,12 @@ pub struct ApplySnapshotContext {
/// The message should be sent after snapshot is applied.
pub msgs: Vec<eraftpb::Message>,
pub persist_res: Option<PersistSnapshotResult>,
/// Destroy the peer after apply task finished or aborted
/// This flag is set to true when the peer destroy is skipped because of
/// running snapshot task.
/// This is to accelerate peer destroy without waiting for extra destory
/// peer message.
pub destroy_peer_after_apply: bool,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -1242,13 +1248,14 @@ where
}
}

if let Some(snap_ctx) = self.apply_snap_ctx.as_ref() {
if let Some(snap_ctx) = self.apply_snap_ctx.as_mut() {
if !snap_ctx.scheduled {
info!(
"stale peer is persisting snapshot, will destroy next time";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);
snap_ctx.destroy_peer_after_apply = true;
return None;
}
}
Expand All @@ -1259,6 +1266,9 @@ where
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);
if let Some(snap_ctx) = self.apply_snap_ctx.as_mut() {
snap_ctx.destroy_peer_after_apply = true;
}
return None;
}

Expand Down Expand Up @@ -1632,6 +1642,13 @@ where
self.apply_snap_ctx.is_some() || self.get_store().is_applying_snapshot()
}

#[inline]
pub fn should_destroy_after_apply_snapshot(&self) -> bool {
self.apply_snap_ctx
.as_ref()
.map_or(false, |ctx| ctx.destroy_peer_after_apply)
}

/// Returns `true` if the raft group has replicated a snapshot but not
/// committed it yet.
#[inline]
Expand Down Expand Up @@ -2861,6 +2878,7 @@ where
destroy_regions,
for_witness,
}),
destroy_peer_after_apply: false,
});
if self.last_compacted_idx == 0 && last_first_index >= RAFT_INIT_LOG_INDEX {
// There may be stale logs in raft engine, so schedule a task to clean it
Expand Down
25 changes: 25 additions & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ use tikv::{
tablet_snap::NoSnapshotCache,
ttl::TtlChecker,
KvEngineFactoryBuilder, Node, RaftKv, Server, CPU_CORES_QUOTA_GAUGE, GRPC_THREAD_PREFIX,
MEMORY_LIMIT_GAUGE,
},
storage::{
self,
Expand Down Expand Up @@ -166,6 +167,7 @@ fn run_impl<EK, CER, F>(
let server_config = tikv.init_servers();
tikv.register_services();
tikv.init_metrics_flusher(fetcher, engines_info);
tikv.init_cgroup_monitor();
tikv.init_storage_stats_task(engines);
tikv.run_server(server_config);
tikv.run_status_server();
Expand Down Expand Up @@ -259,6 +261,7 @@ pub fn run_tikv(
const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
const DEFAULT_MEMTRACE_FLUSH_INTERVAL: Duration = Duration::from_millis(1_000);
const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1);
const DEFAULT_CGROUP_MONITOR_INTERVAL: Duration = Duration::from_secs(10);

/// A complete TiKV server.
struct TikvServer<EK, ER, F>
Expand Down Expand Up @@ -1503,6 +1506,28 @@ where
}
}

fn init_cgroup_monitor(&mut self) {
let mut last_cpu_quota: f64 = 0.0;
let mut last_memory_limit: u64 = 0;
self.core.background_worker.spawn_interval_task(
DEFAULT_CGROUP_MONITOR_INTERVAL,
move || {
let cpu_quota = SysQuota::cpu_cores_quota_current();
if cpu_quota != last_cpu_quota {
info!("cpu quota set to {:?}", cpu_quota);
CPU_CORES_QUOTA_GAUGE.set(cpu_quota);
last_cpu_quota = cpu_quota;
}
let memory_limit = SysQuota::memory_limit_in_bytes_current();
if memory_limit != last_memory_limit {
info!("memory limit set to {:?}", memory_limit);
MEMORY_LIMIT_GAUGE.set(memory_limit as f64);
last_memory_limit = memory_limit;
}
},
);
}

fn run_server(&mut self, server_config: Arc<VersionTrack<ServerConfig>>) {
let server = self.servers.as_mut().unwrap();
server
Expand Down
25 changes: 25 additions & 0 deletions components/server/src/server2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ use tikv::{
service::{DebugService, DiagnosticsService},
status_server::StatusServer,
KvEngineFactoryBuilder, NodeV2, RaftKv2, Server, CPU_CORES_QUOTA_GAUGE, GRPC_THREAD_PREFIX,
MEMORY_LIMIT_GAUGE,
},
storage::{
self,
Expand Down Expand Up @@ -151,6 +152,7 @@ fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
let server_config = tikv.init_servers::<F>();
tikv.register_services();
tikv.init_metrics_flusher(fetcher, engines_info);
tikv.init_cgroup_monitor();
tikv.init_storage_stats_task();
tikv.run_server(server_config);
tikv.run_status_server();
Expand Down Expand Up @@ -216,6 +218,7 @@ pub fn run_tikv(
const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
const DEFAULT_MEMTRACE_FLUSH_INTERVAL: Duration = Duration::from_millis(1_000);
const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1);
const DEFAULT_CGROUP_MONITOR_INTERVAL: Duration = Duration::from_secs(10);

/// A complete TiKV server.
struct TikvServer<ER: RaftEngine> {
Expand Down Expand Up @@ -1278,6 +1281,28 @@ where
}
}

fn init_cgroup_monitor(&mut self) {
let mut last_cpu_quota: f64 = 0.0;
let mut last_memory_limit: u64 = 0;
self.core.background_worker.spawn_interval_task(
DEFAULT_CGROUP_MONITOR_INTERVAL,
move || {
let cpu_quota = SysQuota::cpu_cores_quota_current();
if cpu_quota != last_cpu_quota {
info!("cpu quota set to {:?}", cpu_quota);
CPU_CORES_QUOTA_GAUGE.set(cpu_quota);
last_cpu_quota = cpu_quota;
}
let memory_limit = SysQuota::memory_limit_in_bytes_current();
if memory_limit != last_memory_limit {
info!("memory limit set to {:?}", memory_limit);
MEMORY_LIMIT_GAUGE.set(memory_limit as f64);
last_memory_limit = memory_limit;
}
},
);
}

fn run_server(&mut self, server_config: Arc<VersionTrack<ServerConfig>>) {
let server = self.servers.as_mut().unwrap();
server
Expand Down

0 comments on commit 5a5a631

Please sign in to comment.