Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add TruncateManager and TruncateWorker #11553

Merged
merged 15 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions cmd/tikv-ctl/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,8 @@ pub trait DebugExecutor {
fn dump_store_info(&self);

fn dump_cluster_info(&self);

fn reset_to_version(&self, version: u64);
}

impl DebugExecutor for DebugClient {
Expand Down Expand Up @@ -840,6 +842,13 @@ impl DebugExecutor for DebugClient {
.unwrap_or_else(|e| perror_and_exit("DebugClient::get_cluster_info", e));
println!("{}", resp.get_cluster_id())
}

fn reset_to_version(&self, version: u64) {
let mut req = ResetToVersionRequest::default();
req.set_ts(version);
DebugClient::reset_to_version(self, &req)
.unwrap_or_else(|e| perror_and_exit("DebugClient::get_cluster_info", e));
}
}

impl<ER: RaftEngine> DebugExecutor for Debugger<ER> {
Expand Down Expand Up @@ -1070,6 +1079,10 @@ impl<ER: RaftEngine> DebugExecutor for Debugger<ER> {
println!("cluster id: {}", ident.get_cluster_id());
}
}

fn reset_to_version(&self, version: u64) {
Debugger::reset_to_version(self, version);
}
}

fn handle_engine_error(err: EngineError) -> ! {
Expand Down
9 changes: 8 additions & 1 deletion src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tikv_util::worker::Worker;
use txn_types::Key;

use crate::config::ConfigController;
use crate::server::reset_to_version::ResetToVersionManager;
use crate::storage::mvcc::{Lock, LockType, TimeStamp, Write, WriteRef, WriteType};

pub use crate::storage::mvcc::MvccInfoIterator;
Expand Down Expand Up @@ -117,6 +118,7 @@ impl From<BottommostLevelCompaction> for debugpb::BottommostLevelCompaction {
#[derive(Clone)]
pub struct Debugger<ER: RaftEngine> {
engines: Engines<RocksEngine, ER>,
reset_to_version_manager: ResetToVersionManager,
cfg_controller: ConfigController,
}

Expand All @@ -125,8 +127,10 @@ impl<ER: RaftEngine> Debugger<ER> {
engines: Engines<RocksEngine, ER>,
cfg_controller: ConfigController,
) -> Debugger<ER> {
let reset_to_version_manager = ResetToVersionManager::new(engines.kv.clone());
Debugger {
engines,
reset_to_version_manager,
cfg_controller,
}
}
Expand Down Expand Up @@ -884,6 +888,10 @@ impl<ER: RaftEngine> Debugger<ER> {
props.append(&mut props1);
Ok(props)
}

pub fn reset_to_version(&self, version: u64) {
self.reset_to_version_manager.start(version.into());
}
}

fn dump_default_cf_properties(
Expand All @@ -899,7 +907,6 @@ fn dump_default_cf_properties(
for (_, v) in collection.iter() {
num_entries += v.num_entries();
}

let sst_files = collection
.iter()
.map(|(k, _)| {
Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod lock_manager;
pub mod node;
mod proxy;
pub mod raftkv;
mod reset_to_version;
pub mod resolve;
pub mod server;
pub mod service;
Expand Down
Loading