Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: fix delete range #15019

Merged
merged 6 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions components/engine_panic/src/misc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{DeleteStrategy, MiscExt, Range, RangeStats, Result, StatisticsReporter};
use engine_traits::{
DeleteStrategy, MiscExt, Range, RangeStats, Result, StatisticsReporter, WriteOptions,
};

use crate::engine::PanicEngine;

Expand Down Expand Up @@ -41,10 +43,11 @@ impl MiscExt for PanicEngine {

fn delete_ranges_cf(
&self,
wopts: &WriteOptions,
cf: &str,
strategy: DeleteStrategy,
ranges: &[Range<'_>],
) -> Result<()> {
) -> Result<bool> {
panic!()
}

Expand Down
77 changes: 52 additions & 25 deletions components/engine_rocks/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use engine_traits::{
CfNamesExt, DeleteStrategy, ImportExt, IterOptions, Iterable, Iterator, MiscExt, Mutable,
Range, RangeStats, Result, SstWriter, SstWriterBuilder, WriteBatch, WriteBatchExt,
WriteOptions,
};
use rocksdb::{FlushOptions, Range as RocksRange};
use tikv_util::{box_try, keybuilder::KeyBuilder};
Expand All @@ -23,10 +24,12 @@ impl RocksEngine {
// of region will never be larger than max-region-size.
fn delete_all_in_range_cf_by_ingest(
&self,
wopts: &WriteOptions,
cf: &str,
sst_path: String,
ranges: &[Range<'_>],
) -> Result<()> {
) -> Result<bool> {
let mut written = false;
let mut ranges = ranges.to_owned();
ranges.sort_by(|a, b| a.start_key.cmp(b.start_key));

Expand All @@ -39,7 +42,8 @@ impl RocksEngine {
.as_ref()
.map_or(false, |key| key.as_slice() > r.start_key)
{
self.delete_all_in_range_cf_by_key(cf, &r)?;
self.delete_all_in_range_cf_by_key(wopts, cf, &r)?;
written = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

written |= self.delete_all_in_range_cf_by_key(wopts, cf, &r)?;

continue;
}
last_end_key = Some(r.end_key.to_owned());
Expand Down Expand Up @@ -84,20 +88,26 @@ impl RocksEngine {
} else {
let mut wb = self.write_batch();
for key in data.iter() {
wb.delete_cf(cf, key)?;
if wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
wb.write()?;
wb.write_opt(wopts)?;
wb.clear();
}
wb.delete_cf(cf, key)?;
}
if wb.count() > 0 {
wb.write()?;
wb.write_opt(wopts)?;
written = true;
}
}
Ok(())
Ok(written)
}

fn delete_all_in_range_cf_by_key(&self, cf: &str, range: &Range<'_>) -> Result<()> {
fn delete_all_in_range_cf_by_key(
&self,
wopts: &WriteOptions,
cf: &str,
range: &Range<'_>,
) -> Result<bool> {
let start = KeyBuilder::from_slice(range.start_key, 0, 0);
let end = KeyBuilder::from_slice(range.end_key, 0, 0);
let mut opts = IterOptions::new(Some(start), Some(end), false);
Expand All @@ -110,18 +120,22 @@ impl RocksEngine {
let mut it_valid = it.seek(range.start_key)?;
let mut wb = self.write_batch();
while it_valid {
wb.delete_cf(cf, it.key())?;
if wb.count() >= Self::WRITE_BATCH_MAX_KEYS {
wb.write()?;
wb.write_opt(wopts)?;
wb.clear();
}
wb.delete_cf(cf, it.key())?;
it_valid = it.next()?;
}
if wb.count() > 0 {
wb.write()?;
wb.write_opt(wopts)?;
if !wopts.disable_wal() {
self.sync_wal()?;
}
Ok(true)
} else {
Ok(false)
}
self.sync_wal()?;
Ok(())
}
}

Expand Down Expand Up @@ -188,12 +202,14 @@ impl MiscExt for RocksEngine {

fn delete_ranges_cf(
&self,
wopts: &WriteOptions,
cf: &str,
strategy: DeleteStrategy,
ranges: &[Range<'_>],
) -> Result<()> {
) -> Result<bool> {
let mut written = false;
if ranges.is_empty() {
return Ok(());
return Ok(written);
}
match strategy {
DeleteStrategy::DeleteFiles => {
Expand All @@ -209,7 +225,7 @@ impl MiscExt for RocksEngine {
})
.collect();
if rocks_ranges.is_empty() {
return Ok(());
return Ok(written);
}
self.as_inner()
.delete_files_in_ranges_cf(handle, &rocks_ranges, false)
Expand All @@ -229,7 +245,7 @@ impl MiscExt for RocksEngine {
})
.collect();
if rocks_ranges.is_empty() {
return Ok(());
return Ok(written);
}
self.as_inner()
.delete_blob_files_in_ranges_cf(handle, &rocks_ranges, false)
Expand All @@ -241,18 +257,19 @@ impl MiscExt for RocksEngine {
for r in ranges.iter() {
wb.delete_range_cf(cf, r.start_key, r.end_key)?;
}
wb.write()?;
wb.write_opt(wopts)?;
written = true;
}
DeleteStrategy::DeleteByKey => {
for r in ranges {
self.delete_all_in_range_cf_by_key(cf, r)?;
written |= self.delete_all_in_range_cf_by_key(wopts, cf, r)?;
}
}
DeleteStrategy::DeleteByWriter { sst_path } => {
self.delete_all_in_range_cf_by_ingest(cf, sst_path, ranges)?;
written |= self.delete_all_in_range_cf_by_ingest(wopts, cf, sst_path, ranges)?;
}
}
Ok(())
Ok(written)
}

fn get_approximate_memtable_stats_cf(&self, cf: &str, range: &Range<'_>) -> Result<(u64, u64)> {
Expand Down Expand Up @@ -482,7 +499,8 @@ mod tests {
wb.write().unwrap();
check_data(&db, ALL_CFS, kvs.as_slice());

db.delete_ranges_cfs(strategy, ranges).unwrap();
db.delete_ranges_cfs(&WriteOptions::default(), strategy, ranges)
.unwrap();

let mut kvs_left: Vec<_> = kvs;
for r in ranges {
Expand Down Expand Up @@ -620,10 +638,18 @@ mod tests {
}
check_data(&db, ALL_CFS, kvs.as_slice());

db.delete_ranges_cfs(DeleteStrategy::DeleteFiles, &[Range::new(b"k2", b"k4")])
.unwrap();
db.delete_ranges_cfs(DeleteStrategy::DeleteBlobs, &[Range::new(b"k2", b"k4")])
.unwrap();
db.delete_ranges_cfs(
&WriteOptions::default(),
DeleteStrategy::DeleteFiles,
&[Range::new(b"k2", b"k4")],
)
.unwrap();
db.delete_ranges_cfs(
&WriteOptions::default(),
DeleteStrategy::DeleteBlobs,
&[Range::new(b"k2", b"k4")],
)
.unwrap();
check_data(&db, ALL_CFS, kvs_left.as_slice());
}

Expand Down Expand Up @@ -668,6 +694,7 @@ mod tests {

// Delete all in ["k2", "k4").
db.delete_ranges_cfs(
&WriteOptions::default(),
DeleteStrategy::DeleteByRange,
&[Range::new(b"kabcdefg2", b"kabcdefg4")],
)
Expand Down
22 changes: 16 additions & 6 deletions components/engine_traits/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
//! FIXME: Things here need to be moved elsewhere.

use crate::{
cf_names::CfNamesExt, errors::Result, flow_control_factors::FlowControlFactorsExt, range::Range,
cf_names::CfNamesExt, errors::Result, flow_control_factors::FlowControlFactorsExt,
range::Range, WriteBatchExt, WriteOptions,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -64,7 +65,7 @@ pub struct RangeStats {
pub num_rows: u64,
}

pub trait MiscExt: CfNamesExt + FlowControlFactorsExt {
pub trait MiscExt: CfNamesExt + FlowControlFactorsExt + WriteBatchExt {
type StatisticsReporter: StatisticsReporter<Self>;

/// Flush all specified column families at once.
Expand All @@ -80,19 +81,28 @@ pub trait MiscExt: CfNamesExt + FlowControlFactorsExt {
age_threshold: Option<std::time::SystemTime>,
) -> Result<()>;

fn delete_ranges_cfs(&self, strategy: DeleteStrategy, ranges: &[Range<'_>]) -> Result<()> {
/// Returns whether there's data written through kv interface.
fn delete_ranges_cfs(
&self,
wopts: &WriteOptions,
strategy: DeleteStrategy,
ranges: &[Range<'_>],
) -> Result<bool> {
let mut written = false;
for cf in self.cf_names() {
self.delete_ranges_cf(cf, strategy.clone(), ranges)?;
written |= self.delete_ranges_cf(wopts, cf, strategy.clone(), ranges)?;
}
Ok(())
Ok(written)
}

/// Returns whether there's data written through kv interface.
fn delete_ranges_cf(
&self,
wopts: &WriteOptions,
cf: &str,
strategy: DeleteStrategy,
ranges: &[Range<'_>],
) -> Result<()>;
) -> Result<bool>;

/// Return the approximate number of records and size in the range of
/// memtables of the cf.
Expand Down
2 changes: 1 addition & 1 deletion components/engine_traits/src/tablet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct CachedTablet<EK> {
}

impl<EK> CachedTablet<EK> {
fn release(&mut self) {
pub fn release(&mut self) {
self.cache = None;
self.version = 0;
}
Expand Down
6 changes: 0 additions & 6 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,15 +512,13 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
let _ = self.apply_delete(delete.cf, u64::MAX, delete.key);
}
SimpleWrite::DeleteRange(dr) => {
let use_delete_range = self.use_delete_range();
let _ = self
.apply_delete_range(
dr.cf,
u64::MAX,
dr.start_key,
dr.end_key,
dr.notify_only,
use_delete_range,
)
.await;
}
Expand Down Expand Up @@ -637,14 +635,12 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
self.apply_delete(delete.cf, log_index, delete.key)?;
}
SimpleWrite::DeleteRange(dr) => {
let use_delete_range = self.use_delete_range();
self.apply_delete_range(
dr.cf,
log_index,
dr.start_key,
dr.end_key,
dr.notify_only,
use_delete_range,
)
.await?;
}
Expand Down Expand Up @@ -739,15 +735,13 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
self.apply_delete(delete.get_cf(), log_index, delete.get_key())?;
}
CmdType::DeleteRange => {
let use_delete_range = self.use_delete_range();
let dr = r.get_delete_range();
self.apply_delete_range(
dr.get_cf(),
log_index,
dr.get_start_key(),
dr.get_end_key(),
dr.get_notify_only(),
use_delete_range,
)
.await?;
}
Expand Down
18 changes: 10 additions & 8 deletions components/raftstore-v2/src/operation/command/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
start_key: &[u8],
end_key: &[u8],
notify_only: bool,
use_delete_range: bool,
) -> Result<()> {
PEER_WRITE_CMD_COUNTER.delete_range.inc();
let off = data_cf_offset(cf);
Expand Down Expand Up @@ -273,17 +272,16 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
let start = Instant::now_coarse();
// Use delete_files_in_range to drop as many sst files as possible, this
// is a way to reclaim disk space quickly after drop a table/index.
if !notify_only {
let written = if !notify_only {
let (notify, wait) = oneshot::channel();
let delete_range = TabletTask::delete_range(
self.region_id(),
self.tablet().clone(),
name_to_cf(cf).unwrap(),
start_key.clone().into(),
end_key.clone().into(),
use_delete_range,
Box::new(move || {
notify.send(()).unwrap();
Box::new(move |written| {
notify.send(written).unwrap();
}),
);
if let Err(e) = self.tablet_scheduler().schedule_force(delete_range) {
Expand All @@ -295,19 +293,23 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
);
}

let _ = wait.await;
}
wait.await.unwrap()
} else {
false
};

info!(
self.logger,
"execute delete range";
"range_start" => log_wrappers::Value::key(&start_key),
"range_end" => log_wrappers::Value::key(&end_key),
"notify_only" => notify_only,
"use_delete_range" => use_delete_range,
"duration" => ?start.saturating_elapsed(),
);

if index != u64::MAX && written {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it u64::MAX?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, it's the old code, shared by all write requests.

self.modifications_mut()[off] = index;
}
// delete range is an unsafe operation and it cannot be rollbacked to replay, so
// we don't update modification index for this operation.

Expand Down
1 change: 1 addition & 0 deletions components/raftstore-v2/src/operation/ready/apply_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl ApplyTrace {

#[inline]
pub fn should_persist(&self) -> bool {
fail_point!("should_persist_apply_trace", |_| true);
self.try_persist
}
}
Expand Down