Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: implement delete range #14714

Merged
merged 4 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
dr.start_key,
dr.end_key,
dr.notify_only,
self.use_delete_range(),
);
}
SimpleWrite::Ingest(_) => {
Expand Down Expand Up @@ -598,6 +599,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
dr.start_key,
dr.end_key,
dr.notify_only,
self.use_delete_range(),
)?;
}
SimpleWrite::Ingest(ssts) => {
Expand Down Expand Up @@ -685,6 +687,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
dr.get_start_key(),
dr.get_end_key(),
dr.get_notify_only(),
self.use_delete_range(),
)?;
}
_ => unimplemented!(),
Expand Down
101 changes: 93 additions & 8 deletions components/raftstore-v2/src/operation/command/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{data_cf_offset, KvEngine, Mutable, RaftEngine, CF_DEFAULT};
use engine_traits::{
data_cf_offset, DeleteStrategy, KvEngine, Mutable, RaftEngine, Range as EngineRange, ALL_CFS,
CF_DEFAULT,
};
use kvproto::raft_cmdpb::RaftRequestHeader;
use raftstore::{
store::{
Expand All @@ -12,7 +15,8 @@ use raftstore::{
},
Error, Result,
};
use tikv_util::slog_panic;
use slog::info;
use tikv_util::{box_err, slog_panic};

use crate::{
batch::StoreContext,
Expand Down Expand Up @@ -222,13 +226,94 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
#[inline]
pub fn apply_delete_range(
&mut self,
_cf: &str,
_index: u64,
_start_key: &[u8],
_end_key: &[u8],
_notify_only: bool,
mut cf: &str,
index: u64,
start_key: &[u8],
end_key: &[u8],
notify_only: bool,
use_delete_range: bool,
) -> Result<()> {
// TODO: reuse the same delete as split/merge.
PEER_WRITE_CMD_COUNTER.delete_range.inc();
let off = data_cf_offset(cf);
if self.should_skip(off, index) {
return Ok(());
}
if !end_key.is_empty() && start_key >= end_key {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that end_key is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible

return Err(box_err!(
"invalid delete range command, start_key: {:?}, end_key: {:?}",
start_key,
end_key
));
}
// region key range has no data prefix, so we must use origin key to check.
util::check_key_in_region(start_key, self.region())?;
let end_key = keys::data_end_key(end_key);
let region_end_key = keys::data_end_key(self.region().get_end_key());
if end_key > region_end_key {
return Err(Error::KeyNotInRegion(
end_key.to_vec(),
self.region().clone(),
));
}

if cf.is_empty() {
cf = CF_DEFAULT;
}

if !ALL_CFS.iter().any(|x| *x == cf) {
return Err(box_err!("invalid delete range command, cf: {:?}", cf));
}

let start_key = keys::data_key(start_key);

info!(
self.logger,
"execute delete range";
"range_start" => log_wrappers::Value::key(&start_key),
"range_end" => log_wrappers::Value::key(&end_key),
"notify_only" => notify_only,
"use_delete_range" => use_delete_range,
);

// Use delete_files_in_range to drop as many sst files as possible, this
// is a way to reclaim disk space quickly after drop a table/index.
if !notify_only {
let range = vec![EngineRange::new(&start_key, &end_key)];
let fail_f = |e: engine_traits::Error, strategy: DeleteStrategy| {
slog_panic!(
self.logger,
"failed to delete";
"strategy" => ?strategy,
"range_start" => log_wrappers::Value::key(&start_key),
"range_end" => log_wrappers::Value::key(&end_key),
"error" => ?e,
)
};
let tablet = self.tablet();
tablet
.delete_ranges_cf(cf, DeleteStrategy::DeleteFiles, &range)
.unwrap_or_else(|e| fail_f(e, DeleteStrategy::DeleteFiles));

let strategy = if use_delete_range {
DeleteStrategy::DeleteByRange
} else {
DeleteStrategy::DeleteByKey
};
// Delete all remaining keys.
tablet
.delete_ranges_cf(cf, strategy.clone(), &range)
.unwrap_or_else(move |e| fail_f(e, strategy));

// to do: support titan?
// tablet
// .delete_ranges_cf(cf, DeleteStrategy::DeleteBlobs, &range)
// .unwrap_or_else(move |e| fail_f(e,
// DeleteStrategy::DeleteBlobs));
}
if index != u64::MAX {
self.modifications_mut()[off] = index;
}

Ok(())
}
}
9 changes: 9 additions & 0 deletions components/raftstore-v2/src/raft/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct Apply<EK: KvEngine, R> {

checkpoint_scheduler: Scheduler<checkpoint::Task>,

// Whether to use the delete range API instead of deleting one by one.
use_delete_range: bool,

pub(crate) metrics: ApplyMetrics,
pub(crate) logger: Logger,
pub(crate) buckets: Option<BucketStat>,
Expand Down Expand Up @@ -128,6 +131,7 @@ impl<EK: KvEngine, R> Apply<EK, R> {
buckets,
sst_importer,
checkpoint_scheduler,
use_delete_range: cfg.use_delete_range,
observe: Observe {
info: CmdObserveInfo::default(),
level: ObserveLevel::None,
Expand Down Expand Up @@ -318,4 +322,9 @@ impl<EK: KvEngine, R> Apply<EK, R> {
pub fn checkpoint_scheduler(&self) -> &Scheduler<checkpoint::Task> {
&self.checkpoint_scheduler
}

#[inline]
pub fn use_delete_range(&self) -> bool {
self.use_delete_range
}
}
8 changes: 7 additions & 1 deletion components/test_raftstore-v2/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ pub trait Simulator<EK: KvEngine> {
write_encoder.delete(delete.get_cf(), delete.get_key());
}
CmdType::DeleteRange => {
unimplemented!()
let delete_range = req.get_delete_range();
write_encoder.delete_range(
delete_range.get_cf(),
delete_range.get_start_key(),
delete_range.get_end_key(),
delete_range.get_notify_only(),
);
}
_ => unreachable!(),
}
Expand Down
37 changes: 35 additions & 2 deletions components/test_raftstore-v2/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{fmt::Write, sync::Arc, thread, time::Duration};
use encryption_export::{data_key_manager_from_config, DataKeyManager};
use engine_rocks::{RocksEngine, RocksStatistics};
use engine_test::raft::RaftTestEngine;
use engine_traits::{KvEngine, TabletRegistry, CF_DEFAULT};
use engine_traits::{CfName, KvEngine, TabletRegistry, CF_DEFAULT};
use file_system::IoRateLimiter;
use futures::Future;
use kvproto::{kvrpcpb::Context, metapb, raft_cmdpb::RaftCmdResponse};
use raftstore::Result;
use rand::RngCore;
use rand::{prelude::SliceRandom, RngCore};
use server::common::ConfiguredRaftEngine;
use tempfile::TempDir;
use test_raftstore::{new_get_cmd, new_put_cf_cmd, new_request, Config};
Expand Down Expand Up @@ -233,3 +233,36 @@ pub fn async_read_on_peer<T: Simulator<EK>, EK: KvEngine>(
request.mut_header().set_replica_read(replica_read);
cluster.sim.wl().async_read(request)
}

pub fn test_delete_range<T: Simulator<EK>, EK: KvEngine>(cluster: &mut Cluster<T, EK>, cf: CfName) {
let data_set: Vec<_> = (1..500)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
format!("value{}", i).into_bytes(),
)
})
.collect();
for kvs in data_set.chunks(50) {
let requests = kvs.iter().map(|(k, v)| new_put_cf_cmd(cf, k, v)).collect();
// key9 is always the last region.
cluster.batch_put(b"key9", requests).unwrap();
}

// delete_range request with notify_only set should not actually delete data.
cluster.must_notify_delete_range_cf(cf, b"", b"");

let mut rng = rand::thread_rng();
for _ in 0..50 {
let (k, v) = data_set.choose(&mut rng).unwrap();
assert_eq!(cluster.get_cf(cf, k).unwrap(), *v);
}

// Empty keys means the whole range.
cluster.must_delete_range_cf(cf, b"", b"");

for _ in 0..50 {
let k = &data_set.choose(&mut rng).unwrap().0;
assert!(cluster.get_cf(cf, k).is_none());
}
}
37 changes: 35 additions & 2 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use encryption_export::{
use engine_rocks::{config::BlobRunMode, RocksEngine, RocksSnapshot, RocksStatistics};
use engine_test::raft::RaftTestEngine;
use engine_traits::{
CfNamesExt, Engines, Iterable, KvEngine, Peekable, RaftEngineDebug, RaftEngineReadOnly,
CfName, CfNamesExt, Engines, Iterable, KvEngine, Peekable, RaftEngineDebug, RaftEngineReadOnly,
CF_DEFAULT, CF_RAFT,
};
use file_system::IoRateLimiter;
Expand All @@ -42,7 +42,7 @@ use raftstore::{
store::{fsm::RaftRouter, *},
RaftRouterCompactedEventSender, Result,
};
use rand::RngCore;
use rand::{seq::SliceRandom, RngCore};
use server::common::ConfiguredRaftEngine;
use tempfile::TempDir;
use test_pd_client::TestPdClient;
Expand Down Expand Up @@ -1430,3 +1430,36 @@ pub fn wait_for_synced(cluster: &mut Cluster<ServerCluster>, node_id: u64, regio
}
assert!(snapshot.ext().is_max_ts_synced());
}

