Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: introduce apply trace #13939

Merged
merged 13 commits into from Dec 17, 2022
2 changes: 1 addition & 1 deletion components/engine_panic/src/misc.rs
Expand Up @@ -5,7 +5,7 @@ use engine_traits::{DeleteStrategy, MiscExt, Range, Result};
use crate::engine::PanicEngine;

impl MiscExt for PanicEngine {
fn flush_cfs(&self, wait: bool) -> Result<()> {
fn flush_cfs(&self, cfs: &[&str], wait: bool) -> Result<()> {
panic!()
}

Expand Down
160 changes: 153 additions & 7 deletions components/engine_rocks/src/event_listener.rs
@@ -1,6 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{PersistenceListener, RaftEngine};
use engine_traits::PersistenceListener;
use file_system::{get_io_type, set_io_type, IoType};
use regex::Regex;
use rocksdb::{
Expand Down Expand Up @@ -179,34 +179,180 @@ fn resolve_sst_filename_from_err(err: &str) -> Option<String> {
Some(filename)
}

pub struct RocksPersistenceListener<ER>(PersistenceListener<ER>);
pub struct RocksPersistenceListener(PersistenceListener);

impl<ER> RocksPersistenceListener<ER> {
pub fn new(listener: PersistenceListener<ER>) -> RocksPersistenceListener<ER> {
impl RocksPersistenceListener {
pub fn new(listener: PersistenceListener) -> RocksPersistenceListener {
RocksPersistenceListener(listener)
}
}

impl<ER: RaftEngine> rocksdb::EventListener for RocksPersistenceListener<ER> {
impl rocksdb::EventListener for RocksPersistenceListener {
fn on_memtable_sealed(&self, info: &MemTableInfo) {
self.0
.on_memtable_sealed(info.cf_name().to_string(), info.first_seqno());
.on_memtable_sealed(info.cf_name().to_string(), info.earliest_seqno());
}

fn on_flush_completed(&self, job: &FlushJobInfo) {
self.0
.on_flush_completed(job.cf_name(), job.smallest_seqno());
.on_flush_completed(job.cf_name(), job.largest_seqno());
}
}

#[cfg(test)]
mod tests {
use std::sync::{
mpsc::{self, Sender},
Arc, Mutex,
};

use engine_traits::{
FlushProgress, FlushState, MiscExt, StateStorage, SyncMutable, CF_DEFAULT, DATA_CFS,
};
use tempfile::Builder;

use super::*;
use crate::{util, RocksCfOptions, RocksDbOptions};

#[test]
fn test_resolve_sst_filename() {
let err = "Corruption: Sst file size mismatch: /qps/data/tikv-10014/db/000398.sst. Size recorded in manifest 6975, actual size 6959";
let filename = resolve_sst_filename_from_err(err).unwrap();
assert_eq!(filename, "/000398.sst");
}

type Record = (u64, u64, FlushProgress);

#[derive(Default)]
struct MemStorage {
records: Mutex<Vec<Record>>,
}

impl StateStorage for MemStorage {
fn persist_progress(&self, region_id: u64, tablet_index: u64, pr: FlushProgress) {
self.records
.lock()
.unwrap()
.push((region_id, tablet_index, pr));
}
}

struct FlushTrack {
sealed: Mutex<Sender<()>>,
block_flush: Arc<Mutex<()>>,
}

impl rocksdb::EventListener for FlushTrack {
fn on_memtable_sealed(&self, _: &MemTableInfo) {
let _ = self.sealed.lock().unwrap().send(());
}

fn on_flush_begin(&self, _: &FlushJobInfo) {
drop(self.block_flush.lock().unwrap())
}
}

#[test]
fn test_persistence_listener() {
let temp_dir = Builder::new()
.prefix("test_persistence_listener")
.tempdir()
.unwrap();
let (region_id, tablet_index) = (2, 3);

let storage = Arc::new(MemStorage::default());
let state = Arc::new(FlushState::default());
let listener =
PersistenceListener::new(region_id, tablet_index, state.clone(), storage.clone());
let mut db_opt = RocksDbOptions::default();
db_opt.add_event_listener(RocksPersistenceListener::new(listener));
let (tx, rx) = mpsc::channel();
let block_flush = Arc::new(Mutex::new(()));
db_opt.add_event_listener(FlushTrack {
sealed: Mutex::new(tx),
block_flush: block_flush.clone(),
});

let mut cf_opts: Vec<_> = DATA_CFS
.iter()
.map(|cf| (*cf, RocksCfOptions::default()))
.collect();
cf_opts[0].1.set_max_write_buffer_number(4);
cf_opts[0].1.set_min_write_buffer_number_to_merge(2);
cf_opts[0].1.set_write_buffer_size(1024);
cf_opts[0].1.set_disable_auto_compactions(true);
let db = util::new_engine_opt(temp_dir.path().to_str().unwrap(), db_opt, cf_opts).unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
let sst_count = || {
std::fs::read_dir(temp_dir.path())
.unwrap()
.filter(|p| {
let p = match p {
Ok(p) => p,
Err(_) => return false,
};
p.path().extension().map_or(false, |ext| ext == "sst")
})
.count()
};
// Although flush is triggered, but there is nothing to flush.
assert_eq!(sst_count(), 0);
assert_eq!(storage.records.lock().unwrap().len(), 0);

// Flush one key should work.
state.set_applied_index(2);
db.put_cf(CF_DEFAULT, b"k0", b"v0").unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
assert_eq!(sst_count(), 1);
let record = storage.records.lock().unwrap().pop().unwrap();
assert_eq!(storage.records.lock().unwrap().len(), 0);
assert_eq!(record.0, region_id);
assert_eq!(record.1, tablet_index);
assert_eq!(record.2.applied_index(), 2);

// When puts and deletes are mixed, the puts may be deleted during flush.
state.set_applied_index(3);
db.put_cf(CF_DEFAULT, b"k0", b"v0").unwrap();
db.delete_cf(CF_DEFAULT, b"k0").unwrap();
db.delete_cf(CF_DEFAULT, b"k1").unwrap();
db.put_cf(CF_DEFAULT, b"k1", b"v1").unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
assert_eq!(sst_count(), 2);
let record = storage.records.lock().unwrap().pop().unwrap();
assert_eq!(storage.records.lock().unwrap().len(), 0);
assert_eq!(record.0, region_id);
assert_eq!(record.1, tablet_index);
assert_eq!(record.2.applied_index(), 3);
// Detail check of `FlushProgress` will be done in raftstore-v2 tests.

// Drain all the events.
while rx.try_recv().is_ok() {}
state.set_applied_index(4);
let block = block_flush.lock();
// Seal twice to trigger flush. Seal third to make a seqno conflict, in
// which case flush largest seqno will be equal to seal earliest seqno.
let mut key_count = 2;
for i in 0..3 {
while rx.try_recv().is_err() {
db.put(format!("k{key_count}").as_bytes(), &[0; 512])
.unwrap();
key_count += 1;
}
state.set_applied_index(5 + i);
}
drop(block);
// Memtable is seal before put, so there must be still one KV in memtable.
db.flush_cf(CF_DEFAULT, true).unwrap();
rx.try_recv().unwrap();
// There is 2 sst before this round, and then 4 are merged into 2, so there
// should be 4 ssts.
assert_eq!(sst_count(), 4);
let records = storage.records.lock().unwrap();
// Although it seals 4 times, but only create 2 SSTs, so only 2 records.
assert_eq!(records.len(), 2);
// The indexes of two merged flush state are 4 and 5, so merged value is 5.
assert_eq!(records[0].2.applied_index(), 5);
// The last two flush state is 6 and 7.
assert_eq!(records[1].2.applied_index(), 7);
}
}
4 changes: 2 additions & 2 deletions components/engine_rocks/src/file_system.rs
Expand Up @@ -82,13 +82,13 @@ mod tests {
db.put(&data_key(b"a1"), &value).unwrap();
db.put(&data_key(b"a2"), &value).unwrap();
assert_eq!(stats.fetch(IoType::Flush, IoOp::Write), 0);
db.flush_cfs(true /* wait */).unwrap();
db.flush_cfs(&[], true /* wait */).unwrap();
assert!(stats.fetch(IoType::Flush, IoOp::Write) > value_size * 2);
assert!(stats.fetch(IoType::Flush, IoOp::Write) < value_size * 2 + amplification_bytes);
stats.reset();
db.put(&data_key(b"a2"), &value).unwrap();
db.put(&data_key(b"a3"), &value).unwrap();
db.flush_cfs(true /* wait */).unwrap();
db.flush_cfs(&[], true /* wait */).unwrap();
assert!(stats.fetch(IoType::Flush, IoOp::Write) > value_size * 2);
assert!(stats.fetch(IoType::Flush, IoOp::Write) < value_size * 2 + amplification_bytes);
stats.reset();
Expand Down
9 changes: 7 additions & 2 deletions components/engine_rocks/src/misc.rs
Expand Up @@ -126,11 +126,16 @@ impl RocksEngine {
}

impl MiscExt for RocksEngine {
fn flush_cfs(&self, wait: bool) -> Result<()> {
fn flush_cfs(&self, cfs: &[&str], wait: bool) -> Result<()> {
let mut handles = vec![];
for cf in self.cf_names() {
for cf in cfs {
handles.push(util::get_cf_handle(self.as_inner(), cf)?);
}
if handles.is_empty() {
for cf in self.cf_names() {
handles.push(util::get_cf_handle(self.as_inner(), cf)?);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: maybe it's cleaner to use a temp cfs to cover both cases.
something like:
let cfs = cfs.is_empty()? &self.cf_names() : cfs;
for cf in cfs .....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf_names will allocate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf_names will allocate.
if !cfs.is_empty(), self.cf_names() won't be called? it's a minor issue anyway.

Copy link
Member Author

@BusyJay BusyJay Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf_names will return a Vec, so you can't put it inside if branch. Instead, it needs to allocate first, and then choose what to borrow.

self.as_inner().flush_cfs(&handles, wait).map_err(r2e)
}

Expand Down
65 changes: 60 additions & 5 deletions components/engine_test/src/lib.rs
Expand Up @@ -119,9 +119,10 @@ pub mod kv {
}

impl TabletFactory<KvTestEngine> for TestTabletFactory {
fn open_tablet(&self, _ctx: TabletContext, path: &Path) -> Result<KvTestEngine> {
KvTestEngine::new_kv_engine_opt(
fn open_tablet(&self, ctx: TabletContext, path: &Path) -> Result<KvTestEngine> {
KvTestEngine::new_tablet(
path.to_str().unwrap(),
ctx,
self.db_opt.clone(),
self.cf_opts.clone(),
)
Expand Down Expand Up @@ -155,7 +156,7 @@ pub mod ctor {
use std::sync::Arc;

use encryption::DataKeyManager;
use engine_traits::Result;
use engine_traits::{Result, StateStorage, TabletContext};
use file_system::IoRateLimiter;

/// Kv engine construction
Expand Down Expand Up @@ -188,6 +189,14 @@ pub mod ctor {
db_opt: DbOptions,
cf_opts: Vec<(&str, CfOptions)>,
) -> Result<Self>;

/// Create a new engine specific for multi rocks.
fn new_tablet(
path: &str,
ctx: TabletContext,
db_opt: DbOptions,
cf_opts: Vec<(&str, CfOptions)>,
) -> Result<Self>;
}

/// Raft engine construction
Expand All @@ -200,6 +209,7 @@ pub mod ctor {
pub struct DbOptions {
key_manager: Option<Arc<DataKeyManager>>,
rate_limiter: Option<Arc<IoRateLimiter>>,
state_storage: Option<Arc<dyn StateStorage>>,
enable_multi_batch_write: bool,
}

Expand All @@ -212,6 +222,10 @@ pub mod ctor {
self.rate_limiter = rate_limiter;
}

pub fn set_state_storage(&mut self, state_storage: Arc<dyn StateStorage>) {
self.state_storage = Some(state_storage);
}

pub fn set_enable_multi_batch_write(&mut self, enable: bool) {
self.enable_multi_batch_write = enable;
}
Expand Down Expand Up @@ -329,6 +343,15 @@ pub mod ctor {
) -> Result<Self> {
Ok(PanicEngine)
}

fn new_tablet(
_path: &str,
_ctx: engine_traits::TabletContext,
_db_opt: DbOptions,
_cf_opts: Vec<(&str, CfOptions)>,
) -> Result<Self> {
Ok(PanicEngine)
}
}

impl RaftEngineConstructorExt for engine_panic::PanicEngine {
Expand All @@ -343,9 +366,11 @@ pub mod ctor {
get_env,
properties::{MvccPropertiesCollectorFactory, RangePropertiesCollectorFactory},
util::new_engine_opt as rocks_new_engine_opt,
RocksCfOptions, RocksDbOptions,
RocksCfOptions, RocksDbOptions, RocksPersistenceListener,
};
use engine_traits::{
CfOptions as _, PersistenceListener, Result, TabletContext, CF_DEFAULT,
};
use engine_traits::{CfOptions as _, Result, CF_DEFAULT};

use super::{
CfOptions, DbOptions, KvEngineConstructorExt, RaftDbOptions, RaftEngineConstructorExt,
Expand Down Expand Up @@ -376,6 +401,36 @@ pub mod ctor {
.collect();
rocks_new_engine_opt(path, rocks_db_opts, rocks_cfs_opts)
}

fn new_tablet(
path: &str,
ctx: TabletContext,
db_opt: DbOptions,
cf_opts: Vec<(&str, CfOptions)>,
) -> Result<Self> {
let mut rocks_db_opts = RocksDbOptions::default();
let env = get_env(db_opt.key_manager.clone(), db_opt.rate_limiter)?;
rocks_db_opts.set_env(env);
rocks_db_opts.enable_unordered_write(false);
rocks_db_opts.enable_pipelined_write(false);
rocks_db_opts.enable_multi_batch_write(false);
rocks_db_opts.allow_concurrent_memtable_write(false);
if let Some(storage) = db_opt.state_storage
&& let Some(flush_state) = ctx.flush_state {
tabokie marked this conversation as resolved.
Show resolved Hide resolved
let listener = PersistenceListener::new(
ctx.id,
ctx.suffix.unwrap(),
flush_state,
storage,
);
rocks_db_opts.add_event_listener(RocksPersistenceListener::new(listener));
}
let rocks_cfs_opts = cf_opts
.iter()
.map(|(name, opt)| (*name, get_rocks_cf_opts(opt)))
.collect();
rocks_new_engine_opt(path, rocks_db_opts, rocks_cfs_opts)
}
}

impl RaftEngineConstructorExt for engine_rocks::RocksEngine {
Expand Down
1 change: 1 addition & 0 deletions components/engine_traits/src/cf_defs.rs
Expand Up @@ -9,6 +9,7 @@ pub const CF_RAFT: CfName = "raft";
pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];
pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT];
pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];
pub const DATA_CFS_LEN: usize = DATA_CFS.len();

pub fn name_to_cf(name: &str) -> Option<CfName> {
if name.is_empty() {
Expand Down