Skip to content

Commit

Permalink
Merge branch 'release-7.1' into cherry-pick-14637-to-release-7.1
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <zhongyangguan@gmail.com>
  • Loading branch information
zyguan committed May 10, 2023
2 parents 73345a2 + 9f07ac2 commit 080a0c3
Show file tree
Hide file tree
Showing 86 changed files with 4,572 additions and 649 deletions.
20 changes: 13 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tikv"
version = "7.1.0-rc.0"
version = "7.1.0"
authors = ["The TiKV Authors"]
description = "A distributed transactional key-value database powered by Rust and Raft"
license = "Apache-2.0"
Expand Down
167 changes: 159 additions & 8 deletions cmd/tikv-ctl/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
borrow::ToOwned, cmp::Ordering, pin::Pin, str, string::ToString, sync::Arc, time::Duration, u64,
borrow::ToOwned, cmp::Ordering, path::Path, pin::Pin, str, string::ToString, sync::Arc,
time::Duration, u64,
};

use encryption_export::data_key_manager_from_config;
use engine_rocks::util::{db_exist, new_engine_opt};
use engine_traits::{
Engines, Error as EngineError, RaftEngine, ALL_CFS, CF_DEFAULT, CF_LOCK, CF_WRITE, DATA_CFS,
Engines, Error as EngineError, RaftEngine, TabletRegistry, ALL_CFS, CF_DEFAULT, CF_LOCK,
CF_WRITE, DATA_CFS,
};
use futures::{executor::block_on, future, stream, Stream, StreamExt, TryStreamExt};
use grpcio::{ChannelBuilder, Environment};
Expand All @@ -25,12 +27,16 @@ use raft_log_engine::RaftLogEngine;
use raftstore::store::{util::build_key_range, INIT_EPOCH_CONF_VER};
use security::SecurityManager;
use serde_json::json;
use server::fatal;
use slog_global::crit;
use tikv::{
config::{ConfigController, TikvConfig},
server::{
debug::{BottommostLevelCompaction, Debugger, RegionInfo},
debug2::DebuggerV2,
KvEngineFactoryBuilder,
},
storage::config::EngineType,
};
use tikv_util::escape;

Expand Down Expand Up @@ -72,13 +78,10 @@ pub fn new_debug_executor(
let factory = KvEngineFactoryBuilder::new(env.clone(), cfg, cache)
.lite(true)
.build();
let kv_db = match factory.create_shared_db(data_dir) {
Ok(db) => db,
Err(e) => handle_engine_error(e),
};

let cfg_controller = ConfigController::default();
if !cfg.raft_engine.enable {
assert_eq!(EngineType::RaftKv, cfg.storage.engine);
let raft_db_opts = cfg.raftdb.build_opt(env, None);
let raft_db_cf_opts = cfg.raftdb.build_cf_opts(factory.block_cache());
let raft_path = cfg.infer_raft_db_path(Some(data_dir)).unwrap();
Expand All @@ -90,6 +93,12 @@ pub fn new_debug_executor(
Ok(db) => db,
Err(e) => handle_engine_error(e),
};

let kv_db = match factory.create_shared_db(data_dir) {
Ok(db) => db,
Err(e) => handle_engine_error(e),
};

let debugger = Debugger::new(Engines::new(kv_db, raft_db), cfg_controller);
Box::new(debugger) as Box<dyn DebugExecutor>
} else {
Expand All @@ -100,8 +109,24 @@ pub fn new_debug_executor(
tikv_util::logger::exit_process_gracefully(-1);
}
let raft_db = RaftLogEngine::new(config, key_manager, None /* io_rate_limiter */).unwrap();
let debugger = Debugger::new(Engines::new(kv_db, raft_db), cfg_controller);
Box::new(debugger) as Box<dyn DebugExecutor>
match cfg.storage.engine {
EngineType::RaftKv => {
let kv_db = match factory.create_shared_db(data_dir) {
Ok(db) => db,
Err(e) => handle_engine_error(e),
};

let debugger = Debugger::new(Engines::new(kv_db, raft_db), cfg_controller);
Box::new(debugger) as Box<dyn DebugExecutor>
}
EngineType::RaftKv2 => {
let registry =
TabletRegistry::new(Box::new(factory), Path::new(data_dir).join("tablets"))
.unwrap_or_else(|e| fatal!("failed to create tablet registry {:?}", e));
let debugger = DebuggerV2::new(registry, raft_db, cfg_controller);
Box::new(debugger) as Box<dyn DebugExecutor>
}
}
}
}

