Skip to content

Commit

Permalink
server/status_server: Add dump_prof and dump_prof_to_resp functions
Browse files Browse the repository at this point in the history
1. Modify entry for profile from `/jeprof` to `/pprof/profile`

2. Abstract the dumping process to two functions. Maybe `dump_prof`
function can be added to `tikv_alloc` crates ( but a global timer is
needed )

Signed-off-by: Yang Keao <keao.yang@yahoo.com>
  • Loading branch information
YangKeao committed May 6, 2019
1 parent e56d0c1 commit bd69b21
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 75 deletions.
7 changes: 7 additions & 0 deletions components/tikv_alloc/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#[derive(Debug)]
pub enum ProfError {
MemProfilingNotEnabled,
IOError(std::io::Error),
JemallocNotEnabled,
JemallocError(i32),
}

pub type ProfResult<T> = std::result::Result<T, ProfError>;

impl From<std::io::Error> for ProfError {
fn from(e: std::io::Error) -> Self {
ProfError::IOError(e)
}
}
8 changes: 4 additions & 4 deletions components/tikv_alloc/src/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,19 @@ mod profiling {
if let Err(e) = jemallocator::mallctl_set(PROF_ACTIVE, true) {
error!("failed to activate profiling: {}", e);
return Err(ProfError::JemallocError(e));
};
}
}
return Ok(());
Ok(())
}

pub fn deactivate_prof() -> ProfResult<()> {
unsafe {
if let Err(e) = jemallocator::mallctl_set(PROF_ACTIVE, false) {
error!("failed to deactivate profiling: {}", e);
return Err(ProfError::JemallocError(e));
};
}
}
return Ok(());
Ok(())
}

/// Dump the profile to the `path`.
Expand Down
155 changes: 84 additions & 71 deletions src/server/status_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use futures::future::ok;
use futures::future::{err, ok};
use futures::sync::oneshot::{Receiver, Sender};
use futures::{self, Future};
use hyper::service::service_fn;
Expand All @@ -13,6 +13,7 @@ use std::str::FromStr;

use super::Result;
use tikv_alloc;
use tikv_alloc::error::ProfError;
use tikv_util::collections::HashMap;
use tikv_util::metrics::dump;
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
Expand Down Expand Up @@ -45,6 +46,86 @@ impl StatusServer {
}
}

pub fn dump_prof(seconds: u64) -> Box<dyn Future<Item = Vec<u8>, Error = ProfError> + Send> {
if let Err(e) = tikv_alloc::activate_prof() {
return Box::new(err(e));
}
info!("Start profiling {} seconds", seconds);

let timer = GLOBAL_TIMER_HANDLE.clone();
Box::new(
timer
.delay(std::time::Instant::now() + std::time::Duration::from_secs(seconds))
.then(|_| {
if let Err(e) = tikv_alloc::deactivate_prof() {
match e {
ProfError::JemallocError(e) => {
error!("jemalloc error {}", e); // TODO: return error through http
}
_ => error!("Unexpected error"),
}
}

let tmp_dir = TempDir::new("").unwrap();
let os_path = tmp_dir.path().join("tikv_dump_profile").into_os_string();
let path = os_path.into_string().unwrap();

tikv_alloc::dump_prof(Some(&path));
tokio_fs::file::File::open(path)
.and_then(|file| {
let buf: Vec<u8> = Vec::new();
tokio_io::io::read_to_end(file, buf)
})
.and_then(move |(_, buf)| {
drop(tmp_dir);
ok(buf)
})
.map_err(|e| -> ProfError { e.into() })
}),
)
}

pub fn dump_prof_to_resp(
req: Request<Body>,
) -> Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send> {
let url = url::Url::parse(&format!("http://host{}", req.uri().to_string())).unwrap(); // Add scheme and host to parse query
let query_pairs: HashMap<_, _> = url.query_pairs().collect();
let seconds: u64 = match query_pairs.get("seconds") {
Some(val) => match val.parse() {
Ok(val) => val,
Err(_) => {
let response = Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap();
return Box::new(ok(response));
}
},
None => {
let response = Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap();
return Box::new(ok(response));
}
};

Box::new(
Self::dump_prof(seconds)
.and_then(|buf| {
let response = Response::builder().body(buf.into()).unwrap();
ok(response)
})
.or_else(|_| {
let response = Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.unwrap();
ok(response)
}),
)
}

pub fn start(&mut self, status_addr: String) -> Result<()> {
let addr = SocketAddr::from_str(&status_addr)?;

Expand All @@ -58,76 +139,8 @@ impl StatusServer {
let response = Response::builder().body(Body::from(dump())).unwrap();
Box::new(ok(response))
}
(&Method::GET, "/jeprof") => {
let url = url::Url::parse(&format!("http://host{}", req.uri().to_string())).unwrap(); // Add scheme and host to parse query
let query_pairs: HashMap<_, _> = url.query_pairs().collect();
let seconds: u64 = match query_pairs.get("seconds") {
Some(val) => {
match val.parse() {
Ok(val) => val,
Err(_) => {
let response = Response::builder().status(StatusCode::BAD_REQUEST).body(Body::empty()).unwrap();
return Box::new(ok(response));
}
}
}
None => {
let response = Response::builder().status(StatusCode::BAD_REQUEST).body(Body::empty()).unwrap();
return Box::new(ok(response));
}
};
if let Err(e) = tikv_alloc::activate_prof() {
match e {
tikv_alloc::error::ProfError::MemProfilingNotEnabled => {
let response = Response::builder().status(StatusCode::NOT_FOUND).body(Body::from("feature mem-profiling is not enabled")).unwrap();
return Box::new(ok(response));
}
tikv_alloc::error::ProfError::JemallocError(e) => {
let response = Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("jemalloc error {}", e))).unwrap();
return Box::new(ok(response));
}
tikv_alloc::error::ProfError::JemallocNotEnabled => {
let response = Response::builder().status(StatusCode::NOT_FOUND).body(Body::from("feature jemalloc is not enabled")).unwrap();
return Box::new(ok(response));
}
}
}
info!("Start profiling {} seconds", seconds);

let timer = GLOBAL_TIMER_HANDLE.clone();
Box::new(timer.delay(std::time::Instant::now() + std::time::Duration::from_secs(seconds))
.then(|_| {
if let Err(e) = tikv_alloc::deactivate_prof() {
match e {
tikv_alloc::error::ProfError::JemallocError(e) => {
error!("jemalloc error {}", e); // TODO: return error through http
}
_ => {
unreachable!()
}
}
}

let tmp_dir = TempDir::new("").unwrap();
let os_path = tmp_dir.path().join("tikv_dump_profile").into_os_string();
let path = os_path.into_string().unwrap();

tikv_alloc::dump_prof(Some(&path));
tokio_fs::file::File::open(path)
.and_then(|file| {
let buf: Vec<u8> = Vec::new();
tokio_io::io::read_to_end(file, buf)
})
.and_then(move |(_, buf)| {
let response = Response::builder().body(buf.into()).unwrap();
drop(tmp_dir); // Drop here manually to extend life of tmp_dir.
ok(response)
})
})
.or_else(|_| {
let response = Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()).unwrap();
ok(response)
}))
(&Method::GET, "/pprof/profile") => {
Self::dump_prof_to_resp(req)
}
(&Method::GET, "/status") => {
let response = Response::builder().body(Body::empty()).unwrap();
Expand Down

0 comments on commit bd69b21

Please sign in to comment.