Skip to content

Commit

Permalink
Merge branch 'master' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf committed Sep 10, 2021
2 parents 89a7784 + 12c9d65 commit d843a6a
Show file tree
Hide file tree
Showing 24 changed files with 365 additions and 121 deletions.
22 changes: 11 additions & 11 deletions components/raftstore/src/store/memory.rs
Expand Up @@ -5,12 +5,12 @@ use lazy_static::lazy_static;
use std::sync::Arc;
use tikv_alloc::{
mem_trace,
trace::{Id, MemoryTrace, MemoryTraceNode},
trace::{Id, MemoryTrace},
};
use tikv_util::sys::memory_usage_reaches_high_water;

lazy_static! {
pub static ref MEMTRACE_ROOT: Arc<MemoryTraceNode> = mem_trace!(
pub static ref MEMTRACE_ROOT: Arc<MemoryTrace> = mem_trace!(
raftstore,
[
peers,
Expand All @@ -23,35 +23,35 @@ lazy_static! {
]
);
/// Memory usage for raft peers fsms.
pub static ref MEMTRACE_PEERS: Arc<dyn MemoryTrace + Send + Sync> =
pub static ref MEMTRACE_PEERS: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("peers"));

/// Memory usage for apply fsms.
pub static ref MEMTRACE_APPLYS: Arc<dyn MemoryTrace + Send + Sync> =
pub static ref MEMTRACE_APPLYS: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("applys"));

pub static ref MEMTRACE_ENTRY_CACHE: Arc<dyn MemoryTrace + Send + Sync> =
pub static ref MEMTRACE_ENTRY_CACHE: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("entry_cache"));

pub static ref MEMTRACE_RAFT_ROUTER_ALIVE: Arc<dyn MemoryTrace + Send + Sync> = MEMTRACE_ROOT
pub static ref MEMTRACE_RAFT_ROUTER_ALIVE: Arc<MemoryTrace> = MEMTRACE_ROOT
.sub_trace(Id::Name("raft_router"))
.sub_trace(Id::Name("alive"));
pub static ref MEMTRACE_RAFT_ROUTER_LEAK: Arc<dyn MemoryTrace + Send + Sync> = MEMTRACE_ROOT
pub static ref MEMTRACE_RAFT_ROUTER_LEAK: Arc<MemoryTrace> = MEMTRACE_ROOT
.sub_trace(Id::Name("raft_router"))
.sub_trace(Id::Name("leak"));
pub static ref MEMTRACE_APPLY_ROUTER_ALIVE: Arc<dyn MemoryTrace + Send + Sync> = MEMTRACE_ROOT
pub static ref MEMTRACE_APPLY_ROUTER_ALIVE: Arc<MemoryTrace> = MEMTRACE_ROOT
.sub_trace(Id::Name("apply_router"))
.sub_trace(Id::Name("alive"));
pub static ref MEMTRACE_APPLY_ROUTER_LEAK: Arc<dyn MemoryTrace + Send + Sync> = MEMTRACE_ROOT
pub static ref MEMTRACE_APPLY_ROUTER_LEAK: Arc<MemoryTrace> = MEMTRACE_ROOT
.sub_trace(Id::Name("apply_router"))
.sub_trace(Id::Name("leak"));

/// Heap size trace for received raft messages.
pub static ref MEMTRACE_RAFT_MESSAGES: Arc<dyn MemoryTrace + Send + Sync> =
pub static ref MEMTRACE_RAFT_MESSAGES: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("raft_messages"));

/// Heap size trace for appended raft entries.
pub static ref MEMTRACE_RAFT_ENTRIES: Arc<dyn MemoryTrace + Send + Sync> =
pub static ref MEMTRACE_RAFT_ENTRIES: Arc<MemoryTrace> =
MEMTRACE_ROOT.sub_trace(Id::Name("raft_entries"));
}

Expand Down
6 changes: 3 additions & 3 deletions components/server/src/memory.rs
Expand Up @@ -3,12 +3,12 @@
use std::sync::Arc;