Expand Down Expand Up @@ -1117,3 +1142,129 @@ fn handle_engine_error(err: EngineError) -> ! {

tikv_util::logger::exit_process_gracefully(-1);
}

impl<ER: RaftEngine> DebugExecutor for DebuggerV2<ER> {
fn check_local_mode(&self) {}

fn get_all_regions_in_store(&self) -> Vec<u64> {
self.get_all_regions_in_store()
.unwrap_or_else(|e| perror_and_exit("Debugger::get_all_regions_in_store", e))
}

fn get_value_by_key(&self, cf: &str, key: Vec<u8>) -> Vec<u8> {
self.get(DbType::Kv, cf, &key)
.unwrap_or_else(|e| perror_and_exit("Debugger::get", e))
}

fn get_region_size(&self, region: u64, cfs: Vec<&str>) -> Vec<(String, usize)> {
self.region_size(region, cfs)
.unwrap_or_else(|e| perror_and_exit("Debugger::region_size", e))
.into_iter()
.map(|(cf, size)| (cf.to_owned(), size))
.collect()
}

fn get_region_info(&self, region: u64) -> RegionInfo {
self.region_info(region)
.unwrap_or_else(|e| perror_and_exit("Debugger::region_info", e))
}

fn get_raft_log(&self, region: u64, index: u64) -> Entry {
self.raft_log(region, index)
.unwrap_or_else(|e| perror_and_exit("Debugger::raft_log", e))
}

fn get_mvcc_infos(&self, from: Vec<u8>, to: Vec<u8>, limit: u64) -> MvccInfoStream {
let iter = self
.scan_mvcc(&from, &to, limit)
.unwrap_or_else(|e| perror_and_exit("Debugger::scan_mvcc", e));
let stream = stream::iter(iter).map_err(|e| e.to_string());
Box::pin(stream)
}

fn raw_scan_impl(&self, _from_key: &[u8], _end_key: &[u8], _limit: usize, _cf: &str) {
unimplemented!()
}

fn do_compaction(
&self,
db: DbType,
cf: &str,
from: &[u8],
to: &[u8],
threads: u32,
bottommost: BottommostLevelCompaction,
) {
self.compact(db, cf, from, to, threads, bottommost)
.unwrap_or_else(|e| perror_and_exit("Debugger::compact", e));
}

fn set_region_tombstone(&self, _regions: Vec<Region>) {
unimplemented!()
}

fn set_region_tombstone_by_id(&self, _regions: Vec<u64>) {
unimplemented!()
}

fn recover_regions(&self, _regions: Vec<Region>, _read_only: bool) {
unimplemented!()
}

fn recover_all(&self, _threads: usize, _read_only: bool) {
unimplemented!()
}

fn print_bad_regions(&self) {
unimplemented!()
}

fn remove_fail_stores(
&self,
_store_ids: Vec<u64>,
_region_ids: Option<Vec<u64>>,
_promote_learner: bool,
) {
unimplemented!()
}

fn drop_unapplied_raftlog(&self, _region_ids: Option<Vec<u64>>) {
unimplemented!()
}

fn recreate_region(&self, _sec_mgr: Arc<SecurityManager>, _pd_cfg: &PdConfig, _region_id: u64) {
unimplemented!()
}

fn dump_metrics(&self, _tags: Vec<&str>) {
unimplemented!()
}

fn check_region_consistency(&self, _: u64) {
unimplemented!()
}

fn modify_tikv_config(&self, _config_name: &str, _config_value: &str) {
unimplemented!()
}

fn dump_region_properties(&self, _region_id: u64) {
unimplemented!()
}

fn dump_range_properties(&self, _start: Vec<u8>, _end: Vec<u8>) {
unimplemented!()
}

fn dump_store_info(&self) {
unimplemented!()
}

fn dump_cluster_info(&self) {
unimplemented!()
}

fn reset_to_version(&self, _version: u64) {
unimplemented!()
}
}
Loading

0 comments on commit 080a0c3

Please sign in to comment.