Skip to content

Commit

Permalink
Merge branch 'master' into purelind/tools-use-fixed-version
Browse files Browse the repository at this point in the history
  • Loading branch information
purelind committed Apr 17, 2024
2 parents 5010651 + 2332f9f commit d62c581
Show file tree
Hide file tree
Showing 28 changed files with 2,700 additions and 1,546 deletions.
17 changes: 15 additions & 2 deletions components/engine_traits/src/range_cache_engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

use std::{cmp, fmt::Debug, result};
use std::{
cmp,
fmt::{self, Debug},
result,
};

use keys::{enc_end_key, enc_start_key};
use kvproto::metapb;
Expand Down Expand Up @@ -49,12 +53,21 @@ pub trait RangeCacheEngine:
/// as it continues to evolve to handle eviction, using stats.
pub trait RangeHintService: Send + Sync {}

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct CacheRange {
pub start: Vec<u8>,
pub end: Vec<u8>,
}

impl Debug for CacheRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CacheRange")
.field("range_start", &log_wrappers::Value(&self.start))
.field("value", &log_wrappers::Value(&self.end))
.finish()
}
}

impl CacheRange {
pub fn new(start: Vec<u8>, end: Vec<u8>) -> Self {
Self { start, end }
Expand Down
6 changes: 4 additions & 2 deletions components/hybrid_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ where
fn snapshot(&self, ctx: Option<SnapshotContext>) -> Self::Snapshot {
let disk_snap = self.disk_engine.snapshot(ctx.clone());
let region_cache_snap = if let Some(ctx) = ctx {
SNAPSHOT_TYPE_COUNT_STATIC.range_cache_engine.inc();
match self.region_cache_engine.snapshot(
ctx.range.unwrap(),
ctx.read_ts,
disk_snap.sequence_number(),
) {
Ok(snap) => Some(snap),
Ok(snap) => {
SNAPSHOT_TYPE_COUNT_STATIC.range_cache_engine.inc();
Some(snap)
}
Err(FailedReason::TooOldRead) => {
RANGE_CACHEN_SNAPSHOT_ACQUIRE_FAILED_REASON_COUNT_STAIC
.too_old_read
Expand Down
3 changes: 3 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3791,6 +3791,9 @@ where
// Mark itself as pending_remove
self.fsm.peer.pending_remove = true;

// try to decrease the RAFT_ENABLE_UNPERSISTED_APPLY_GAUGE count.
self.fsm.peer.disable_apply_unpersisted_log(0);

fail_point!("destroy_peer_after_pending_move", |_| { true });

if let Some(reason) = self.maybe_delay_destroy() {
Expand Down
4 changes: 2 additions & 2 deletions components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,8 @@ lazy_static! {
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();

pub static ref RAFT_DISABLE_UNPERSISTED_APPLY_GAUGE: IntGauge = register_int_gauge!(
"tikv_raft_disable_unpersisted_apply",
pub static ref RAFT_ENABLE_UNPERSISTED_APPLY_GAUGE: IntGauge = register_int_gauge!(
"tikv_raft_enable_unpersisted_apply_regions",
"The number of regions that disable apply unpersisted raft log."
).unwrap();

Expand Down
12 changes: 7 additions & 5 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,9 @@ where
pre_vote: cfg.prevote,
max_committed_size_per_ready: MAX_COMMITTED_SIZE_PER_READY,
priority: if peer.is_witness { -1 } else { 0 },
max_apply_unpersisted_log_limit: cfg.max_apply_unpersisted_log_limit,
// always disable applying unpersisted log at initialization,
// will enable it after applying to the current last_index.
max_apply_unpersisted_log_limit: 0,
..Default::default()
};

Expand Down Expand Up @@ -1190,7 +1192,7 @@ where
.max_apply_unpersisted_log_limit
== 0
{
RAFT_DISABLE_UNPERSISTED_APPLY_GAUGE.dec();
RAFT_ENABLE_UNPERSISTED_APPLY_GAUGE.inc();
}
self.raft_group
.raft
Expand All @@ -1200,7 +1202,7 @@ where
}

#[inline]
fn disable_apply_unpersisted_log(&mut self, min_enable_index: u64) {
pub fn disable_apply_unpersisted_log(&mut self, min_enable_index: u64) {
self.min_safe_index_for_unpersisted_apply =
std::cmp::max(self.min_safe_index_for_unpersisted_apply, min_enable_index);
if self
Expand All @@ -1211,7 +1213,7 @@ where
> 0
{
self.raft_group.raft.set_max_apply_unpersisted_log_limit(0);
RAFT_DISABLE_UNPERSISTED_APPLY_GAUGE.inc();
RAFT_ENABLE_UNPERSISTED_APPLY_GAUGE.dec();
}
}

Expand Down Expand Up @@ -5755,7 +5757,7 @@ where
!= self.max_apply_unpersisted_log_limit
{
if self.max_apply_unpersisted_log_limit == 0 {
self.raft_group.raft.set_max_apply_unpersisted_log_limit(0);
self.disable_apply_unpersisted_log(0);
} else if self.is_leader() {
// Currently only enable unpersisted apply on leader.
self.maybe_update_apply_unpersisted_log_state(
Expand Down
49 changes: 43 additions & 6 deletions components/region_cache_memory_engine/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use parking_lot::RwLock;
use pd_client::RpcClient;
use slog_global::{error, info, warn};
use tikv_util::{
config::ReadableSize,
keybuilder::KeyBuilder,
worker::{Builder, Runnable, ScheduleError, Scheduler, Worker},
time::Instant,
worker::{Builder, Runnable, RunnableWithTimer, ScheduleError, Scheduler, Worker},
};
use txn_types::{Key, TimeStamp, WriteRef, WriteType};
use yatp::Remote;
Expand All @@ -25,7 +27,10 @@ use crate::{
engine::{RangeCacheMemoryEngineCore, SkiplistHandle},
keys::{decode_key, encode_key, encoding_for_filter, InternalBytes, InternalKey, ValueType},
memory_controller::MemoryController,
metrics::GC_FILTERED_STATIC,
metrics::{
GC_FILTERED_STATIC, RANGE_CACHE_MEMORY_USAGE, RANGE_GC_TIME_HISTOGRAM,
RANGE_LOAD_TIME_HISTOGRAM,
},
range_manager::LoadFailedReason,
region_label::{
LabelRule, RegionLabelAddedCb, RegionLabelRulesManager, RegionLabelServiceBuilder,
Expand Down Expand Up @@ -187,7 +192,7 @@ impl BgWorkManager {
) -> Self {
let worker = Worker::new("range-cache-background-worker");
let runner = BackgroundRunner::new(core.clone(), memory_controller);
let scheduler = worker.start("range-cache-engine-background", runner);
let scheduler = worker.start_with_timer("range-cache-engine-background", runner);

let scheduler_clone = scheduler.clone();

Expand Down Expand Up @@ -314,6 +319,7 @@ impl BackgroundRunnerCore {
(core.engine(), safe_point)
};

let start = Instant::now();
let write_cf_handle = skiplist_engine.cf_handle(CF_WRITE);
let default_cf_handle = skiplist_engine.cf_handle(CF_DEFAULT);
let mut filter = Filter::new(safe_ts, default_cf_handle, write_cf_handle.clone());
Expand All @@ -333,9 +339,12 @@ impl BackgroundRunnerCore {
iter.next(guard);
}

let duration = start.saturating_elapsed();
RANGE_GC_TIME_HISTOGRAM.observe(duration.as_secs_f64());
info!(
"range gc complete";
"range" => ?range,
"gc_duration" => ?duration,
"total_version" => filter.metrics.total,
"filtered_version" => filter.metrics.filtered,
"below_safe_point_unique_keys" => filter.metrics.unique_key,
Expand Down Expand Up @@ -364,7 +373,8 @@ impl BackgroundRunnerCore {
.cloned()
}

fn on_snapshot_load_finished(&mut self, range: CacheRange) {
// if `false` is returned, the load is canceled
fn on_snapshot_load_finished(&mut self, range: CacheRange) -> bool {
fail::fail_point!("on_snapshot_load_finished");
loop {
// Consume the cached write batch after the snapshot is acquired.
Expand All @@ -386,7 +396,7 @@ impl BackgroundRunnerCore {
drop(core);
// Clear the range directly here to quickly free the memory.
self.delete_ranges(&[r]);
return;
return false;
}

if core.has_cached_write_batch(&range) {
Expand Down Expand Up @@ -414,6 +424,7 @@ impl BackgroundRunnerCore {
break;
}
}
true
}

fn on_snapshot_load_canceled(&mut self, range: CacheRange) {
Expand Down Expand Up @@ -539,6 +550,7 @@ impl Runnable for BackgroundRunner {
core.on_snapshot_load_canceled(range);
continue;
}
let start = Instant::now();
for &cf in DATA_CFS {
let guard = &epoch::pin();
let handle = skiplist_engine.cf_handle(cf);
Expand All @@ -564,13 +576,27 @@ impl Runnable for BackgroundRunner {
}
}
}
core.on_snapshot_load_finished(range);
if core.on_snapshot_load_finished(range.clone()) {
let duration = start.saturating_elapsed();
RANGE_LOAD_TIME_HISTOGRAM.observe(duration.as_secs_f64());
info!(
"Loading range finished";
"range" => ?range,
"duration(sec)" => ?duration,
);
} else {
info!("Loading range canceled";"range" => ?range);
}
}
};
self.range_load_remote.spawn(f);
}
BackgroundTask::MemoryCheckAndEvict => {
let mem_usage = self.core.memory_controller.mem_usage();
info!(
"start memory usage check and evict";
"mem_usage(MB)" => ReadableSize(mem_usage as u64).as_mb()
);
if mem_usage > self.core.memory_controller.soft_limit_threshold() {
// todo: select ranges to evict
}
Expand All @@ -585,6 +611,17 @@ impl Runnable for BackgroundRunner {
}
}

impl RunnableWithTimer for BackgroundRunner {
fn on_timeout(&mut self) {
let mem_usage = self.core.memory_controller.mem_usage();
RANGE_CACHE_MEMORY_USAGE.set(mem_usage as i64);
}

fn get_interval(&self) -> Duration {
Duration::from_secs(10)
}
}

#[derive(Default)]
struct FilterMetrics {
total: usize,
Expand Down
2 changes: 2 additions & 0 deletions components/region_cache_memory_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use engine_traits::{
use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock, RwLockWriteGuard};
use skiplist_rs::{base::OwnedIter, SkipList};
use slog_global::error;
use tikv_util::info;

use crate::{
background::{BackgroundTask, BgWorkManager, PdRangeHintService},
Expand Down Expand Up @@ -206,6 +207,7 @@ pub struct RangeCacheMemoryEngine {

impl RangeCacheMemoryEngine {
pub fn new(config: &RangeCacheEngineConfig) -> Self {
info!("init range cache memory engine";);
let core = Arc::new(RwLock::new(RangeCacheMemoryEngineCore::new()));
let skiplist_engine = { core.read().engine().clone() };

Expand Down
27 changes: 25 additions & 2 deletions components/region_cache_memory_engine/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0.

use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec, IntCounterVec};
use prometheus_static_metric::{auto_flush_from, make_auto_flush_static_metric};
use prometheus::*;
use prometheus_static_metric::*;

make_auto_flush_static_metric! {
pub label_enum KeyCountType {
Expand All @@ -24,6 +24,29 @@ lazy_static! {
&["type"]
)
.unwrap();
pub static ref RANGE_CACHE_MEMORY_USAGE: IntGauge = register_int_gauge!(
"tikv_range_cache_memory_usage_bytes",
"The memory usage of the range cache engine",
)
.unwrap();
pub static ref RANGE_LOAD_TIME_HISTOGRAM: Histogram = register_histogram!(
"tikv_range_load_duration_secs",
"Bucketed histogram of range load time duration.",
exponential_buckets(0.001, 2.0, 20).unwrap()
)
.unwrap();
pub static ref RANGE_GC_TIME_HISTOGRAM: Histogram = register_histogram!(
"tikv_range_gc_duration_secs",
"Bucketed histogram of range gc time duration.",
exponential_buckets(0.001, 2.0, 20).unwrap()
)
.unwrap();
pub static ref WRITE_DURATION_HISTOGRAM: Histogram = register_histogram!(
"tikv_range_cache_engine_write_duration_seconds",
"Bucketed histogram of write duration in range cache engine.",
exponential_buckets(0.00001, 2.0, 20).unwrap()
)
.unwrap();
}

lazy_static! {
Expand Down
6 changes: 6 additions & 0 deletions components/region_cache_memory_engine/src/range_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ impl RangeManager {
return false;
};

info!(
"evict range in cache range engine";
"range_start" => log_wrappers::Value(&evict_range.start),
"range_end" => log_wrappers::Value(&evict_range.end),
);

let meta = self.ranges.remove(&range_key).unwrap();
let (left_range, right_range) = range_key.split_off(evict_range);
assert!((left_range.is_some() || right_range.is_some()) || &range_key == evict_range);
Expand Down
22 changes: 13 additions & 9 deletions components/region_cache_memory_engine/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use crossbeam::epoch;
use engine_traits::{
CacheRange, Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions, CF_DEFAULT,
};
use tikv_util::{box_err, config::ReadableSize, error, warn};
use tikv_util::{box_err, config::ReadableSize, error, info, time::Instant, warn};

use crate::{
background::BackgroundTask,
engine::{cf_to_id, SkiplistEngine},
keys::{encode_key, InternalBytes, ValueType, ENC_KEY_SEQ_LENGTH},
memory_controller::{MemoryController, MemoryUsage},
metrics::WRITE_DURATION_HISTOGRAM,
range_manager::{RangeCacheStatus, RangeManager},
RangeCacheMemoryEngine,
};
Expand Down Expand Up @@ -109,6 +110,7 @@ impl RangeCacheWriteBatch {
std::mem::take(&mut self.pending_range_in_loading_buffer),
);
let guard = &epoch::pin();
let start = Instant::now();
// Some entries whose ranges may be marked as evicted above, but it does not
// matter, they will be deleted later.
let res = entries_to_write
Expand All @@ -117,6 +119,8 @@ impl RangeCacheWriteBatch {
.try_for_each(|e| {
e.write_to_memory(seq, &engine, self.memory_controller.clone(), guard)
});
let duration = start.saturating_elapsed_secs();
WRITE_DURATION_HISTOGRAM.observe(duration);

if !ranges_to_delete.is_empty() {
if let Err(e) = self
Expand Down Expand Up @@ -181,8 +185,13 @@ impl RangeCacheWriteBatch {

let memory_expect = entry_size();
if !self.memory_acquire(memory_expect) {
self.ranges_to_evict
.insert(self.current_range.clone().unwrap());
let range = self.current_range.clone().unwrap();
info!(
"memory acquire failed due to reaching hard limit";
"range_start" => log_wrappers::Value(&range.start),
"range_end" => log_wrappers::Value(&range.end),
);
self.ranges_to_evict.insert(range);
return;
}

Expand Down Expand Up @@ -229,12 +238,7 @@ impl RangeCacheWriteBatch {
self.schedule_memory_check();
return false;
}
MemoryUsage::SoftLimitReached(n) => {
warn!(
"the memory usage of in-memory engine reaches to soft limit";
"memory_usage(MB)" => ReadableSize(n as u64).as_mb_f64(),
"memory_acquire(MB)" => ReadableSize(mem_required as u64).as_mb_f64(),
);
MemoryUsage::SoftLimitReached(_) => {
self.schedule_memory_check();
}
_ => {}
Expand Down

0 comments on commit d62c581

Please sign in to comment.