Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: report read statistics to pd #2307

Merged
merged 26 commits into from Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 82 additions & 21 deletions src/coprocessor/endpoint.rs
Expand Up @@ -15,6 +15,7 @@ use std::usize;
use std::time::Duration;
use std::rc::Rc;
use std::fmt::{self, Debug, Display, Formatter};
use std::mem;

use tipb::select::{self, Chunk, DAGRequest, SelectRequest};
use tipb::executor::ExecType;
Expand All @@ -27,9 +28,9 @@ use kvproto::kvrpcpb::{CommandPri, IsolationLevel};
use util::time::{duration_to_sec, Instant};
use util::worker::{BatchRunnable, Scheduler};
use util::collections::HashMap;
use util::threadpool::{Context, ThreadPool, ThreadPoolBuilder};
use util::threadpool::{Context, ContextFactory, ThreadPool, ThreadPoolBuilder};
use server::{Config, OnResponse};
use storage::{self, engine, Engine, Snapshot, Statistics, StatisticsSummary};
use storage::{self, engine, Engine, FlowStatistics, Snapshot, Statistics, StatisticsSummary};
use storage::engine::Error as EngineError;

use super::codec::mysql;
Expand Down Expand Up @@ -59,24 +60,50 @@ const OUTDATED_ERROR_MSG: &'static str = "request outdated.";

const ENDPOINT_IS_BUSY: &'static str = "endpoint is busy";

pub struct Host {
pub struct Host<R: CopSender + 'static> {
engine: Box<Engine>,
sched: Scheduler<Task>,
reqs: HashMap<u64, Vec<RequestTask>>,
last_req_id: u64,
pool: ThreadPool<CopContext>,
low_priority_pool: ThreadPool<CopContext>,
high_priority_pool: ThreadPool<CopContext>,
pool: ThreadPool<CopContext<R>>,
low_priority_pool: ThreadPool<CopContext<R>>,
high_priority_pool: ThreadPool<CopContext<R>>,
max_running_task_count: usize,
}

pub type CopRequestStatistics = HashMap<u64, FlowStatistics>;

pub trait CopSender: Send + Clone {
fn send(&self, CopRequestStatistics) -> Result<()>;
}

struct CopContextFactory<R: CopSender + 'static> {
sender: R,
}

impl<R> ContextFactory<CopContext<R>> for CopContextFactory<R>
where
R: CopSender + 'static,
{
fn create(&self) -> CopContext<R> {
CopContext {
sender: self.sender.clone(),
select_stats: Default::default(),
index_stats: Default::default(),
request_stats: HashMap::default(),
}
}
}

#[derive(Default)]
struct CopContext {
struct CopContext<R: CopSender + 'static> {
select_stats: StatisticsSummary,
index_stats: StatisticsSummary,
request_stats: CopRequestStatistics,
sender: R,
}

impl CopContext {
impl<R: CopSender + 'static> CopContext<R> {
fn add_statistics(&mut self, type_str: &str, stats: &Statistics) {
self.get_statistics(type_str).add_statistics(stats);
}
Expand All @@ -91,9 +118,17 @@ impl CopContext {
}
}
}

fn add_statistics_by_region(&mut self, region_id: u64, stats: &Statistics) {
let flow_stats = self.request_stats
.entry(region_id)
.or_insert_with(FlowStatistics::default);
flow_stats.add(&stats.write.flow_stats);
flow_stats.add(&stats.data.flow_stats);
}
}

impl Context for CopContext {
impl<R: CopSender + 'static> Context for CopContext<R> {
fn on_tick(&mut self) {
for type_str in &[STR_REQ_TYPE_SELECT, STR_REQ_TYPE_INDEX] {
let this_statistics = self.get_statistics(type_str);
Expand All @@ -110,26 +145,38 @@ impl Context for CopContext {
}
*this_statistics = Default::default();
}
if !self.request_stats.is_empty() {
let mut to_send_stats = HashMap::default();
mem::swap(&mut to_send_stats, &mut self.request_stats);
if let Err(e) = self.sender.send(to_send_stats) {
error!("send coprocessor statistics: {:?}", e);
};
}

}
}

