Skip to content

Commit

Permalink
*: make gc not write db directly
Browse files Browse the repository at this point in the history
We rely on non-concurrent memtable write for dynamic regions to achieve
best performance. This PR makes sure writes of compaction filter be
redirected to apply thread when dynamic regions is enabled.

The solution may miss data if TiKV crashes before writes are flushed to
disk. Note even for v1, it's also possible to leave garbage if writes to
rocksdb fail. We need to scan default CFs and check for orphan versions.

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Dec 22, 2022
1 parent a499caf commit b673f5a
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 77 deletions.
1 change: 1 addition & 0 deletions components/raftstore-v2/src/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> {
// TODO: flush by buffer size.
ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await,
ApplyTask::Snapshot(snap_task) => self.apply.schedule_gen_snapshot(snap_task),
ApplyTask::UnsafeWrite(raw_write) => self.apply.apply_unsafe_write(raw_write),
}

// TODO: yield after some time.
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
write.ch,
);
}
PeerMsg::UnsafeWrite(write) => {
self.on_receive_command(write.send_time);
self.fsm
.peer_mut()
.on_unsafe_write(self.store_ctx, write.data);
}
PeerMsg::Tick(tick) => self.on_tick(tick),
PeerMsg::ApplyRes(res) => self.fsm.peer.on_apply_res(self.store_ctx, res),
PeerMsg::SplitInit(msg) => self.fsm.peer.on_split_init(self.store_ctx, msg),
Expand Down
26 changes: 26 additions & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,32 @@ impl<EK: KvEngine, R> Apply<EK, R> {
}

impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
pub fn apply_unsafe_write(&mut self, data: Box<[u8]>) {
let decoder = match SimpleWriteReqDecoder::new(&self.logger, &data, u64::MAX, u64::MAX) {
Ok(decoder) => decoder,
Err(req) => unreachable!("unexpected request: {:?}", req),
};
for req in decoder {
match req {
SimpleWrite::Put(put) => {
let _ = self.apply_put(put.cf, u64::MAX, put.key, put.value);
}
SimpleWrite::Delete(delete) => {
let _ = self.apply_delete(delete.cf, u64::MAX, delete.key);
}
SimpleWrite::DeleteRange(dr) => {
let _ = self.apply_delete_range(
dr.cf,
u64::MAX,
dr.start_key,
dr.end_key,
dr.notify_only,
);
}
}
}
}

#[inline]
pub async fn apply_committed_entries(&mut self, ce: CommittedEntries) {
fail::fail_point!("APPLY_COMMITTED_ENTRIES");
Expand Down
33 changes: 30 additions & 3 deletions components/raftstore-v2/src/operation/command/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
batch::StoreContext,
operation::cf_offset,
raft::{Apply, Peer},
router::CmdResChannel,
router::{ApplyTask, CmdResChannel},
};

mod simple_write;
Expand Down Expand Up @@ -71,6 +71,29 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
self.simple_write_encoder_mut().replace(encoder);
}

#[inline]
pub fn on_unsafe_write<T>(
&mut self,
ctx: &mut StoreContext<EK, ER, T>,
data: SimpleWriteBinary,
) {
if !self.serving() {
return;
}
let bin = SimpleWriteReqEncoder::new(
Box::<RaftRequestHeader>::default(),
data,
ctx.cfg.raft_entry_max_size.0 as usize,
false,
)
.encode()
.0
.into_boxed_slice();
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::UnsafeWrite(bin));
}
}

pub fn propose_pending_writes<T>(&mut self, ctx: &mut StoreContext<EK, ER, T>) {
if let Some(encoder) = self.simple_write_encoder_mut().take() {
let call_proposed_on_success = if encoder.notify_proposed() {
Expand Down Expand Up @@ -140,7 +163,9 @@ impl<EK: KvEngine, R> Apply<EK, R> {
"aborted by failpoint".into()
)));
self.metrics.size_diff_hint += (self.key_buffer.len() + value.len()) as i64;
self.modifications_mut()[off] = index;
if index != u64::MAX {
self.modifications_mut()[off] = index;
}
Ok(())
}

Expand Down Expand Up @@ -171,7 +196,9 @@ impl<EK: KvEngine, R> Apply<EK, R> {
);
});
self.metrics.size_diff_hint -= self.key_buffer.len() as i64;
self.modifications_mut()[off] = index;
if index != u64::MAX {
self.modifications_mut()[off] = index;
}
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/router/internal_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::operation::{AdminCmdResult, CommittedEntries, DataTrace, GenSnapTask}
pub enum ApplyTask {
CommittedEntries(CommittedEntries),
Snapshot(GenSnapTask),
/// Writes that doesn't care consistency.
UnsafeWrite(Box<[u8]>),
}

