diff --git a/Cargo.lock b/Cargo.lock index 65e2b9a1b91..777bb65c3f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2111,7 +2111,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#317f69fb54b44619271df82ec163764032184a85" +source = "git+https://github.com/pingcap/kvproto.git#bd5706b9d9f291f1f60661d67d17daae8f9dbdaf" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 69c983a95db..90c28a57279 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -32,7 +32,6 @@ use fail::fail_point; use kvproto::import_sstpb::SstMeta; use kvproto::kvrpcpb::ExtraOp as TxnExtraOp; use kvproto::metapb::{PeerRole, Region, RegionEpoch}; -use kvproto::pdpb::QueryKind; use kvproto::raft_cmdpb::{ AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, CommitMergeRequest, RaftCmdRequest, RaftCmdResponse, Request, Response, @@ -75,7 +74,7 @@ use crate::store::util::{ ConfChangeKind, KeysInfoFormatter, LatencyInspector, }; use crate::store::{cmd_resp, util, Config, RegionSnapshot, RegionTask}; -use crate::{bytes_capacity, store::QueryStats, Error, Result}; +use crate::{bytes_capacity, Error, Result}; use super::metrics::*; @@ -1447,22 +1446,9 @@ where for req in requests { let cmd_type = req.get_cmd_type(); let mut resp = match cmd_type { - CmdType::Put => { - self.metrics - .written_query_stats - .add_query_num(QueryKind::Put, 1); - self.handle_put(ctx.kv_wb_mut(), req) - } - CmdType::Delete => { - self.metrics - .written_query_stats - .add_query_num(QueryKind::Delete, 1); - self.handle_delete(ctx.kv_wb_mut(), req) - } + CmdType::Put => self.handle_put(ctx.kv_wb_mut(), req), + CmdType::Delete => self.handle_delete(ctx.kv_wb_mut(), req), CmdType::DeleteRange => { - self.metrics - .written_query_stats - .add_query_num(QueryKind::DeleteRange, 1); self.handle_delete_range(&ctx.engine, req, &mut ranges, ctx.use_delete_range) } CmdType::IngestSst => self.handle_ingest_sst(ctx, req, &mut ssts), @@ -3126,7 +3112,6 @@ pub struct ApplyMetrics { pub written_bytes: u64, pub written_keys: u64, - pub written_query_stats: QueryStats, pub lock_cf_written_bytes: u64, } diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 4ef74cc938a..a1f0244ed08 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -3314,10 +3314,6 @@ where self.ctx.store_stat.lock_cf_bytes_written += metrics.lock_cf_written_bytes; self.ctx.store_stat.engine_total_bytes_written += metrics.written_bytes; self.ctx.store_stat.engine_total_keys_written += metrics.written_keys; - self.ctx - .store_stat - .engine_total_query_stats - .add_query_stats(&metrics.written_query_stats.0); } /// Check if a request is valid if it has valid prepare_merge/commit_merge proposal. diff --git a/components/raftstore/src/store/mod.rs b/components/raftstore/src/store/mod.rs index 532165309cf..0c01d0163e8 100644 --- a/components/raftstore/src/store/mod.rs +++ b/components/raftstore/src/store/mod.rs @@ -59,7 +59,7 @@ pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport}; pub use self::util::{RegionReadProgress, RegionReadProgressRegistry}; pub use self::worker::{ AutoSplitController, FlowStatistics, FlowStatsReporter, PdTask, QueryStats, ReadDelegate, - ReadStats, SplitConfig, SplitConfigManager, TrackVer, + ReadStats, SplitConfig, SplitConfigManager, TrackVer, WriteStats, }; pub use self::worker::{CheckLeaderRunner, CheckLeaderTask}; pub use self::worker::{KeyEntry, LocalReader, RegionTask}; diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 0a25163ae10..be9f94ca42f 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -48,9 +48,7 @@ use crate::store::hibernate_state::GroupState; use crate::store::memory::{needs_evict_entry_cache, MEMTRACE_RAFT_ENTRIES}; use crate::store::msg::RaftCommand; use crate::store::util::{admin_cmd_epoch_lookup, RegionReadProgress}; -use crate::store::worker::{ - HeartbeatTask, QueryStats, ReadDelegate, ReadExecutor, ReadProgress, RegionTask, -}; +use crate::store::worker::{HeartbeatTask, ReadDelegate, ReadExecutor, ReadProgress, RegionTask}; use crate::store::{ Callback, Config, GlobalReplicationState, PdTask, ReadIndexContext, ReadResponse, }; @@ -219,7 +217,6 @@ pub struct ConsistencyState { pub struct PeerStat { pub written_bytes: u64, pub written_keys: u64, - pub written_query_stats: QueryStats, } #[derive(Default, Debug, Clone, Copy)] @@ -2171,9 +2168,6 @@ where self.peer_stat.written_keys += apply_metrics.written_keys; self.peer_stat.written_bytes += apply_metrics.written_bytes; - self.peer_stat - .written_query_stats - .add_query_stats(&apply_metrics.written_query_stats.0); self.delete_keys_hint += apply_metrics.delete_keys_hint; let diff = self.size_diff_hint as i64 + apply_metrics.size_diff_hint; self.size_diff_hint = cmp::max(diff, 0) as u64; @@ -3567,7 +3561,6 @@ where pending_peers: self.collect_pending_peers(ctx), written_bytes: self.peer_stat.written_bytes, written_keys: self.peer_stat.written_keys, - written_query_stats: self.peer_stat.written_query_stats.clone(), approximate_size: self.approximate_size, approximate_keys: self.approximate_keys, replication_status: self.region_replication_status(), diff --git a/components/raftstore/src/store/worker/mod.rs b/components/raftstore/src/store/worker/mod.rs index 0947517a2d2..ee33954ee52 100644 --- a/components/raftstore/src/store/worker/mod.rs +++ b/components/raftstore/src/store/worker/mod.rs @@ -29,4 +29,4 @@ pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate, ReadEx pub use self::region::{Runner as RegionRunner, Task as RegionTask}; pub use self::split_check::{KeyEntry, Runner as SplitCheckRunner, Task as SplitCheckTask}; pub use self::split_config::{SplitConfig, SplitConfigManager}; -pub use self::split_controller::{AutoSplitController, ReadStats}; +pub use self::split_controller::{AutoSplitController, ReadStats, WriteStats}; diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index ec2e7ef944d..07b100a9c9a 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -34,7 +34,7 @@ use crate::store::util::{ }; use crate::store::worker::query_stats::QueryStats; use crate::store::worker::split_controller::{SplitInfo, TOP_N}; -use crate::store::worker::{AutoSplitController, ReadStats}; +use crate::store::worker::{AutoSplitController, ReadStats, WriteStats}; use crate::store::{ Callback, CasualMessage, Config, PeerMsg, RaftCmdExtraOpts, RaftCommand, RaftRouter, SnapManager, StoreInfo, StoreMsg, @@ -72,6 +72,8 @@ impl FlowStatistics { pub trait FlowStatsReporter: Send + Clone + Sync + 'static { // TODO: maybe we need to return a Result later? fn report_read_stats(&self, read_stats: ReadStats); + + fn report_write_stats(&self, write_stats: WriteStats); } impl FlowStatsReporter for Scheduler> @@ -83,6 +85,12 @@ where error!("Failed to send read flow statistics"; "err" => ?e); } } + + fn report_write_stats(&self, write_stats: WriteStats) { + if let Err(e) = self.schedule(Task::WriteStats { write_stats }) { + error!("Failed to send write flow statistics"; "err" => ?e); + } + } } pub struct HeartbeatTask { @@ -93,7 +101,6 @@ pub struct HeartbeatTask { pub pending_peers: Vec, pub written_bytes: u64, pub written_keys: u64, - pub written_query_stats: QueryStats, pub approximate_size: u64, pub approximate_keys: u64, pub replication_status: Option, @@ -138,6 +145,9 @@ where ReadStats { read_stats: ReadStats, }, + WriteStats { + write_stats: WriteStats, + }, DestroyPeer { region_id: u64, }, @@ -206,14 +216,13 @@ impl Default for StoreStat { pub struct PeerStat { pub read_bytes: u64, pub read_keys: u64, - pub read_query_stats: QueryStats, + pub query_stats: QueryStats, // last_region_report_attributes records the state of the last region heartbeat pub last_region_report_read_bytes: u64, pub last_region_report_read_keys: u64, - pub last_region_report_read_query_stats: QueryStats, + pub last_region_report_query_stats: QueryStats, pub last_region_report_written_bytes: u64, pub last_region_report_written_keys: u64, - pub last_region_report_written_query_stats: QueryStats, pub last_region_report_ts: UnixSecs, // last_store_report_attributes records the state of the last store heartbeat pub last_store_report_read_bytes: u64, @@ -296,6 +305,9 @@ where Task::ReadStats { ref read_stats } => { write!(f, "get the read statistics {:?}", read_stats) } + Task::WriteStats { ref write_stats } => { + write!(f, "get the write statistics {:?}", write_stats) + } Task::DestroyPeer { ref region_id } => { write!(f, "destroy peer of region {}", region_id) } @@ -872,17 +884,17 @@ where for (region_id, region_peer) in &mut self.region_peers { let read_bytes = region_peer.read_bytes - region_peer.last_store_report_read_bytes; let read_keys = region_peer.read_keys - region_peer.last_store_report_read_keys; - let read_query_stats = region_peer - .read_query_stats + let query_stats = region_peer + .query_stats .sub_query_stats(®ion_peer.last_store_report_query_stats); region_peer.last_store_report_read_bytes = region_peer.read_bytes; region_peer.last_store_report_read_keys = region_peer.read_keys; region_peer .last_store_report_query_stats - .fill_query_stats(®ion_peer.read_query_stats); + .fill_query_stats(®ion_peer.query_stats); if read_bytes < hotspot_byte_report_threshold() && read_keys < hotspot_key_report_threshold() - && read_query_stats.get_read_query_num() < hotspot_query_num_report_threshold() + && query_stats.get_read_query_num() < hotspot_query_num_report_threshold() { continue; } @@ -890,7 +902,7 @@ where read_stat.set_region_id(*region_id); read_stat.set_read_keys(read_keys); read_stat.set_read_bytes(read_bytes); - read_stat.set_query_stats(read_query_stats.0); + read_stat.set_query_stats(query_stats.0); report_peers.insert(*region_id, read_stat); } @@ -1176,7 +1188,7 @@ where self.store_stat.engine_total_bytes_read += region_info.flow.read_bytes as u64; self.store_stat.engine_total_keys_read += region_info.flow.read_keys as u64; peer_stat - .read_query_stats + .query_stats .add_query_stats(®ion_info.query_stats.0); self.store_stat .engine_total_query_num @@ -1191,6 +1203,19 @@ where } } + fn handle_write_stats(&mut self, mut write_stats: WriteStats) { + for (region_id, region_info) in write_stats.region_infos.iter_mut() { + let peer_stat = self + .region_peers + .entry(*region_id) + .or_insert_with(PeerStat::default); + peer_stat.query_stats.add_query_stats(®ion_info.0); + self.store_stat + .engine_total_query_num + .add_query_stats(®ion_info.0); + } + } + fn handle_destroy_peer(&mut self, region_id: u64) { match self.region_peers.remove(®ion_id) { None => {} @@ -1394,21 +1419,15 @@ where hb_task.written_bytes - peer_stat.last_region_report_written_bytes; let written_keys_delta = hb_task.written_keys - peer_stat.last_region_report_written_keys; - let written_query_stats_delta = hb_task - .written_query_stats - .sub_query_stats(&peer_stat.last_region_report_written_query_stats); - let mut query_stats = peer_stat - .read_query_stats - .sub_query_stats(&peer_stat.last_region_report_read_query_stats); - query_stats.add_query_stats(&written_query_stats_delta.0); // add write info + let query_stats = peer_stat + .query_stats + .sub_query_stats(&peer_stat.last_region_report_query_stats); let mut last_report_ts = peer_stat.last_region_report_ts; peer_stat.last_region_report_written_bytes = hb_task.written_bytes; peer_stat.last_region_report_written_keys = hb_task.written_keys; - peer_stat.last_region_report_written_query_stats = hb_task.written_query_stats; peer_stat.last_region_report_read_bytes = peer_stat.read_bytes; peer_stat.last_region_report_read_keys = peer_stat.read_keys; - peer_stat.last_region_report_read_query_stats = - peer_stat.read_query_stats.clone(); + peer_stat.last_region_report_query_stats = peer_stat.query_stats.clone(); peer_stat.last_region_report_ts = UnixSecs::now(); if last_report_ts.is_zero() { @@ -1448,6 +1467,7 @@ where Task::ReportBatchSplit { regions } => self.handle_report_batch_split(regions), Task::ValidatePeer { region, peer } => self.handle_validate_peer(region, peer), Task::ReadStats { read_stats } => self.handle_read_stats(read_stats), + Task::WriteStats { write_stats } => self.handle_write_stats(write_stats), Task::DestroyPeer { region_id } => self.handle_destroy_peer(region_id), Task::StoreInfos { cpu_usages, diff --git a/components/raftstore/src/store/worker/query_stats.rs b/components/raftstore/src/store/worker/query_stats.rs index 8ae0420cde2..46ea5b37686 100644 --- a/components/raftstore/src/store/worker/query_stats.rs +++ b/components/raftstore/src/store/worker/query_stats.rs @@ -12,6 +12,10 @@ static QUERY_KINDS: &[kvproto::pdpb::QueryKind] = &[ QueryKind::Delete, QueryKind::DeleteRange, QueryKind::Put, + QueryKind::Prewrite, + QueryKind::Commit, + QueryKind::Rollback, + QueryKind::AcquirePessimisticLock, ]; #[derive(Debug, Clone, Default, PartialEq)] pub struct QueryStats(pub pdpb::QueryStats); @@ -26,6 +30,10 @@ impl QueryStats { QueryKind::Delete => self.0.set_delete(query_num), QueryKind::DeleteRange => self.0.set_delete_range(query_num), QueryKind::Put => self.0.set_put(query_num), + QueryKind::Prewrite => self.0.set_prewrite(query_num), + QueryKind::Commit => self.0.set_commit(query_num), + QueryKind::Rollback => self.0.set_rollback(query_num), + QueryKind::AcquirePessimisticLock => self.0.set_acquire_pessimistic_lock(query_num), QueryKind::Others => (), } } @@ -39,6 +47,10 @@ impl QueryStats { QueryKind::Delete => query_stats.get_delete(), QueryKind::DeleteRange => query_stats.get_delete_range(), QueryKind::Put => query_stats.get_put(), + QueryKind::Prewrite => query_stats.get_prewrite(), + QueryKind::Commit => query_stats.get_commit(), + QueryKind::Rollback => query_stats.get_rollback(), + QueryKind::AcquirePessimisticLock => query_stats.get_acquire_pessimistic_lock(), QueryKind::Others => 0, } } diff --git a/components/raftstore/src/store/worker/split_controller.rs b/components/raftstore/src/store/worker/split_controller.rs index 520b40a64b9..a1c18699731 100644 --- a/components/raftstore/src/store/worker/split_controller.rs +++ b/components/raftstore/src/store/worker/split_controller.rs @@ -281,6 +281,33 @@ impl Recorder { } } +#[derive(Clone, Debug)] +pub struct WriteStats { + pub region_infos: HashMap, +} + +impl WriteStats { + pub fn add_query_num(&mut self, region_id: u64, kind: QueryKind) { + let query_stats = self + .region_infos + .entry(region_id) + .or_insert_with(|| QueryStats::default()); + query_stats.add_query_num(kind, 1); + } + + pub fn is_empty(&self) -> bool { + self.region_infos.is_empty() + } +} + +impl Default for WriteStats { + fn default() -> WriteStats { + WriteStats { + region_infos: HashMap::default(), + } + } +} + #[derive(Clone, Debug)] pub struct ReadStats { pub region_infos: HashMap, diff --git a/components/server/src/server.rs b/components/server/src/server.rs index b9bbbbd967d..83a24b1931f 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -628,6 +628,7 @@ impl TiKVServer { self.concurrency_manager.clone(), lock_mgr.get_pipelined(), flow_controller, + pd_sender.clone(), ) .unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e)); diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index cc38c51f4ed..7a82a0abc1b 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -257,7 +257,7 @@ impl Simulator for ServerCluster { let pd_sender = pd_worker.scheduler(); let storage_read_pool = ReadPool::from(storage::build_read_pool( &tikv::config::StorageReadPoolConfig::default_for_test(), - pd_sender, + pd_sender.clone(), raft_engine.clone(), )); @@ -317,6 +317,7 @@ impl Simulator for ServerCluster { concurrency_manager.clone(), lock_mgr.get_pipelined(), Arc::new(FlowController::empty()), + pd_sender, )?; self.storages.insert(node_id, raft_engine); diff --git a/components/tikv_util/src/yatp_pool/mod.rs b/components/tikv_util/src/yatp_pool/mod.rs index 48f5f78eaea..adc5bc4117b 100644 --- a/components/tikv_util/src/yatp_pool/mod.rs +++ b/components/tikv_util/src/yatp_pool/mod.rs @@ -16,7 +16,7 @@ use yatp::ThreadPool; pub(crate) const TICK_INTERVAL: Duration = Duration::from_secs(1); fn tick_interval() -> Duration { - fail_point!("mock_tick_interval", |_| { Duration::from_millis(10) }); + fail_point!("mock_tick_interval", |_| { Duration::from_millis(1) }); TICK_INTERVAL } diff --git a/metrics/alertmanager/tikv.rules.yml b/metrics/alertmanager/tikv.rules.yml index 0b673accc8b..bfbf5b69f0e 100644 --- a/metrics/alertmanager/tikv.rules.yml +++ b/metrics/alertmanager/tikv.rules.yml @@ -301,17 +301,17 @@ groups: value: '{{ $value }}' summary: TiKV pending task too much - - alert: TiKV_low_space_and_add_region - expr: count( (sum(tikv_store_size_bytes{type="available"}) by (instance) / sum(tikv_store_size_bytes{type="capacity"}) by (instance) < 0.2) and (sum(tikv_raftstore_snapshot_traffic_total{type="applying"}) by (instance) > 0 ) ) > 0 + - alert: TiKV_low_space + expr: sum(tikv_store_size_bytes{type="available"}) by (instance) / sum(tikv_store_size_bytes{type="capacity"}) by (instance) < 0.2 for: 1m labels: env: ENV_LABELS_ENV level: warning - expr: count( (sum(tikv_store_size_bytes{type="available"}) by (instance) / sum(tikv_store_size_bytes{type="capacity"}) by (instance) < 0.2) and (sum(tikv_raftstore_snapshot_traffic_total{type="applying"}) by (instance) > 0 ) ) > 0 + expr: sum(tikv_store_size_bytes{type="available"}) by (instance) / sum(tikv_store_size_bytes{type="capacity"}) by (instance) < 0.2 annotations: description: 'cluster: ENV_LABELS_ENV, type: {{ $labels.type }}, instance: {{ $labels.instance }}, values: {{ $value }}' value: '{{ $value }}' - summary: TiKV low_space and add_region + summary: TiKV available disk space too low - alert: TiKV_approximate_region_size expr: histogram_quantile(0.99, sum(rate(tikv_raftstore_region_size_bucket[1m])) by (le)) > 1073741824 diff --git a/src/read_pool.rs b/src/read_pool.rs index aa3da42c871..8622dad089d 100644 --- a/src/read_pool.rs +++ b/src/read_pool.rs @@ -290,7 +290,7 @@ mod tests { use super::*; use crate::storage::TestEngineBuilder; use futures::channel::oneshot; - use raftstore::store::ReadStats; + use raftstore::store::{ReadStats, WriteStats}; use std::thread; use std::time::Duration; @@ -299,6 +299,7 @@ mod tests { impl FlowStatsReporter for DummyReporter { fn report_read_stats(&self, _read_stats: ReadStats) {} + fn report_write_stats(&self, _write_stats: WriteStats) {} } #[test] diff --git a/src/server/node.rs b/src/server/node.rs index 5c3cb268ad3..4ecfde42430 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -10,6 +10,7 @@ use crate::import::SSTImporter; use crate::read_pool::ReadPoolHandle; use crate::server::lock_manager::LockManager; use crate::server::Config as ServerConfig; +use crate::storage::kv::FlowStatsReporter; use crate::storage::txn::flow_controller::FlowController; use crate::storage::{config::Config as StorageConfig, Storage}; use concurrency_manager::ConcurrencyManager; @@ -34,7 +35,7 @@ const CHECK_CLUSTER_BOOTSTRAPPED_RETRY_SECONDS: u64 = 3; /// Creates a new storage engine which is backed by the Raft consensus /// protocol. -pub fn create_raft_storage( +pub fn create_raft_storage( engine: RaftKv, cfg: &StorageConfig, read_pool: ReadPoolHandle, @@ -42,6 +43,7 @@ pub fn create_raft_storage( concurrency_manager: ConcurrencyManager, pipelined_pessimistic_lock: Arc, flow_controller: Arc, + reporter: R, ) -> Result, LockManager>> where S: RaftStoreRouter + LocalReadRouter + 'static, @@ -55,6 +57,7 @@ where concurrency_manager, pipelined_pessimistic_lock, flow_controller, + reporter, )?; Ok(store) } diff --git a/src/storage/errors.rs b/src/storage/errors.rs index b80f5bb37d3..f0b84c1e670 100644 --- a/src/storage/errors.rs +++ b/src/storage/errors.rs @@ -31,6 +31,7 @@ pub enum ErrorInner { #[error("storage is closed.")] Closed, + #[error("{0}")] Other(#[from] Box), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 08543b884ba..39842cafef7 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -86,6 +86,7 @@ use kvproto::kvrpcpb::{ }; use kvproto::pdpb::QueryKind; use raftstore::store::util::build_key_range; +use raftstore::store::{ReadStats, WriteStats}; use rand::prelude::*; use resource_metering::{cpu::FutureExt, ResourceMeteringTag}; use std::{ @@ -197,7 +198,7 @@ macro_rules! check_key_size { impl Storage { /// Create a `Storage` from given engine. - pub fn from_engine( + pub fn from_engine( engine: E, config: &Config, read_pool: ReadPoolHandle, @@ -205,6 +206,7 @@ impl Storage { concurrency_manager: ConcurrencyManager, pipelined_pessimistic_lock: Arc, flow_controller: Arc, + reporter: R, ) -> Result { let sched = TxnScheduler::new( engine.clone(), @@ -213,6 +215,7 @@ impl Storage { config, pipelined_pessimistic_lock, flow_controller, + reporter, ); info!("Storage started."); @@ -927,10 +930,26 @@ impl Storage { } } + // The entry point of all transaction commands. It checks transaction-specific constraints. pub fn sched_txn_command( &self, cmd: TypedCommand, callback: Callback, + ) -> Result<()> { + if self.enable_ttl { + return Err(box_err!( + "can't sched txn cmd({}) with TTL enabled", + cmd.cmd.tag() + )); + } + self.sched_command(cmd, callback) + } + + // The entry point of the storage scheduler. Not only transaction commands need to access keys serially. + fn sched_command( + &self, + cmd: TypedCommand, + callback: Callback, ) -> Result<()> { use crate::storage::txn::commands::{ AcquirePessimisticLock, Prewrite, PrewritePessimistic, @@ -1724,7 +1743,7 @@ impl Storage { }; let cmd = RawCompareAndSwap::new(cf, Key::from_encoded(key), previous_value, value, ttl, ctx); - self.sched_txn_command(cmd, cb) + self.sched_command(cmd, cb) } pub fn raw_batch_put_atomic( @@ -1749,7 +1768,7 @@ impl Storage { None }; let cmd = RawAtomicStore::new(cf, muations, ttl, ctx); - self.sched_txn_command(cmd, callback) + self.sched_command(cmd, callback) } pub fn raw_batch_delete_atomic( @@ -1765,7 +1784,7 @@ impl Storage { .map(|k| Mutation::Delete(Key::from_encoded(k))) .collect(); let cmd = RawAtomicStore::new(cf, muations, None, ctx); - self.sched_txn_command(cmd, callback) + self.sched_command(cmd, callback) } pub fn raw_checksum( @@ -1924,6 +1943,14 @@ impl TestStorageBuilder { } } +#[derive(Clone)] +struct DummyReporter; + +impl FlowStatsReporter for DummyReporter { + fn report_read_stats(&self, _read_stats: ReadStats) {} + fn report_write_stats(&self, _write_stats: WriteStats) {} +} + impl TestStorageBuilder { pub fn from_engine_and_lock_mgr(engine: E, lock_mgr: L) -> Self { let config = Config::default(); @@ -1969,6 +1996,7 @@ impl TestStorageBuilder { ConcurrencyManager::new(1.into()), self.pipelined_pessimistic_lock, Arc::new(FlowController::empty()), + DummyReporter, ) } } diff --git a/src/storage/txn/sched_pool.rs b/src/storage/txn/sched_pool.rs index 8caf89cff6e..12937700355 100644 --- a/src/storage/txn/sched_pool.rs +++ b/src/storage/txn/sched_pool.rs @@ -1,15 +1,20 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::cell::RefCell; +use std::mem; use std::sync::{Arc, Mutex}; use tikv_util::time::Duration; use collections::HashMap; use file_system::{set_io_type, IOType}; +use kvproto::pdpb::QueryKind; use prometheus::local::*; +use raftstore::store::WriteStats; use tikv_util::yatp_pool::{FuturePool, PoolTicker, YatpPoolBuilder}; -use crate::storage::kv::{destroy_tls_engine, set_tls_engine, Engine, Statistics}; +use crate::storage::kv::{ + destroy_tls_engine, set_tls_engine, Engine, FlowStatsReporter, Statistics, +}; use crate::storage::metrics::*; pub struct SchedLocalMetrics { @@ -17,6 +22,7 @@ pub struct SchedLocalMetrics { processing_read_duration: LocalHistogramVec, processing_write_duration: LocalHistogramVec, command_keyread_histogram_vec: LocalHistogramVec, + local_write_stats: WriteStats, } thread_local! { @@ -26,6 +32,7 @@ thread_local! { processing_read_duration: SCHED_PROCESSING_READ_HISTOGRAM_VEC.local(), processing_write_duration: SCHED_PROCESSING_WRITE_HISTOGRAM_VEC.local(), command_keyread_histogram_vec: KV_COMMAND_KEYREAD_HISTOGRAM_VEC.local(), + local_write_stats:WriteStats::default(), } ); } @@ -36,18 +43,25 @@ pub struct SchedPool { } #[derive(Clone)] -pub struct SchedTicker; +pub struct SchedTicker { + reporter: R, +} -impl PoolTicker for SchedTicker { +impl PoolTicker for SchedTicker { fn on_tick(&mut self) { - tls_flush(); + tls_flush(&self.reporter); } } impl SchedPool { - pub fn new(engine: E, pool_size: usize, name_prefix: &str) -> Self { + pub fn new( + engine: E, + pool_size: usize, + reporter: R, + name_prefix: &str, + ) -> Self { let engine = Arc::new(Mutex::new(engine)); - let pool = YatpPoolBuilder::new(SchedTicker {}) + let pool = YatpPoolBuilder::new(SchedTicker {reporter:reporter.clone()}) .thread_count(pool_size, pool_size) .name_prefix(name_prefix) // Safety: by setting `after_start` and `before_stop`, `FuturePool` ensures @@ -59,7 +73,7 @@ impl SchedPool { .before_stop(move || unsafe { // Safety: we ensure the `set_` and `destroy_` calls use the same engine type. destroy_tls_engine::(); - tls_flush(); + tls_flush(&reporter); }) .build_future_pool(); SchedPool { pool } @@ -76,7 +90,7 @@ pub fn tls_collect_scan_details(cmd: &'static str, stats: &Statistics) { }); } -pub fn tls_flush() { +pub fn tls_flush(reporter: &R) { TLS_SCHED_METRICS.with(|m| { let mut m = m.borrow_mut(); for (cmd, stat) in m.local_scan_details.drain() { @@ -91,6 +105,20 @@ pub fn tls_flush() { m.processing_read_duration.flush(); m.processing_write_duration.flush(); m.command_keyread_histogram_vec.flush(); + + // Report PD metrics + if !m.local_write_stats.is_empty() { + let mut write_stats = WriteStats::default(); + mem::swap(&mut write_stats, &mut m.local_write_stats); + reporter.report_write_stats(write_stats); + } + }); +} + +pub fn tls_collect_query(region_id: u64, kind: QueryKind) { + TLS_SCHED_METRICS.with(|m| { + let mut m = m.borrow_mut(); + m.local_write_stats.add_query_num(region_id, kind); }); } diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index e9a00b73416..1944667e03f 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -30,6 +30,7 @@ use std::u64; use collections::HashMap; use concurrency_manager::{ConcurrencyManager, KeyHandleGuard}; use kvproto::kvrpcpb::{CommandPri, DiskFullOpt, ExtraOp}; +use kvproto::pdpb::QueryKind; use resource_metering::{cpu::FutureExt, ResourceMeteringTag}; use tikv_util::{callback::must_call, deadline::Deadline, time::Instant}; use txn_types::TimeStamp; @@ -45,6 +46,7 @@ use crate::storage::metrics::{self, *}; use crate::storage::txn::commands::{ ResponsePolicy, WriteContext, WriteResult, WriteResultLockInfo, }; +use crate::storage::txn::sched_pool::tls_collect_query; use crate::storage::txn::{ commands::Command, flow_controller::FlowController, @@ -53,7 +55,7 @@ use crate::storage::txn::{ Error, ProcessResult, }; use crate::storage::{ - get_priority_tag, types::StorageCallback, Error as StorageError, + get_priority_tag, kv::FlowStatsReporter, types::StorageCallback, Error as StorageError, ErrorInner as StorageErrorInner, }; @@ -287,13 +289,14 @@ unsafe impl Send for Scheduler {} impl Scheduler { /// Creates a scheduler. - pub(in crate::storage) fn new( + pub(in crate::storage) fn new( engine: E, lock_mgr: L, concurrency_manager: ConcurrencyManager, config: &Config, pipelined_pessimistic_lock: Arc, flow_controller: Arc, + reporter: R, ) -> Self { let t = Instant::now_coarse(); let mut task_slots = Vec::with_capacity(TASKS_SLOTS_NUM); @@ -310,11 +313,13 @@ impl Scheduler { worker_pool: SchedPool::new( engine.clone(), config.scheduler_worker_pool_size, + reporter.clone(), "sched-worker-pool", ), high_priority_pool: SchedPool::new( engine.clone(), std::cmp::max(1, config.scheduler_worker_pool_size / 2), + reporter, "sched-high-pri-pool", ), lock_mgr, @@ -644,6 +649,21 @@ impl Scheduler { let region_id = task.cmd.ctx().get_region_id(); let ts = task.cmd.ts(); let mut statistics = Statistics::default(); + match &task.cmd { + Command::Prewrite(_) | Command::PrewritePessimistic(_) => { + tls_collect_query(region_id, QueryKind::Prewrite); + } + Command::AcquirePessimisticLock(_) => { + tls_collect_query(region_id, QueryKind::AcquirePessimisticLock); + } + Command::Commit(_) => { + tls_collect_query(region_id, QueryKind::Commit); + } + Command::Rollback(_) | Command::PessimisticRollback(_) => { + tls_collect_query(region_id, QueryKind::Rollback); + } + _ => {} + } if task.cmd.readonly() { self.process_read(snapshot, task, &mut statistics); @@ -939,10 +959,19 @@ mod tests { }; use futures_executor::block_on; use kvproto::kvrpcpb::{BatchRollbackRequest, Context}; + use raftstore::store::{ReadStats, WriteStats}; use tikv_util::config::ReadableSize; use tikv_util::future::paired_future_callback; use txn_types::{Key, OldValues}; + #[derive(Clone)] + struct DummyReporter; + + impl FlowStatsReporter for DummyReporter { + fn report_read_stats(&self, _read_stats: ReadStats) {} + fn report_write_stats(&self, _write_stats: WriteStats) {} + } + #[test] fn test_command_latches() { let mut temp_map = HashMap::default(); @@ -1075,6 +1104,7 @@ mod tests { &config, Arc::new(AtomicBool::new(true)), Arc::new(FlowController::empty()), + DummyReporter, ); let mut lock = Lock::new(&[Key::from_raw(b"b")]); @@ -1124,6 +1154,7 @@ mod tests { &config, Arc::new(AtomicBool::new(true)), Arc::new(FlowController::empty()), + DummyReporter, ); // Spawn a task that sleeps for 500ms to occupy the pool. The next request @@ -1174,6 +1205,7 @@ mod tests { &config, Arc::new(AtomicBool::new(true)), Arc::new(FlowController::empty()), + DummyReporter, ); let mut lock = Lock::new(&[Key::from_raw(b"b")]); diff --git a/tests/integrations/raftstore/test_stats.rs b/tests/integrations/raftstore/test_stats.rs index 3cbf4672783..1ee771b4380 100644 --- a/tests/integrations/raftstore/test_stats.rs +++ b/tests/integrations/raftstore/test_stats.rs @@ -363,28 +363,30 @@ fn test_query_stats() { fail::cfg("mock_hotspot_threshold", "return(0)").unwrap(); fail::cfg("mock_tick_interval", "return(0)").unwrap(); fail::cfg("mock_collect_interval", "return(0)").unwrap(); - test_query_num(raw_get); - test_query_num(raw_batch_get); - test_query_num(raw_scan); - test_query_num(raw_batch_scan); - test_query_num(get); - test_query_num(batch_get); - test_query_num(scan); - test_query_num(scan_lock); - test_query_num(get_key_ttl); - test_query_num(raw_batch_get_command); - test_query_num(batch_get_command); + test_query_num(raw_get, true); + test_query_num(raw_batch_get, true); + test_query_num(raw_scan, true); + test_query_num(raw_batch_scan, true); + test_query_num(get, false); + test_query_num(batch_get, false); + test_query_num(scan, false); + test_query_num(scan_lock, false); + test_query_num(get_key_ttl, true); + test_query_num(raw_batch_get_command, true); + test_query_num(batch_get_command, false); test_delete_query(); + test_pessimistic_lock(); + test_rollback(); fail::remove("mock_tick_interval"); fail::remove("mock_hotspot_threshold"); fail::remove("mock_collect_interval"); } fn raw_put( - cluster: &Cluster, + _cluster: &Cluster, client: &TikvClient, ctx: &Context, - store_id: u64, + _store_id: u64, key: Vec, ) { let mut put_req = RawPutRequest::default(); @@ -394,7 +396,8 @@ fn raw_put( let put_resp = client.raw_put(&put_req).unwrap(); assert!(!put_resp.has_region_error()); assert!(put_resp.error.is_empty()); - assert!(check_query_num_write(&cluster, store_id, QueryKind::Put, 1)); + // todo support raw kv write query statistic + // skip raw kv write query check } fn put( @@ -429,7 +432,12 @@ fn put( prewrite_resp.get_errors() ); } - assert!(check_query_num_write(&cluster, store_id, QueryKind::Put, 1)); + assert!(check_query_num_write( + &cluster, + store_id, + QueryKind::Prewrite, + 1 + )); // Commit { let commit_ts = block_on(cluster.pd_client.get_tso()).unwrap(); @@ -446,10 +454,87 @@ fn put( ); assert!(!commit_resp.has_error(), "{:?}", commit_resp.get_error()); } - assert!(check_query_num_write(&cluster, store_id, QueryKind::Put, 1)); + assert!(check_query_num_write( + &cluster, + store_id, + QueryKind::Commit, + 1 + )); +} + +fn test_pessimistic_lock() { + let (cluster, client, ctx) = must_new_and_configure_cluster_and_kv_client(|cluster| { + cluster.cfg.raft_store.pd_store_heartbeat_tick_interval = ReadableDuration::millis(50); + }); + + let key = b"key2".to_vec(); + let store_id = 1; + put(&cluster, &client, &ctx, store_id, key.clone()); + + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + let mut mutation = Mutation::default(); + mutation.set_op(Op::PessimisticLock); + mutation.key = key.clone(); + mutation.value = b"v2".to_vec(); + + let mut lock_req = PessimisticLockRequest::default(); + lock_req.set_context(ctx.clone()); + lock_req.set_mutations(vec![mutation].into_iter().collect()); + lock_req.start_version = start_ts.into_inner(); + lock_req.for_update_ts = start_ts.into_inner(); + lock_req.primary_lock = key.clone(); + let lock_resp = client.kv_pessimistic_lock(&lock_req).unwrap(); + assert!( + !lock_resp.has_region_error(), + "{:?}", + lock_resp.get_region_error() + ); + assert!( + lock_resp.get_errors().is_empty(), + "{:?}", + lock_resp.get_errors() + ); + assert!(check_query_num_write( + &cluster, + store_id, + QueryKind::AcquirePessimisticLock, + 1 + )); } -fn test_query_num(query: Box) { +pub fn test_rollback() { + let (cluster, client, ctx) = must_new_and_configure_cluster_and_kv_client(|cluster| { + cluster.cfg.raft_store.pd_store_heartbeat_tick_interval = ReadableDuration::millis(50); + }); + let key = b"key2".to_vec(); + let store_id = 1; + put(&cluster, &client, &ctx, store_id, key.clone()); + let start_ts = block_on(cluster.pd_client.get_tso()).unwrap(); + + let mut rollback_req = BatchRollbackRequest::default(); + rollback_req.set_context(ctx.clone()); + rollback_req.start_version = start_ts.into_inner(); + rollback_req.set_keys(vec![key].into_iter().collect()); + let rollback_resp = client.kv_batch_rollback(&rollback_req).unwrap(); + assert!( + !rollback_resp.has_region_error(), + "{:?}", + rollback_resp.get_region_error() + ); + assert!( + !rollback_resp.has_error(), + "{:?}", + rollback_resp.get_error() + ); + assert!(check_query_num_write( + &cluster, + store_id, + QueryKind::Rollback, + 1 + )); +} + +fn test_query_num(query: Box, enable_ttl: bool) { let (cluster, client, ctx) = must_new_and_configure_cluster_and_kv_client(|cluster| { cluster.cfg.raft_store.pd_store_heartbeat_tick_interval = ReadableDuration::millis(50); cluster.cfg.split.qps_threshold = 0; @@ -457,13 +542,16 @@ fn test_query_num(query: Box) { cluster.cfg.split.split_contained_score = 2.0; cluster.cfg.split.detect_times = 1; cluster.cfg.split.sample_threshold = 0; - cluster.cfg.storage.enable_ttl = true; + cluster.cfg.storage.enable_ttl = enable_ttl; }); let k = b"key".to_vec(); let store_id = 1; - raw_put(&cluster, &client, &ctx, store_id, k.clone()); - put(&cluster, &client, &ctx, store_id, k.clone()); + if enable_ttl { + raw_put(&cluster, &client, &ctx, store_id, k.clone()); + } else { + put(&cluster, &client, &ctx, store_id, k.clone()); + } let region_id = cluster.get_region_id(&k); query( ctx.clone(), @@ -476,55 +564,47 @@ fn test_query_num(query: Box) { } fn test_delete_query() { - let (cluster, client, ctx) = must_new_and_configure_cluster_and_kv_client(|cluster| { - cluster.cfg.raft_store.pd_store_heartbeat_tick_interval = ReadableDuration::millis(50); - cluster.cfg.storage.enable_ttl = true; - }); - let k = b"key".to_vec(); let store_id = 1; - raw_put(&cluster, &client, &ctx, store_id, k.clone()); - put(&cluster, &client, &ctx, store_id, k.clone()); - - // Raw Delete - let mut delete_req = RawDeleteRequest::default(); - delete_req.set_context(ctx.clone()); - delete_req.key = k.clone(); - client.raw_delete(&delete_req).unwrap(); - assert!(check_query_num_write( - &cluster, - store_id, - QueryKind::Delete, - 1 - )); - // DeleteRange - let mut delete_req = DeleteRangeRequest::default(); - delete_req.set_context(ctx.clone()); - delete_req.set_start_key(k.clone()); - delete_req.set_end_key(vec![]); - client.kv_delete_range(&delete_req).unwrap(); - assert!(check_query_num_write( - &cluster, - store_id, - QueryKind::DeleteRange, - 1 - )); + { + let (cluster, client, ctx) = must_new_and_configure_cluster_and_kv_client(|cluster| { + cluster.cfg.raft_store.pd_store_heartbeat_tick_interval = ReadableDuration::millis(50); + cluster.cfg.storage.enable_ttl = true; + }); - raw_put(&cluster, &client, &ctx, store_id, k.clone()); - put(&cluster, &client, &ctx, store_id, k.clone()); - // Raw DeleteRange - let mut delete_req = RawDeleteRangeRequest::default(); - delete_req.set_context(ctx.clone()); - delete_req.set_start_key(k.clone()); - delete_req.set_end_key(vec![]); - client.raw_delete_range(&delete_req).unwrap(); - assert!(check_query_num_write( - &cluster, - store_id, - QueryKind::DeleteRange, - 1 - )); + raw_put(&cluster, &client, &ctx, store_id, k.clone()); + // Raw Delete + let mut delete_req = RawDeleteRequest::default(); + delete_req.set_context(ctx.clone()); + delete_req.key = k.clone(); + client.raw_delete(&delete_req).unwrap(); + // skip raw kv write query check + + raw_put(&cluster, &client, &ctx, store_id, k.clone()); + // Raw DeleteRange + let mut delete_req = RawDeleteRangeRequest::default(); + delete_req.set_context(ctx.clone()); + delete_req.set_start_key(k.clone()); + delete_req.set_end_key(vec![]); + client.raw_delete_range(&delete_req).unwrap(); + // skip raw kv write query check + } + + { + let (cluster, client, ctx) = must_new_and_configure_cluster_and_kv_client(|cluster| { + cluster.cfg.raft_store.pd_store_heartbeat_tick_interval = ReadableDuration::millis(50); + }); + + put(&cluster, &client, &ctx, store_id, k.clone()); + // DeleteRange + let mut delete_req = DeleteRangeRequest::default(); + delete_req.set_context(ctx); + delete_req.set_start_key(k); + delete_req.set_end_key(vec![]); + client.kv_delete_range(&delete_req).unwrap(); + // skip kv write query check + } } fn check_query_num_read( diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index e7fc6fc8a43..428bfafe751 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -242,12 +242,18 @@ fn test_rawkv_ttl() { std::thread::sleep(Duration::from_secs(1)); let mut get_req = RawGetRequest::default(); - get_req.set_context(ctx); + get_req.set_context(ctx.clone()); get_req.key = k; let get_resp = client.raw_get(&get_req).unwrap(); assert!(!get_resp.has_region_error()); assert!(get_resp.error.is_empty()); assert!(get_resp.value.is_empty()); + + // Can't run transaction commands with TTL enabled. + let mut prewrite_req = PrewriteRequest::default(); + prewrite_req.set_context(ctx); + let prewrite_resp = client.kv_prewrite(&prewrite_req).unwrap(); + assert!(!prewrite_resp.get_errors().is_empty()); } #[test]