pub fn test_delete_range<T: Simulator>(cluster: &mut Cluster<T>, cf: CfName) {
let data_set: Vec<_> = (1..500)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
format!("value{}", i).into_bytes(),
)
})
.collect();
for kvs in data_set.chunks(50) {
let requests = kvs.iter().map(|(k, v)| new_put_cf_cmd(cf, k, v)).collect();
// key9 is always the last region.
cluster.batch_put(b"key9", requests).unwrap();
}

// delete_range request with notify_only set should not actually delete data.
cluster.must_notify_delete_range_cf(cf, b"", b"");

let mut rng = rand::thread_rng();
for _ in 0..50 {
let (k, v) = data_set.choose(&mut rng).unwrap();
assert_eq!(cluster.get_cf(cf, k).unwrap(), *v);
}

// Empty keys means the whole range.
cluster.must_delete_range_cf(cf, b"", b"");

for _ in 0..50 {
let k = &data_set.choose(&mut rng).unwrap().0;
assert!(cluster.get_cf(cf, k).is_none());
}
}
50 changes: 8 additions & 42 deletions tests/integrations/raftstore/test_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,15 @@

use std::time::Duration;

use engine_traits::{CfName, CF_DEFAULT, CF_WRITE};
use engine_traits::{CF_DEFAULT, CF_WRITE};
use raftstore::store::RAFT_INIT_LOG_INDEX;
use rand::prelude::*;
use test_raftstore::{new_put_cf_cmd, new_put_cmd, new_request, sleep_ms};
use test_raftstore::{new_put_cmd, new_request, sleep_ms};
use test_raftstore_macro::test_case;
use tikv_util::{config::*, time::Instant};

