Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make gc not write db directly #13982

Merged
merged 2 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions components/raftstore-v2/src/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> {
// TODO: flush by buffer size.
ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await,
ApplyTask::Snapshot(snap_task) => self.apply.schedule_gen_snapshot(snap_task),
ApplyTask::UnsafeWrite(raw_write) => self.apply.apply_unsafe_write(raw_write),
}

// TODO: yield after some time.
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
write.ch,
);
}
PeerMsg::UnsafeWrite(write) => {
self.on_receive_command(write.send_time);
self.fsm
.peer_mut()
.on_unsafe_write(self.store_ctx, write.data);
}
PeerMsg::Tick(tick) => self.on_tick(tick),
PeerMsg::ApplyRes(res) => self.fsm.peer.on_apply_res(self.store_ctx, res),
PeerMsg::SplitInit(msg) => self.fsm.peer.on_split_init(self.store_ctx, msg),
Expand Down
26 changes: 26 additions & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,32 @@ impl<EK: KvEngine, R> Apply<EK, R> {
}

impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
pub fn apply_unsafe_write(&mut self, data: Box<[u8]>) {
let decoder = match SimpleWriteReqDecoder::new(&self.logger, &data, u64::MAX, u64::MAX) {
Ok(decoder) => decoder,
Err(req) => unreachable!("unexpected request: {:?}", req),
};
for req in decoder {
match req {
SimpleWrite::Put(put) => {
let _ = self.apply_put(put.cf, u64::MAX, put.key, put.value);
}
SimpleWrite::Delete(delete) => {
let _ = self.apply_delete(delete.cf, u64::MAX, delete.key);
}
SimpleWrite::DeleteRange(dr) => {
let _ = self.apply_delete_range(
dr.cf,
u64::MAX,
dr.start_key,
dr.end_key,
dr.notify_only,
);
}
}
}
}

#[inline]
pub async fn apply_committed_entries(&mut self, ce: CommittedEntries) {
fail::fail_point!("APPLY_COMMITTED_ENTRIES");
Expand Down
33 changes: 30 additions & 3 deletions components/raftstore-v2/src/operation/command/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
batch::StoreContext,
operation::cf_offset,
raft::{Apply, Peer},
router::CmdResChannel,
router::{ApplyTask, CmdResChannel},
};

mod simple_write;
Expand Down Expand Up @@ -71,6 +71,29 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
self.simple_write_encoder_mut().replace(encoder);
}

#[inline]
pub fn on_unsafe_write<T>(
&mut self,
ctx: &mut StoreContext<EK, ER, T>,
data: SimpleWriteBinary,
) {
if !self.serving() {
return;
}
let bin = SimpleWriteReqEncoder::new(
Box::<RaftRequestHeader>::default(),
data,
ctx.cfg.raft_entry_max_size.0 as usize,
false,
)
.encode()
.0
.into_boxed_slice();
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::UnsafeWrite(bin));
}
}

pub fn propose_pending_writes<T>(&mut self, ctx: &mut StoreContext<EK, ER, T>) {
if let Some(encoder) = self.simple_write_encoder_mut().take() {
let call_proposed_on_success = if encoder.notify_proposed() {
Expand Down Expand Up @@ -140,7 +163,9 @@ impl<EK: KvEngine, R> Apply<EK, R> {
"aborted by failpoint".into()
)));
self.metrics.size_diff_hint += (self.key_buffer.len() + value.len()) as i64;
self.modifications_mut()[off] = index;
if index != u64::MAX {
self.modifications_mut()[off] = index;
}
Ok(())
}

Expand Down Expand Up @@ -171,7 +196,9 @@ impl<EK: KvEngine, R> Apply<EK, R> {
);
});
self.metrics.size_diff_hint -= self.key_buffer.len() as i64;
self.modifications_mut()[off] = index;
if index != u64::MAX {
self.modifications_mut()[off] = index;
}
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/router/internal_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::operation::{AdminCmdResult, CommittedEntries, DataTrace, GenSnapTask}
pub enum ApplyTask {
CommittedEntries(CommittedEntries),
Snapshot(GenSnapTask),
/// Writes that doesn't care consistency.
UnsafeWrite(Box<[u8]>),
}

#[derive(Debug, Default)]
Expand Down
14 changes: 14 additions & 0 deletions components/raftstore-v2/src/router/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub struct SimpleWrite {
pub ch: CmdResChannel,
}

#[derive(Debug)]
pub struct UnsafeWrite {
pub send_time: Instant,
pub data: SimpleWriteBinary,
}