#[derive(Debug, Default)]
Expand Down
14 changes: 14 additions & 0 deletions components/raftstore-v2/src/router/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub struct SimpleWrite {
pub ch: CmdResChannel,
}

#[derive(Debug)]
pub struct UnsafeWrite {
pub send_time: Instant,
pub data: SimpleWriteBinary,
}

/// Message that can be sent to a peer.
#[derive(Debug)]
pub enum PeerMsg {
Expand All @@ -132,6 +138,7 @@ pub enum PeerMsg {
/// Command changes the inernal states. It will be transformed into logs and
/// applied on all replicas.
SimpleWrite(SimpleWrite),
UnsafeWrite(UnsafeWrite),
/// Command that contains admin requests.
AdminCommand(RaftRequest<CmdResChannel>),
/// Tick is periodical task. If target peer doesn't exist there is a
Expand Down Expand Up @@ -206,6 +213,13 @@ impl PeerMsg {
)
}

pub fn unsafe_write(data: SimpleWriteBinary) -> Self {
PeerMsg::UnsafeWrite(UnsafeWrite {
send_time: Instant::now(),
data,
})
}

pub fn request_split(
epoch: metapb::RegionEpoch,
split_keys: Vec<Vec<u8>>,
Expand Down
128 changes: 97 additions & 31 deletions src/server/gc_worker/compaction_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ use engine_rocks::{
},
RocksEngine, RocksMvccProperties, RocksWriteBatchVec,
};
use engine_traits::{
KvEngine, MiscExt, Mutable, MvccProperties, WriteBatch, WriteBatchExt, WriteOptions,
};
use engine_traits::{KvEngine, MiscExt, MvccProperties, WriteBatch, WriteOptions};
use file_system::{IoType, WithIoType};
use pd_client::{Feature, FeatureGate};
use prometheus::{local::*, *};
use raftstore::coprocessor::RegionInfoProvider;
use tikv_util::{
time::Instant,
worker::{ScheduleError, Scheduler},
Either,
};
use txn_types::{Key, TimeStamp, WriteRef, WriteType};

Expand All @@ -51,7 +50,7 @@ const COMPACTION_FILTER_GC_FEATURE: Feature = Feature::require(5, 0, 0);
// these fields are not available when constructing
// `WriteCompactionFilterFactory`.
pub struct GcContext {
pub(crate) db: RocksEngine,
pub(crate) db: Option<RocksEngine>,
pub(crate) store_id: u64,
pub(crate) safe_point: Arc<AtomicU64>,
pub(crate) cfg_tracker: GcWorkerConfigManager,
Expand Down Expand Up @@ -154,7 +153,7 @@ where
);
}

impl<EK> CompactionFilterInitializer<EK> for EK
impl<EK> CompactionFilterInitializer<EK> for Option<EK>
where
EK: KvEngine,
{
Expand All @@ -171,7 +170,7 @@ where
}
}

