Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: report read statistics to pd #2307

Merged
merged 26 commits into from Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 79 additions & 21 deletions src/coprocessor/endpoint.rs
Expand Up @@ -27,9 +27,9 @@ use kvproto::kvrpcpb::{CommandPri, IsolationLevel};
use util::time::{duration_to_sec, Instant};
use util::worker::{BatchRunnable, Scheduler};
use util::collections::HashMap;
use util::threadpool::{Context, ThreadPool, ThreadPoolBuilder};
use util::threadpool::{Context, ContextFactory, ThreadPool, ThreadPoolBuilder};
use server::{Config, OnResponse};
use storage::{self, engine, Engine, Snapshot, Statistics, StatisticsSummary};
use storage::{self, engine, Engine, FlowStatistics, Snapshot, Statistics, StatisticsSummary};
use storage::engine::Error as EngineError;

use super::codec::mysql;
Expand Down Expand Up @@ -59,24 +59,50 @@ const OUTDATED_ERROR_MSG: &'static str = "request outdated.";

const ENDPOINT_IS_BUSY: &'static str = "endpoint is busy";

pub struct Host {
pub struct Host<R: CopSender + 'static> {
engine: Box<Engine>,
sched: Scheduler<Task>,
reqs: HashMap<u64, Vec<RequestTask>>,
last_req_id: u64,
pool: ThreadPool<CopContext>,
low_priority_pool: ThreadPool<CopContext>,
high_priority_pool: ThreadPool<CopContext>,
pool: ThreadPool<CopContext<R>>,
low_priority_pool: ThreadPool<CopContext<R>>,
high_priority_pool: ThreadPool<CopContext<R>>,
max_running_task_count: usize,
}

pub type CopRequestStatistics = HashMap<u64, FlowStatistics>;

pub trait CopSender: Send + Clone {
fn send(&self, CopRequestStatistics) -> Result<()>;
}

struct CopContextFactory<R: CopSender + 'static> {
sender: R,
}

impl<R> ContextFactory<CopContext<R>> for CopContextFactory<R>
where
R: CopSender + 'static,
{
fn create(&self) -> CopContext<R> {
CopContext {
sender: self.sender.clone(),
select_stats: Default::default(),
index_stats: Default::default(),
request_stats: HashMap::default(),
}
}
}

#[derive(Default)]
struct CopContext {
struct CopContext<R: CopSender + 'static> {
select_stats: StatisticsSummary,
index_stats: StatisticsSummary,
request_stats: CopRequestStatistics,
sender: R,
}

impl CopContext {
impl<R: CopSender + 'static> CopContext<R> {
fn add_statistics(&mut self, type_str: &str, stats: &Statistics) {
self.get_statistics(type_str).add_statistics(stats);
}
Expand All @@ -91,9 +117,16 @@ impl CopContext {
}
}
}

fn add_statistics_by_request(&mut self, id: u64, stats: &Statistics) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id == region_id ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/add_statistics_by_request/add_statistics_by_region

let empty_stat = FlowStatistics::default();
let flow_stats = self.request_stats.entry(id).or_insert(empty_stat);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/empty_stat/FlowStatistics::default()

flow_stats.add(&stats.write.flow_stats);
flow_stats.add(&stats.data.flow_stats);
}
}

impl Context for CopContext {
impl<R: CopSender + 'static> Context for CopContext<R> {
fn on_tick(&mut self) {
for type_str in &[STR_REQ_TYPE_SELECT, STR_REQ_TYPE_INDEX] {
let this_statistics = self.get_statistics(type_str);
Expand All @@ -110,26 +143,37 @@ impl Context for CopContext {
}
*this_statistics = Default::default();
}
if !self.request_stats.is_empty() {
if let Err(e) = self.sender.send(self.request_stats.clone()) {
error!("send coprocessor statistics: {:?}", e);
};
self.request_stats = HashMap::default();
}

}
}

