Skip to content

Commit

Permalink
engine: provide customized file builder to raft engine (#10937)
Browse files Browse the repository at this point in the history
* open raft engine with customized file builder

ref #11119

Signed-off-by: tabokie <xy.tao@outlook.com>

* overhaul configurations

Signed-off-by: tabokie <xy.tao@outlook.com>

* update default config and address comment

Signed-off-by: tabokie <xy.tao@outlook.com>

Co-authored-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
tabokie and hicqu committed Nov 25, 2021
1 parent 7fbee56 commit 25109a0
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 117 deletions.
149 changes: 102 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cmd/tikv-ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn new_debug_executor(

let cache = cfg.storage.block_cache.build_shared_cache();
let shared_block_cache = cache.is_some();
let env = get_env(key_manager, None /*io_rate_limiter*/).unwrap();
let env = get_env(key_manager.clone(), None /*io_rate_limiter*/).unwrap();

let mut kv_db_opts = cfg.rocksdb.build_opt();
kv_db_opts.set_env(env.clone());
Expand Down Expand Up @@ -154,7 +154,7 @@ fn new_debug_executor(
error!("raft engine not exists: {}", config.dir);
process::exit(-1);
}
let raft_db = RaftLogEngine::new(config).unwrap();
let raft_db = RaftLogEngine::new(config, key_manager, None /*io_rate_limiter*/).unwrap();
let debugger = Debugger::new(Engines::new(kv_db, raft_db), cfg_controller);
Box::new(debugger) as Box<dyn DebugExecutor>
}
Expand Down
15 changes: 10 additions & 5 deletions components/encryption/src/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::io::{self, Error as IoError, ErrorKind, Result as IoResult};
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
use std::path::{Path, PathBuf};
use std::sync::{atomic::AtomicU64, atomic::Ordering, Arc, Mutex};
use std::thread::JoinHandle;
Expand Down Expand Up @@ -567,21 +567,26 @@ impl DataKeyManager {

pub fn create_file_for_write<P: AsRef<Path>>(&self, path: P) -> Result<EncrypterWriter<File>> {
let file_writer = File::create(&path)?;
self.create_file_with_writer(path, file_writer)
self.open_file_with_writer(path, file_writer, true /*create*/)
}

pub fn create_file_with_writer<P: AsRef<Path>, W: std::io::Write>(
pub fn open_file_with_writer<P: AsRef<Path>, W: std::io::Write>(
&self,
path: P,
writer: W,
create: bool,
) -> Result<EncrypterWriter<W>> {
let fname = path.as_ref().to_str().ok_or_else(|| {
Error::Other(box_err!(
"failed to convert path to string {:?}",
path.as_ref()
))
})?;
let file = self.new_file(fname)?;
let file = if create {
self.new_file(fname)?
} else {
self.get_file(fname)?
};
EncrypterWriter::new(
writer,
crypter::encryption_method_from_db_encryption_method(file.method),
Expand All @@ -595,7 +600,7 @@ impl DataKeyManager {
self.open_file_with_reader(path, file_reader)
}

pub fn open_file_with_reader<P: AsRef<Path>, R: io::Read + io::Seek>(
pub fn open_file_with_reader<P: AsRef<Path>, R>(
&self,
path: P,
reader: R,
Expand Down
2 changes: 2 additions & 0 deletions components/raft_log_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ edition = "2018"
[dependencies]
engine_traits = { path = "../engine_traits", default-features = false }
tikv_util = { path = "../tikv_util", default-features = false }
encryption = { path = "../encryption" }
file_system = { path = "../file_system" }
num_cpus = "1"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
Expand Down
170 changes: 151 additions & 19 deletions components/raft_log_engine/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::fs;
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::Arc;

use encryption::{DataKeyManager, DecrypterReader, EncrypterWriter};
use engine_traits::{
CacheStats, RaftEngine, RaftEngineReadOnly, RaftLogBatch as RaftLogBatchTrait, Result,
};
use file_system::{IOOp, IORateLimiter, IOType};
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use raft_engine::{
Command, Error as RaftEngineError, LogBatch, MessageExt, RaftLogEngine as RawRaftEngine,
Command, Engine as RawRaftEngine, Error as RaftEngineError, FileBuilder, LogBatch, MessageExt,
};
use tikv_util::Either;

pub use raft_engine::{Config as RaftEngineConfig, RecoveryMode};

Expand All @@ -25,14 +30,133 @@ impl MessageExt for MessageExtTyped {
}
}

struct ManagedReader<R: Seek + Read> {
inner: Either<R, DecrypterReader<R>>,
rate_limiter: Option<Arc<IORateLimiter>>,
}

impl<R: Seek + Read> Seek for ManagedReader<R> {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
match self.inner.as_mut() {
Either::Left(reader) => reader.seek(pos),
Either::Right(reader) => reader.seek(pos),
}
}
}

impl<R: Seek + Read> Read for ManagedReader<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let mut size = buf.len();
if let Some(ref mut limiter) = self.rate_limiter {
size = limiter.request(IOType::ForegroundRead, IOOp::Read, size);
}
match self.inner.as_mut() {
Either::Left(reader) => reader.read(&mut buf[..size]),
Either::Right(reader) => reader.read(&mut buf[..size]),
}
}
}

struct ManagedWriter<W: Seek + Write> {
inner: Either<W, EncrypterWriter<W>>,
rate_limiter: Option<Arc<IORateLimiter>>,
}

impl<W: Seek + Write> Seek for ManagedWriter<W> {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
match self.inner.as_mut() {
Either::Left(writer) => writer.seek(pos),
Either::Right(writer) => writer.seek(pos),
}
}
}

impl<W: Seek + Write> Write for ManagedWriter<W> {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
let mut size = buf.len();
if let Some(ref mut limiter) = self.rate_limiter {
size = limiter.request(IOType::ForegroundWrite, IOOp::Write, size);
}
match self.inner.as_mut() {
Either::Left(writer) => writer.write(&buf[..size]),
Either::Right(writer) => writer.write(&buf[..size]),
}
}

fn flush(&mut self) -> IoResult<()> {
Ok(())
}
}

struct ManagedFileBuilder {
key_manager: Option<Arc<DataKeyManager>>,
rate_limiter: Option<Arc<IORateLimiter>>,
}

impl ManagedFileBuilder {
fn new(
key_manager: Option<Arc<DataKeyManager>>,
rate_limiter: Option<Arc<IORateLimiter>>,
) -> Self {
Self {
key_manager,
rate_limiter,
}
}
}

impl FileBuilder for ManagedFileBuilder {
type Reader<R: Seek + Read + Send> = ManagedReader<R>;
type Writer<W: Seek + Write + Send> = ManagedWriter<W>;

fn build_reader<R>(&self, path: &Path, reader: R) -> IoResult<Self::Reader<R>>
where
R: Seek + Read + Send,
{
if let Some(ref key_manager) = self.key_manager {
Ok(ManagedReader {
inner: Either::Right(key_manager.open_file_with_reader(path, reader)?),
rate_limiter: self.rate_limiter.clone(),
})
} else {
Ok(ManagedReader {
inner: Either::Left(reader),
rate_limiter: self.rate_limiter.clone(),
})
}
}

fn build_writer<W>(&self, path: &Path, writer: W, create: bool) -> IoResult<Self::Writer<W>>
where
W: Seek + Write + Send,
{
if let Some(ref key_manager) = self.key_manager {
Ok(ManagedWriter {
inner: Either::Right(key_manager.open_file_with_writer(path, writer, create)?),
rate_limiter: self.rate_limiter.clone(),
})
} else {
Ok(ManagedWriter {
inner: Either::Left(writer),
rate_limiter: self.rate_limiter.clone(),
})
}
}
}

#[derive(Clone)]
pub struct RaftLogEngine(RawRaftEngine<MessageExtTyped>);
pub struct RaftLogEngine(Arc<RawRaftEngine<ManagedFileBuilder>>);

impl RaftLogEngine {
pub fn new(config: RaftEngineConfig) -> Result<Self> {
Ok(RaftLogEngine(
RawRaftEngine::open(config).map_err(transfer_error)?,
))
pub fn new(
config: RaftEngineConfig,
key_manager: Option<Arc<DataKeyManager>>,
rate_limiter: Option<Arc<IORateLimiter>>,
) -> Result<Self> {
let file_builder = Arc::new(ManagedFileBuilder::new(key_manager, rate_limiter));
Ok(RaftLogEngine(Arc::new(
RawRaftEngine::open_with_file_builder(config, file_builder).map_err(transfer_error)?,
)))
}

/// If path is not an empty directory, we say db exists.
Expand All @@ -58,14 +182,15 @@ impl RaftLogEngine {
}

#[derive(Default)]
pub struct RaftLogBatch(LogBatch<MessageExtTyped>);
pub struct RaftLogBatch(LogBatch);

const RAFT_LOG_STATE_KEY: &[u8] = b"R";

impl RaftLogBatchTrait for RaftLogBatch {
fn append(&mut self, raft_group_id: u64, entries: Vec<Entry>) -> Result<()> {
self.0.add_entries(raft_group_id, entries);
Ok(())
self.0
.add_entries::<MessageExtTyped>(raft_group_id, &entries)
.map_err(transfer_error)
}

fn cut_logs(&mut self, _: u64, _: u64, _: u64) {
Expand All @@ -86,8 +211,8 @@ impl RaftLogBatchTrait for RaftLogBatch {
self.0.is_empty()
}

fn merge(&mut self, src: Self) {
self.0.merge(src.0);
fn merge(&mut self, mut src: Self) {
self.0.merge(&mut src.0);
}
}

Expand All @@ -100,7 +225,7 @@ impl RaftEngineReadOnly for RaftLogEngine {

fn get_entry(&self, raft_group_id: u64, index: u64) -> Result<Option<Entry>> {
self.0
.get_entry(raft_group_id, index)
.get_entry::<MessageExtTyped>(raft_group_id, index)
.map_err(transfer_error)
}

Expand All @@ -113,7 +238,7 @@ impl RaftEngineReadOnly for RaftLogEngine {
to: &mut Vec<Entry>,
) -> Result<usize> {
self.0
.fetch_entries_to(raft_group_id, begin, end, max_size, to)
.fetch_entries_to::<MessageExtTyped>(raft_group_id, begin, end, max_size, to)
.map_err(transfer_error)
}
}
Expand Down Expand Up @@ -155,14 +280,21 @@ impl RaftEngine for RaftLogEngine {

fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
let mut batch = Self::LogBatch::default();
batch.0.add_entries(raft_group_id, entries);
batch
.0
.add_entries::<MessageExtTyped>(raft_group_id, &entries)
.map_err(transfer_error)?;
self.0.write(&mut batch.0, false).map_err(transfer_error)
}

fn put_raft_state(&self, raft_group_id: u64, state: &RaftLocalState) -> Result<()> {
self.0
.put_message(raft_group_id, RAFT_LOG_STATE_KEY, state)
.map_err(transfer_error)
let mut batch = Self::LogBatch::default();
batch
.0
.put_message(raft_group_id, RAFT_LOG_STATE_KEY.to_vec(), state)
.map_err(transfer_error)?;
self.0.write(&mut batch.0, false).map_err(transfer_error)?;
Ok(())
}

fn gc(&self, raft_group_id: u64, _from: u64, to: u64) -> Result<usize> {
Expand Down Expand Up @@ -199,8 +331,8 @@ impl RaftEngine for RaftLogEngine {

fn transfer_error(e: RaftEngineError) -> engine_traits::Error {
match e {
RaftEngineError::StorageCompacted => engine_traits::Error::EntriesCompacted,
RaftEngineError::StorageUnavailable => engine_traits::Error::EntriesUnavailable,
RaftEngineError::EntryCompacted => engine_traits::Error::EntriesCompacted,
RaftEngineError::EntryNotFound => engine_traits::Error::EntriesUnavailable,
e => {
let e = box_err!(e);
engine_traits::Error::Other(e)
Expand Down
1 change: 1 addition & 0 deletions components/raft_log_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Please read the engine_trait crate docs before hacking.

#![cfg_attr(test, feature(test))]
#![feature(generic_associated_types)]

#[macro_use]
extern crate tikv_util;
Expand Down
Loading

0 comments on commit 25109a0

Please sign in to comment.