Skip to content

Commit

Permalink
Merge branch 'master' into dr-fix-assign
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Aug 25, 2021
2 parents 361cc54 + 3b098f6 commit e0c5f53
Show file tree
Hide file tree
Showing 21 changed files with 356 additions and 142 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 3 additions & 18 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use fail::fail_point;
use kvproto::import_sstpb::SstMeta;
use kvproto::kvrpcpb::ExtraOp as TxnExtraOp;
use kvproto::metapb::{PeerRole, Region, RegionEpoch};
use kvproto::pdpb::QueryKind;
use kvproto::raft_cmdpb::{
AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest,
RaftCmdRequest, RaftCmdResponse, Request, Response,
Expand Down Expand Up @@ -75,7 +74,7 @@ use crate::store::util::{
ConfChangeKind, KeysInfoFormatter, LatencyInspector,
};
use crate::store::{cmd_resp, util, Config, RegionSnapshot, RegionTask};
use crate::{bytes_capacity, store::QueryStats, Error, Result};
use crate::{bytes_capacity, Error, Result};

use super::metrics::*;

Expand Down Expand Up @@ -1447,22 +1446,9 @@ where
for req in requests {
let cmd_type = req.get_cmd_type();
let mut resp = match cmd_type {
CmdType::Put => {
self.metrics
.written_query_stats
.add_query_num(QueryKind::Put, 1);
self.handle_put(ctx.kv_wb_mut(), req)
}
CmdType::Delete => {
self.metrics
.written_query_stats
.add_query_num(QueryKind::Delete, 1);
self.handle_delete(ctx.kv_wb_mut(), req)
}
CmdType::Put => self.handle_put(ctx.kv_wb_mut(), req),
CmdType::Delete => self.handle_delete(ctx.kv_wb_mut(), req),
CmdType::DeleteRange => {
self.metrics
.written_query_stats
.add_query_num(QueryKind::DeleteRange, 1);
self.handle_delete_range(&ctx.engine, req, &mut ranges, ctx.use_delete_range)
}
CmdType::IngestSst => self.handle_ingest_sst(ctx, req, &mut ssts),
Expand Down Expand Up @@ -3126,7 +3112,6 @@ pub struct ApplyMetrics {

pub written_bytes: u64,
pub written_keys: u64,
pub written_query_stats: QueryStats,
pub lock_cf_written_bytes: u64,
}

Expand Down
4 changes: 0 additions & 4 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3314,10 +3314,6 @@ where
self.ctx.store_stat.lock_cf_bytes_written += metrics.lock_cf_written_bytes;
self.ctx.store_stat.engine_total_bytes_written += metrics.written_bytes;
self.ctx.store_stat.engine_total_keys_written += metrics.written_keys;
self.ctx
.store_stat
.engine_total_query_stats
.add_query_stats(&metrics.written_query_stats.0);
}

/// Check if a request is valid if it has valid prepare_merge/commit_merge proposal.
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport};
pub use self::util::{RegionReadProgress, RegionReadProgressRegistry};
pub use self::worker::{
AutoSplitController, FlowStatistics, FlowStatsReporter, PdTask, QueryStats, ReadDelegate,
ReadStats, SplitConfig, SplitConfigManager, TrackVer,
ReadStats, SplitConfig, SplitConfigManager, TrackVer, WriteStats,
};
pub use self::worker::{CheckLeaderRunner, CheckLeaderTask};
pub use self::worker::{KeyEntry, LocalReader, RegionTask};
Expand Down
9 changes: 1 addition & 8 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ use crate::store::hibernate_state::GroupState;
use crate::store::memory::{needs_evict_entry_cache, MEMTRACE_RAFT_ENTRIES};
use crate::store::msg::RaftCommand;
use crate::store::util::{admin_cmd_epoch_lookup, RegionReadProgress};
use crate::store::worker::{
HeartbeatTask, QueryStats, ReadDelegate, ReadExecutor, ReadProgress, RegionTask,
};
use crate::store::worker::{HeartbeatTask, ReadDelegate, ReadExecutor, ReadProgress, RegionTask};
use crate::store::{
Callback, Config, GlobalReplicationState, PdTask, ReadIndexContext, ReadResponse,
};
Expand Down Expand Up @@ -219,7 +217,6 @@ pub struct ConsistencyState {
pub struct PeerStat {
pub written_bytes: u64,
pub written_keys: u64,
pub written_query_stats: QueryStats,
}

#[derive(Default, Debug, Clone, Copy)]
Expand Down Expand Up @@ -2171,9 +2168,6 @@ where

self.peer_stat.written_keys += apply_metrics.written_keys;
self.peer_stat.written_bytes += apply_metrics.written_bytes;
self.peer_stat
.written_query_stats
.add_query_stats(&apply_metrics.written_query_stats.0);
self.delete_keys_hint += apply_metrics.delete_keys_hint;
let diff = self.size_diff_hint as i64 + apply_metrics.size_diff_hint;
self.size_diff_hint = cmp::max(diff, 0) as u64;
Expand Down Expand Up @@ -3567,7 +3561,6 @@ where
pending_peers: self.collect_pending_peers(ctx),
written_bytes: self.peer_stat.written_bytes,
written_keys: self.peer_stat.written_keys,
written_query_stats: self.peer_stat.written_query_stats.clone(),
approximate_size: self.approximate_size,
approximate_keys: self.approximate_keys,
replication_status: self.region_replication_status(),
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore/src/store/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate, ReadEx
pub use self::region::{Runner as RegionRunner, Task as RegionTask};
pub use self::split_check::{KeyEntry, Runner as SplitCheckRunner, Task as SplitCheckTask};
pub use self::split_config::{SplitConfig, SplitConfigManager};
pub use self::split_controller::{AutoSplitController, ReadStats};
pub use self::split_controller::{AutoSplitController, ReadStats, WriteStats};
62 changes: 41 additions & 21 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::store::util::{
};
use crate::store::worker::query_stats::QueryStats;
use crate::store::worker::split_controller::{SplitInfo, TOP_N};
use crate::store::worker::{AutoSplitController, ReadStats};
use crate::store::worker::{AutoSplitController, ReadStats, WriteStats};
use crate::store::{
Callback, CasualMessage, Config, PeerMsg, RaftCmdExtraOpts, RaftCommand, RaftRouter,
SnapManager, StoreInfo, StoreMsg,
Expand Down Expand Up @@ -72,6 +72,8 @@ impl FlowStatistics {
pub trait FlowStatsReporter: Send + Clone + Sync + 'static {
// TODO: maybe we need to return a Result later?
fn report_read_stats(&self, read_stats: ReadStats);

fn report_write_stats(&self, write_stats: WriteStats);
}

impl<E> FlowStatsReporter for Scheduler<Task<E>>
Expand All @@ -83,6 +85,12 @@ where
error!("Failed to send read flow statistics"; "err" => ?e);
}
}

fn report_write_stats(&self, write_stats: WriteStats) {
if let Err(e) = self.schedule(Task::WriteStats { write_stats }) {
error!("Failed to send write flow statistics"; "err" => ?e);
}
}
}