use tikv::server::MEM_TRACE_SUM_GAUGE;
use tikv_alloc::trace::{MemoryTrace, MemoryTraceNode};
use tikv_alloc::trace::MemoryTrace;
use tikv_util::time::Instant;

#[derive(Default)]
pub struct MemoryTraceManager {
providers: Vec<Arc<MemoryTraceNode>>,
providers: Vec<Arc<MemoryTrace>>,
}

impl MemoryTraceManager {
Expand All @@ -30,7 +30,7 @@ impl MemoryTraceManager {
}
}

pub fn register_provider(&mut self, provider: Arc<MemoryTraceNode>) {
pub fn register_provider(&mut self, provider: Arc<MemoryTrace>) {
let p = &mut self.providers;
p.push(provider);
}
Expand Down
8 changes: 5 additions & 3 deletions components/server/src/server.rs
Expand Up @@ -58,15 +58,16 @@ use raftstore::{
config::RaftstoreConfigManager,
fsm,
fsm::store::{RaftBatchSystem, RaftRouter, StoreMeta, PENDING_MSG_CAP},
memory::MEMTRACE_ROOT,
memory::MEMTRACE_ROOT as MEMTRACE_RAFTSTORE,
AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, SnapManager,
SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMsg,
},
};
use security::SecurityManager;
use tikv::{
config::{ConfigController, DBConfigManger, DBType, TiKvConfig, DEFAULT_ROCKSDB_SUB_DIR},
coprocessor, coprocessor_v2,
coprocessor::{self, MEMTRACE_ROOT as MEMTRACE_COPROCESSOR},
coprocessor_v2,
import::{ImportSSTService, SSTImporter},
read_pool::{build_yatp_read_pool, ReadPool},
server::raftkv::ReplicaReadLockChecker,
Expand Down Expand Up @@ -1069,7 +1070,8 @@ impl<ER: RaftEngine> TiKVServer<ER> {
}

let mut mem_trace_metrics = MemoryTraceManager::default();
mem_trace_metrics.register_provider((&*MEMTRACE_ROOT).to_owned());
mem_trace_metrics.register_provider(MEMTRACE_RAFTSTORE.clone());
mem_trace_metrics.register_provider(MEMTRACE_COPROCESSOR.clone());
self.background_worker
.spawn_interval_task(DEFAULT_MEMTRACE_FLUSH_INTERVAL, move || {
let now = Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion components/test_coprocessor/src/util.rs
Expand Up @@ -23,7 +23,7 @@ pub fn handle_request<E>(copr: &Endpoint<E>, req: Request) -> Response
where
E: Engine,
{
block_on(copr.parse_and_handle_unary_request(req, None))
block_on(copr.parse_and_handle_unary_request(req, None)).consume()
}

pub fn handle_select<E>(copr: &Endpoint<E>, req: Request) -> SelectResponse
Expand Down
1 change: 1 addition & 0 deletions components/tikv_alloc/src/lib.rs
Expand Up @@ -125,6 +125,7 @@ mod imp;
mod imp;

pub use crate::imp::*;
pub use crate::trace::*;

#[global_allocator]
static ALLOC: imp::Allocator = imp::allocator();
126 changes: 94 additions & 32 deletions components/tikv_alloc/src/trace.rs
Expand Up @@ -13,12 +13,12 @@
//! enumerates instead.
//!
//! To define a memory trace tree, we can use the `mem_trace` macro. The `mem_trace`
//! macro constructs every node as a `MemoryTraceNode` which implements `MemoryTrace` trait.
//! macro constructs every node as a `MemoryTrace` which implements `MemoryTrace` trait.
//! We can also define a specified tree node by implementing `MemoryTrace` trait.

use std::fmt::{self, Display};
use std::fmt::{self, Debug, Display, Formatter};
use std::num::NonZeroU64;
use std::ops::Add;
use std::ops::{Add, Deref, DerefMut};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -118,34 +118,22 @@ impl Add for TraceEvent {
}
}

pub trait MemoryTrace {
fn trace(&self, event: TraceEvent);
fn snapshot(&self) -> MemoryTraceSnapshot;
fn sub_trace(&self, id: Id) -> Arc<dyn MemoryTrace + Send + Sync>;
fn add_sub_trace(&mut self, id: Id, trace: Arc<dyn MemoryTrace + Send + Sync>);
fn sum(&self) -> usize;
fn name(&self) -> String;
fn get_children_ids(&self) -> Vec<Id>;
}

pub struct MemoryTraceNode {
pub struct MemoryTrace {
pub id: Id,
trace: AtomicUsize,
children: HashMap<Id, Arc<dyn MemoryTrace + Send + Sync>>,
children: HashMap<Id, Arc<MemoryTrace>>,
}

impl MemoryTraceNode {
pub fn new(id: impl Into<Id>) -> MemoryTraceNode {
MemoryTraceNode {
impl MemoryTrace {
pub fn new(id: impl Into<Id>) -> MemoryTrace {
MemoryTrace {
id: id.into(),
trace: std::sync::atomic::AtomicUsize::default(),
children: HashMap::default(),
}
}
}

impl MemoryTrace for MemoryTraceNode {
fn trace(&self, event: TraceEvent) {
pub fn trace(&self, event: TraceEvent) {
match event {
TraceEvent::Add(val) => {
self.trace.fetch_add(val, Ordering::Relaxed);
Expand All @@ -159,33 +147,43 @@ impl MemoryTrace for MemoryTraceNode {
}
}

fn snapshot(&self) -> MemoryTraceSnapshot {
pub fn trace_guard<T: Default>(
self: &Arc<MemoryTrace>,
item: T,
size: usize,
) -> MemoryTraceGuard<T> {
self.trace(TraceEvent::Add(size));
let node = Some(self.clone());
MemoryTraceGuard { item, size, node }
}

pub fn snapshot(&self) -> MemoryTraceSnapshot {
MemoryTraceSnapshot {
id: self.id,
trace: self.trace.load(Ordering::Relaxed),
children: self.children.values().map(|c| c.snapshot()).collect(),
}
}

fn sub_trace(&self, id: Id) -> Arc<dyn MemoryTrace + Send + Sync> {
pub fn sub_trace(&self, id: Id) -> Arc<MemoryTrace> {
self.children.get(&id).cloned().unwrap()
}

fn add_sub_trace(&mut self, id: Id, trace: Arc<dyn MemoryTrace + Send + Sync>) {
pub fn add_sub_trace(&mut self, id: Id, trace: Arc<MemoryTrace>) {
self.children.insert(id, trace);
}

// TODO: Maybe need a cache to reduce read cost.
fn sum(&self) -> usize {
pub fn sum(&self) -> usize {
let sum: usize = self.children.values().map(|c| c.sum()).sum();
sum + self.trace.load(Ordering::Relaxed)
}

fn name(&self) -> String {
pub fn name(&self) -> String {
self.id.name()
}

fn get_children_ids(&self) -> Vec<Id> {
pub fn get_children_ids(&self) -> Vec<Id> {
let mut ids = vec![];
for id in self.children.keys() {
ids.push(*id);
Expand Down Expand Up @@ -215,15 +213,13 @@ pub struct MemoryTraceSnapshot {
macro_rules! mem_trace {
($name: ident) => {
{
use tikv_alloc::trace::MemoryTraceNode;
use tikv_alloc::trace::MemoryTrace;

std::sync::Arc::new(MemoryTraceNode::new(stringify!($name)))
std::sync::Arc::new(MemoryTrace::new(stringify!($name)))
}
};
($name: ident, [$($child:tt),+]) => {
{
use tikv_alloc::trace::MemoryTrace;

let mut node = mem_trace!($name);
$(
let child = mem_trace!($child);
Expand All @@ -237,11 +233,77 @@ macro_rules! mem_trace {
}
}

pub struct MemoryTraceGuard<T: Default> {
item: T,
size: usize,
node: Option<Arc<MemoryTrace>>,
}

impl<T: Default> MemoryTraceGuard<T> {
pub fn map<F, U: Default>(mut self, f: F) -> MemoryTraceGuard<U>
where
F: FnOnce(T) -> U,
{
let item = std::mem::take(&mut self.item);
MemoryTraceGuard {
item: f(item),
size: self.size,
node: self.node.take(),
}
}

pub fn consume(&mut self) -> T {
if let Some(node) = self.node.take() {
node.trace(TraceEvent::Sub(self.size));
}
std::mem::take(&mut self.item)
}
}

impl<T: Default> Drop for MemoryTraceGuard<T> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
node.trace(TraceEvent::Sub(self.size));
}
}
}

impl<T: Default> From<T> for MemoryTraceGuard<T> {
fn from(item: T) -> Self {
MemoryTraceGuard {
item,
size: 0,
node: None,
}
}
}

impl<T: Default> Deref for MemoryTraceGuard<T> {
type Target = T;
fn deref(&self) -> &T {
&self.item
}
}

impl<T: Default> DerefMut for MemoryTraceGuard<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.item
}
}

impl<T: Default> Debug for MemoryTraceGuard<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("MemoryTraceGuard")
.field("size", &self.size)
.finish()
}
}

#[cfg(test)]
mod tests {
use crate::{
self as tikv_alloc,
trace::{Id, MemoryTrace, TraceEvent},
trace::{Id, TraceEvent},
};

#[test]
Expand Down
2 changes: 1 addition & 1 deletion metrics/grafana/tikv_details.json
Expand Up @@ -3686,7 +3686,7 @@
"steppedLine": false,
"targets": [
{
"expr": "tikv_server_mem_trace_sum{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", name=~\"raftstore-.*\"}",
"expr": "tikv_server_mem_trace_sum{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{instance}}-{{name}}",
Expand Down
5 changes: 3 additions & 2 deletions src/coprocessor/cache.rs
Expand Up @@ -2,6 +2,7 @@

use async_trait::async_trait;
use kvproto::coprocessor::Response;
use tikv_alloc::trace::MemoryTraceGuard;

use crate::coprocessor::RequestHandler;
use crate::coprocessor::*;
Expand All @@ -25,12 +26,12 @@ impl CachedRequestHandler {

#[async_trait]
impl RequestHandler for CachedRequestHandler {
async fn handle_request(&mut self) -> Result<Response> {
async fn handle_request(&mut self) -> Result<MemoryTraceGuard<Response>> {
let mut resp = Response::default();
resp.set_is_cache_hit(true);
if let Some(v) = self.data_version {
resp.set_cache_last_version(v);
}
Ok(resp)
Ok(resp.into())
}
}
5 changes: 3 additions & 2 deletions src/coprocessor/checksum.rs
Expand Up @@ -7,6 +7,7 @@ use tidb_query_common::storage::scanner::{RangesScanner, RangesScannerOptions};
use tidb_query_common::storage::Range;
use tidb_query_executors::runner::MAX_TIME_SLICE;
use tidb_query_expr::BATCH_MAX_SIZE;
use tikv_alloc::trace::MemoryTraceGuard;
use tikv_util::time::Instant;
use tipb::{ChecksumAlgorithm, ChecksumRequest, ChecksumResponse};
use yatp::task::future::reschedule;
Expand Down Expand Up @@ -53,7 +54,7 @@ impl<S: Snapshot> ChecksumContext<S> {

#[async_trait]
impl<S: Snapshot> RequestHandler for ChecksumContext<S> {
async fn handle_request(&mut self) -> Result<Response> {
async fn handle_request(&mut self) -> Result<MemoryTraceGuard<Response>> {
let algorithm = self.req.get_algorithm();
if algorithm != ChecksumAlgorithm::Crc64Xor {
return Err(box_err!("unknown checksum algorithm {:?}", algorithm));
Expand Down Expand Up @@ -101,7 +102,7 @@ impl<S: Snapshot> RequestHandler for ChecksumContext<S> {

let mut resp = Response::default();
resp.set_data(data);
Ok(resp)
Ok(resp.into())
}

fn collect_scan_statistics(&mut self, dest: &mut Statistics) {
Expand Down

0 comments on commit d843a6a

Please sign in to comment.