Skip to content

Commit

Permalink
Merge branch 'tunicate' of https://github.com/longfangsong/tikv into …
Browse files Browse the repository at this point in the history
…tunicate

Signed-off-by: longfangsong <longfangsong@icloud.com>
  • Loading branch information
longfangsong committed Dec 14, 2021
2 parents e1e596e + 01713eb commit d795a19
Show file tree
Hide file tree
Showing 18 changed files with 467 additions and 331 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7
# kvproto at the same time.
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
[patch.'https://github.com/pingcap/kvproto']
kvproto = {git = "https://github.com/longfangsong/kvproto", rev = "220f1ee7d259e967ecd7c4fb351b67259fdc3aa8"}
kvproto = {git = "https://github.com/longfangsong/kvproto", rev = "577bb449d8ed69ef1b159eedec1a702bc2f6c49e"}

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
13 changes: 13 additions & 0 deletions cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ trait DebugExecutor {
fn dump_store_info(&self);

fn dump_cluster_info(&self);

fn return_to_version(&self, version: u64);
}

impl DebugExecutor for DebugClient {
Expand Down Expand Up @@ -880,6 +882,13 @@ impl DebugExecutor for DebugClient {
.unwrap_or_else(|e| perror_and_exit("DebugClient::get_cluster_info", e));
println!("{}", resp.get_cluster_id())
}

fn return_to_version(&self, version: u64) {
let mut req = ReturnToVersionRequest::default();
req.set_ts(version);
self.return_to_version(&ReturnToVersionRequest::default())
.unwrap_or_else(|e| perror_and_exit("DebugClient::return_to_version", e));
}
}

impl<ER: RaftEngine> DebugExecutor for Debugger<ER> {
Expand Down Expand Up @@ -1110,6 +1119,10 @@ impl<ER: RaftEngine> DebugExecutor for Debugger<ER> {
println!("cluster id: {}", ident.get_cluster_id());
}
}

fn return_to_version(&self, version: u64) {
Debugger::return_to_version(self, version);
}
}

fn warning_prompt(message: &str) -> bool {
Expand Down
1 change: 1 addition & 0 deletions components/external_storage/export/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ fn create_backend_inner(
},
#[cfg(not(any(feature = "cloud-gcp", feature = "cloud-aws")))]
_ => return Err(bad_backend(backend.clone())),
Backend::AzureBlobStorage(_) => todo!(),
};
record_storage_create(start, &*storage);
Ok(storage)
Expand Down
8 changes: 8 additions & 0 deletions src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tikv_util::worker::Worker;
use txn_types::Key;

use crate::config::ConfigController;
use crate::server::return_to_version::ReturnToVersionManager;
use crate::storage::mvcc::{Lock, LockType, TimeStamp, Write, WriteRef, WriteType};

pub use crate::storage::mvcc::MvccInfoIterator;
Expand Down Expand Up @@ -117,6 +118,7 @@ impl From<BottommostLevelCompaction> for debugpb::BottommostLevelCompaction {
#[derive(Clone)]
pub struct Debugger<ER: RaftEngine> {
engines: Engines<RocksEngine, ER>,
return_to_version_manager: ReturnToVersionManager,
cfg_controller: ConfigController,
}

Expand All @@ -125,8 +127,10 @@ impl<ER: RaftEngine> Debugger<ER> {
engines: Engines<RocksEngine, ER>,
cfg_controller: ConfigController,
) -> Debugger<ER> {
let return_to_version_manager = ReturnToVersionManager::new(engines.kv.clone());
Debugger {
engines,
return_to_version_manager,
cfg_controller,
}
}
Expand Down Expand Up @@ -868,6 +872,10 @@ impl<ER: RaftEngine> Debugger<ER> {
&keys::data_end_key(end),
)
}

pub fn return_to_version(&self, version: u64) {
self.return_to_version_manager.start(version.into());
}
}