pub struct HeartbeatTask {
Expand All @@ -93,7 +101,6 @@ pub struct HeartbeatTask {
pub pending_peers: Vec<metapb::Peer>,
pub written_bytes: u64,
pub written_keys: u64,
pub written_query_stats: QueryStats,
pub approximate_size: u64,
pub approximate_keys: u64,
pub replication_status: Option<RegionReplicationStatus>,
Expand Down Expand Up @@ -138,6 +145,9 @@ where
ReadStats {
read_stats: ReadStats,
},
WriteStats {
write_stats: WriteStats,
},
DestroyPeer {
region_id: u64,
},
Expand Down Expand Up @@ -206,14 +216,13 @@ impl Default for StoreStat {
pub struct PeerStat {
pub read_bytes: u64,
pub read_keys: u64,
pub read_query_stats: QueryStats,
pub query_stats: QueryStats,
// last_region_report_attributes records the state of the last region heartbeat
pub last_region_report_read_bytes: u64,
pub last_region_report_read_keys: u64,
pub last_region_report_read_query_stats: QueryStats,
pub last_region_report_query_stats: QueryStats,
pub last_region_report_written_bytes: u64,
pub last_region_report_written_keys: u64,
pub last_region_report_written_query_stats: QueryStats,
pub last_region_report_ts: UnixSecs,
// last_store_report_attributes records the state of the last store heartbeat
pub last_store_report_read_bytes: u64,
Expand Down Expand Up @@ -296,6 +305,9 @@ where
Task::ReadStats { ref read_stats } => {
write!(f, "get the read statistics {:?}", read_stats)
}
Task::WriteStats { ref write_stats } => {
write!(f, "get the write statistics {:?}", write_stats)
}
Task::DestroyPeer { ref region_id } => {
write!(f, "destroy peer of region {}", region_id)
}
Expand Down Expand Up @@ -872,25 +884,25 @@ where
for (region_id, region_peer) in &mut self.region_peers {
let read_bytes = region_peer.read_bytes - region_peer.last_store_report_read_bytes;
let read_keys = region_peer.read_keys - region_peer.last_store_report_read_keys;
let read_query_stats = region_peer
.read_query_stats
let query_stats = region_peer
.query_stats
.sub_query_stats(&region_peer.last_store_report_query_stats);
region_peer.last_store_report_read_bytes = region_peer.read_bytes;
region_peer.last_store_report_read_keys = region_peer.read_keys;
region_peer
.last_store_report_query_stats
.fill_query_stats(&region_peer.read_query_stats);
.fill_query_stats(&region_peer.query_stats);
if read_bytes < hotspot_byte_report_threshold()
&& read_keys < hotspot_key_report_threshold()
&& read_query_stats.get_read_query_num() < hotspot_query_num_report_threshold()
&& query_stats.get_read_query_num() < hotspot_query_num_report_threshold()
{
continue;
}
let mut read_stat = pdpb::PeerStat::default();
read_stat.set_region_id(*region_id);
read_stat.set_read_keys(read_keys);
read_stat.set_read_bytes(read_bytes);
read_stat.set_query_stats(read_query_stats.0);
read_stat.set_query_stats(query_stats.0);
report_peers.insert(*region_id, read_stat);
}

Expand Down Expand Up @@ -1176,7 +1188,7 @@ where
self.store_stat.engine_total_bytes_read += region_info.flow.read_bytes as u64;
self.store_stat.engine_total_keys_read += region_info.flow.read_keys as u64;
peer_stat
.read_query_stats
.query_stats
.add_query_stats(&region_info.query_stats.0);
self.store_stat
.engine_total_query_num
Expand All @@ -1191,6 +1203,19 @@ where
}
}

fn handle_write_stats(&mut self, mut write_stats: WriteStats) {
for (region_id, region_info) in write_stats.region_infos.iter_mut() {
let peer_stat = self
.region_peers
.entry(*region_id)
.or_insert_with(PeerStat::default);
peer_stat.query_stats.add_query_stats(&region_info.0);
self.store_stat
.engine_total_query_num
.add_query_stats(&region_info.0);
}
}

fn handle_destroy_peer(&mut self, region_id: u64) {
match self.region_peers.remove(&region_id) {
None => {}
Expand Down Expand Up @@ -1394,21 +1419,15 @@ where
hb_task.written_bytes - peer_stat.last_region_report_written_bytes;
let written_keys_delta =
hb_task.written_keys - peer_stat.last_region_report_written_keys;
let written_query_stats_delta = hb_task
.written_query_stats
.sub_query_stats(&peer_stat.last_region_report_written_query_stats);
let mut query_stats = peer_stat
.read_query_stats
.sub_query_stats(&peer_stat.last_region_report_read_query_stats);
query_stats.add_query_stats(&written_query_stats_delta.0); // add write info
let query_stats = peer_stat
.query_stats
.sub_query_stats(&peer_stat.last_region_report_query_stats);
let mut last_report_ts = peer_stat.last_region_report_ts;
peer_stat.last_region_report_written_bytes = hb_task.written_bytes;
peer_stat.last_region_report_written_keys = hb_task.written_keys;
peer_stat.last_region_report_written_query_stats = hb_task.written_query_stats;
peer_stat.last_region_report_read_bytes = peer_stat.read_bytes;
peer_stat.last_region_report_read_keys = peer_stat.read_keys;
peer_stat.last_region_report_read_query_stats =
peer_stat.read_query_stats.clone();
peer_stat.last_region_report_query_stats = peer_stat.query_stats.clone();
peer_stat.last_region_report_ts = UnixSecs::now();

if last_report_ts.is_zero() {
Expand Down Expand Up @@ -1448,6 +1467,7 @@ where
Task::ReportBatchSplit { regions } => self.handle_report_batch_split(regions),
Task::ValidatePeer { region, peer } => self.handle_validate_peer(region, peer),
Task::ReadStats { read_stats } => self.handle_read_stats(read_stats),
Task::WriteStats { write_stats } => self.handle_write_stats(write_stats),
Task::DestroyPeer { region_id } => self.handle_destroy_peer(region_id),
Task::StoreInfos {
cpu_usages,
Expand Down
12 changes: 12 additions & 0 deletions components/raftstore/src/store/worker/query_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ static QUERY_KINDS: &[kvproto::pdpb::QueryKind] = &[
QueryKind::Delete,
QueryKind::DeleteRange,
QueryKind::Put,
QueryKind::Prewrite,
QueryKind::Commit,
QueryKind::Rollback,
QueryKind::AcquirePessimisticLock,
];
#[derive(Debug, Clone, Default, PartialEq)]
pub struct QueryStats(pub pdpb::QueryStats);
Expand All @@ -26,6 +30,10 @@ impl QueryStats {
QueryKind::Delete => self.0.set_delete(query_num),
QueryKind::DeleteRange => self.0.set_delete_range(query_num),
QueryKind::Put => self.0.set_put(query_num),
QueryKind::Prewrite => self.0.set_prewrite(query_num),
QueryKind::Commit => self.0.set_commit(query_num),
QueryKind::Rollback => self.0.set_rollback(query_num),
QueryKind::AcquirePessimisticLock => self.0.set_acquire_pessimistic_lock(query_num),
QueryKind::Others => (),
}
}
Expand All @@ -39,6 +47,10 @@ impl QueryStats {
QueryKind::Delete => query_stats.get_delete(),
QueryKind::DeleteRange => query_stats.get_delete_range(),
QueryKind::Put => query_stats.get_put(),
QueryKind::Prewrite => query_stats.get_prewrite(),
QueryKind::Commit => query_stats.get_commit(),
QueryKind::Rollback => query_stats.get_rollback(),
QueryKind::AcquirePessimisticLock => query_stats.get_acquire_pessimistic_lock(),
QueryKind::Others => 0,
}
}
Expand Down
27 changes: 27 additions & 0 deletions components/raftstore/src/store/worker/split_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,33 @@ impl Recorder {
}
}

#[derive(Clone, Debug)]
pub struct WriteStats {
pub region_infos: HashMap<u64, QueryStats>,
}

impl WriteStats {
pub fn add_query_num(&mut self, region_id: u64, kind: QueryKind) {
let query_stats = self
.region_infos
.entry(region_id)
.or_insert_with(|| QueryStats::default());
query_stats.add_query_num(kind, 1);
}

pub fn is_empty(&self) -> bool {
self.region_infos.is_empty()
}
}

impl Default for WriteStats {
fn default() -> WriteStats {
WriteStats {
region_infos: HashMap::default(),
}
}
}

#[derive(Clone, Debug)]
pub struct ReadStats {
pub region_infos: HashMap<u64, RegionInfo>,
Expand Down
1 change: 1 addition & 0 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
self.concurrency_manager.clone(),
lock_mgr.get_pipelined(),
flow_controller,
pd_sender.clone(),
)
.unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e));

Expand Down
3 changes: 2 additions & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl Simulator for ServerCluster {
let pd_sender = pd_worker.scheduler();
let storage_read_pool = ReadPool::from(storage::build_read_pool(
&tikv::config::StorageReadPoolConfig::default_for_test(),
pd_sender,
pd_sender.clone(),
raft_engine.clone(),
));

Expand Down Expand Up @@ -317,6 +317,7 @@ impl Simulator for ServerCluster {
concurrency_manager.clone(),
lock_mgr.get_pipelined(),
Arc::new(FlowController::empty()),
pd_sender,
)?;
self.storages.insert(node_id, raft_engine);

Expand Down
2 changes: 1 addition & 1 deletion components/tikv_util/src/yatp_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use yatp::ThreadPool;
pub(crate) const TICK_INTERVAL: Duration = Duration::from_secs(1);

fn tick_interval() -> Duration {
fail_point!("mock_tick_interval", |_| { Duration::from_millis(10) });
fail_point!("mock_tick_interval", |_| { Duration::from_millis(1) });
TICK_INTERVAL
}

Expand Down

0 comments on commit e0c5f53

Please sign in to comment.