Skip to content

Commit

Permalink
*: fix thread name truncating issue (#12442)
Browse files Browse the repository at this point in the history
ref #5593, close #12451

Based on #5593, add a global hashmap recording relationship between
thread id and thread name; add some wrappers to spawn threads and
update the hashmap.

It's necessary to use `after_start_wrapper` and `before_stop_wrapper`
together. Otherwise it may cause reporting a wrong thread name if
a thread inserts its name to hashmap and doesn't remove it, while
another thread reuses the same tid and doesn't update the hashmap.

Signed-off-by: GanZiheng <ganziheng98@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
GanZiheng and ti-chi-bot committed Jun 20, 2022
1 parent c1a09b8 commit cead3f5
Show file tree
Hide file tree
Showing 37 changed files with 313 additions and 60 deletions.
9 changes: 9 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
disallowed-methods = [
{ path = "std::thread::Builder::spawn", reason = "Wrapper function `<std::thread::Builder as tikv_util::sys::thread::StdThreadBuildWrapper>::spawn_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },

{ path = "tokio::runtime::builder::Builder::on_thread_start", reason = "Wrapper function `<tokio::runtime::builder::Builder as tikv_util::sys::thread::ThreadBuildWrapper>::after_start_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
{ path = "tokio::runtime::builder::Builder::on_thread_stop", reason = "Wrapper function `<tokio::runtime::builder::Builder as tikv_util::sys::thread::ThreadBuildWrapper>::before_stop_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },

{ path = "futures_executor::thread_pool::ThreadPoolBuilder::after_start", reason = "Wrapper function `<futures_executor::thread_pool::ThreadPoolBuilder as tikv_util::sys::thread::ThreadBuildWrapper>::after_start_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
{ path = "futures_executor::thread_pool::ThreadPoolBuilder::before_stop", reason = "Wrapper function `<futures_executor::thread_pool::ThreadPoolBuilder as tikv_util::sys::thread::ThreadBuildWrapper>::before_stop_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
]
4 changes: 2 additions & 2 deletions cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use regex::Regex;
use security::{SecurityConfig, SecurityManager};
use structopt::{clap::ErrorKind, StructOpt};
use tikv::{config::TiKvConfig, server::debug::BottommostLevelCompaction};
use tikv_util::{escape, run_and_wait_child_process, unescape};
use tikv_util::{escape, run_and_wait_child_process, sys::thread::StdThreadBuildWrapper, unescape};
use txn_types::Key;

use crate::{cmd::*, executor::*, util::*};
Expand Down Expand Up @@ -604,7 +604,7 @@ fn compact_whole_cluster(
let cfs: Vec<String> = cfs.iter().map(|cf| cf.to_string()).collect();
let h = thread::Builder::new()
.name(format!("compact-{}", addr))
.spawn(move || {
.spawn_wrapper(move || {
tikv_alloc::add_thread_memory_accessor();
let debug_executor = new_debug_executor(&cfg, None, false, Some(&addr), mgr);
for cf in cfs {
Expand Down
5 changes: 3 additions & 2 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tikv_util::{
box_err,
config::ReadableDuration,
debug, defer, info,
sys::thread::ThreadBuildWrapper,
time::Instant,
warn,
worker::{Runnable, Scheduler},
Expand Down Expand Up @@ -1016,10 +1017,10 @@ fn create_tokio_runtime(thread_count: usize, thread_name: &str) -> TokioResult<R
.worker_threads(thread_count)
.enable_io()
.enable_time()
.on_thread_start(|| {
.after_start_wrapper(|| {
tikv_alloc::add_thread_memory_accessor();
})
.on_thread_stop(|| {
.before_stop_wrapper(|| {
tikv_alloc::remove_thread_memory_accessor();
})
.build()
Expand Down
6 changes: 3 additions & 3 deletions components/backup/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use api_version::{dispatch_api_version, ApiV2, KeyMode, KvFormat};
use file_system::IOType;
use futures::Future;
use kvproto::kvrpcpb::ApiVersion;
use tikv_util::error;
use tikv_util::{error, sys::thread::ThreadBuildWrapper};
use tokio::{io::Result as TokioResult, runtime::Runtime};
use txn_types::{Key, TimeStamp};

Expand Down Expand Up @@ -90,11 +90,11 @@ pub fn create_tokio_runtime(thread_count: usize, thread_name: &str) -> TokioResu
.thread_name(thread_name)
.enable_io()
.enable_time()
.on_thread_start(|| {
.after_start_wrapper(|| {
tikv_alloc::add_thread_memory_accessor();
file_system::set_io_type(IOType::Export);
})
.on_thread_stop(|| {
.before_stop_wrapper(|| {
tikv_alloc::remove_thread_memory_accessor();
})
.worker_threads(thread_count)
Expand Down
7 changes: 5 additions & 2 deletions components/batch-system/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use std::{
use crossbeam::channel::{self, SendError};
use fail::fail_point;
use file_system::{set_io_type, IOType};
use tikv_util::{debug, error, info, mpsc, safe_panic, thd_name, time::Instant, warn};
use tikv_util::{
debug, error, info, mpsc, safe_panic, sys::thread::StdThreadBuildWrapper, thd_name,
time::Instant, warn,
};

use crate::{
config::Config,
Expand Down Expand Up @@ -581,7 +584,7 @@ where
let props = tikv_util::thread_group::current_properties();
let t = thread::Builder::new()
.name(name)
.spawn(move || {
.spawn_wrapper(move || {
tikv_util::thread_group::set_properties(props);
set_io_type(IOType::ForegroundWrite);
poller.poll();
Expand Down
5 changes: 5 additions & 0 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use security::SecurityManager;
use tikv::{config::CdcConfig, storage::Statistics};
use tikv_util::{
debug, error, impl_display_as_debug, info,
sys::thread::ThreadBuildWrapper,
time::Limiter,
timer::SteadyTimer,
warn,
Expand Down Expand Up @@ -373,12 +374,16 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let workers = Builder::new_multi_thread()
.thread_name("cdcwkr")
.worker_threads(config.incremental_scan_threads)
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()
.unwrap();
let tso_worker = Builder::new_multi_thread()
.thread_name("tso")
.worker_threads(config.tso_worker_threads)
.enable_time()
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()
.unwrap();

Expand Down
7 changes: 6 additions & 1 deletion components/cdc/src/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,10 @@ mod tests {
},
TestEngineBuilder,
};
use tikv_util::worker::{LazyWorker, Runnable};
use tikv_util::{
sys::thread::ThreadBuildWrapper,
worker::{LazyWorker, Runnable},
};
use tokio::runtime::{Builder, Runtime};

use super::*;
Expand Down Expand Up @@ -608,6 +611,8 @@ mod tests {
let pool = Builder::new_multi_thread()
.thread_name("test-initializer-worker")
.worker_threads(4)
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()
.unwrap();
let downstream_state = Arc::new(AtomicCell::new(DownstreamState::Initializing));
Expand Down
4 changes: 2 additions & 2 deletions components/encryption/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use fail::fail_point;
use file_system::File;
use kvproto::encryptionpb::{DataKey, EncryptionMethod, FileDictionary, FileInfo, KeyDictionary};
use protobuf::Message;
use tikv_util::{box_err, debug, error, info, thd_name, warn};
use tikv_util::{box_err, debug, error, info, sys::thread::StdThreadBuildWrapper, thd_name, warn};

use crate::{
config::EncryptionConfig,
Expand Down Expand Up @@ -557,7 +557,7 @@ impl DataKeyManager {
let (rotate_terminal, rx) = channel::bounded(1);
let background_worker = std::thread::Builder::new()
.name(thd_name!("enc:key"))
.spawn(move || {
.spawn_wrapper(move || {
run_background_rotate_work(dict_clone, method, &*master_key, rx);
})?;

Expand Down
3 changes: 3 additions & 0 deletions components/encryption/src/master_key/kms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kvproto::encryptionpb::EncryptedContent;
use tikv_util::{
box_err, error,
stream::{retry, with_timeout},
sys::thread::ThreadBuildWrapper,
};
use tokio::runtime::{Builder, Runtime};

Expand Down Expand Up @@ -81,6 +82,8 @@ impl KmsBackend {
Builder::new_current_thread()
.thread_name("kms-runtime")
.enable_all()
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()?,
);

Expand Down
6 changes: 4 additions & 2 deletions components/file_system/src/io_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub use proc::*;

#[cfg(test)]
mod tests {
use tikv_util::sys::thread::StdThreadBuildWrapper;

use super::*;
use crate::IOType;

Expand All @@ -54,7 +56,7 @@ mod tests {
let _ths = (0..8)
.map(|_| {
let tx_clone = tx.clone();
std::thread::Builder::new().spawn(move || {
std::thread::Builder::new().spawn_wrapper(move || {
set_io_type(IOType::ForegroundWrite);
tx_clone.send(()).unwrap();
})
Expand All @@ -72,7 +74,7 @@ mod tests {
let _ths = (0..8)
.map(|_| {
let tx_clone = tx.clone();
std::thread::Builder::new().spawn(move || {
std::thread::Builder::new().spawn_wrapper(move || {
set_io_type(IOType::ForegroundWrite);
tx_clone.send(()).unwrap();
})
Expand Down
4 changes: 2 additions & 2 deletions components/pd_client/src/tso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::{
};
use grpcio::{CallOption, WriteFlags};
use kvproto::pdpb::{PdClient, TsoRequest, TsoResponse};
use tikv_util::{box_err, info};
use tikv_util::{box_err, info, sys::thread::StdThreadBuildWrapper};
use tokio::sync::{mpsc, oneshot, watch};
use txn_types::TimeStamp;

Expand Down Expand Up @@ -61,7 +61,7 @@ impl TimestampOracle {
// Start a background thread to handle TSO requests and responses
thread::Builder::new()
.name("tso-worker".into())
.spawn(move || {
.spawn_wrapper(move || {
block_on(run_tso(
cluster_id,
rpc_sender.sink_err_into(),
Expand Down
12 changes: 8 additions & 4 deletions components/raftstore/src/store/async_io/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use raft::eraftpb::Entry;
use tikv_util::{
box_err,
config::{Tracker, VersionTrack},
debug, info, slow_log, thd_name,
debug, info, slow_log,
sys::thread::StdThreadBuildWrapper,
thd_name,
time::{duration_to_sec, Instant},
warn,
};
Expand Down Expand Up @@ -692,9 +694,11 @@ where
cfg,
);
info!("starting store writer {}", i);
let t = thread::Builder::new().name(thd_name!(tag)).spawn(move || {
worker.run();
})?;
let t = thread::Builder::new()
.name(thd_name!(tag))
.spawn_wrapper(move || {
worker.run();
})?;
self.writers.push(tx);
self.handlers.push(t);
}
Expand Down
3 changes: 2 additions & 1 deletion components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use resource_metering::{Collector, CollectorGuard, CollectorRegHandle, RawRecord
use tikv_util::{
box_err, debug, error, info,
metrics::ThreadInfoStatistics,
sys::thread::StdThreadBuildWrapper,
thd_name,
time::{Instant as TiInstant, UnixSecs},
timer::GLOBAL_TIMER_HANDLE,
Expand Down Expand Up @@ -537,7 +538,7 @@ where
}
let h = Builder::new()
.name(thd_name!("stats-monitor"))
.spawn(move || {
.spawn_wrapper(move || {
tikv_util::thread_group::set_properties(props);
tikv_alloc::add_thread_memory_accessor();
let mut thread_stats = ThreadInfoStatistics::new();
Expand Down
6 changes: 4 additions & 2 deletions components/raftstore/src/store/worker/refresh_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::{

use batch_system::{BatchRouter, Fsm, FsmTypes, HandlerBuilder, Poller, PoolState, Priority};
use file_system::{set_io_type, IOType};
use tikv_util::{debug, error, info, safe_panic, thd_name, worker::Runnable};
use tikv_util::{
debug, error, info, safe_panic, sys::thread::StdThreadBuildWrapper, thd_name, worker::Runnable,
};

use crate::store::fsm::{
apply::{ApplyFsm, ControlFsm},
Expand Down Expand Up @@ -70,7 +72,7 @@ where
name_prefix,
i + self.state.id_base,
)))
.spawn(move || {
.spawn_wrapper(move || {
tikv_util::thread_group::set_properties(props);
set_io_type(IOType::ForegroundWrite);
poller.poll();
Expand Down
6 changes: 5 additions & 1 deletion components/resolved_ts/src/advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use pd_client::PdClient;
use protobuf::Message;
use raftstore::store::{fsm::StoreMeta, util::RegionReadProgressRegistry};
use security::SecurityManager;
use tikv_util::{info, time::Instant, timer::SteadyTimer, worker::Scheduler};
use tikv_util::{
info, sys::thread::ThreadBuildWrapper, time::Instant, timer::SteadyTimer, worker::Scheduler,
};
use tokio::{
runtime::{Builder, Runtime},
sync::Mutex,
Expand Down Expand Up @@ -65,6 +67,8 @@ impl<E: KvEngine> AdvanceTsWorker<E> {
.thread_name("advance-ts")
.worker_threads(1)
.enable_time()
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()
.unwrap();
Self {
Expand Down
4 changes: 3 additions & 1 deletion components/resolved_ts/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tikv::storage::{
mvcc::{DeltaScanner, MvccReader, ScannerBuilder},
txn::{TxnEntry, TxnEntryScanner},
};
use tikv_util::{time::Instant, timer::GLOBAL_TIMER_HANDLE};
use tikv_util::{sys::thread::ThreadBuildWrapper, time::Instant, timer::GLOBAL_TIMER_HANDLE};
use tokio::runtime::{Builder, Runtime};
use txn_types::{Key, Lock, LockType, TimeStamp};

Expand Down Expand Up @@ -74,6 +74,8 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> ScannerPool<T, E> {
Builder::new_multi_thread()
.thread_name("inc-scan")
.worker_threads(count)
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()
.unwrap(),
);
Expand Down
9 changes: 6 additions & 3 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ use tikv_util::{
};
use tokio::runtime::Builder;

use crate::{memory::*, raft_engine_switch::*, setup::*, signal_handler};
use crate::{
memory::*, raft_engine_switch::*, setup::*, signal_handler,
tikv_util::sys::thread::ThreadBuildWrapper,
};

#[inline]
fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(config: TiKvConfig) {
Expand Down Expand Up @@ -622,11 +625,11 @@ impl<ER: RaftEngine> TiKvServer<ER> {
Builder::new_multi_thread()
.thread_name(thd_name!("debugger"))
.worker_threads(1)
.on_thread_start(move || {
.after_start_wrapper(move || {
tikv_alloc::add_thread_memory_accessor();
tikv_util::thread_group::set_properties(props.clone());
})
.on_thread_stop(tikv_alloc::remove_thread_memory_accessor)
.before_stop_wrapper(tikv_alloc::remove_thread_memory_accessor)
.build()
.unwrap(),
);
Expand Down
3 changes: 3 additions & 0 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use tikv::{
use tikv_util::{
config::VersionTrack,
quota_limiter::QuotaLimiter,
sys::thread::ThreadBuildWrapper,
time::ThreadReadId,
worker::{Builder as WorkerBuilder, LazyWorker},
HandyRwLock,
Expand Down Expand Up @@ -448,6 +449,8 @@ impl ServerCluster {
TokioBuilder::new_multi_thread()
.thread_name(thd_name!("debugger"))
.worker_threads(1)
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.build()
.unwrap(),
);
Expand Down
3 changes: 2 additions & 1 deletion components/test_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
};

use rand::Rng;
use tikv_util::sys::thread::StdThreadBuildWrapper;

pub use crate::{
encryption::*,
Expand All @@ -36,7 +37,7 @@ pub fn setup_for_ci() {
// of time to avoid causing timeout.
thread::Builder::new()
.name(tikv_util::thd_name!("backtrace-loader"))
.spawn(::backtrace::Backtrace::new)
.spawn_wrapper(::backtrace::Backtrace::new)
.unwrap();

if env::var("CI").is_ok() {
Expand Down
4 changes: 3 additions & 1 deletion components/tikv_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use nix::{
};
use rand::rngs::ThreadRng;

use crate::sys::thread::StdThreadBuildWrapper;

#[macro_use]
pub mod log;
pub mod buffer_vec;
Expand Down Expand Up @@ -468,7 +470,7 @@ pub fn set_panic_hook(panic_abort: bool, data_dir: &str) {
// Caching is slow, spawn it in another thread to speed up.
thread::Builder::new()
.name(thd_name!("backtrace-loader"))
.spawn(::backtrace::Backtrace::new)
.spawn_wrapper(::backtrace::Backtrace::new)
.unwrap();

let data_dir = data_dir.to_string();
Expand Down

0 comments on commit cead3f5

Please sign in to comment.