Skip to content

Commit

Permalink
Cherry-pick #7085 #7273 #7541 #7437 (#7776)
Browse files Browse the repository at this point in the history
Signed-off-by: Fullstop000 <fullstop1005@gmail.com>
  • Loading branch information
Fullstop000 committed May 11, 2020
1 parent 8a4f34f commit 911bb62
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 12 deletions.
1 change: 1 addition & 0 deletions cmd/src/server.rs
Expand Up @@ -656,6 +656,7 @@ impl TiKVServer {
let diag_service = DiagnosticsService::new(
pool,
self.config.log_file.clone(),
self.config.slow_log_file.clone(),
self.security_mgr.clone(),
);
if servers
Expand Down
5 changes: 4 additions & 1 deletion cmd/src/setup.rs
Expand Up @@ -32,7 +32,10 @@ macro_rules! fatal {
// number while rotate by size.
fn rename_by_timestamp(path: &Path) -> io::Result<PathBuf> {
let mut new_path = path.to_path_buf().into_os_string();
new_path.push(format!(".{}", Local::now().format("%Y-%m-%d-%H:%M:%S%.f")));
new_path.push(format!(
".{}",
Local::now().format(logger::DATETIME_ROTATE_SUFFIX)
));
Ok(PathBuf::from(new_path))
}

Expand Down
5 changes: 5 additions & 0 deletions components/tikv_util/src/logger/mod.rs
Expand Up @@ -19,6 +19,11 @@ use crate::config::{ReadableDuration, ReadableSize};

pub use slog::{FilterFn, Level};

// The suffix appended to the end of rotated log files by datetime log rotator
// Warning: Diagnostics service parses log files by file name format.
// Remember to update the corresponding code when suffix layout is changed.
pub const DATETIME_ROTATE_SUFFIX: &str = "%Y-%m-%d-%H:%M:%S%.f";

// Default is 128.
// Extended since blocking is set, and we don't want to block very often.
const SLOG_CHANNEL_SIZE: usize = 10240;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -26,6 +26,7 @@
#![feature(box_patterns)]
#![feature(shrink_to)]
#![feature(drain_filter)]
#![feature(str_strip)]

#[macro_use(fail_point)]
extern crate fail;
Expand Down
80 changes: 69 additions & 11 deletions src/server/service/diagnostics.rs
Expand Up @@ -6,6 +6,10 @@ use std::time::{Duration, Instant};
use futures::{Future, Sink, Stream};
use futures_cpupool::CpuPool;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, UnarySink, WriteFlags};
#[cfg(feature = "prost-codec")]
use kvproto::diagnosticspb::search_log_request::Target as SearchLogRequestTarget;
#[cfg(not(feature = "prost-codec"))]
use kvproto::diagnosticspb::SearchLogRequestTarget;
use kvproto::diagnosticspb::{
Diagnostics, SearchLogRequest, SearchLogResponse, ServerInfoRequest, ServerInfoResponse,
ServerInfoType,
Expand All @@ -21,14 +25,21 @@ use crate::server::{Error, Result};
pub struct Service {
pool: CpuPool,
log_file: String,
slow_log_file: String,
security_mgr: Arc<SecurityManager>,
}

impl Service {
pub fn new(pool: CpuPool, log_file: String, security_mgr: Arc<SecurityManager>) -> Self {
pub fn new(
pool: CpuPool,
log_file: String,
slow_log_file: String,
security_mgr: Arc<SecurityManager>,
) -> Self {
Service {
pool,
log_file,
slow_log_file,
security_mgr,
}
}
Expand All @@ -44,7 +55,11 @@ impl Diagnostics for Service {
if !check_common_name(self.security_mgr.cert_allowed_cn(), &ctx) {
return;
}
let log_file = self.log_file.to_owned();
let log_file = if req.get_target() == SearchLogRequestTarget::Normal {
self.log_file.to_owned()
} else {
self.slow_log_file.to_owned()
};
let stream = self
.pool
.spawn_fn(move || log::search(log_file, req))
Expand Down Expand Up @@ -800,15 +815,18 @@ mod log {
use futures::stream::{iter_ok, Stream};
use itertools::Itertools;
use kvproto::diagnosticspb::{LogLevel, LogMessage, SearchLogRequest, SearchLogResponse};
use lazy_static::lazy_static;
use nom::bytes::complete::{tag, take};
use nom::character::complete::{alpha1, space0, space1};
use nom::sequence::tuple;
use nom::*;
use regex::Regex;
use rev_lines;

const INVALID_TIMESTAMP: i64 = -1;
const TIMESTAMP_LENGTH: usize = 30;

#[derive(Default)]
struct LogIterator {
search_files: Vec<(i64, File)>,
currrent_lines: Option<std::io::Lines<BufReader<File>>>,
Expand Down Expand Up @@ -887,7 +905,7 @@ mod log {
None => continue,
};
// Rotated file name have the same prefix with the original
if !file_name.starts_with(log_name) {
if !is_log_file(file_name, log_name) {
continue;
}
// Open the file
Expand Down Expand Up @@ -992,6 +1010,26 @@ mod log {
}
}

lazy_static! {
static ref NUM_REGEX: Regex = Regex::new(r"^\d{4}").unwrap();
}

// Returns true if target 'filename' is part of given 'log_file'
fn is_log_file(filename: &str, log_file: &str) -> bool {
// for not rotated nomral file
if filename == log_file {
return true;
}

// for rotated *.<rotated-datetime> file
if let Some(res) = filename.strip_prefix((log_file.to_owned() + ".").as_str()) {
if NUM_REGEX.is_match(res) {
return true;
}
}
false
}

fn parse_time(input: &str) -> IResult<&str, &str> {
let (input, (_, _, time, _)) =
tuple((space0, tag("["), take(TIMESTAMP_LENGTH), tag("]")))(input)?;
Expand Down Expand Up @@ -1071,6 +1109,9 @@ mod log {
log_file: P,
mut req: SearchLogRequest,
) -> Result<impl Stream<Item = SearchLogResponse, Error = ()>, Error> {
if !log_file.as_ref().exists() {
return Ok(bacth_log_item(LogIterator::default()));
}
let begin_time = req.get_start_time();
let end_time = req.get_end_time();
let levels = req.take_levels();
Expand Down Expand Up @@ -1273,7 +1314,7 @@ mod log {
)
.unwrap();

let log_file2 = dir.path().join("tikv.log.2");
let log_file2 = dir.path().join("tikv.log.2019-08-23-18:10:00.387000");
let mut file = File::create(&log_file2).unwrap();
write!(
file,
Expand Down Expand Up @@ -1445,7 +1486,21 @@ mod log {
[2019/08/23 18:10:03.387 +08:00] [DEBUG] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:10:04.387 +08:00] [ERROR] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:10:05.387 +08:00] [CRITICAL] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:10:06.387 +08:00] [WARN] [foo.rs:100] [some message] [key=val]"#
[2019/08/23 18:10:06.387 +08:00] [WARN] [foo.rs:100] [some message] [key=val] - test-filter"#
)
.unwrap();

let log_file3 = dir.path().join("tikv.log.2019-08-23-18:11:02.123456789");
let mut file = File::create(&log_file3).unwrap();
write!(
file,
r#"[2019/08/23 18:11:53.387 +08:00] [INFO] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:54.387 +08:00] [trace] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:55.387 +08:00] [DEBUG] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:56.387 +08:00] [ERROR] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:57.387 +08:00] [CRITICAL] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:58.387 +08:00] [WARN] [foo.rs:100] [some message] [key=val] - test-filter
[2019/08/23 18:11:59.387 +08:00] [warning] [foo.rs:100] [some message] [key=val]"#
)
.unwrap();

Expand All @@ -1454,11 +1509,15 @@ mod log {
req.set_end_time(std::i64::MAX);
req.set_levels(vec![LogLevel::Warn.into()].into());
req.set_patterns(vec![".*test-filter.*".to_string()].into());
let expected = vec!["2019/08/23 18:09:58.387 +08:00"]
.iter()
.map(|s| timestamp(s))
.collect::<Vec<i64>>();
let expected = vec![
"2019/08/23 18:09:58.387 +08:00",
"2019/08/23 18:11:58.387 +08:00",
]
.iter()
.map(|s| timestamp(s))
.collect::<Vec<i64>>();
assert_eq!(
expected,
search(log_file, req)
.unwrap()
.wait()
Expand All @@ -1469,8 +1528,7 @@ mod log {
.into_iter()
.flatten()
.map(|msg| msg.get_time())
.collect::<Vec<i64>>(),
expected
.collect::<Vec<i64>>()
);
}
}
Expand Down

0 comments on commit 911bb62

Please sign in to comment.