Skip to content

Commit

Permalink
Merge branch 'master' into raftstorev2/read-index-check-lock
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed May 15, 2023
2 parents 1926c6a + 761de11 commit d0a1ed9
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 226 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions components/encryption/export/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use derive_more::Deref;
pub use encryption::KmsBackend;
pub use encryption::{
clean_up_dir, clean_up_trash, from_engine_encryption_method, trash_dir_all, Backend,
DataKeyManager, DataKeyManagerArgs, DecrypterReader, EncryptionConfig, Error, FileConfig, Iv,
KmsConfig, MasterKeyConfig, Result,
DataKeyImporter, DataKeyManager, DataKeyManagerArgs, DecrypterReader, EncryptionConfig, Error,
FileConfig, Iv, KmsConfig, MasterKeyConfig, Result,
};
use encryption::{
DataKeyPair, EncryptedKey, FileBackend, KmsProvider, PlainKey, PlaintextBackend,
Expand Down
4 changes: 4 additions & 0 deletions components/encryption/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl<R> DecrypterReader<R> {
iv,
)?))
}

pub fn inner(&self) -> &R {
&self.0.reader
}
}

impl<R: Read> Read for DecrypterReader<R> {
Expand Down
2 changes: 1 addition & 1 deletion components/encryption/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use self::{
io::{
create_aes_ctr_crypter, DecrypterReader, DecrypterWriter, EncrypterReader, EncrypterWriter,
},
manager::{DataKeyManager, DataKeyManagerArgs},
manager::{DataKeyImporter, DataKeyManager, DataKeyManagerArgs},
master_key::{
Backend, DataKeyPair, EncryptedKey, FileBackend, KmsBackend, KmsProvider, PlaintextBackend,
},
Expand Down
58 changes: 47 additions & 11 deletions components/encryption/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use file_system::File;
use kvproto::encryptionpb::{DataKey, EncryptionMethod, FileDictionary, FileInfo, KeyDictionary};
use protobuf::Message;
use tikv_util::{box_err, debug, error, info, sys::thread::StdThreadBuildWrapper, thd_name, warn};
use tokio::sync::oneshot;

use crate::{
config::EncryptionConfig,
Expand Down Expand Up @@ -397,7 +396,7 @@ fn check_stale_file_exist(

enum RotateTask {
Terminate,
Save(oneshot::Sender<()>),
Save(std::sync::mpsc::Sender<()>),
}

fn run_background_rotate_work(
Expand Down Expand Up @@ -770,6 +769,27 @@ impl DataKeyManager {
Ok(Some(encrypted_file))
}

/// Returns initial vector and data key.
pub fn get_file_internal(&self, fname: &str) -> IoResult<Option<(Vec<u8>, DataKey)>> {
let (key_id, iv) = {
match self.dicts.get_file(fname) {
Some(file) if file.method != EncryptionMethod::Plaintext => (file.key_id, file.iv),
_ => return Ok(None),
}
};
// Fail if key is specified but not found.
let k = match self.dicts.key_dict.lock().unwrap().keys.get(&key_id) {
Some(k) => k.clone(),
None => {
return Err(IoError::new(
ErrorKind::NotFound,
format!("key not found for id {}", key_id),
));
}
};
Ok(Some((iv, k)))
}

/// Removes data keys under the directory `logical`. If `physical` is
/// present, if means the `logical` directory is already physically renamed
/// to `physical`.
Expand Down Expand Up @@ -988,10 +1008,17 @@ impl<'a> DataKeyImporter<'a> {
}

pub fn commit(mut self) -> Result<()> {
let (tx, rx) = oneshot::channel();
if !self.key_additions.is_empty() {
self.manager.rotate_tx.send(RotateTask::Save(tx)).unwrap();
rx.blocking_recv().unwrap();
let (tx, rx) = std::sync::mpsc::channel();
self.manager
.rotate_tx
.send(RotateTask::Save(tx))
.map_err(|_| {
Error::Other(box_err!("Failed to request background key dict rotation"))
})?;
rx.recv().map_err(|_| {
Error::Other(box_err!("Failed to wait for background key dict rotation"))
})?;
}
if !self.file_additions.is_empty() {
self.manager.dicts.file_dict_file.lock().unwrap().sync()?;
Expand All @@ -1006,13 +1033,22 @@ impl<'a> DataKeyImporter<'a> {
while let Some(f) = iter.next() {
self.manager.dicts.delete_file(&f, iter.peek().is_none())?;
}
for key_id in self.key_additions.drain(..) {
let mut key_dict = self.manager.dicts.key_dict.lock().unwrap();
key_dict.keys.remove(&key_id);
if !self.key_additions.is_empty() {
for key_id in self.key_additions.drain(..) {
let mut key_dict = self.manager.dicts.key_dict.lock().unwrap();
key_dict.keys.remove(&key_id);
}
let (tx, rx) = std::sync::mpsc::channel();
self.manager
.rotate_tx
.send(RotateTask::Save(tx))
.map_err(|_| {
Error::Other(box_err!("Failed to request background key dict rotation"))
})?;
rx.recv().map_err(|_| {
Error::Other(box_err!("Failed to wait for background key dict rotation"))
})?;
}
let (tx, rx) = oneshot::channel();
self.manager.rotate_tx.send(RotateTask::Save(tx)).unwrap();
rx.blocking_recv().unwrap();
Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ slog-global = { workspace = true }
tempfile = "3.0"
test_pd = { workspace = true }
test_util = { workspace = true }
walkdir = "2"

[[test]]
name = "raftstore-v2-failpoints"
Expand Down
1 change: 0 additions & 1 deletion components/raftstore-v2/src/operation/ready/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,6 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
write_task,
&ctx.snap_mgr,
&ctx.tablet_registry,
ctx.key_manager.as_ref(),
) {
SNAP_COUNTER.apply.fail.inc();
error!(self.logger(),"failed to apply snapshot";"error" => ?e)
Expand Down
7 changes: 4 additions & 3 deletions components/raftstore-v2/src/operation/ready/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,6 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
task: &mut WriteTask<EK, ER>,
snap_mgr: &TabletSnapManager,
reg: &TabletRegistry<EK>,
key_manager: Option<&Arc<DataKeyManager>>,
) -> Result<()> {
let region_id = self.region().get_id();
let peer_id = self.peer().get_id();
Expand Down Expand Up @@ -660,7 +659,7 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
// The snapshot require no additional processing such as ingest them to DB, but
// it should load it into the factory after it persisted.
let reg = reg.clone();
let key_manager = key_manager.cloned();
let key_manager = snap_mgr.key_manager().clone();
let hook = move || {
fail::fail_point!("region_apply_snap");
if !install_tablet(&reg, key_manager.as_deref(), &path, region_id, last_index) {
Expand All @@ -673,7 +672,9 @@ impl<EK: KvEngine, ER: RaftEngine> Storage<EK, ER> {
}
if clean_split {
let path = temp_split_path(&reg, region_id);
// TODO(tabokie)
if let Some(m) = key_manager {
let _ = m.remove_dir(&path, None);
}
let _ = fs::remove_dir_all(path);
}
};
Expand Down
3 changes: 1 addition & 2 deletions components/raftstore-v2/src/raft/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,7 @@ mod tests {
.unwrap();
let snapshot = new_empty_snapshot(region.clone(), snap_index, snap_term, false);
let mut task = WriteTask::new(region.get_id(), 5, 1);
s.apply_snapshot(&snapshot, &mut task, &mgr, &reg, None)
.unwrap();
s.apply_snapshot(&snapshot, &mut task, &mgr, &reg).unwrap();
// Add more entries to check if old entries are cleared. If not, it should panic
// with memtable hole when using raft engine.
let entries = (snap_index + 1..=snap_index + 10)
Expand Down
36 changes: 26 additions & 10 deletions components/raftstore-v2/tests/integrations/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ use causal_ts::CausalTsProviderImpl;
use collections::HashSet;
use concurrency_manager::ConcurrencyManager;
use crossbeam::channel::{self, Receiver, Sender, TrySendError};
use encryption_export::{data_key_manager_from_config, DataKeyImporter};
use engine_test::{
ctor::{CfOptions, DbOptions},
kv::{KvTestEngine, KvTestSnapshot, TestTabletFactory},
raft::RaftTestEngine,
};
use engine_traits::{TabletContext, TabletRegistry, DATA_CFS};
use engine_traits::{EncryptionKeyManager, TabletContext, TabletRegistry, DATA_CFS};
use futures::executor::block_on;
use kvproto::{
kvrpcpb::ApiVersion,
Expand Down Expand Up @@ -262,14 +263,12 @@ impl RunningState {
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>,
logger: &Logger,
) -> (TestRouter, Self) {
// TODO(tabokie): Enable encryption by default. (after snapshot encryption)
// let encryption_cfg = test_util::new_file_security_config(path);
// let key_manager = Some(Arc::new(
// data_key_manager_from_config(&encryption_cfg, path.to_str().unwrap())
// .unwrap()
// .unwrap(),
// ));
let key_manager = None;
let encryption_cfg = test_util::new_file_security_config(path);
let key_manager = Some(Arc::new(
data_key_manager_from_config(&encryption_cfg, path.to_str().unwrap())
.unwrap()
.unwrap(),
));

let mut opts = engine_test::ctor::RaftDbOptions::default();
opts.set_key_manager(key_manager.clone());
Expand Down Expand Up @@ -633,7 +632,24 @@ impl Cluster {
let gen_path = from_snap_mgr.tablet_gen_path(&key);
let recv_path = to_snap_mgr.final_recv_path(&key);
assert!(gen_path.exists());
std::fs::rename(gen_path, recv_path.clone()).unwrap();
if let Some(m) = from_snap_mgr.key_manager() {
let mut importer =
DataKeyImporter::new(to_snap_mgr.key_manager().as_deref().unwrap());
for e in walkdir::WalkDir::new(&gen_path).into_iter() {
let e = e.unwrap();
let new_path = recv_path.join(e.path().file_name().unwrap());
if let Some((iv, key)) =
m.get_file_internal(e.path().to_str().unwrap()).unwrap()
{
importer.add(new_path.to_str().unwrap(), iv, key).unwrap();
}
}
importer.commit().unwrap();
}
std::fs::rename(&gen_path, &recv_path).unwrap();
if let Some(m) = from_snap_mgr.key_manager() {
m.delete_file(gen_path.to_str().unwrap()).unwrap();
}
assert!(recv_path.exists());
}
regions.insert(msg.get_region_id());
Expand Down
4 changes: 2 additions & 2 deletions components/test_raftstore-v2/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub trait Simulator<EK: KvEngine> {
node_id: u64,
cfg: Config,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
key_mgr: Option<Arc<DataKeyManager>>,
key_manager: Option<Arc<DataKeyManager>>,
raft_engine: RaftTestEngine,
tablet_registry: TabletRegistry<EK>,
resource_manager: &Option<Arc<ResourceGroupManager>>,
Expand Down Expand Up @@ -535,6 +535,7 @@ impl<T: Simulator<EK>, EK: KvEngine> Cluster<T, EK> {
debug!("starting node {}", node_id);
let tablet_registry = self.tablet_registries[&node_id].clone();
let raft_engine = self.raft_engines[&node_id].clone();
let key_mgr = self.key_managers_map[&node_id].clone();
let cfg = self.cfg.clone();

// if let Some(labels) = self.labels.get(&node_id) {
Expand All @@ -556,7 +557,6 @@ impl<T: Simulator<EK>, EK: KvEngine> Cluster<T, EK> {
tikv_util::thread_group::set_properties(Some(props));

debug!("calling run node"; "node_id" => node_id);
let key_mgr = self.key_managers_map.get(&node_id).unwrap().clone();
self.sim.wl().run_node(
node_id,
cfg,
Expand Down
13 changes: 12 additions & 1 deletion components/test_raftstore-v2/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{fmt::Write, sync::Arc, thread, time::Duration};
use std::{fmt::Write, path::Path, sync::Arc, thread, time::Duration};

use encryption_export::{data_key_manager_from_config, DataKeyManager};
use engine_rocks::{RocksEngine, RocksStatistics};
Expand All @@ -9,6 +9,7 @@ use engine_traits::{CfName, KvEngine, TabletRegistry, CF_DEFAULT};
use file_system::IoRateLimiter;
use futures::future::BoxFuture;
use kvproto::{
encryptionpb::EncryptionMethod,
kvrpcpb::Context,
metapb,
raft_cmdpb::{RaftCmdRequest, RaftCmdResponse},
Expand Down Expand Up @@ -131,12 +132,22 @@ pub fn put_cf_till_size<T: Simulator<EK>, EK: KvEngine>(
key.into_bytes()
}

pub fn configure_for_encryption(config: &mut Config) {
let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR"));

let cfg = &mut config.security.encryption;
cfg.data_encryption_method = EncryptionMethod::Aes128Ctr;
cfg.data_key_rotation_period = ReadableDuration(Duration::from_millis(100));
cfg.master_key = test_util::new_test_file_master_key(manifest_dir);
}

pub fn configure_for_snapshot(config: &mut Config) {
// Truncate the log quickly so that we can force sending snapshot.
config.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
config.raft_store.raft_log_gc_count_limit = Some(2);
config.raft_store.merge_max_log_gap = 1;
config.raft_store.snap_mgr_gc_tick_interval = ReadableDuration::millis(50);
configure_for_encryption(config);
}

pub fn configure_for_lease_read_v2<T: Simulator<EK>, EK: KvEngine>(
Expand Down
2 changes: 1 addition & 1 deletion components/test_util/src/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn create_test_key_file(path: &str) {
.unwrap();
}

fn new_test_file_master_key(tmp: &Path) -> MasterKeyConfig {
pub fn new_test_file_master_key(tmp: &Path) -> MasterKeyConfig {
let key_path = tmp.join("test_key").to_str().unwrap().to_owned();
create_test_key_file(&key_path);
MasterKeyConfig::File {
Expand Down

0 comments on commit d0a1ed9

Please sign in to comment.