// TODO add epoch not match test cases.

fn test_delete_range<T: test_raftstore::Simulator>(
cluster: &mut test_raftstore::Cluster<T>,
cf: CfName,
) {
let data_set: Vec<_> = (1..500)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
format!("value{}", i).into_bytes(),
)
})
.collect();
for kvs in data_set.chunks(50) {
let requests = kvs.iter().map(|(k, v)| new_put_cf_cmd(cf, k, v)).collect();
// key9 is always the last region.
cluster.batch_put(b"key9", requests).unwrap();
}

// delete_range request with notify_only set should not actually delete data.
cluster.must_notify_delete_range_cf(cf, b"", b"");

let mut rng = rand::thread_rng();
for _ in 0..50 {
let (k, v) = data_set.choose(&mut rng).unwrap();
assert_eq!(cluster.get_cf(cf, k).unwrap(), *v);
}

// Empty keys means the whole range.
cluster.must_delete_range_cf(cf, b"", b"");

for _ in 0..50 {
let k = &data_set.choose(&mut rng).unwrap().0;
assert!(cluster.get_cf(cf, k).is_none());
}
}

#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore::new_server_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
Expand Down Expand Up @@ -127,19 +91,21 @@ fn test_delete() {
}
}

#[test]
#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_node_use_delete_range() {
let mut cluster = test_raftstore::new_node_cluster(0, 1);
let mut cluster = new_cluster(0, 1);
cluster.cfg.raft_store.use_delete_range = true;
cluster.run();
test_delete_range(&mut cluster, CF_DEFAULT);
// Prefix bloom filter is always enabled in the Write CF.
test_delete_range(&mut cluster, CF_WRITE);
}

#[test]
#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_node_not_use_delete_range() {
let mut cluster = test_raftstore::new_node_cluster(0, 1);
let mut cluster = new_cluster(0, 1);
cluster.cfg.raft_store.use_delete_range = false;
cluster.run();
test_delete_range(&mut cluster, CF_DEFAULT);
Expand Down