Skip to content

Commit

Permalink
Merge branch 'master' into feature/update-codecov-yaml
Browse files Browse the repository at this point in the history
  • Loading branch information
wuhuizuo committed Jun 30, 2023
2 parents 883c220 + 425f6f2 commit 9325047
Show file tree
Hide file tree
Showing 77 changed files with 1,278 additions and 404 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ fs2 = { git = "https://github.com/tabokie/fs2-rs", branch = "tikv" }
# Remove this when a new version is release. We need to solve rust-lang/cmake-rs#143.
cmake = { git = "https://github.com/rust-lang/cmake-rs" }

sysinfo ={ git = "https://github.com/tikv/sysinfo", branch = "0.26-fix-cpu" }

[target.'cfg(target_os = "linux")'.dependencies]
procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "6599eb9dca74229b2c1fcc44118bef7eff127128" }
# When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to
Expand Down
8 changes: 4 additions & 4 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
disallowed-methods = [
{ path = "std::thread::Builder::spawn", reason = "Wrapper function `<std::thread::Builder as tikv_util::sys::thread::StdThreadBuildWrapper>::spawn_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },

{ path = "tokio::runtime::builder::Builder::on_thread_start", reason = "Wrapper function `<tokio::runtime::builder::Builder as tikv_util::sys::thread::ThreadBuildWrapper>::after_start_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
{ path = "tokio::runtime::builder::Builder::on_thread_stop", reason = "Wrapper function `<tokio::runtime::builder::Builder as tikv_util::sys::thread::ThreadBuildWrapper>::before_stop_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
{ path = "tokio::runtime::builder::Builder::on_thread_start", reason = "Adding hooks directly will omit system hooks, please use <tokio::runtime::builder::Builder as tikv_util::sys::thread::ThreadBuildWrapper>::with_sys_and_custom_hooks refer to https://github.com/tikv/tikv/pull/12442 and https://github.com/tikv/tikv/pull/15017 for more details." },
{ path = "tokio::runtime::builder::Builder::on_thread_stop", reason = "Adding hooks directly will omit system hooks, please use <tokio::runtime::builder::Builder as tikv_util::sys::thread::ThreadBuildWrapper>::with_sys_and_custom_hooks refer to https://github.com/tikv/tikv/pull/12442 and https://github.com/tikv/tikv/pull/15017 for more details." },

{ path = "futures_executor::thread_pool::ThreadPoolBuilder::after_start", reason = "Wrapper function `<futures_executor::thread_pool::ThreadPoolBuilder as tikv_util::sys::thread::ThreadBuildWrapper>::after_start_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
{ path = "futures_executor::thread_pool::ThreadPoolBuilder::before_stop", reason = "Wrapper function `<futures_executor::thread_pool::ThreadPoolBuilder as tikv_util::sys::thread::ThreadBuildWrapper>::before_stop_wrapper` should be used instead, refer to https://github.com/tikv/tikv/pull/12442 for more details." },
{ path = "futures_executor::thread_pool::ThreadPoolBuilder::after_start", reason = "Adding hooks directly will omit system hooks, please use <futures_executor::thread_pool::ThreadPoolBuilder as tikv_util::sys::thread::ThreadBuildWrapper>::with_sys_and_custom_hooks refer to https://github.com/tikv/tikv/pull/12442 and https://github.com/tikv/tikv/pull/15017 for more details." },
{ path = "futures_executor::thread_pool::ThreadPoolBuilder::before_stop", reason = "Adding hooks directly will omit system hooks, please use <futures_executor::thread_pool::ThreadPoolBuilder as tikv_util::sys::thread::ThreadBuildWrapper>::with_sys_and_custom_hooks refer to https://github.com/tikv/tikv/pull/12442 and https://github.com/tikv/tikv/pull/15017 for more details." },
]
avoid-breaking-exported-api = false
upper-case-acronyms-aggressive = true
2 changes: 0 additions & 2 deletions cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,6 @@ fn compact_whole_cluster(
let h = thread::Builder::new()
.name(format!("compact-{}", addr))
.spawn_wrapper(move || {
tikv_alloc::add_thread_memory_accessor();
let debug_executor = new_debug_executor(&cfg, None, Some(&addr), mgr);
for cf in cfs {
debug_executor.compact(
Expand All @@ -792,7 +791,6 @@ fn compact_whole_cluster(
bottommost,
);
}
tikv_alloc::remove_thread_memory_accessor();
})
.unwrap();
handles.push(h);
Expand Down
7 changes: 1 addition & 6 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,9 @@ fn create_tokio_runtime(thread_count: usize, thread_name: &str) -> TokioResult<R
// (`File` API in `tokio::io` would use this pool.)
.max_blocking_threads(thread_count * 8)
.worker_threads(thread_count)
.with_sys_hooks()
.enable_io()
.enable_time()
.after_start_wrapper(|| {
tikv_alloc::add_thread_memory_accessor();
})
.before_stop_wrapper(|| {
tikv_alloc::remove_thread_memory_accessor();
})
.build()
}

Expand Down
2 changes: 0 additions & 2 deletions components/backup-stream/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,8 @@ fn spawn_executors(init: impl InitialScan + Send + 'static, number: usize) -> Sc
let rx = rx.clone();
let stopped = stopped.clone();
pool.spawn(move |_: &mut YatpHandle<'_>| {
tikv_alloc::add_thread_memory_accessor();
let _io_guard = file_system::WithIoType::new(file_system::IoType::Replication);
scan_executor_loop(init, rx, stopped);
tikv_alloc::remove_thread_memory_accessor();
})
}
ScanPoolHandle {
Expand Down
13 changes: 6 additions & 7 deletions components/backup/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,12 @@ pub fn create_tokio_runtime(thread_count: usize, thread_name: &str) -> TokioResu
.thread_name(thread_name)
.enable_io()
.enable_time()
.after_start_wrapper(|| {
tikv_alloc::add_thread_memory_accessor();
file_system::set_io_type(IoType::Export);
})
.before_stop_wrapper(|| {
tikv_alloc::remove_thread_memory_accessor();
})
.with_sys_and_custom_hooks(
|| {
file_system::set_io_type(IoType::Export);
},
|| {},
)
.worker_threads(thread_count)
.build()
}
Expand Down
6 changes: 2 additions & 4 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,14 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,
let workers = Builder::new_multi_thread()
.thread_name("cdcwkr")
.worker_threads(config.incremental_scan_threads)
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.with_sys_hooks()
.build()
.unwrap();
let tso_worker = Builder::new_multi_thread()
.thread_name("tso")
.worker_threads(config.tso_worker_threads)
.enable_time()
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.with_sys_hooks()
.build()
.unwrap();

Expand Down
3 changes: 1 addition & 2 deletions components/cdc/src/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ mod tests {
let pool = Builder::new_multi_thread()
.thread_name("test-initializer-worker")
.worker_threads(4)
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.with_sys_hooks()
.build()
.unwrap();
let downstream_state = Arc::new(AtomicCell::new(DownstreamState::Initializing));
Expand Down
3 changes: 1 addition & 2 deletions components/encryption/src/master_key/kms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl KmsBackend {
Builder::new_current_thread()
.thread_name("kms-runtime")
.enable_all()
.after_start_wrapper(|| {})
.before_stop_wrapper(|| {})
.with_sys_hooks()
.build()?,
);

Expand Down
7 changes: 5 additions & 2 deletions components/engine_panic/src/misc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{DeleteStrategy, MiscExt, Range, RangeStats, Result, StatisticsReporter};
use engine_traits::{
DeleteStrategy, MiscExt, Range, RangeStats, Result, StatisticsReporter, WriteOptions,
};

use crate::engine::PanicEngine;

Expand Down Expand Up @@ -41,10 +43,11 @@ impl MiscExt for PanicEngine {

fn delete_ranges_cf(
&self,
wopts: &WriteOptions,
cf: &str,
strategy: DeleteStrategy,
ranges: &[Range<'_>],
) -> Result<()> {
) -> Result<bool> {
panic!()
}

Expand Down
76 changes: 51 additions & 25 deletions components/engine_rocks/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use engine_traits::{
CfNamesExt, DeleteStrategy, ImportExt, IterOptions, Iterable, Iterator, MiscExt, Mutable,
Range, RangeStats, Result, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt,
WriteOptions,
};
use rocksdb::{FlushOptions, Range as RocksRange};
use tikv_util::{box_try, keybuilder::KeyBuilder};
Expand All @@ -23,10 +24,12 @@ impl RocksEngine {
// of region will never be larger than max-region-size.
fn delete_all_in_range_cf_by_ingest(
&self,
wopts: &WriteOptions,
cf: &str,
sst_path: String,
ranges: &[Range<'_>],
) -> Result<()> {
) -> Result<bool> {
let mut written = false;
let mut ranges = ranges.to_owned();
ranges.sort_by(|a, b| a.start_key.cmp(b.start_key));

Expand All @@ -39,7 +42,7 @@ impl RocksEngine {
.as_ref()
.map_or(false, |key| key.as_slice() > r.start_key)
{
self.delete_all_in_range_cf_by_key(cf, &r)?;
written |= self.delete_all_in_range_cf_by_key(wopts, cf, &r)?;
continue;
}
last_end_key = Some(r.end_key.to_owned());
Expand Down Expand Up @@ -84,20 +87,26 @@ impl RocksEngine {
} else {
let mut wb = self.write_batch();
for key in data.iter() {
wb.delete_cf(cf, key)?;
if wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
wb.write()?;
wb.write_opt(wopts)?;
wb.clear();
}
wb.delete_cf(cf, key)?;
}
if wb.count() > 0 {
wb.write()?;
wb.write_opt(wopts)?;
written = true;
}
}
Ok(())
Ok(written)
}

fn delete_all_in_range_cf_by_key(&self, cf: &str, range: &Range<'_>) -> Result<()> {
fn delete_all_in_range_cf_by_key(
&self,
wopts: &WriteOptions,
cf: &str,
range: &Range<'_>,
) -> Result<bool> {
let start = KeyBuilder::from_slice(range.start_key, 0, 0);
let end = KeyBuilder::from_slice(range.end_key, 0, 0);
let mut opts = IterOptions::new(Some(start), Some(end), false);
Expand All @@ -110,18 +119,22 @@ impl RocksEngine {
let mut it_valid = it.seek(range.start_key)?;
let mut wb = self.write_batch();
while it_valid {
wb.delete_cf(cf, it.key())?;
if wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
wb.write()?;
wb.write_opt(wopts)?;
wb.clear();
}
wb.delete_cf(cf, it.key())?;
it_valid = it.next()?;
}
if wb.count() > 0 {
wb.write()?;
wb.write_opt(wopts)?;
if !wopts.disable_wal() {
self.sync_wal()?;
}
Ok(true)
} else {
Ok(false)
}
self.sync_wal()?;
Ok(())
}
}

Expand Down Expand Up @@ -188,12 +201,14 @@ impl MiscExt for RocksEngine {

fn delete_ranges_cf(
&self,
wopts: &WriteOptions,
cf: &str,
strategy: DeleteStrategy,
ranges: &[Range<'_>],
) -> Result<()> {
) -> Result<bool> {
let mut written = false;
if ranges.is_empty() {
return Ok(());
return Ok(written);
}
match strategy {
DeleteStrategy::DeleteFiles => {
Expand All @@ -209,7 +224,7 @@ impl MiscExt for RocksEngine {
})
.collect();
if rocks_ranges.is_empty() {
return Ok(());
return Ok(written);
}
self.as_inner()
.delete_files_in_ranges_cf(handle, &rocks_ranges, false)
Expand All @@ -229,7 +244,7 @@ impl MiscExt for RocksEngine {
})
.collect();
if rocks_ranges.is_empty() {
return Ok(());
return Ok(written);
}
self.as_inner()
.delete_blob_files_in_ranges_cf(handle, &rocks_ranges, false)
Expand All @@ -241,18 +256,19 @@ impl MiscExt for RocksEngine {
for r in ranges.iter() {
wb.delete_range_cf(cf, r.start_key, r.end_key)?;
}
wb.write()?;
wb.write_opt(wopts)?;
written = true;
}
DeleteStrategy::DeleteByKey => {
for r in ranges {
self.delete_all_in_range_cf_by_key(cf, r)?;
written |= self.delete_all_in_range_cf_by_key(wopts, cf, r)?;
}
}
DeleteStrategy::DeleteByWriter { sst_path } => {
self.delete_all_in_range_cf_by_ingest(cf, sst_path, ranges)?;
written |= self.delete_all_in_range_cf_by_ingest(wopts, cf, sst_path, ranges)?;
}
}
Ok(())
Ok(written)
}

fn get_approximate_memtable_stats_cf(&self, cf: &str, range: &Range<'_>) -> Result<(u64, u64)> {
Expand Down Expand Up @@ -482,7 +498,8 @@ mod tests {
wb.write().unwrap();
check_data(&db, ALL_CFS, kvs.as_slice());

db.delete_ranges_cfs(strategy, ranges).unwrap();
db.delete_ranges_cfs(&WriteOptions::default(), strategy, ranges)
.unwrap();

let mut kvs_left: Vec<_> = kvs;
for r in ranges {
Expand Down Expand Up @@ -620,10 +637,18 @@ mod tests {
}
check_data(&db, ALL_CFS, kvs.as_slice());

db.delete_ranges_cfs(DeleteStrategy::DeleteFiles, &[Range::new(b"k2", b"k4")])
.unwrap();
db.delete_ranges_cfs(DeleteStrategy::DeleteBlobs, &[Range::new(b"k2", b"k4")])
.unwrap();
db.delete_ranges_cfs(
&WriteOptions::default(),
DeleteStrategy::DeleteFiles,
&[Range::new(b"k2", b"k4")],
)
.unwrap();
db.delete_ranges_cfs(
&WriteOptions::default(),
DeleteStrategy::DeleteBlobs,
&[Range::new(b"k2", b"k4")],
)
.unwrap();
check_data(&db, ALL_CFS, kvs_left.as_slice());
}

Expand Down Expand Up @@ -668,6 +693,7 @@ mod tests {

// Delete all in ["k2", "k4").
db.delete_ranges_cfs(
&WriteOptions::default(),
DeleteStrategy::DeleteByRange,
&[Range::new(b"kabcdefg2", b"kabcdefg4")],
)
Expand Down

0 comments on commit 9325047

Please sign in to comment.