Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Introduce tracing framework #8981

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 55 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ url = "2"
uuid = { version = "0.8.1", features = ["serde", "v4"] }
vlog = "0.1.4"
walkdir = "2"
minitrace = { git = "https://github.com/pingcap-incubator/minitrace-rust.git", branch = "master" }
yatp = { git = "https://github.com/tikv/yatp.git", branch = "master" }

[dev-dependencies]
Expand All @@ -215,6 +214,9 @@ protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", rev = "65
[target.'cfg(target_os = "linux")'.dependencies]
procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b73b26b3c08b6e8afc3c665a56e" }

[patch.'https://github.com/pingcap/kvproto.git']
kvproto = { git = "https://github.com/zhongzc/kvproto.git", rev = "55582f22f9cff4c193c03d5708795fc623cc507c", default-features = false }

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
# Without resolver = 2, using `cargo build --features x` to build `cmd`
Expand Down
28 changes: 28 additions & 0 deletions cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ use tikv_util::{
use tokio::runtime::Builder;

use crate::{setup::*, signal_handler};
use tikv::server::trace::{
JaegerReportRunner, JaegerSubscriber, Reporter as TraceReporter,
ReporterBuilder as TraceReporterBuilder,
};

/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
Expand Down Expand Up @@ -605,6 +609,29 @@ impl<ER: RaftEngine> TiKVServer<ER> {

let server_config = Arc::new(self.config.server.clone());

// Create trace reporter
let trace_reporter: Arc<TraceReporter> = {
let mut builder = TraceReporterBuilder::new();

let addr = &self.config.trace.jaeger_thrift_compact_agent;
if !addr.is_empty() {
let agent: SocketAddr = addr
.parse()
.unwrap_or_else(|_| fatal!("failed to parse into a socket address: {}", addr));
let jaeger = JaegerSubscriber::new(
self.background_worker
.start("jaeger-reporter", JaegerReportRunner),
agent,
);
builder.register(jaeger);
}

builder.duration_threshold(self.config.trace.duration_threshold.into());
builder.spans_max_length(self.config.trace.max_spans_length);

Arc::new(builder.build())
};

// Create server
let server = Server::new(
&server_config,
Expand All @@ -623,6 +650,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
self.env.clone(),
unified_read_pool,
debug_thread_pool,
trace_reporter,
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));

Expand Down
4 changes: 2 additions & 2 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
SignificantMsg::CaptureChange {
cmd: change_cmd,
region_epoch: request.take_region_epoch(),
callback: Callback::Read(Box::new(move |resp| {
callback: Callback::read(Box::new(move |resp| {
if let Err(e) = scheduler.schedule(Task::InitDownstream {
downstream_id,
downstream_state,
Expand Down Expand Up @@ -796,7 +796,7 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
let (tx, rx) = tokio::sync::oneshot::channel();
if let Err(e) = raft_router_clone.significant_send(
region_id,
SignificantMsg::LeaderCallback(Callback::Read(Box::new(move |resp| {
SignificantMsg::LeaderCallback(Callback::read(Box::new(move |resp| {
let resp = if resp.response.get_header().has_error() {
None
} else {
Expand Down
27 changes: 23 additions & 4 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use sst_importer::SSTImporter;
use tikv_util::config::{Tracker, VersionTrack};
use tikv_util::mpsc::{loose_bounded, LooseBoundedSender, Receiver};
use tikv_util::time::{duration_to_sec, Instant};
use tikv_util::trace::*;
use tikv_util::worker::Scheduler;
use tikv_util::{Either, MustConsumeVec};
use time::Timespec;
Expand All @@ -63,6 +64,7 @@ use crate::store::{cmd_resp, util, Config, RegionSnapshot, RegionTask};
use crate::{Error, Result};

use super::metrics::*;
use crate::store::fsm::tracer::ApplyTracer;

const DEFAULT_APPLY_WB_SIZE: usize = 4 * 1024;
const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024;
Expand Down Expand Up @@ -301,10 +303,18 @@ where
ApplyCallback { region, cbs }
}

#[trace("ApplyCallback::invoke_all")]
fn invoke_all(self, host: &CoprocessorHost<EK>) {
for (cb, mut cmd) in self.cbs {
host.post_apply(&self.region, &mut cmd);
if let Some(cb) = cb {
cb.access_span(|span| {
ApplyTracer::partial_submit(|s| {
for spans in s {
span.mount_local_spans(spans.clone())
}
})
});
cb.invoke_with_response(cmd.response)
};
}
Expand Down Expand Up @@ -462,6 +472,7 @@ where

/// Writes all the changes into RocksDB.
/// If it returns true, all pending writes are persisted in engines.
#[trace("ApplyContext::write_to_db")]
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
if !self.kv_wb_mut().is_empty() {
Expand Down Expand Up @@ -495,6 +506,7 @@ where
// Call it before invoking callback for preventing Commit is executed before Prewrite is observed.
self.host.on_flush_apply(self.engine.clone());

ApplyTracer::truncate();
for cbs in self.cbs.drain(..) {
cbs.invoke_all(&self.host);
}
Expand Down Expand Up @@ -540,6 +552,7 @@ where

/// Flush all pending writes to engines.
/// If it returns true, all pending writes are persisted in engines.
#[trace("ApplyContext::flush")]
pub fn flush(&mut self) -> bool {
// TODO: this check is too hacky, need to be more verbose and less buggy.
let t = match self.timer.take() {
Expand Down Expand Up @@ -918,6 +931,7 @@ where
});
}

#[trace("ApplyDelegate::handle_raft_entry_normal")]
fn handle_raft_entry_normal<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -968,6 +982,7 @@ where
ApplyResult::None
}

#[trace("ApplyDelegate::handle_raft_entry_conf_change")]
fn handle_raft_entry_conf_change<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -1045,6 +1060,7 @@ where
None
}

#[trace("ApplyDelegate::process_raft_cmd")]
fn process_raft_cmd<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -3023,6 +3039,7 @@ where
}

/// Handles apply tasks, and uses the apply delegate to handle the committed entries.
#[trace("ApplyFsm::handle_apply")]
fn handle_apply<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
Expand Down Expand Up @@ -3462,6 +3479,7 @@ where
}
}
self.apply_ctx.perf_context.start_observe();
ApplyTracer::begin();
}

/// There is no control fsm in apply poller.
Expand Down Expand Up @@ -3533,6 +3551,7 @@ where
fsm.delegate.last_sync_apply_index = fsm.delegate.apply_state.get_applied_index();
}
}
ApplyTracer::end();
}
}

Expand Down Expand Up @@ -4698,7 +4717,7 @@ mod tests {
region_id: 1,
enabled: enabled.clone(),
},
cb: Callback::Read(Box::new(|resp: ReadResponse<KvTestSnapshot>| {
cb: Callback::read(Box::new(|resp: ReadResponse<KvTestSnapshot>| {
assert!(!resp.response.get_header().has_error());
assert!(resp.snapshot.is_some());
let snap = resp.snapshot.unwrap();
Expand Down Expand Up @@ -4768,7 +4787,7 @@ mod tests {
region_id: 2,
enabled,
},
cb: Callback::Read(Box::new(|resp: ReadResponse<_>| {
cb: Callback::read(Box::new(|resp: ReadResponse<_>| {
assert!(resp
.response
.get_header()
Expand Down Expand Up @@ -4945,7 +4964,7 @@ mod tests {
region_id: 1,
enabled: enabled.clone(),
},
cb: Callback::Read(Box::new(|resp: ReadResponse<_>| {
cb: Callback::read(Box::new(|resp: ReadResponse<_>| {
assert!(!resp.response.get_header().has_error(), "{:?}", resp);
assert!(resp.snapshot.is_some());
})),
Expand Down Expand Up @@ -5104,7 +5123,7 @@ mod tests {
region_id: 1,
enabled: Arc::new(AtomicBool::new(true)),
},
cb: Callback::Read(Box::new(move |resp: ReadResponse<_>| {
cb: Callback::read(Box::new(move |resp: ReadResponse<_>| {
assert!(
resp.response.get_header().get_error().has_epoch_not_match(),
"{:?}",
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/fsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod apply;
mod metrics;
mod peer;
pub mod store;
mod tracer;

pub use self::apply::{
create_apply_batch_system, Apply, ApplyBatchSystem, ApplyMetrics, ApplyRes, ApplyRouter,
Expand Down
Loading