Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: introduce apply trace #13939

Merged
merged 13 commits into from Dec 17, 2022
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -338,7 +338,7 @@ pd_client = { path = "components/pd_client" }
profiler = { path = "components/profiler" }
raft_log_engine = { path = "components/raft_log_engine" }
raftstore = { path = "components/raftstore", default-features = false }
raftstore_v2 = { path = "components/raftstore-v2", default-features = false }
raftstore-v2 = { path = "components/raftstore-v2", default-features = false }
resolved_ts = { path = "components/resolved_ts" }
resource_metering = { path = "components/resource_metering" }
security = { path = "components/security" }
Expand Down
2 changes: 1 addition & 1 deletion components/engine_panic/src/misc.rs
Expand Up @@ -5,7 +5,7 @@ use engine_traits::{DeleteStrategy, MiscExt, Range, Result};
use crate::engine::PanicEngine;

impl MiscExt for PanicEngine {
fn flush_cfs(&self, wait: bool) -> Result<()> {
fn flush_cfs(&self, cfs: &[&str], wait: bool) -> Result<()> {
panic!()
}

Expand Down
14 changes: 1 addition & 13 deletions components/engine_rocks/src/engine.rs
Expand Up @@ -2,9 +2,7 @@

use std::{any::Any, sync::Arc};

use engine_traits::{
FlushState, IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable,
};
use engine_traits::{IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable};
use rocksdb::{DBIterator, Writable, DB};

use crate::{
Expand All @@ -26,7 +24,6 @@ use crate::{
pub struct RocksEngine {
db: Arc<DB>,
support_multi_batch_write: bool,
flush_state: Option<Arc<FlushState>>,
}

impl RocksEngine {
Expand All @@ -38,7 +35,6 @@ impl RocksEngine {
RocksEngine {
db: db.clone(),
support_multi_batch_write: db.get_db_options().is_enable_multi_batch_write(),
flush_state: None,
}
}

Expand All @@ -53,14 +49,6 @@ impl RocksEngine {
pub fn support_multi_batch_write(&self) -> bool {
self.support_multi_batch_write
}

pub fn set_flush_state(&mut self, flush_state: Arc<FlushState>) {
self.flush_state = Some(flush_state);
}

pub fn flush_state(&self) -> Option<Arc<FlushState>> {
self.flush_state.clone()
}
}

impl KvEngine for RocksEngine {
Expand Down
160 changes: 153 additions & 7 deletions components/engine_rocks/src/event_listener.rs
@@ -1,6 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{PersistenceListener, RaftEngine};
use engine_traits::PersistenceListener;
use file_system::{get_io_type, set_io_type, IoType};
use regex::Regex;
use rocksdb::{
Expand Down Expand Up @@ -179,34 +179,180 @@ fn resolve_sst_filename_from_err(err: &str) -> Option<String> {
Some(filename)
}

pub struct RocksPersistenceListener<ER>(PersistenceListener<ER>);
pub struct RocksPersistenceListener(PersistenceListener);

impl<ER> RocksPersistenceListener<ER> {
pub fn new(listener: PersistenceListener<ER>) -> RocksPersistenceListener<ER> {
impl RocksPersistenceListener {
pub fn new(listener: PersistenceListener) -> RocksPersistenceListener {
RocksPersistenceListener(listener)
}
}

impl<ER: RaftEngine> rocksdb::EventListener for RocksPersistenceListener<ER> {
impl rocksdb::EventListener for RocksPersistenceListener {
fn on_memtable_sealed(&self, info: &MemTableInfo) {
self.0
.on_memtable_sealed(info.cf_name().to_string(), info.first_seqno());
.on_memtable_sealed(info.cf_name().to_string(), info.earliest_seqno());
}

fn on_flush_completed(&self, job: &FlushJobInfo) {
self.0
.on_flush_completed(job.cf_name(), job.smallest_seqno());
.on_flush_completed(job.cf_name(), job.largest_seqno());
}
}

#[cfg(test)]
mod tests {
use std::sync::{
mpsc::{self, Sender},
Arc, Mutex,
};

use engine_traits::{
FlushProgress, FlushState, MiscExt, StateStorage, SyncMutable, CF_DEFAULT, DATA_CFS,
};
use tempfile::Builder;

use super::*;
use crate::{util, RocksCfOptions, RocksDbOptions};

#[test]
fn test_resolve_sst_filename() {
let err = "Corruption: Sst file size mismatch: /qps/data/tikv-10014/db/000398.sst. Size recorded in manifest 6975, actual size 6959";
let filename = resolve_sst_filename_from_err(err).unwrap();
assert_eq!(filename, "/000398.sst");
}

type Record = (u64, u64, FlushProgress);

#[derive(Default)]
struct MemStorage {
records: Mutex<Vec<Record>>,
}

impl StateStorage for MemStorage {
fn persist_progress(&self, region_id: u64, tablet_index: u64, pr: FlushProgress) {
self.records
.lock()
.unwrap()
.push((region_id, tablet_index, pr));
}
}

struct FlushTrack {
sealed: Mutex<Sender<()>>,
block_flush: Arc<Mutex<()>>,
}

impl rocksdb::EventListener for FlushTrack {
fn on_memtable_sealed(&self, _: &MemTableInfo) {
let _ = self.sealed.lock().unwrap().send(());
}

fn on_flush_begin(&self, _: &FlushJobInfo) {
drop(self.block_flush.lock().unwrap())
}
}

#[test]
fn test_persistence_listener() {
let temp_dir = Builder::new()
.prefix("test_persistence_listener")
.tempdir()
.unwrap();
let (region_id, tablet_index) = (2, 3);

let storage = Arc::new(MemStorage::default());
let state = Arc::new(FlushState::default());
let listener =
PersistenceListener::new(region_id, tablet_index, state.clone(), storage.clone());
let mut db_opt = RocksDbOptions::default();
db_opt.add_event_listener(RocksPersistenceListener::new(listener));
let (tx, rx) = mpsc::channel();
let block_flush = Arc::new(Mutex::new(()));
db_opt.add_event_listener(FlushTrack {
sealed: Mutex::new(tx),
block_flush: block_flush.clone(),
});

let mut cf_opts: Vec<_> = DATA_CFS
.iter()
.map(|cf| (*cf, RocksCfOptions::default()))
.collect();
cf_opts[0].1.set_max_write_buffer_number(4);
cf_opts[0].1.set_min_write_buffer_number_to_merge(2);
cf_opts[0].1.set_write_buffer_size(1024);
cf_opts[0].1.set_disable_auto_compactions(true);
let db = util::new_engine_opt(temp_dir.path().to_str().unwrap(), db_opt, cf_opts).unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
let sst_count = || {
std::fs::read_dir(temp_dir.path())
.unwrap()
.filter(|p| {
let p = match p {
Ok(p) => p,
Err(_) => return false,
};
p.path().extension().map_or(false, |ext| ext == "sst")
})
.count()
};
// Although flush is triggered, but there is nothing to flush.
assert_eq!(sst_count(), 0);
assert_eq!(storage.records.lock().unwrap().len(), 0);

// Flush one key should work.
state.set_applied_index(2);
db.put_cf(CF_DEFAULT, b"k0", b"v0").unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
assert_eq!(sst_count(), 1);
let record = storage.records.lock().unwrap().pop().unwrap();
assert_eq!(storage.records.lock().unwrap().len(), 0);
assert_eq!(record.0, region_id);
assert_eq!(record.1, tablet_index);
assert_eq!(record.2.applied_index(), 2);

// When puts and deletes are mixed, the puts may be deleted during flush.
state.set_applied_index(3);
db.put_cf(CF_DEFAULT, b"k0", b"v0").unwrap();
db.delete_cf(CF_DEFAULT, b"k0").unwrap();
db.delete_cf(CF_DEFAULT, b"k1").unwrap();
db.put_cf(CF_DEFAULT, b"k1", b"v1").unwrap();
db.flush_cf(CF_DEFAULT, true).unwrap();
assert_eq!(sst_count(), 2);
let record = storage.records.lock().unwrap().pop().unwrap();
assert_eq!(storage.records.lock().unwrap().len(), 0);
assert_eq!(record.0, region_id);
assert_eq!(record.1, tablet_index);
assert_eq!(record.2.applied_index(), 3);
// Detail check of `FlushProgress` will be done in raftstore-v2 tests.

// Drain all the events.
while rx.try_recv().is_ok() {}
state.set_applied_index(4);
let block = block_flush.lock();
// Seal twice to trigger flush. Seal third to make a seqno conflict, in
// which case flush largest seqno will be equal to seal earliest seqno.
let mut key_count = 2;
for i in 0..3 {
while rx.try_recv().is_err() {
db.put(format!("k{key_count}").as_bytes(), &[0; 512])
.unwrap();
key_count += 1;
}
state.set_applied_index(5 + i);
}
drop(block);
// Memtable is seal before put, so there must be still one KV in memtable.
db.flush_cf(CF_DEFAULT, true).unwrap();
rx.try_recv().unwrap();
// There is 2 sst before this round, and then 4 are merged into 2, so there
// should be 4 ssts.
assert_eq!(sst_count(), 4);
let records = storage.records.lock().unwrap();
// Although it seals 4 times, but only create 2 SSTs, so only 2 records.
assert_eq!(records.len(), 2);
// The indexes of two merged flush state are 4 and 5, so merged value is 5.
assert_eq!(records[0].2.applied_index(), 5);
// The last two flush state is 6 and 7.
assert_eq!(records[1].2.applied_index(), 7);
}
}
4 changes: 2 additions & 2 deletions components/engine_rocks/src/file_system.rs
Expand Up @@ -82,13 +82,13 @@ mod tests {
db.put(&data_key(b"a1"), &value).unwrap();
db.put(&data_key(b"a2"), &value).unwrap();
assert_eq!(stats.fetch(IoType::Flush, IoOp::Write), 0);
db.flush_cfs(true /* wait */).unwrap();
db.flush_cfs(&[], true /* wait */).unwrap();
assert!(stats.fetch(IoType::Flush, IoOp::Write) > value_size * 2);
assert!(stats.fetch(IoType::Flush, IoOp::Write) < value_size * 2 + amplification_bytes);
stats.reset();
db.put(&data_key(b"a2"), &value).unwrap();
db.put(&data_key(b"a3"), &value).unwrap();
db.flush_cfs(true /* wait */).unwrap();
db.flush_cfs(&[], true /* wait */).unwrap();
assert!(stats.fetch(IoType::Flush, IoOp::Write) > value_size * 2);
assert!(stats.fetch(IoType::Flush, IoOp::Write) < value_size * 2 + amplification_bytes);
stats.reset();
Expand Down
9 changes: 7 additions & 2 deletions components/engine_rocks/src/misc.rs
Expand Up @@ -126,11 +126,16 @@ impl RocksEngine {
}

impl MiscExt for RocksEngine {
fn flush_cfs(&self, wait: bool) -> Result<()> {
fn flush_cfs(&self, cfs: &[&str], wait: bool) -> Result<()> {
let mut handles = vec![];
for cf in self.cf_names() {
for cf in cfs {
handles.push(util::get_cf_handle(self.as_inner(), cf)?);
}
if handles.is_empty() {
for cf in self.cf_names() {
handles.push(util::get_cf_handle(self.as_inner(), cf)?);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: maybe it's cleaner to use a temp cfs to cover both cases.
something like:
let cfs = cfs.is_empty()? &self.cf_names() : cfs;
for cf in cfs .....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf_names will allocate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf_names will allocate.
if !cfs.is_empty(), self.cf_names() won't be called? it's a minor issue anyway.

Copy link
Member Author

@BusyJay BusyJay Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf_names will return a Vec, so you can't put it inside if branch. Instead, it needs to allocate first, and then choose what to borrow.

self.as_inner().flush_cfs(&handles, wait).map_err(r2e)
}

Expand Down