Skip to content

Commit

Permalink
add trace framework
Browse files Browse the repository at this point in the history
Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

trace raftstore

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

trace read

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

config for bench

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

fix test compilation

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

fix test compilation

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

set span id prefix

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

bump minitrace

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

optimize enabling multiple scopes

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

background worker

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

start scopes

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

turn off tracing

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

bench no serialization

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

bump minitrace

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

fix conflict

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

bump minitrace

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

tmp

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

pb + jaeger

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

polish

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

return on demand

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

make test compiled

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>

make test compiled

Signed-off-by: zhongzc <zhongzc_arch@outlook.com>
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

simplify

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
  • Loading branch information
zhongzc committed Jan 25, 2021
1 parent 927e36f commit 9448150
Show file tree
Hide file tree
Showing 46 changed files with 885 additions and 363 deletions.
66 changes: 57 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ url = "2"
uuid = { version = "0.8.1", features = ["serde", "v4"] }
vlog = "0.1.4"
walkdir = "2"
minitrace = { git = "https://github.com/pingcap-incubator/minitrace-rust.git", branch = "master" }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[dev-dependencies]
Expand All @@ -221,6 +220,9 @@ protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", rev = "65
[target.'cfg(target_os = "linux")'.dependencies]
procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b73b26b3c08b6e8afc3c665a56e" }

[patch.'https://github.com/pingcap/kvproto.git']
kvproto = { git = "https://github.com/zhongzc/kvproto.git", rev = "8902e22f01f5f444a5bf226a5773e49f411b676c", default-features = false }