/// Message that can be sent to a peer.
#[derive(Debug)]
pub enum PeerMsg {
Expand All @@ -132,6 +138,7 @@ pub enum PeerMsg {
/// Command changes the inernal states. It will be transformed into logs and
/// applied on all replicas.
SimpleWrite(SimpleWrite),
UnsafeWrite(UnsafeWrite),
/// Command that contains admin requests.
AdminCommand(RaftRequest<CmdResChannel>),
/// Tick is periodical task. If target peer doesn't exist there is a
Expand Down Expand Up @@ -206,6 +213,13 @@ impl PeerMsg {
)
}

pub fn unsafe_write(data: SimpleWriteBinary) -> Self {
PeerMsg::UnsafeWrite(UnsafeWrite {
send_time: Instant::now(),
data,
})
}

pub fn request_split(
epoch: metapb::RegionEpoch,
split_keys: Vec<Vec<u8>>,
Expand Down
128 changes: 97 additions & 31 deletions src/server/gc_worker/compaction_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ use engine_rocks::{
},
RocksEngine, RocksMvccProperties, RocksWriteBatchVec,
};
use engine_traits::{
KvEngine, MiscExt, Mutable, MvccProperties, WriteBatch, WriteBatchExt, WriteOptions,
};
use engine_traits::{KvEngine, MiscExt, MvccProperties, WriteBatch, WriteOptions};
use file_system::{IoType, WithIoType};
use pd_client::{Feature, FeatureGate};
use prometheus::{local::*, *};
use raftstore::coprocessor::RegionInfoProvider;
use tikv_util::{
time::Instant,
worker::{ScheduleError, Scheduler},
Either,
};
use txn_types::{Key, TimeStamp, WriteRef, WriteType};

Expand All @@ -51,7 +50,7 @@ const COMPACTION_FILTER_GC_FEATURE: Feature = Feature::require(5, 0, 0);
// these fields are not available when constructing
// `WriteCompactionFilterFactory`.
pub struct GcContext {
pub(crate) db: RocksEngine,
pub(crate) db: Option<RocksEngine>,
pub(crate) store_id: u64,
pub(crate) safe_point: Arc<AtomicU64>,
pub(crate) cfg_tracker: GcWorkerConfigManager,
Expand Down Expand Up @@ -154,7 +153,7 @@ where
);
}

impl<EK> CompactionFilterInitializer<EK> for EK
impl<EK> CompactionFilterInitializer<EK> for Option<EK>
where
EK: KvEngine,
{
Expand All @@ -171,7 +170,7 @@ where
}
}