impl Host {
pub fn new(engine: Box<Engine>, scheduler: Scheduler<Task>, cfg: &Config) -> Host {
impl<R: CopSender + 'static> Host<R> {
pub fn new(engine: Box<Engine>, scheduler: Scheduler<Task>, cfg: &Config, r: R) -> Host<R> {
Host {
engine: engine,
sched: scheduler,
reqs: HashMap::default(),
last_req_id: 0,
max_running_task_count: cfg.end_point_max_tasks,
pool: ThreadPoolBuilder::with_default_factory(thd_name!("endpoint-normal-pool"))
.thread_count(cfg.end_point_concurrency)
pool: ThreadPoolBuilder::new(
thd_name!("endpoint-normal-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.build(),
low_priority_pool: ThreadPoolBuilder::with_default_factory(
low_priority_pool: ThreadPoolBuilder::new(
thd_name!("endpoint-low-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.build(),
high_priority_pool: ThreadPoolBuilder::with_default_factory(
high_priority_pool: ThreadPoolBuilder::new(
thd_name!("endpoint-high-pool"),
CopContextFactory { sender: r.clone() },
).thread_count(cfg.end_point_concurrency)
.build(),
}
Expand Down Expand Up @@ -170,9 +214,11 @@ impl Host {
CommandPri::High => &mut self.high_priority_pool,
CommandPri::Normal => &mut self.pool,
};
pool.execute(move |ctx: &mut CopContext| {
pool.execute(move |ctx: &mut CopContext<R>| {
let region_id = req.req.get_context().get_region_id();
let stats = end_point.handle_request(req);
ctx.add_statistics(type_str, &stats);
ctx.add_statistics_by_request(region_id, &stats);
COPR_PENDING_REQS
.with_label_values(&[type_str, pri_str])
.dec();
Expand Down Expand Up @@ -367,7 +413,7 @@ impl Display for RequestTask {
}
}

impl BatchRunnable<Task> for Host {
impl<R: CopSender + 'static> BatchRunnable<Task> for Host<R> {
// TODO: limit pending reqs
#[allow(for_kv_map)]
fn run_batch(&mut self, tasks: &mut Vec<Task>) {
Expand Down Expand Up @@ -655,17 +701,29 @@ pub fn get_req_pri_str(pri: CommandPri) -> &'static str {

#[cfg(test)]
mod tests {
use super::*;
use storage::engine::{self, TEMP_DIR};
use std::sync::*;
use std::thread;
use std::time::Duration;

use kvproto::coprocessor::Request;

use storage::engine::{self, TEMP_DIR};
use util::worker::Worker;
use util::time::Instant;

use super::*;
#[derive(Clone)]
struct MockCopSender {}
impl MockCopSender {
fn new() -> MockCopSender {
MockCopSender {}
}
}
impl CopSender for MockCopSender {
fn send(&self, _stats: CopRequestStatistics) -> Result<()> {
Ok(())
}
}

#[test]
fn test_get_reg_scan_tag() {
Expand All @@ -686,7 +744,7 @@ mod tests {
let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap();
let mut cfg = Config::default();
cfg.end_point_concurrency = 1;
let end_point = Host::new(engine, worker.scheduler(), &cfg);
let end_point = Host::new(engine, worker.scheduler(), &cfg, MockCopSender::new());
worker.start_batch(end_point, 30).unwrap();
let (tx, rx) = mpsc::channel();
let mut task = RequestTask::new(Request::new(), box move |msg| { tx.send(msg).unwrap(); });
Expand All @@ -703,7 +761,7 @@ mod tests {
let engine = engine::new_local_engine(TEMP_DIR, &[]).unwrap();
let mut cfg = Config::default();
cfg.end_point_concurrency = 1;
let mut end_point = Host::new(engine, worker.scheduler(), &cfg);
let mut end_point = Host::new(engine, worker.scheduler(), &cfg, MockCopSender::new());
end_point.max_running_task_count = 3;
worker.start_batch(end_point, 30).unwrap();
let (tx, rx) = mpsc::channel();
Expand Down
5 changes: 3 additions & 2 deletions src/coprocessor/mod.rs
Expand Up @@ -85,5 +85,6 @@ impl From<txn::Error> for Error {
}
}

pub use self::endpoint::{Host as EndPointHost, RequestTask, Task as EndPointTask, REQ_TYPE_DAG,
REQ_TYPE_INDEX, REQ_TYPE_SELECT, SINGLE_GROUP};
pub use self::endpoint::{CopRequestStatistics, CopSender, Host as EndPointHost, RequestTask,
Task as EndPointTask, REQ_TYPE_DAG, REQ_TYPE_INDEX, REQ_TYPE_SELECT,
SINGLE_GROUP};
2 changes: 2 additions & 0 deletions src/pd/client.rs
Expand Up @@ -238,6 +238,8 @@ impl PdClient for RpcClient {
req.set_pending_peers(RepeatedField::from_vec(region_stat.pending_peers));
req.set_bytes_written(region_stat.written_bytes);
req.set_keys_written(region_stat.written_keys);
req.set_bytes_read(region_stat.read_bytes);
req.set_keys_read(region_stat.read_bytes);
req.set_approximate_size(region_stat.approximate_size);

let executor = |client: &RwLock<Inner>, req: pdpb::RegionHeartbeatRequest| {
Expand Down
6 changes: 6 additions & 0 deletions src/pd/mod.rs
Expand Up @@ -34,6 +34,8 @@ pub struct RegionStat {
pub pending_peers: Vec<metapb::Peer>,
pub written_bytes: u64,
pub written_keys: u64,
pub read_bytes: u64,
pub read_keys: u64,
pub approximate_size: u64,
}

Expand All @@ -43,13 +45,17 @@ impl RegionStat {
pending_peers: Vec<metapb::Peer>,
written_bytes: u64,
written_keys: u64,
read_bytes: u64,
read_keys: u64,
approximate_size: u64,
) -> RegionStat {
RegionStat {
down_peers: down_peers,
pending_peers: pending_peers,
written_bytes: written_bytes,
written_keys: written_keys,
read_bytes: read_bytes,
read_keys: read_keys,
approximate_size: approximate_size,
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/raftstore/store/metrics.rs
Expand Up @@ -129,6 +129,20 @@ lazy_static! {
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();

pub static ref REGION_READ_KEYS_HISTOGRAM: Histogram =
register_histogram!(
"tikv_region_read_keys",
"Histogram of keys written for regions",
exponential_buckets(1.0, 2.0, 20).unwrap()
).unwrap();

pub static ref REGION_READ_BYTES_HISTOGRAM: Histogram =
register_histogram!(
"tikv_region_read_bytes",
"Histogram of bytes written for regions",
exponential_buckets(256.0, 2.0, 20).unwrap()
).unwrap();

pub static ref REQUEST_WAIT_TIME_HISTOGRAM: Histogram =
register_histogram!(
"tikv_raftstore_request_wait_time_duration_secs",
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/mod.rs
Expand Up @@ -29,7 +29,7 @@ mod worker;
mod metrics;
mod local_metrics;

pub use self::msg::{BatchCallback, Callback, Msg, SnapshotStatusMsg, Tick};
pub use self::msg::{BatchCallback, Callback, CopFlowStatistics, Msg, SnapshotStatusMsg, Tick};
pub use self::store::{create_event_loop, Engines, Store, StoreChannel};
pub use self::config::Config;
pub use self::transport::Transport;
Expand Down
7 changes: 6 additions & 1 deletion src/raftstore/store/msg.rs
Expand Up @@ -19,11 +19,14 @@ use kvproto::raft_serverpb::RaftMessage;
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse};
use kvproto::metapb::RegionEpoch;
use raft::SnapshotStatus;
use util::collections::HashMap;
use storage::FlowStatistics;

use util::escape;

pub type Callback = Box<FnBox(RaftCmdResponse) + Send>;
pub type BatchCallback = Box<FnBox(Vec<Option<RaftCmdResponse>>) + Send>;
pub type CopFlowStatistics = HashMap<u64, FlowStatistics>;

#[derive(Debug, Clone, Copy)]
pub enum Tick {
Expand All @@ -36,7 +39,6 @@ pub enum Tick {
SnapGc,
CompactLockCf,
ConsistencyCheck,
ReportRegionFlow,
}

pub struct SnapshotStatusMsg {
Expand Down Expand Up @@ -84,6 +86,8 @@ pub enum Msg {
// For snapshot stats.
SnapshotStats,

CoprocessorStats { request_stats: CopFlowStatistics },

// For consistency check
ComputeHashResult {
region_id: u64,
Expand All @@ -110,6 +114,7 @@ impl fmt::Debug for Msg {
region_id
),
Msg::SnapshotStats => write!(fmt, "Snapshot stats"),
Msg::CoprocessorStats { .. } => write!(fmt, "Coperocessor stats"),
Msg::ComputeHashResult {
region_id,
index,
Expand Down
6 changes: 6 additions & 0 deletions src/raftstore/store/peer.rs
Expand Up @@ -193,6 +193,10 @@ pub struct PeerStat {
pub written_keys: u64,
pub last_written_bytes: u64,
pub last_written_keys: u64,
pub read_bytes: u64,
pub read_keys: u64,
pub last_read_bytes: u64,
pub last_read_keys: u64,
}

pub struct Peer {
Expand Down Expand Up @@ -1567,6 +1571,8 @@ impl Peer {
pending_peers: self.collect_pending_peers(),
written_bytes: self.peer_stat.last_written_bytes,
written_keys: self.peer_stat.last_written_keys,
read_bytes: self.peer_stat.last_read_bytes,
read_keys: self.peer_stat.last_read_keys,
};
if let Err(e) = worker.schedule(task) {
error!("{} failed to notify pd: {}", self.tag, e);
Expand Down