diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index cb0b4aed878..f3a6a59b7dc 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -15,6 +15,7 @@ use std::usize; use std::time::Duration; use std::rc::Rc; use std::fmt::{self, Debug, Display, Formatter}; +use std::mem; use tipb::select::{self, Chunk, DAGRequest, SelectRequest}; use tipb::executor::ExecType; @@ -27,9 +28,9 @@ use kvproto::kvrpcpb::{CommandPri, IsolationLevel}; use util::time::{duration_to_sec, Instant}; use util::worker::{BatchRunnable, Scheduler}; use util::collections::HashMap; -use util::threadpool::{Context, ThreadPool, ThreadPoolBuilder}; +use util::threadpool::{Context, ContextFactory, ThreadPool, ThreadPoolBuilder}; use server::{Config, OnResponse}; -use storage::{self, engine, Engine, Snapshot, Statistics, StatisticsSummary}; +use storage::{self, engine, Engine, FlowStatistics, Snapshot, Statistics, StatisticsSummary}; use storage::engine::Error as EngineError; use super::codec::mysql; @@ -59,24 +60,50 @@ const OUTDATED_ERROR_MSG: &'static str = "request outdated."; const ENDPOINT_IS_BUSY: &'static str = "endpoint is busy"; -pub struct Host { +pub struct Host { engine: Box, sched: Scheduler, reqs: HashMap>, last_req_id: u64, - pool: ThreadPool, - low_priority_pool: ThreadPool, - high_priority_pool: ThreadPool, + pool: ThreadPool>, + low_priority_pool: ThreadPool>, + high_priority_pool: ThreadPool>, max_running_task_count: usize, } +pub type CopRequestStatistics = HashMap; + +pub trait CopSender: Send + Clone { + fn send(&self, CopRequestStatistics) -> Result<()>; +} + +struct CopContextFactory { + sender: R, +} + +impl ContextFactory> for CopContextFactory +where + R: CopSender + 'static, +{ + fn create(&self) -> CopContext { + CopContext { + sender: self.sender.clone(), + select_stats: Default::default(), + index_stats: Default::default(), + request_stats: HashMap::default(), + } + } +} + #[derive(Default)] -struct CopContext { +struct CopContext { select_stats: StatisticsSummary, index_stats: StatisticsSummary, + request_stats: CopRequestStatistics, + sender: R, } -impl CopContext { +impl CopContext { fn add_statistics(&mut self, type_str: &str, stats: &Statistics) { self.get_statistics(type_str).add_statistics(stats); } @@ -91,9 +118,17 @@ impl CopContext { } } } + + fn add_statistics_by_region(&mut self, region_id: u64, stats: &Statistics) { + let flow_stats = self.request_stats + .entry(region_id) + .or_insert_with(FlowStatistics::default); + flow_stats.add(&stats.write.flow_stats); + flow_stats.add(&stats.data.flow_stats); + } } -impl Context for CopContext { +impl Context for CopContext { fn on_tick(&mut self) { for type_str in &[STR_REQ_TYPE_SELECT, STR_REQ_TYPE_INDEX] { let this_statistics = self.get_statistics(type_str); @@ -110,26 +145,38 @@ impl Context for CopContext { } *this_statistics = Default::default(); } + if !self.request_stats.is_empty() { + let mut to_send_stats = HashMap::default(); + mem::swap(&mut to_send_stats, &mut self.request_stats); + if let Err(e) = self.sender.send(to_send_stats) { + error!("send coprocessor statistics: {:?}", e); + }; + } + } } -impl Host { - pub fn new(engine: Box, scheduler: Scheduler, cfg: &Config) -> Host { +impl Host { + pub fn new(engine: Box, scheduler: Scheduler, cfg: &Config, r: R) -> Host { Host { engine: engine, sched: scheduler, reqs: HashMap::default(), last_req_id: 0, max_running_task_count: cfg.end_point_max_tasks, - pool: ThreadPoolBuilder::with_default_factory(thd_name!("endpoint-normal-pool")) - .thread_count(cfg.end_point_concurrency) + pool: ThreadPoolBuilder::new( + thd_name!("endpoint-normal-pool"), + CopContextFactory { sender: r.clone() }, + ).thread_count(cfg.end_point_concurrency) .build(), - low_priority_pool: ThreadPoolBuilder::with_default_factory( + low_priority_pool: ThreadPoolBuilder::new( thd_name!("endpoint-low-pool"), + CopContextFactory { sender: r.clone() }, ).thread_count(cfg.end_point_concurrency) .build(), - high_priority_pool: ThreadPoolBuilder::with_default_factory( + high_priority_pool: ThreadPoolBuilder::new( thd_name!("endpoint-high-pool"), + CopContextFactory { sender: r.clone() }, ).thread_count(cfg.end_point_concurrency) .build(), } @@ -170,9 +217,11 @@ impl Host { CommandPri::High => &mut self.high_priority_pool, CommandPri::Normal => &mut self.pool, }; - pool.execute(move |ctx: &mut CopContext| { + pool.execute(move |ctx: &mut CopContext| { + let region_id = req.req.get_context().get_region_id(); let stats = end_point.handle_request(req); ctx.add_statistics(type_str, &stats); + ctx.add_statistics_by_region(region_id, &stats); COPR_PENDING_REQS .with_label_values(&[type_str, pri_str]) .dec(); @@ -367,7 +416,7 @@ impl Display for RequestTask { } } -impl BatchRunnable for Host { +impl BatchRunnable for Host { // TODO: limit pending reqs #[allow(for_kv_map)] fn run_batch(&mut self, tasks: &mut Vec) { @@ -661,17 +710,29 @@ pub fn get_req_pri_str(pri: CommandPri) -> &'static str { #[cfg(test)] mod tests { + use super::*; + use storage::engine::{self, TEMP_DIR}; use std::sync::*; use std::thread; use std::time::Duration; use kvproto::coprocessor::Request; - use storage::engine::{self, TEMP_DIR}; use util::worker::Worker; use util::time::Instant; - use super::*; + #[derive(Clone)] + struct MockCopSender {} + impl MockCopSender { + fn new() -> MockCopSender { + MockCopSender {} + } + } + impl CopSender for MockCopSender { + fn send(&self, _stats: CopRequestStatistics) -> Result<()> { + Ok(()) + } + } #[test] fn test_get_reg_scan_tag() { @@ -692,7 +753,7 @@ mod tests { let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap(); let mut cfg = Config::default(); cfg.end_point_concurrency = 1; - let end_point = Host::new(engine, worker.scheduler(), &cfg); + let end_point = Host::new(engine, worker.scheduler(), &cfg, MockCopSender::new()); worker.start_batch(end_point, 30).unwrap(); let (tx, rx) = mpsc::channel(); let mut task = RequestTask::new(Request::new(), box move |msg| { tx.send(msg).unwrap(); }); @@ -709,7 +770,7 @@ mod tests { let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap(); let mut cfg = Config::default(); cfg.end_point_concurrency = 1; - let mut end_point = Host::new(engine, worker.scheduler(), &cfg); + let mut end_point = Host::new(engine, worker.scheduler(), &cfg, MockCopSender::new()); end_point.max_running_task_count = 3; worker.start_batch(end_point, 30).unwrap(); let (tx, rx) = mpsc::channel(); diff --git a/src/coprocessor/mod.rs b/src/coprocessor/mod.rs index f30d3bb2e53..10ebc779732 100644 --- a/src/coprocessor/mod.rs +++ b/src/coprocessor/mod.rs @@ -85,5 +85,6 @@ impl From for Error { } } -pub use self::endpoint::{Host as EndPointHost, RequestTask, Task as EndPointTask, REQ_TYPE_DAG, - REQ_TYPE_INDEX, REQ_TYPE_SELECT, SINGLE_GROUP}; +pub use self::endpoint::{CopRequestStatistics, CopSender, Host as EndPointHost, RequestTask, + Task as EndPointTask, REQ_TYPE_DAG, REQ_TYPE_INDEX, REQ_TYPE_SELECT, + SINGLE_GROUP}; diff --git a/src/pd/client.rs b/src/pd/client.rs index 5cc657a1892..6520dbbd6eb 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -238,6 +238,8 @@ impl PdClient for RpcClient { req.set_pending_peers(RepeatedField::from_vec(region_stat.pending_peers)); req.set_bytes_written(region_stat.written_bytes); req.set_keys_written(region_stat.written_keys); + req.set_bytes_read(region_stat.read_bytes); + req.set_keys_read(region_stat.read_bytes); req.set_approximate_size(region_stat.approximate_size); let executor = |client: &RwLock, req: pdpb::RegionHeartbeatRequest| { diff --git a/src/pd/mod.rs b/src/pd/mod.rs index ad1d9981026..e9470ae1086 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -34,6 +34,8 @@ pub struct RegionStat { pub pending_peers: Vec, pub written_bytes: u64, pub written_keys: u64, + pub read_bytes: u64, + pub read_keys: u64, pub approximate_size: u64, } @@ -43,6 +45,8 @@ impl RegionStat { pending_peers: Vec, written_bytes: u64, written_keys: u64, + read_bytes: u64, + read_keys: u64, approximate_size: u64, ) -> RegionStat { RegionStat { @@ -50,6 +54,8 @@ impl RegionStat { pending_peers: pending_peers, written_bytes: written_bytes, written_keys: written_keys, + read_bytes: read_bytes, + read_keys: read_keys, approximate_size: approximate_size, } } diff --git a/src/raftstore/store/metrics.rs b/src/raftstore/store/metrics.rs index 163e9721870..2a846206356 100644 --- a/src/raftstore/store/metrics.rs +++ b/src/raftstore/store/metrics.rs @@ -136,6 +136,20 @@ lazy_static! { exponential_buckets(1.0, 2.0, 20).unwrap() ).unwrap(); + pub static ref REGION_READ_KEYS_HISTOGRAM: Histogram = + register_histogram!( + "tikv_region_read_keys", + "Histogram of keys written for regions", + exponential_buckets(1.0, 2.0, 20).unwrap() + ).unwrap(); + + pub static ref REGION_READ_BYTES_HISTOGRAM: Histogram = + register_histogram!( + "tikv_region_read_bytes", + "Histogram of bytes written for regions", + exponential_buckets(256.0, 2.0, 20).unwrap() + ).unwrap(); + pub static ref REQUEST_WAIT_TIME_HISTOGRAM: Histogram = register_histogram!( "tikv_raftstore_request_wait_time_duration_secs", diff --git a/src/raftstore/store/mod.rs b/src/raftstore/store/mod.rs index 16105de9daa..f83c709ff0c 100644 --- a/src/raftstore/store/mod.rs +++ b/src/raftstore/store/mod.rs @@ -29,7 +29,7 @@ mod worker; mod metrics; mod local_metrics; -pub use self::msg::{BatchCallback, Callback, Msg, SnapshotStatusMsg, Tick}; +pub use self::msg::{BatchCallback, Callback, CopFlowStatistics, Msg, SnapshotStatusMsg, Tick}; pub use self::store::{create_event_loop, Engines, Store, StoreChannel}; pub use self::config::Config; pub use self::transport::Transport; diff --git a/src/raftstore/store/msg.rs b/src/raftstore/store/msg.rs index 0f9ea63ad27..0e5b95c3b17 100644 --- a/src/raftstore/store/msg.rs +++ b/src/raftstore/store/msg.rs @@ -19,11 +19,14 @@ use kvproto::raft_serverpb::RaftMessage; use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse}; use kvproto::metapb::RegionEpoch; use raft::SnapshotStatus; +use util::collections::HashMap; +use storage::FlowStatistics; use util::escape; pub type Callback = Box; pub type BatchCallback = Box>) + Send>; +pub type CopFlowStatistics = HashMap; #[derive(Debug, Clone, Copy)] pub enum Tick { @@ -36,7 +39,6 @@ pub enum Tick { SnapGc, CompactLockCf, ConsistencyCheck, - ReportRegionFlow, } pub struct SnapshotStatusMsg { @@ -84,6 +86,8 @@ pub enum Msg { // For snapshot stats. SnapshotStats, + CoprocessorStats { request_stats: CopFlowStatistics }, + // For consistency check ComputeHashResult { region_id: u64, @@ -110,6 +114,7 @@ impl fmt::Debug for Msg { region_id ), Msg::SnapshotStats => write!(fmt, "Snapshot stats"), + Msg::CoprocessorStats { .. } => write!(fmt, "Coperocessor stats"), Msg::ComputeHashResult { region_id, index, diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index 6f4ce867f6d..b0b5e3957b7 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -44,7 +44,7 @@ use util::collections::{FlatMap, FlatMapValues as Values, HashSet}; use pd::INVALID_ID; -use super::store::Store; +use super::store::{Store, StoreStat}; use super::peer_storage::{write_peer_state, ApplySnapResult, InvokeContext, PeerStorage}; use super::util; use super::msg::Callback; @@ -195,6 +195,10 @@ pub struct PeerStat { pub written_keys: u64, pub last_written_bytes: u64, pub last_written_keys: u64, + pub read_bytes: u64, + pub read_keys: u64, + pub last_read_bytes: u64, + pub last_read_keys: u64, } pub struct Peer { @@ -872,7 +876,12 @@ impl Peer { } } - pub fn post_apply(&mut self, res: &ApplyRes, groups: &mut HashSet) { + pub fn post_apply( + &mut self, + res: &ApplyRes, + groups: &mut HashSet, + store_stat: &mut StoreStat, + ) { if self.is_applying_snapshot() { panic!("{} should not applying snapshot.", self.tag); } @@ -888,6 +897,8 @@ impl Peer { self.mut_store().applied_index_term = res.applied_index_term; self.peer_stat.written_keys += res.metrics.written_keys; self.peer_stat.written_bytes += res.metrics.written_bytes; + store_stat.engine_total_bytes_written += res.metrics.written_bytes; + store_stat.engine_total_keys_written += res.metrics.written_keys; let diff = if has_split { self.delete_keys_hint = res.metrics.delete_keys_hint; @@ -1568,8 +1579,10 @@ impl Peer { peer: self.peer.clone(), down_peers: self.collect_down_peers(self.cfg.max_peer_down_duration.0), pending_peers: self.collect_pending_peers(), - written_bytes: self.peer_stat.last_written_bytes, - written_keys: self.peer_stat.last_written_keys, + written_bytes: self.peer_stat.written_bytes - self.peer_stat.last_written_bytes, + written_keys: self.peer_stat.written_keys - self.peer_stat.last_written_keys, + read_bytes: self.peer_stat.read_bytes - self.peer_stat.last_read_bytes, + read_keys: self.peer_stat.read_keys - self.peer_stat.last_read_keys, }; if let Err(e) = worker.schedule(task) { error!("{} failed to notify pd: {}", self.tag, e); diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index acfa0ef5cf8..3d6ec3c822b 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -58,7 +58,7 @@ use super::engine::{Iterable, Peekable, Snapshot as EngineSnapshot}; use super::config::Config; use super::peer::{self, ConsistencyState, Peer, ReadyContext, StaleState}; use super::peer_storage::{self, ApplySnapResult, CacheQueryStats}; -use super::msg::{BatchCallback, Callback}; +use super::msg::{BatchCallback, Callback, CopFlowStatistics}; use super::cmd_resp::{bind_term, new_error}; use super::transport::Transport; use super::metrics::*; @@ -94,7 +94,19 @@ pub struct StoreChannel { pub struct StoreStat { pub region_bytes_written: LocalHistogram, pub region_keys_written: LocalHistogram, + pub region_bytes_read: LocalHistogram, + pub region_keys_read: LocalHistogram, pub lock_cf_bytes_written: u64, + + pub engine_total_bytes_written: u64, + pub engine_total_keys_written: u64, + pub engine_total_bytes_read: u64, + pub engine_total_keys_read: u64, + + pub engine_last_total_bytes_written: u64, + pub engine_last_total_keys_written: u64, + pub engine_last_total_bytes_read: u64, + pub engine_last_total_keys_read: u64, } impl Default for StoreStat { @@ -102,7 +114,18 @@ impl Default for StoreStat { StoreStat { region_bytes_written: REGION_WRITTEN_BYTES_HISTOGRAM.local(), region_keys_written: REGION_WRITTEN_KEYS_HISTOGRAM.local(), + region_bytes_read: REGION_READ_BYTES_HISTOGRAM.local(), + region_keys_read: REGION_READ_KEYS_HISTOGRAM.local(), lock_cf_bytes_written: 0, + engine_total_bytes_written: 0, + engine_total_keys_written: 0, + engine_total_bytes_read: 0, + engine_total_keys_read: 0, + + engine_last_total_bytes_written: 0, + engine_last_total_keys_written: 0, + engine_last_total_bytes_read: 0, + engine_last_total_keys_read: 0, } } } @@ -494,12 +517,11 @@ impl Store { self.register_raft_gc_log_tick(event_loop); self.register_split_region_check_tick(event_loop); self.register_compact_check_tick(event_loop); - self.register_pd_heartbeat_tick(event_loop); self.register_pd_store_heartbeat_tick(event_loop); + self.register_pd_heartbeat_tick(event_loop); self.register_snap_mgr_gc_tick(event_loop); self.register_compact_lock_cf_tick(event_loop); self.register_consistency_check_tick(event_loop); - self.register_report_region_flow_tick(event_loop); let split_check_runner = SplitCheckRunner::new( self.kv_engine.clone(), @@ -655,7 +677,7 @@ impl Store { Ok(ApplyTaskRes::Applys(multi_res)) => for res in multi_res { if let Some(p) = self.region_peers.get_mut(&res.region_id) { debug!("{} async apply finish: {:?}", p.tag, res); - p.post_apply(&res, &mut self.pending_raft_groups); + p.post_apply(&res, &mut self.pending_raft_groups, &mut self.store_stat); } self.store_stat.lock_cf_bytes_written += res.metrics.lock_cf_written_bytes; self.on_ready_result(res.region_id, res.exec_res); @@ -1675,25 +1697,12 @@ impl Store { }; } - fn register_report_region_flow_tick(&self, event_loop: &mut EventLoop) { - if let Err(e) = register_timer( - event_loop, - Tick::ReportRegionFlow, - self.cfg.report_region_flow_interval.as_millis(), - ) { - error!("{} register raft gc log tick err: {:?}", self.tag, e); - }; - } - - fn on_report_region_flow(&mut self, event_loop: &mut EventLoop) { + fn on_update_region_flow(&mut self) { for peer in self.region_peers.values_mut() { peer.peer_stat.last_written_bytes = peer.peer_stat.written_bytes; peer.peer_stat.last_written_keys = peer.peer_stat.written_keys; - if !peer.is_leader() { - peer.peer_stat.written_bytes = 0; - peer.peer_stat.written_keys = 0; - continue; - } + peer.peer_stat.last_read_bytes = peer.peer_stat.read_bytes; + peer.peer_stat.last_read_keys = peer.peer_stat.read_keys; self.store_stat .region_bytes_written @@ -1701,13 +1710,25 @@ impl Store { self.store_stat .region_keys_written .observe(peer.peer_stat.written_keys as f64); - peer.peer_stat.written_bytes = 0; - peer.peer_stat.written_keys = 0; + self.store_stat + .region_bytes_read + .observe(peer.peer_stat.read_bytes as f64); + self.store_stat + .region_keys_read + .observe(peer.peer_stat.read_keys as f64); } self.store_stat.region_bytes_written.flush(); self.store_stat.region_keys_written.flush(); + self.store_stat.region_bytes_read.flush(); + self.store_stat.region_keys_read.flush(); + } - self.register_report_region_flow_tick(event_loop); + fn on_update_store_flow(&mut self) { + self.store_stat.engine_last_total_bytes_written = + self.store_stat.engine_total_bytes_written; + self.store_stat.engine_last_total_keys_written = self.store_stat.engine_total_keys_written; + self.store_stat.engine_last_total_bytes_read = self.store_stat.engine_total_bytes_read; + self.store_stat.engine_last_total_keys_read = self.store_stat.engine_total_keys_read; } #[allow(if_same_then_else)] @@ -1958,7 +1979,6 @@ impl Store { for peer in self.region_peers.values_mut() { peer.check_peers(); } - let mut leader_count = 0; for peer in self.region_peers.values() { if peer.is_leader() { @@ -1966,7 +1986,7 @@ impl Store { peer.heartbeat_pd(&self.pd_worker); } } - + self.on_update_region_flow(); STORE_PD_HEARTBEAT_GAUGE_VEC .with_label_values(&["leader"]) .set(leader_count as f64); @@ -2020,6 +2040,23 @@ impl Store { stats.set_start_time(self.start_time.sec as u32); + // report store write flow to pd + stats.set_bytes_written( + self.store_stat.engine_total_bytes_written - + self.store_stat.engine_last_total_bytes_written, + ); + stats.set_keys_written( + self.store_stat.engine_total_keys_written - + self.store_stat.engine_last_total_keys_written, + ); + stats.set_bytes_read( + self.store_stat.engine_total_bytes_read - self.store_stat.engine_last_total_bytes_read, + ); + stats.set_keys_read( + self.store_stat.engine_total_keys_read - self.store_stat.engine_last_total_keys_read, + ); + self.on_update_store_flow(); + stats.set_is_busy(self.is_busy); self.is_busy = false; @@ -2168,6 +2205,20 @@ impl Store { peer.raft_group.report_unreachable(to_peer_id); } } + + fn handle_coprocessor_msg(&mut self, request_stats: CopFlowStatistics) { + for (region_id, stats) in &request_stats { + if let Some(peer) = self.region_peers.get_mut(region_id) { + if !peer.is_leader() { + continue; + } + peer.peer_stat.read_bytes += stats.read_bytes as u64; + peer.peer_stat.read_keys += stats.read_keys as u64; + self.store_stat.engine_total_bytes_read += stats.read_bytes as u64; + self.store_stat.engine_total_keys_read += stats.read_keys as u64; + } + } + } } // Consistency Check implementation. @@ -2484,6 +2535,7 @@ impl mio::Handler for Store { self.on_unreachable(region_id, to_peer_id); } Msg::SnapshotStats => self.store_heartbeat_pd(), + Msg::CoprocessorStats { request_stats } => self.handle_coprocessor_msg(request_stats), Msg::ComputeHashResult { region_id, index, @@ -2519,7 +2571,6 @@ impl mio::Handler for Store { Tick::SnapGc => self.on_snap_mgr_gc(event_loop), Tick::CompactLockCf => self.on_compact_lock_cf(event_loop), Tick::ConsistencyCheck => self.on_consistency_check_tick(event_loop), - Tick::ReportRegionFlow => self.on_report_region_flow(event_loop), } slow_log!(t, "{} handle timeout {:?}", self.tag, timeout); } diff --git a/src/raftstore/store/worker/pd.rs b/src/raftstore/store/worker/pd.rs index ab990eca822..2fd6db7c145 100644 --- a/src/raftstore/store/worker/pd.rs +++ b/src/raftstore/store/worker/pd.rs @@ -54,6 +54,8 @@ pub enum Task { pending_peers: Vec, written_bytes: u64, written_keys: u64, + read_bytes: u64, + read_keys: u64, }, StoreHeartbeat { stats: pdpb::StoreStats, @@ -418,6 +420,8 @@ impl Runnable for Runner { pending_peers, written_bytes, written_keys, + read_bytes, + read_keys, } => { let approximate_size = get_region_approximate_size(&self.db, ®ion).unwrap_or(0); self.handle_heartbeat( @@ -429,6 +433,8 @@ impl Runnable for Runner { pending_peers, written_bytes, written_keys, + read_bytes, + read_keys, approximate_size, ), ) diff --git a/src/server/server.rs b/src/server/server.rs index bdad587eb44..b030078c675 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -22,10 +22,10 @@ use kvproto::debugpb_grpc::create_debug; use util::worker::Worker; use storage::Storage; -use raftstore::store::{Engines, SnapManager, SnapshotStatusMsg}; +use raftstore::store::{CopFlowStatistics, Engines, Msg, SnapManager, SnapshotStatusMsg}; use super::{Config, Result}; -use coprocessor::{EndPointHost, EndPointTask}; +use coprocessor::{CopRequestStatistics, CopSender, EndPointHost, EndPointTask, Result as CopResult}; use super::service::*; use super::transport::{RaftStoreRouter, ServerTransport}; use super::resolve::StoreAddrResolver; @@ -52,6 +52,39 @@ pub struct Server snap_worker: Worker, } +#[derive(Clone)] +pub struct CopReport { + router: R, +} + +impl CopReport { + pub fn new(r: R) -> CopReport { + CopReport { router: r.clone() } + } +} + +impl CopSender for CopReport { + fn send(&self, stats: CopRequestStatistics) -> CopResult<()> { + box_try!(self.router.try_send(Msg::CoprocessorStats { + request_stats: stats as CopFlowStatistics, + })); + Ok(()) + } +} + +#[derive(Clone)] +struct MockCopSender {} +impl MockCopSender { + fn new() -> MockCopSender { + MockCopSender {} + } +} +impl CopSender for MockCopSender { + fn send(&self, _stats: CopRequestStatistics) -> CopResult<()> { + Ok(()) + } +} + impl Server { #[allow(too_many_arguments)] pub fn new( @@ -137,6 +170,7 @@ impl Server { self.storage.get_engine(), self.end_point_worker.scheduler(), cfg, + MockCopSender::new(), ); box_try!( self.end_point_worker diff --git a/src/storage/engine/mod.rs b/src/storage/engine/mod.rs index 613b2eb1931..be20a614b93 100644 --- a/src/storage/engine/mod.rs +++ b/src/storage/engine/mod.rs @@ -186,9 +186,23 @@ pub struct CFStatistics { pub prev: usize, pub seek: usize, pub seek_for_prev: usize, + pub flow_stats: FlowStatistics, pub over_seek_bound: usize, } +#[derive(Default, Clone)] +pub struct FlowStatistics { + pub read_keys: usize, + pub read_bytes: usize, +} + +impl FlowStatistics { + pub fn add(&mut self, other: &Self) { + self.read_bytes = self.read_keys.saturating_add(other.read_bytes); + self.read_keys = self.read_keys.saturating_add(other.read_keys); + } +} + impl CFStatistics { #[inline] pub fn total_op_count(&self) -> usize { @@ -216,6 +230,7 @@ impl CFStatistics { self.seek = self.seek.saturating_add(other.seek); self.seek_for_prev = self.seek_for_prev.saturating_add(other.seek_for_prev); self.over_seek_bound = self.over_seek_bound.saturating_add(other.over_seek_bound); + self.flow_stats.add(&other.flow_stats); } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 619c94cc5d4..e49838cf4ed 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -32,7 +32,8 @@ mod metrics; pub use self::config::{Config, DEFAULT_DATA_DIR, DEFAULT_ROCKSDB_SUB_DIR}; pub use self::engine::{new_local_engine, CFStatistics, Cursor, Engine, Error as EngineError, - Modify, ScanMode, Snapshot, Statistics, StatisticsSummary, TEMP_DIR}; + FlowStatistics, Modify, ScanMode, Snapshot, Statistics, StatisticsSummary, + TEMP_DIR}; pub use self::engine::raftkv::RaftKv; pub use self::txn::{Msg, Scheduler, SnapshotStore, StoreScanner}; pub use self::types::{make_key, Key, KvPair, MvccInfo, Value}; diff --git a/src/storage/mvcc/reader.rs b/src/storage/mvcc/reader.rs index 30154d16868..6d7e3570fc0 100644 --- a/src/storage/mvcc/reader.rs +++ b/src/storage/mvcc/reader.rs @@ -94,7 +94,8 @@ impl<'a> MvccReader<'a> { }; self.statistics.data.processed += 1; - + self.statistics.data.flow_stats.read_bytes += res.len(); + self.statistics.data.flow_stats.read_keys += 1; Ok(res) } @@ -185,6 +186,8 @@ impl<'a> MvccReader<'a> { } let write = try!(Write::parse(cursor.value())); self.statistics.write.processed += 1; + self.statistics.write.flow_stats.read_bytes += cursor.value().len(); + self.statistics.write.flow_stats.read_keys += 1; Ok(Some((commit_ts, write))) } @@ -222,7 +225,6 @@ impl<'a> MvccReader<'a> { match try!(self.seek_write(key, ts)) { Some((commit_ts, mut write)) => match write.write_type { WriteType::Put => { - self.statistics.write.processed += 1; if write.short_value.is_some() { if self.key_only { return Ok(Some(vec![])); @@ -232,7 +234,6 @@ impl<'a> MvccReader<'a> { return self.load_data(key, write.start_ts).map(Some); } WriteType::Delete => { - self.statistics.write.processed += 1; return Ok(None); } WriteType::Lock | WriteType::Rollback => ts = commit_ts - 1, @@ -250,7 +251,6 @@ impl<'a> MvccReader<'a> { let mut seek_ts = start_ts; while let Some((commit_ts, write)) = try!(self.reverse_seek_write(key, seek_ts)) { if write.start_ts == start_ts { - self.statistics.write.processed += 1; return Ok(Some((commit_ts, write.write_type))); } seek_ts = commit_ts + 1; diff --git a/tests/coprocessor/test_select.rs b/tests/coprocessor/test_select.rs index eaa33902a88..5d88bd1082c 100644 --- a/tests/coprocessor/test_select.rs +++ b/tests/coprocessor/test_select.rs @@ -40,6 +40,7 @@ use raftstore::util::MAX_LEADER_LEASE; use storage::sync_storage::SyncStorage; use storage::util::new_raft_engine; use tikv::coprocessor::select::xeval::evaluator::FLAG_IGNORE_TRUNCATE; +use tikv::coprocessor::Result as CopResult; static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1); @@ -59,6 +60,19 @@ struct Row { data: Vec, } +#[derive(Clone)] +struct MockCopSender {} +impl MockCopSender { + fn new() -> MockCopSender { + MockCopSender {} + } +} +impl CopSender for MockCopSender { + fn send(&self, _stats: CopRequestStatistics) -> CopResult<()> { + Ok(()) + } +} + #[derive(Debug)] struct ChunkSpliter { chunk: Vec, @@ -670,7 +684,12 @@ fn init_data_with_engine_and_commit( let mut end_point = Worker::new("test select worker"); let mut cfg = Config::default(); cfg.end_point_concurrency = 1; - let runner = EndPointHost::new(store.get_engine(), end_point.scheduler(), &cfg); + let runner = EndPointHost::new( + store.get_engine(), + end_point.scheduler(), + &cfg, + MockCopSender::new(), + ); end_point.start_batch(runner, 5).unwrap(); (store, end_point)