Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #7085 #7273 #7541 #7437 #7776

Merged
merged 4 commits into from May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/src/server.rs
Expand Up @@ -656,6 +656,7 @@ impl TiKVServer {
let diag_service = DiagnosticsService::new(
pool,
self.config.log_file.clone(),
self.config.slow_log_file.clone(),
self.security_mgr.clone(),
);
if servers
Expand Down
5 changes: 4 additions & 1 deletion cmd/src/setup.rs
Expand Up @@ -32,7 +32,10 @@ macro_rules! fatal {
// number while rotate by size.
fn rename_by_timestamp(path: &Path) -> io::Result<PathBuf> {
let mut new_path = path.to_path_buf().into_os_string();
new_path.push(format!(".{}", Local::now().format("%Y-%m-%d-%H:%M:%S%.f")));
new_path.push(format!(
".{}",
Local::now().format(logger::DATETIME_ROTATE_SUFFIX)
));
Ok(PathBuf::from(new_path))
}

Expand Down
5 changes: 5 additions & 0 deletions components/tikv_util/src/logger/mod.rs
Expand Up @@ -19,6 +19,11 @@ use crate::config::{ReadableDuration, ReadableSize};

pub use slog::{FilterFn, Level};

// The suffix appended to the end of rotated log files by datetime log rotator
// Warning: Diagnostics service parses log files by file name format.
// Remember to update the corresponding code when suffix layout is changed.
pub const DATETIME_ROTATE_SUFFIX: &str = "%Y-%m-%d-%H:%M:%S%.f";

// Default is 128.
// Extended since blocking is set, and we don't want to block very often.
const SLOG_CHANNEL_SIZE: usize = 10240;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -26,6 +26,7 @@
#![feature(box_patterns)]
#![feature(shrink_to)]
#![feature(drain_filter)]
#![feature(str_strip)]

#[macro_use(fail_point)]
extern crate fail;
Expand Down
80 changes: 69 additions & 11 deletions src/server/service/diagnostics.rs
Expand Up @@ -6,6 +6,10 @@ use std::time::{Duration, Instant};
use futures::{Future, Sink, Stream};
use futures_cpupool::CpuPool;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, UnarySink, WriteFlags};
#[cfg(feature = "prost-codec")]
use kvproto::diagnosticspb::search_log_request::Target as SearchLogRequestTarget;
#[cfg(not(feature = "prost-codec"))]
use kvproto::diagnosticspb::SearchLogRequestTarget;
use kvproto::diagnosticspb::{
Diagnostics, SearchLogRequest, SearchLogResponse, ServerInfoRequest, ServerInfoResponse,
ServerInfoType,
Expand All @@ -21,14 +25,21 @@ use crate::server::{Error, Result};
pub struct Service {
pool: CpuPool,
log_file: String,
slow_log_file: String,
security_mgr: Arc<SecurityManager>,
}

impl Service {
pub fn new(pool: CpuPool, log_file: String, security_mgr: Arc<SecurityManager>) -> Self {
pub fn new(
pool: CpuPool,
log_file: String,
slow_log_file: String,
security_mgr: Arc<SecurityManager>,
) -> Self {
Service {
pool,
log_file,
slow_log_file,
security_mgr,
}
}
Expand All @@ -44,7 +55,11 @@ impl Diagnostics for Service {
if !check_common_name(self.security_mgr.cert_allowed_cn(), &ctx) {
return;
}
let log_file = self.log_file.to_owned();
let log_file = if req.get_target() == SearchLogRequestTarget::Normal {
self.log_file.to_owned()
} else {
self.slow_log_file.to_owned()
};
let stream = self
.pool
.spawn_fn(move || log::search(log_file, req))
Expand Down Expand Up @@ -800,15 +815,18 @@ mod log {
use futures::stream::{iter_ok, Stream};
use itertools::Itertools;
use kvproto::diagnosticspb::{LogLevel, LogMessage, SearchLogRequest, SearchLogResponse};
use lazy_static::lazy_static;
use nom::bytes::complete::{tag, take};
use nom::character::complete::{alpha1, space0, space1};
use nom::sequence::tuple;
use nom::*;
use regex::Regex;
use rev_lines;

const INVALID_TIMESTAMP: i64 = -1;
const TIMESTAMP_LENGTH: usize = 30;

#[derive(Default)]
struct LogIterator {
search_files: Vec<(i64, File)>,
currrent_lines: Option<std::io::Lines<BufReader<File>>>,
Expand Down Expand Up @@ -887,7 +905,7 @@ mod log {
None => continue,
};
// Rotated file name have the same prefix with the original
if !file_name.starts_with(log_name) {
if !is_log_file(file_name, log_name) {
continue;
}
// Open the file
Expand Down Expand Up @@ -992,6 +1010,26 @@ mod log {
}
}

lazy_static! {
static ref NUM_REGEX: Regex = Regex::new(r"^\d{4}").unwrap();
}

// Returns true if target 'filename' is part of given 'log_file'
fn is_log_file(filename: &str, log_file: &str) -> bool {
// for not rotated nomral file
if filename == log_file {
return true;
}

// for rotated *.<rotated-datetime> file
if let Some(res) = filename.strip_prefix((log_file.to_owned() + ".").as_str()) {
if NUM_REGEX.is_match(res) {
return true;
}
}
false
}

fn parse_time(input: &str) -> IResult<&str, &str> {
let (input, (_, _, time, _)) =
tuple((space0, tag("["), take(TIMESTAMP_LENGTH), tag("]")))(input)?;
Expand Down Expand Up @@ -1071,6 +1109,9 @@ mod log {
log_file: P,
mut req: SearchLogRequest,
) -> Result<impl Stream<Item = SearchLogResponse, Error = ()>, Error> {
if !log_file.as_ref().exists() {
return Ok(bacth_log_item(LogIterator::default()));
}
let begin_time = req.get_start_time();
let end_time = req.get_end_time();
let levels = req.take_levels();
Expand Down Expand Up @@ -1273,7 +1314,7 @@ mod log {
)
.unwrap();

let log_file2 = dir.path().join("tikv.log.2");
let log_file2 = dir.path().join("tikv.log.2019-08-23-18:10:00.387000");
let mut file = File::create(&log_file2).unwrap();
write!(
file,
Expand Down Expand Up @@ -1445,7 +1486,21 @@ mod log {
[2019/08/23 18:10:03.387 +08:00] [DEBUG] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:10:04.387 +08:00] [ERROR] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:10:05.387 +08:00] [CRITICAL] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:10:06.387 +08:00] [WARN] [foo.rs:100] [some message] [key=val]"#
[2019/08/23 18:10:06.387 +08:00] [WARN] [foo.rs:100] [some message] [key=val] - test-filter"#
)
.unwrap();

let log_file3 = dir.path().join("tikv.log.2019-08-23-18:11:02.123456789");
let mut file = File::create(&log_file3).unwrap();
write!(
file,
r#"[2019/08/23 18:11:53.387 +08:00] [INFO] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:54.387 +08:00] [trace] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:55.387 +08:00] [DEBUG] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:56.387 +08:00] [ERROR] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:57.387 +08:00] [CRITICAL] [foo.rs:100] [some message] [key=val]
[2019/08/23 18:11:58.387 +08:00] [WARN] [foo.rs:100] [some message] [key=val] - test-filter
[2019/08/23 18:11:59.387 +08:00] [warning] [foo.rs:100] [some message] [key=val]"#
)
.unwrap();

Expand All @@ -1454,11 +1509,15 @@ mod log {
req.set_end_time(std::i64::MAX);
req.set_levels(vec![LogLevel::Warn.into()].into());
req.set_patterns(vec![".*test-filter.*".to_string()].into());
let expected = vec!["2019/08/23 18:09:58.387 +08:00"]
.iter()
.map(|s| timestamp(s))
.collect::<Vec<i64>>();
let expected = vec![
"2019/08/23 18:09:58.387 +08:00",
"2019/08/23 18:11:58.387 +08:00",
]
.iter()
.map(|s| timestamp(s))
.collect::<Vec<i64>>();
assert_eq!(
expected,
search(log_file, req)
.unwrap()
.wait()
Expand All @@ -1469,8 +1528,7 @@ mod log {
.into_iter()
.flatten()
.map(|msg| msg.get_time())
.collect::<Vec<i64>>(),
expected
.collect::<Vec<i64>>()
);
}
}
Expand Down