impl CompactionFilterInitializer<RocksEngine> for RocksEngine {
impl CompactionFilterInitializer<RocksEngine> for Option<RocksEngine> {
fn init_compaction_filter(
&self,
store_id: u64,
Expand Down Expand Up @@ -237,7 +236,10 @@ impl CompactionFilterFactory for WriteCompactionFilterFactory {
"ratio_threshold" => ratio_threshold,
);

if db.is_stalled_or_stopped() {
if db
.as_ref()
.map_or(false, RocksEngine::is_stalled_or_stopped)
{
debug!("skip gc in compaction filter because the DB is stalled");
return std::ptr::null_mut();
}
Expand Down Expand Up @@ -277,13 +279,60 @@ impl CompactionFilterFactory for WriteCompactionFilterFactory {
}
}

pub struct DeleteBatch<B> {
pub batch: Either<B, Vec<Key>>,
}

impl<B: WriteBatch> DeleteBatch<B> {
fn new<EK>(db: &Option<EK>) -> Self
where
EK: KvEngine<WriteBatch = B>,
{
Self {
batch: match db {
Some(db) => Either::Left(db.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE)),
None => Either::Right(Vec::with_capacity(64)),
},
}
}

// `key` has prefix `DATA_KEY`.
fn delete(&mut self, key: &[u8], ts: TimeStamp) -> Result<(), String> {
match &mut self.batch {
Either::Left(batch) => {
let key = Key::from_encoded_slice(key).append_ts(ts);
batch.delete(key.as_encoded())?;
}
Either::Right(keys) => {
let key = Key::from_encoded_slice(keys::origin_key(key)).append_ts(ts);
keys.push(key);
}
}
Ok(())
}

fn is_empty(&self) -> bool {
match &self.batch {
Either::Left(batch) => batch.is_empty(),
Either::Right(keys) => keys.is_empty(),
}
}

pub fn count(&self) -> usize {
match &self.batch {
Either::Left(batch) => batch.count(),
Either::Right(keys) => keys.len(),
}
}
}

struct WriteCompactionFilter {
safe_point: u64,
engine: RocksEngine,
engine: Option<RocksEngine>,
is_bottommost_level: bool,
encountered_errors: bool,

write_batch: RocksWriteBatchVec,
write_batch: DeleteBatch<RocksWriteBatchVec>,
gc_scheduler: Scheduler<GcTask<RocksEngine>>,
// A key batch which is going to be sent to the GC worker.
mvcc_deletions: Vec<Key>,
Expand Down Expand Up @@ -312,7 +361,7 @@ struct WriteCompactionFilter {

impl WriteCompactionFilter {
fn new(
engine: RocksEngine,
engine: Option<RocksEngine>,
safe_point: u64,
context: &CompactionFilterContext,
gc_scheduler: Scheduler<GcTask<RocksEngine>>,
Expand All @@ -322,7 +371,7 @@ impl WriteCompactionFilter {
assert!(safe_point > 0);
debug!("gc in compaction filter"; "safe_point" => safe_point);

let write_batch = engine.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE);
let write_batch = DeleteBatch::new(&engine);
WriteCompactionFilter {
safe_point,
engine,
Expand Down Expand Up @@ -469,9 +518,8 @@ impl WriteCompactionFilter {

fn handle_filtered_write(&mut self, write: WriteRef<'_>) -> Result<(), String> {
if write.short_value.is_none() && write.write_type == WriteType::Put {
let prefix = Key::from_encoded_slice(&self.mvcc_key_prefix);
let def_key = prefix.append_ts(write.start_ts).into_encoded();
self.write_batch.delete(&def_key)?;
self.write_batch
.delete(&self.mvcc_key_prefix, write.start_ts)?;
}
Ok(())
}
Expand Down Expand Up @@ -499,24 +547,40 @@ impl WriteCompactionFilter {
}

if self.write_batch.count() > DEFAULT_DELETE_BATCH_COUNT || force {
let mut wopts = WriteOptions::default();
wopts.set_no_slowdown(true);
if let Err(e) = do_flush(&mut self.write_batch, &wopts) {
let wb = mem::replace(
&mut self.write_batch,
self.engine.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE),
);
self.orphan_versions += wb.count();
let id = ORPHAN_VERSIONS_ID.fetch_add(1, Ordering::Relaxed);
let task = GcTask::OrphanVersions { wb, id };
let err = match &mut self.write_batch.batch {
Either::Left(wb) => {
let mut wopts = WriteOptions::default();
wopts.set_no_slowdown(true);
match do_flush(wb, &wopts) {
Ok(()) => {
wb.clear();
return Ok(());
}
Err(e) => Some(e),
}
}
Either::Right(_) => None,
};

let wb = mem::replace(&mut self.write_batch, DeleteBatch::new(&self.engine));
self.orphan_versions += wb.count();
let id = ORPHAN_VERSIONS_ID.fetch_add(1, Ordering::Relaxed);
let region_info_provider = self.regions_provider.1.clone();
let task = GcTask::OrphanVersions {
wb,
id,
region_info_provider,
};
if let Some(e) = &err {
warn!(
"compaction filter flush fail, dispatch to gc worker";
"task" => %task, "err" => ?e,
"compaction filter flush fail, dispatch to gc worker";
"task" => %task, "err" => ?e,
);
self.schedule_gc_task(task, true);
return Err(e);
}
self.write_batch.clear();
self.schedule_gc_task(task, true);
if let Some(err) = err {
return Err(err);
}
}
Ok(())
}
Expand Down Expand Up @@ -607,7 +671,9 @@ impl Drop for WriteCompactionFilter {
if let Err(e) = self.flush_pending_writes_if_need(true) {
error!("compaction filter flush writes fail"; "err" => ?e);
}
self.engine.sync_wal().unwrap();
if let Some(engine) = &self.engine {
engine.sync_wal().unwrap();
}

self.switch_key_metrics();
self.flush_metrics();
Expand Down Expand Up @@ -831,7 +897,7 @@ pub mod test_utils {

let mut gc_context_opt = GC_CONTEXT.lock().unwrap();
*gc_context_opt = Some(GcContext {
db: engine.clone(),
db: Some(engine.clone()),
store_id: 1,
safe_point,
cfg_tracker,
Expand Down

0 comments on commit b673f5a

Please sign in to comment.