impl CompactionFilterInitializer<RocksEngine> for RocksEngine {
impl CompactionFilterInitializer<RocksEngine> for Option<RocksEngine> {
fn init_compaction_filter(
&self,
store_id: u64,
Expand Down Expand Up @@ -237,7 +236,10 @@ impl CompactionFilterFactory for WriteCompactionFilterFactory {
"ratio_threshold" => ratio_threshold,
);

if db.is_stalled_or_stopped() {
if db
.as_ref()
.map_or(false, RocksEngine::is_stalled_or_stopped)
{
debug!("skip gc in compaction filter because the DB is stalled");
return std::ptr::null_mut();
}
Expand Down Expand Up @@ -277,13 +279,60 @@ impl CompactionFilterFactory for WriteCompactionFilterFactory {
}
}

pub struct DeleteBatch<B> {
pub batch: Either<B, Vec<Key>>,
}

impl<B: WriteBatch> DeleteBatch<B> {
fn new<EK>(db: &Option<EK>) -> Self
where
EK: KvEngine<WriteBatch = B>,
{
Self {
batch: match db {
Some(db) => Either::Left(db.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE)),
None => Either::Right(Vec::with_capacity(64)),
},
}
}

// `key` has prefix `DATA_KEY`.
fn delete(&mut self, key: &[u8], ts: TimeStamp) -> Result<(), String> {
match &mut self.batch {
Either::Left(batch) => {
let key = Key::from_encoded_slice(key).append_ts(ts);
batch.delete(key.as_encoded())?;
}
Either::Right(keys) => {
let key = Key::from_encoded_slice(keys::origin_key(key)).append_ts(ts);
keys.push(key);
}
}
Ok(())
}

fn is_empty(&self) -> bool {
match &self.batch {
Either::Left(batch) => batch.is_empty(),
Either::Right(keys) => keys.is_empty(),
}
}

pub fn count(&self) -> usize {
match &self.batch {
Either::Left(batch) => batch.count(),
Either::Right(keys) => keys.len(),
}
}
}

struct WriteCompactionFilter {
safe_point: u64,
engine: RocksEngine,
engine: Option<RocksEngine>,
is_bottommost_level: bool,
encountered_errors: bool,

write_batch: RocksWriteBatchVec,
write_batch: DeleteBatch<RocksWriteBatchVec>,
gc_scheduler: Scheduler<GcTask<RocksEngine>>,
// A key batch which is going to be sent to the GC worker.
mvcc_deletions: Vec<Key>,
Expand Down Expand Up @@ -312,7 +361,7 @@ struct WriteCompactionFilter {

impl WriteCompactionFilter {
fn new(
engine: RocksEngine,
engine: Option<RocksEngine>,
safe_point: u64,
context: &CompactionFilterContext,
gc_scheduler: Scheduler<GcTask<RocksEngine>>,
Expand All @@ -322,7 +371,7 @@ impl WriteCompactionFilter {
assert!(safe_point > 0);
debug!("gc in compaction filter"; "safe_point" => safe_point);

let write_batch = engine.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE);
let write_batch = DeleteBatch::new(&engine);
WriteCompactionFilter {
safe_point,
engine,
Expand Down Expand Up @@ -469,9 +518,8 @@ impl WriteCompactionFilter {

fn handle_filtered_write(&mut self, write: WriteRef<'_>) -> Result<(), String> {
if write.short_value.is_none() && write.write_type == WriteType::Put {
let prefix = Key::from_encoded_slice(&self.mvcc_key_prefix);
let def_key = prefix.append_ts(write.start_ts).into_encoded();
self.write_batch.delete(&def_key)?;
self.write_batch
.delete(&self.mvcc_key_prefix, write.start_ts)?;
}
Ok(())
}
Expand Down Expand Up @@ -499,24 +547,40 @@ impl WriteCompactionFilter {
}

if self.write_batch.count() > DEFAULT_DELETE_BATCH_COUNT || force {
let mut wopts = WriteOptions::default();
wopts.set_no_slowdown(true);
if let Err(e) = do_flush(&mut self.write_batch, &wopts) {
let wb = mem::replace(
&mut self.write_batch,
self.engine.write_batch_with_cap(DEFAULT_DELETE_BATCH_SIZE),
);
self.orphan_versions += wb.count();
let id = ORPHAN_VERSIONS_ID.fetch_add(1, Ordering::Relaxed);
let task = GcTask::OrphanVersions { wb, id };
let err = match &mut self.write_batch.batch {
Either::Left(wb) => {
let mut wopts = WriteOptions::default();
wopts.set_no_slowdown(true);
match do_flush(wb, &wopts) {
Ok(()) => {
wb.clear();
return Ok(());
}
Err(e) => Some(e),
}
}
Either::Right(_) => None,
};

let wb = mem::replace(&mut self.write_batch, DeleteBatch::new(&self.engine));
self.orphan_versions += wb.count();
let id = ORPHAN_VERSIONS_ID.fetch_add(1, Ordering::Relaxed);
let region_info_provider = self.regions_provider.1.clone();
let task = GcTask::OrphanVersions {
wb,
id,
region_info_provider,
};
if let Some(e) = &err {
warn!(
"compaction filter flush fail, dispatch to gc worker";
"task" => %task, "err" => ?e,
"compaction filter flush fail, dispatch to gc worker";
"task" => %task, "err" => ?e,
);
self.schedule_gc_task(task, true);
return Err(e);
}
self.write_batch.clear();
self.schedule_gc_task(task, true);
if let Some(err) = err {
return Err(err);
}
}
Ok(())
}
Expand Down Expand Up @@ -607,7 +671,9 @@ impl Drop for WriteCompactionFilter {
if let Err(e) = self.flush_pending_writes_if_need(true) {
error!("compaction filter flush writes fail"; "err" => ?e);
}
self.engine.sync_wal().unwrap();
if let Some(engine) = &self.engine {
engine.sync_wal().unwrap();
}

self.switch_key_metrics();
self.flush_metrics();
Expand Down Expand Up @@ -831,7 +897,7 @@ pub mod test_utils {

let mut gc_context_opt = GC_CONTEXT.lock().unwrap();
*gc_context_opt = Some(GcContext {
db: engine.clone(),
db: Some(engine.clone()),
store_id: 1,
safe_point,
cfg_tracker,
Expand Down