[workspace]
resolver = "2"
members = [
Expand Down
28 changes: 28 additions & 0 deletions cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ use tikv_util::{
use tokio::runtime::Builder;

use crate::{setup::*, signal_handler};
use tikv::server::trace::{
JaegerReportRunner, JaegerSubscriber, Reporter as TraceReporter,
ReporterBuilder as TraceReporterBuilder,
};
use tikv_util::worker::LazyWorker;

/// Run a TiKV server. Returns when the server is shutdown by the user, in which
Expand Down Expand Up @@ -599,6 +603,29 @@ impl<ER: RaftEngine> TiKVServer<ER> {

let server_config = Arc::new(self.config.server.clone());

// Create trace reporter
let trace_reporter: Arc<TraceReporter> = {
let mut builder = TraceReporterBuilder::new();

let addr = &self.config.trace.jaeger_thrift_compact_agent;
if !addr.is_empty() {
let agent: SocketAddr = addr
.parse()
.unwrap_or_else(|_| fatal!("failed to parse into a socket address: {}", addr));
let jaeger = JaegerSubscriber::new(
self.background_worker
.start("jaeger-reporter", JaegerReportRunner),
agent,
);
builder.register(jaeger);
}

builder.duration_threshold(self.config.trace.duration_threshold.into());
builder.spans_max_length(self.config.trace.max_spans_length);

Arc::new(builder.build())
};

// Create server
let server = Server::new(
&server_config,
Expand All @@ -617,6 +644,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
self.env.clone(),
unified_read_pool,
debug_thread_pool,
trace_reporter,
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));

Expand Down
4 changes: 2 additions & 2 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
SignificantMsg::CaptureChange {
cmd: change_cmd,
region_epoch: request.take_region_epoch(),
callback: Callback::Read(Box::new(move |resp| {
callback: Callback::read(Box::new(move |resp| {
if let Err(e) = scheduler.schedule(Task::InitDownstream {
downstream_id,
downstream_state,
Expand Down Expand Up @@ -793,7 +793,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
let (tx, rx) = tokio::sync::oneshot::channel();
if let Err(e) = raft_router_clone.significant_send(
region_id,
SignificantMsg::LeaderCallback(Callback::Read(Box::new(move |resp| {
SignificantMsg::LeaderCallback(Callback::read(Box::new(move |resp| {
let resp = if resp.response.get_header().has_error() {
None
} else {
Expand Down
27 changes: 23 additions & 4 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sst_importer::SSTImporter;
use tikv_util::config::{Tracker, VersionTrack};
use tikv_util::mpsc::{loose_bounded, LooseBoundedSender, Receiver};
use tikv_util::time::{duration_to_sec, Instant};
use tikv_util::trace::*;
use tikv_util::worker::Scheduler;
use tikv_util::{Either, MustConsumeVec};
use time::Timespec;
Expand All @@ -63,6 +64,7 @@ use crate::store::{cmd_resp, util, Config, RegionSnapshot, RegionTask};
use crate::{Error, Result};

use super::metrics::*;
use crate::store::fsm::tracer::ApplyTracer;

const DEFAULT_APPLY_WB_SIZE: usize = 4 * 1024;
const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -301,10 +303,18 @@ where
ApplyCallback { region, cbs }
}

#[trace("ApplyCallback::invoke_all")]
fn invoke_all(self, host: &CoprocessorHost<EK>) {
for (cb, mut cmd) in self.cbs {
host.post_apply(&self.region, &mut cmd);
if let Some(cb) = cb {
cb.access_span(|span| {
ApplyTracer::partial_submit(|s| {
for spans in s {
span.mount_local_spans(spans.clone())
}
})
});
cb.invoke_with_response(cmd.response)
};
}
Expand Down Expand Up @@ -462,6 +472,7 @@ where

/// Writes all the changes into RocksDB.
/// If it returns true, all pending writes are persisted in engines.
#[trace("ApplyContext::write_to_db")]
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
if !self.kv_wb_mut().is_empty() {
Expand Down Expand Up @@ -495,6 +506,7 @@ where
// Call it before invoking callback for preventing Commit is executed before Prewrite is observed.
self.host.on_flush_apply(self.engine.clone());

ApplyTracer::truncate();
for cbs in self.cbs.drain(..) {
cbs.invoke_all(&self.host);
}
Expand Down Expand Up @@ -540,6 +552,7 @@ where

/// Flush all pending writes to engines.
/// If it returns true, all pending writes are persisted in engines.
#[trace("ApplyContext::flush")]
pub fn flush(&mut self) -> bool {
// TODO: this check is too hacky, need to be more verbose and less buggy.
let t = match self.timer.take() {
Expand Down Expand Up @@ -918,6 +931,7 @@ where
});
}

#[trace("ApplyDelegate::handle_raft_entry_normal")]
fn handle_raft_entry_normal<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -968,6 +982,7 @@ where
ApplyResult::None
}

#[trace("ApplyDelegate::handle_raft_entry_conf_change")]
fn handle_raft_entry_conf_change<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -1045,6 +1060,7 @@ where
None
}

#[trace("ApplyDelegate::process_raft_cmd")]
fn process_raft_cmd<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -3023,6 +3039,7 @@ where
}

/// Handles apply tasks, and uses the apply delegate to handle the committed entries.
#[trace("ApplyFsm::handle_apply")]
fn handle_apply<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -3465,6 +3482,7 @@ where
}
}
self.apply_ctx.perf_context.start_observe();
ApplyTracer::begin();
}

/// There is no control fsm in apply poller.
Expand Down Expand Up @@ -3536,6 +3554,7 @@ where
fsm.delegate.last_sync_apply_index = fsm.delegate.apply_state.get_applied_index();
}
}
ApplyTracer::end();
}
}

Expand Down Expand Up @@ -4695,7 +4714,7 @@ mod tests {
region_id: 1,
enabled: enabled.clone(),
},
cb: Callback::Read(Box::new(|resp: ReadResponse<KvTestSnapshot>| {
cb: Callback::read(Box::new(|resp: ReadResponse<KvTestSnapshot>| {
assert!(!resp.response.get_header().has_error());
assert!(resp.snapshot.is_some());
let snap = resp.snapshot.unwrap();
Expand Down Expand Up @@ -4765,7 +4784,7 @@ mod tests {
region_id: 2,
enabled,
},
cb: Callback::Read(Box::new(|resp: ReadResponse<_>| {
cb: Callback::read(Box::new(|resp: ReadResponse<_>| {
assert!(resp
.response
.get_header()
Expand Down Expand Up @@ -4940,7 +4959,7 @@ mod tests {
region_id: 1,
enabled: enabled.clone(),
},
cb: Callback::Read(Box::new(|resp: ReadResponse<_>| {
cb: Callback::read(Box::new(|resp: ReadResponse<_>| {
assert!(!resp.response.get_header().has_error(), "{:?}", resp);
assert!(resp.snapshot.is_some());
})),
Expand Down Expand Up @@ -5099,7 +5118,7 @@ mod tests {
region_id: 1,
enabled: Arc::new(AtomicBool::new(true)),
},
cb: Callback::Read(Box::new(move |resp: ReadResponse<_>| {
cb: Callback::read(Box::new(move |resp: ReadResponse<_>| {
assert!(
resp.response.get_header().get_error().has_epoch_not_match(),
"{:?}",
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/fsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod apply;
mod metrics;
mod peer;
pub mod store;
mod tracer;

pub use self::apply::{
create_apply_batch_system, Apply, ApplyBatchSystem, ApplyMetrics, ApplyRes, ApplyRouter,
Expand Down
Loading

0 comments on commit 9448150

Please sign in to comment.