impl Host {
pub fn new(engine: Box<Engine>, scheduler: Scheduler<Task>, cfg: &Config) -> Host {
impl<R: CopSender + 'static> Host<R> {
pub fn new(engine: Box<Engine>, scheduler: Scheduler<Task>, cfg: &Config, r: R) -> Host<R> {
Host {
engine: engine,
sched: scheduler,
reqs: HashMap::default(),
last_req_id: 0,
max_running_task_count: cfg.end_point_max_tasks,
pool: ThreadPoolBuilder::with_default_factory(thd_name!("endpoint-normal-pool"))
.thread_count(cfg.end_point_concurrency)
pool: ThreadPoolBuilder::new(
thd_name!("endpoint-normal-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.build(),
low_priority_pool: ThreadPoolBuilder::with_default_factory(
low_priority_pool: ThreadPoolBuilder::new(
thd_name!("endpoint-low-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.build(),
high_priority_pool: ThreadPoolBuilder::with_default_factory(
high_priority_pool: ThreadPoolBuilder::new(
thd_name!("endpoint-high-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.build(),
}
Expand Down Expand Up @@ -170,9 +217,11 @@ impl Host {
CommandPri::High => &mut self.high_priority_pool,
CommandPri::Normal => &mut self.pool,
};
pool.execute(move |ctx: &mut CopContext| {
pool.execute(move |ctx: &mut CopContext<R>| {
let region_id = req.req.get_context().get_region_id();
let stats = end_point.handle_request(req);
ctx.add_statistics(type_str, &stats);
ctx.add_statistics_by_region(region_id, &stats);
COPR_PENDING_REQS
.with_label_values(&[type_str, pri_str])
.dec();
Expand Down Expand Up @@ -367,7 +416,7 @@ impl Display for RequestTask {
}
}

impl BatchRunnable<Task> for Host {
impl<R: CopSender + 'static> BatchRunnable<Task> for Host<R> {
// TODO: limit pending reqs
#[allow(for_kv_map)]
fn run_batch(&mut self, tasks: &mut Vec<Task>) {
Expand Down Expand Up @@ -661,17 +710,29 @@ pub fn get_req_pri_str(pri: CommandPri) -> &'static str {

#[cfg(test)]
mod tests {
use super::*;
use storage::engine::{self, TEMP_DIR};
use std::sync::*;
use std::thread;
use std::time::Duration;

use kvproto::coprocessor::Request;

use storage::engine::{self, TEMP_DIR};
use util::worker::Worker;
use util::time::Instant;

use super::*;
#[derive(Clone)]
struct MockCopSender {}
impl MockCopSender {
fn new() -> MockCopSender {
MockCopSender {}
}
}
impl CopSender for MockCopSender {
fn send(&self, _stats: CopRequestStatistics) -> Result<()> {
Ok(())
}
}

#[test]
fn test_get_reg_scan_tag() {
Expand All @@ -692,7 +753,7 @@ mod tests {
let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap();
let mut cfg = Config::default();
cfg.end_point_concurrency = 1;
let end_point = Host::new(engine, worker.scheduler(), &cfg);
let end_point = Host::new(engine, worker.scheduler(), &cfg, MockCopSender::new());
worker.start_batch(end_point, 30).unwrap();
let (tx, rx) = mpsc::channel();
let mut task = RequestTask::new(Request::new(), box move |msg| { tx.send(msg).unwrap(); });
Expand All @@ -709,7 +770,7 @@ mod tests {
let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap();
let mut cfg = Config::default();
cfg.end_point_concurrency = 1;
let mut end_point = Host::new(engine, worker.scheduler(), &cfg);
let mut end_point = Host::new(engine, worker.scheduler(), &cfg, MockCopSender::new());
end_point.max_running_task_count = 3;
worker.start_batch(end_point, 30).unwrap();
let (tx, rx) = mpsc::channel();
Expand Down
5 changes: 3 additions & 2 deletions src/coprocessor/mod.rs
Expand Up @@ -85,5 +85,6 @@ impl From<txn::Error> for Error {
}
}

pub use self::endpoint::{Host as EndPointHost, RequestTask, Task as EndPointTask, REQ_TYPE_DAG,
REQ_TYPE_INDEX, REQ_TYPE_SELECT, SINGLE_GROUP};
pub use self::endpoint::{CopRequestStatistics, CopSender, Host as EndPointHost, RequestTask,
Task as EndPointTask, REQ_TYPE_DAG, REQ_TYPE_INDEX, REQ_TYPE_SELECT,
SINGLE_GROUP};
2 changes: 2 additions & 0 deletions src/pd/client.rs
Expand Up @@ -238,6 +238,8 @@ impl PdClient for RpcClient {
req.set_pending_peers(RepeatedField::from_vec(region_stat.pending_peers));
req.set_bytes_written(region_stat.written_bytes);
req.set_keys_written(region_stat.written_keys);
req.set_bytes_read(region_stat.read_bytes);
req.set_keys_read(region_stat.read_bytes);
req.set_approximate_size(region_stat.approximate_size);

let executor = |client: &RwLock<Inner>, req: pdpb::RegionHeartbeatRequest| {
Expand Down
6 changes: 6 additions & 0 deletions src/pd/mod.rs
Expand Up @@ -34,6 +34,8 @@ pub struct RegionStat {
pub pending_peers: Vec<metapb::Peer>,
pub written_bytes: u64,
pub written_keys: u64,
pub read_bytes: u64,
pub read_keys: u64,
pub approximate_size: u64,
}

Expand All @@ -43,13 +45,17 @@ impl RegionStat {
pending_peers: Vec<metapb::Peer>,
written_bytes: u64,
written_keys: u64,
read_bytes: u64,
read_keys: u64,
approximate_size: u64,
) -> RegionStat {
RegionStat {
down_peers: down_peers,
pending_peers: pending_peers,
written_bytes: written_bytes,
written_keys: written_keys,
read_bytes: read_bytes,
read_keys: read_keys,
approximate_size: approximate_size,
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/raftstore/store/metrics.rs
Expand Up @@ -136,6 +136,20 @@ lazy_static! {
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();

pub static ref REGION_READ_KEYS_HISTOGRAM: Histogram =
register_histogram!(
"tikv_region_read_keys",
"Histogram of keys written for regions",
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();

pub static ref REGION_READ_BYTES_HISTOGRAM: Histogram =
register_histogram!(
"tikv_region_read_bytes",
"Histogram of bytes written for regions",
exponential_buckets(256.0, 2.0, 20).unwrap()
).unwrap();

pub static ref REQUEST_WAIT_TIME_HISTOGRAM: Histogram =
register_histogram!(
"tikv_raftstore_request_wait_time_duration_secs",
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/mod.rs
Expand Up @@ -29,7 +29,7 @@ mod worker;
mod metrics;
mod local_metrics;

pub use self::msg::{BatchCallback, Callback, Msg, SnapshotStatusMsg, Tick};
pub use self::msg::{BatchCallback, Callback, CopFlowStatistics, Msg, SnapshotStatusMsg, Tick};
pub use self::store::{create_event_loop, Engines, Store, StoreChannel};
pub use self::config::Config;
pub use self::transport::Transport;
Expand Down
7 changes: 6 additions & 1 deletion src/raftstore/store/msg.rs
Expand Up @@ -19,11 +19,14 @@ use kvproto::raft_serverpb::RaftMessage;
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse};
use kvproto::metapb::RegionEpoch;
use raft::SnapshotStatus;
use util::collections::HashMap;
use storage::FlowStatistics;

use util::escape;

pub type Callback = Box<FnBox(RaftCmdResponse) + Send>;
pub type BatchCallback = Box<FnBox(Vec<Option<RaftCmdResponse>>) + Send>;
pub type CopFlowStatistics = HashMap<u64, FlowStatistics>;

#[derive(Debug, Clone, Copy)]
pub enum Tick {
Expand All @@ -36,7 +39,6 @@ pub enum Tick {
SnapGc,
CompactLockCf,
ConsistencyCheck,
ReportRegionFlow,
}

pub struct SnapshotStatusMsg {
Expand Down Expand Up @@ -84,6 +86,8 @@ pub enum Msg {
// For snapshot stats.
SnapshotStats,

CoprocessorStats { request_stats: CopFlowStatistics },

// For consistency check
ComputeHashResult {
region_id: u64,
Expand All @@ -110,6 +114,7 @@ impl fmt::Debug for Msg {
region_id
),
Msg::SnapshotStats => write!(fmt, "Snapshot stats"),
Msg::CoprocessorStats { .. } => write!(fmt, "Coperocessor stats"),
Msg::ComputeHashResult {
region_id,
index,
Expand Down
21 changes: 17 additions & 4 deletions src/raftstore/store/peer.rs
Expand Up @@ -44,7 +44,7 @@ use util::collections::{FlatMap, FlatMapValues as Values, HashSet};

use pd::INVALID_ID;

use super::store::Store;
use super::store::{Store, StoreStat};
use super::peer_storage::{write_peer_state, ApplySnapResult, InvokeContext, PeerStorage};
use super::util;
use super::msg::Callback;
Expand Down Expand Up @@ -195,6 +195,10 @@ pub struct PeerStat {
pub written_keys: u64,
pub last_written_bytes: u64,
pub last_written_keys: u64,
pub read_bytes: u64,
pub read_keys: u64,
pub last_read_bytes: u64,
pub last_read_keys: u64,
}

pub struct Peer {
Expand Down Expand Up @@ -872,7 +876,12 @@ impl Peer {
}
}

pub fn post_apply(&mut self, res: &ApplyRes, groups: &mut HashSet<u64>) {
pub fn post_apply(
&mut self,
res: &ApplyRes,
groups: &mut HashSet<u64>,
store_stat: &mut StoreStat,
) {
if self.is_applying_snapshot() {
panic!("{} should not applying snapshot.", self.tag);
}
Expand All @@ -888,6 +897,8 @@ impl Peer {
self.mut_store().applied_index_term = res.applied_index_term;
self.peer_stat.written_keys += res.metrics.written_keys;
self.peer_stat.written_bytes += res.metrics.written_bytes;
store_stat.engine_total_bytes_written += res.metrics.written_bytes;
store_stat.engine_total_keys_written += res.metrics.written_keys;

let diff = if has_split {
self.delete_keys_hint = res.metrics.delete_keys_hint;
Expand Down Expand Up @@ -1568,8 +1579,10 @@ impl Peer {
peer: self.peer.clone(),
down_peers: self.collect_down_peers(self.cfg.max_peer_down_duration.0),
pending_peers: self.collect_pending_peers(),
written_bytes: self.peer_stat.last_written_bytes,
written_keys: self.peer_stat.last_written_keys,
written_bytes: self.peer_stat.written_bytes - self.peer_stat.last_written_bytes,
written_keys: self.peer_stat.written_keys - self.peer_stat.last_written_keys,
read_bytes: self.peer_stat.read_bytes - self.peer_stat.last_read_bytes,
read_keys: self.peer_stat.read_keys - self.peer_stat.last_read_keys,
};
if let Err(e) = worker.schedule(task) {
error!("{} failed to notify pd: {}", self.tag, e);
Expand Down