fn dump_mvcc_properties(db: &Arc<DB>, start: &[u8], end: &[u8]) -> Result<Vec<(String, String)>> {
Expand Down
3 changes: 2 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ pub mod node;
mod proxy;
pub mod raftkv;
pub mod resolve;
mod return_to_version;
pub mod server;
pub mod service;
pub mod snap;
pub mod status_server;
pub mod transport;
mod truncate;
pub mod ttl;
mod return_to_version;

pub use self::config::{Config, ServerConfigManager, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR};
pub use self::errors::{Error, Result};
Expand Down
5 changes: 3 additions & 2 deletions src/server/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ impl Buffer for BatchMessageBuffer {
let mut msg_size = msg.start_key.len()
+ msg.end_key.len()
+ msg.get_message().context.len()
+ msg.extra_ctx.len();
+ msg.extra_ctx.len()
// index: 3, term: 2, data tag and size: 3, entry tag and size: 3
+ 11 * msg.get_message().get_entries().len();
for entry in msg.get_message().get_entries() {
msg_size += entry.data.len();
}
Expand Down Expand Up @@ -568,7 +570,6 @@ where

let cb = ChannelBuilder::new(self.builder.env.clone())
.stream_initial_window_size(self.builder.cfg.grpc_stream_initial_window_size.0 as i32)
.max_send_message_len(self.builder.cfg.max_grpc_send_msg_len)
.keepalive_time(self.builder.cfg.grpc_keepalive_time.0)
.keepalive_timeout(self.builder.cfg.grpc_keepalive_timeout.0)
.default_compression_algorithm(self.builder.cfg.grpc_compression_algorithm())
Expand Down
101 changes: 60 additions & 41 deletions src/server/return_to_version.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use super::Result;
use engine_rocks::{ RocksEngineIterator, RocksWriteBatch};
use engine_traits::{Iterable, CF_LOCK};
use engine_rocks::RocksEngine;
use engine_rocks::{RocksEngineIterator, RocksWriteBatch};
use engine_traits::WriteBatch;
use engine_traits::WriteBatchExt;
use engine_traits::{IterOptions, Iterator, CF_DEFAULT, CF_WRITE};
use engine_traits::{Iterable, CF_LOCK};
use engine_traits::{Mutable, SeekKey};
use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use txn_types::{Key, TimeStamp, Write, WriteRef};
use engine_traits::WriteBatch;
use engine_rocks::RocksEngine;
use engine_traits::WriteBatchExt;
use std::thread::JoinHandle;
use txn_types::{Key, TimeStamp, Write, WriteRef};

const BATCH_SIZE: usize = 256;

#[derive(Debug, Clone)]
pub enum ReturnToVersionState {
RemoveWrite {
scanned: usize,
},
RemoveLock {
scanned: usize,
},
RemoveWrite { scanned: usize },
RemoveLock { scanned: usize },
Done,
}

Expand Down Expand Up @@ -50,9 +47,10 @@ impl ReturnToVersionWorker {
ts: TimeStamp,
state: Arc<Mutex<ReturnToVersionState>>,
) -> Self {
*state.lock().expect("failed to lock `state` in `ReturnToVersionWorker::new`") = ReturnToVersionState::RemoveWrite {
scanned: 0,
};
*state
.lock()
.expect("failed to lock `state` in `ReturnToVersionWorker::new`") =
ReturnToVersionState::RemoveWrite { scanned: 0 };
write_iter.seek(SeekKey::Start).unwrap();
lock_iter.seek(SeekKey::Start).unwrap();
Self {
Expand All @@ -69,7 +67,10 @@ impl ReturnToVersionWorker {
.state
.lock()
.expect("failed to lock ReturnToVersionWorker::state");
debug_assert!(matches!(*state, ReturnToVersionState::RemoveWrite { scanned: _ }));
debug_assert!(matches!(
*state,
ReturnToVersionState::RemoveWrite { scanned: _ }
));
*state.scanned() += 1;
drop(state);
let write = box_try!(WriteRef::parse(self.write_iter.value())).to_owned();
Expand Down Expand Up @@ -104,7 +105,10 @@ impl ReturnToVersionWorker {
) -> Result<bool> {
let (writes, has_more) = self.scan_next_batch(batch_size)?;
for (key, write) in writes {
let default_key = Key::from_encoded_slice(&key).truncate_ts().unwrap().append_ts(write.start_ts);
let default_key = Key::from_encoded_slice(&key)
.truncate_ts()
.unwrap()
.append_ts(write.start_ts);
box_try!(wb.delete_cf(CF_WRITE, &key));
box_try!(wb.delete_cf(CF_DEFAULT, default_key.as_encoded()));
}
Expand All @@ -125,7 +129,10 @@ impl ReturnToVersionWorker {
.state
.lock()
.expect("failed to lock ReturnToVersionWorker::state");
debug_assert!(matches!(*state, ReturnToVersionState::RemoveLock { scanned: _ }));
debug_assert!(matches!(
*state,
ReturnToVersionState::RemoveLock { scanned: _ }
));
*state.scanned() += 1;
drop(state);

Expand All @@ -144,7 +151,17 @@ impl ReturnToVersionWorker {
pub struct ReturnToVersionManager {
state: Arc<Mutex<ReturnToVersionState>>,
engine: RocksEngine,
worker_handle: Option<JoinHandle<()>>,
worker_handle: RefCell<Option<JoinHandle<()>>>,
}

impl Clone for ReturnToVersionManager {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
engine: self.engine.clone(),
worker_handle: RefCell::new(None),
}
}
}

#[allow(dead_code)]
Expand All @@ -154,22 +171,21 @@ impl ReturnToVersionManager {
ReturnToVersionManager {
state,
engine,
worker_handle: None,
worker_handle: RefCell::new(None),
}
}

pub fn start(&mut self, ts: TimeStamp) {
pub fn start(&self, ts: TimeStamp) {
let readopts = IterOptions::new(None, None, false);
let write_iter = self.engine.iterator_cf_opt(CF_WRITE, readopts.clone()).unwrap();
let write_iter = self
.engine
.iterator_cf_opt(CF_WRITE, readopts.clone())
.unwrap();
let lock_iter = self.engine.iterator_cf_opt(CF_LOCK, readopts).unwrap();
let mut worker = ReturnToVersionWorker::new(
write_iter,
lock_iter,
ts,
self.state.clone());
let mut worker = ReturnToVersionWorker::new(write_iter, lock_iter, ts, self.state.clone());
let mut wb = self.engine.write_batch();
let props = tikv_util::thread_group::current_properties();
self.worker_handle = Some(std::thread::Builder::new()
*self.worker_handle.borrow_mut() = Some(std::thread::Builder::new()
.name("return_to_version".to_string())
.spawn(move || {
tikv_util::thread_group::set_properties(props);
Expand All @@ -192,7 +208,10 @@ impl ReturnToVersionManager {
}

pub fn state(&self) -> ReturnToVersionState {
self.state.lock().expect("failed to lock `state` in `ReturnToVersionManager::state`").clone()
self.state
.lock()
.expect("failed to lock `state` in `ReturnToVersionManager::state`")
.clone()
}

#[cfg(test)]
Expand All @@ -204,13 +223,13 @@ impl ReturnToVersionManager {
#[cfg(test)]
mod tests {
use super::*;
use engine_traits::{CF_LOCK, CF_RAFT};
use engine_rocks::raw::{ColumnFamilyOptions, DBOptions};
use engine_rocks::raw_util::CFOptions;
use engine_rocks::Compat;
use engine_traits::{WriteBatch, WriteBatchExt};
use engine_traits::{CF_LOCK, CF_RAFT};
use tempfile::Builder;
use txn_types::{WriteType, LockType, Lock};
use txn_types::{Lock, LockType, WriteType};

#[test]
fn test_basic() {
Expand All @@ -219,10 +238,7 @@ mod tests {
Remove,
}

let tmp = Builder::new()
.prefix("test_basic")
.tempdir()
.unwrap();
let tmp = Builder::new().prefix("test_basic").tempdir().unwrap();
let path = tmp.path().to_str().unwrap();
let fake_engine = Arc::new(
engine_rocks::raw_util::new_engine_opt(
Expand Down Expand Up @@ -332,10 +348,7 @@ mod tests {
remaining_defaults.push((key, value));
}

let mut lock_iter = fake_engine
.c()
.iterator_cf_opt(CF_LOCK, readopts)
.unwrap();
let mut lock_iter = fake_engine.c().iterator_cf_opt(CF_LOCK, readopts).unwrap();
lock_iter.seek(SeekKey::Start).unwrap();
let mut remaining_locks = vec![];
while lock_iter.valid().unwrap() {
Expand All @@ -347,10 +360,16 @@ mod tests {

assert_eq!(remaining_writes.len(), 1);
let (key, _) = &remaining_writes[0];
assert_eq!(Key::from_encoded(key.clone()).decode_ts().unwrap(), 99.into());
assert_eq!(
Key::from_encoded(key.clone()).decode_ts().unwrap(),
99.into()
);
assert_eq!(remaining_defaults.len(), 1);
let (key, _) = &remaining_defaults[0];
assert_eq!(Key::from_encoded(key.clone()).decode_ts().unwrap(), 98.into());
assert_eq!(
Key::from_encoded(key.clone()).decode_ts().unwrap(),
98.into()
);
assert!(remaining_locks.is_empty());
}
}
9 changes: 3 additions & 6 deletions src/server/service/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ fn error_to_grpc_error(tag: &'static str, e: Error) -> GrpcError {
pub struct Service<ER: RaftEngine, T: RaftStoreRouter<RocksEngine>> {
pool: Handle,
debugger: Debugger<ER>,
return_to_version_manager: ReturnToVersionManager,
raft_router: T,
}

Expand All @@ -60,12 +59,10 @@ impl<ER: RaftEngine, T: RaftStoreRouter<RocksEngine>> Service<ER, T> {
cfg_controller: ConfigController,
) -> Service<ER, T> {
let debugger = Debugger::new(engines, cfg_controller);
let return_to_version_manager = ReturnToVersionManager::new(engines.kv.clone());
Service {
pool,
debugger,
raft_router,
return_to_version_manager
}
}

Expand Down Expand Up @@ -517,11 +514,11 @@ impl<ER: RaftEngine, T: RaftStoreRouter<RocksEngine> + 'static> debugpb::Debug f

fn return_to_version(
&mut self,
ctx: RpcContext<'_>,
_ctx: RpcContext<'_>,
req: ReturnToVersionRequest,
sink: UnarySink<ReturnToVersionResponse>
sink: UnarySink<ReturnToVersionResponse>,
) {
self.return_to_version_manager.start(req.get_ts());
self.debugger.return_to_version(req.get_ts());
sink.success(ReturnToVersionResponse::default());
}
}
Expand Down
Loading

0 comments on commit d795a19

Please sign in to comment.