Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#14714
Browse files Browse the repository at this point in the history
close tikv#14723

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
SpadeA-Tang authored and ti-chi-bot committed May 10, 2023
1 parent 9f07ac2 commit a683a26
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 55 deletions.
3 changes: 3 additions & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
dr.start_key,
dr.end_key,
dr.notify_only,
self.use_delete_range(),
);
}
SimpleWrite::Ingest(_) => {
Expand Down Expand Up @@ -594,6 +595,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
dr.start_key,
dr.end_key,
dr.notify_only,
self.use_delete_range(),
)?;
}
SimpleWrite::Ingest(ssts) => {
Expand Down Expand Up @@ -681,6 +683,7 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
dr.get_start_key(),
dr.get_end_key(),
dr.get_notify_only(),
self.use_delete_range(),
)?;
}
_ => unimplemented!(),
Expand Down
101 changes: 93 additions & 8 deletions components/raftstore-v2/src/operation/command/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use engine_traits::{data_cf_offset, KvEngine, Mutable, RaftEngine, CF_DEFAULT};
use engine_traits::{
data_cf_offset, DeleteStrategy, KvEngine, Mutable, RaftEngine, Range as EngineRange, ALL_CFS,
CF_DEFAULT,
};
use kvproto::raft_cmdpb::RaftRequestHeader;
use raftstore::{
store::{
Expand All @@ -12,7 +15,8 @@ use raftstore::{
},
Error, Result,
};
use tikv_util::slog_panic;
use slog::info;
use tikv_util::{box_err, slog_panic};

use crate::{
batch::StoreContext,
Expand Down Expand Up @@ -222,13 +226,94 @@ impl<EK: KvEngine, R: ApplyResReporter> Apply<EK, R> {
#[inline]
pub fn apply_delete_range(
&mut self,
_cf: &str,
_index: u64,
_start_key: &[u8],
_end_key: &[u8],
_notify_only: bool,
mut cf: &str,
index: u64,
start_key: &[u8],
end_key: &[u8],
notify_only: bool,
use_delete_range: bool,
) -> Result<()> {
// TODO: reuse the same delete as split/merge.
PEER_WRITE_CMD_COUNTER.delete_range.inc();
let off = data_cf_offset(cf);
if self.should_skip(off, index) {
return Ok(());
}
if !end_key.is_empty() && start_key >= end_key {
return Err(box_err!(
"invalid delete range command, start_key: {:?}, end_key: {:?}",
start_key,
end_key
));
}
// region key range has no data prefix, so we must use origin key to check.
util::check_key_in_region(start_key, self.region())?;
let end_key = keys::data_end_key(end_key);
let region_end_key = keys::data_end_key(self.region().get_end_key());
if end_key > region_end_key {
return Err(Error::KeyNotInRegion(
end_key.to_vec(),
self.region().clone(),
));
}

if cf.is_empty() {
cf = CF_DEFAULT;
}

if !ALL_CFS.iter().any(|x| *x == cf) {
return Err(box_err!("invalid delete range command, cf: {:?}", cf));
}

let start_key = keys::data_key(start_key);

info!(
self.logger,
"execute delete range";
"range_start" => log_wrappers::Value::key(&start_key),
"range_end" => log_wrappers::Value::key(&end_key),
"notify_only" => notify_only,
"use_delete_range" => use_delete_range,
);

// Use delete_files_in_range to drop as many sst files as possible, this
// is a way to reclaim disk space quickly after drop a table/index.
if !notify_only {
let range = vec![EngineRange::new(&start_key, &end_key)];
let fail_f = |e: engine_traits::Error, strategy: DeleteStrategy| {
slog_panic!(
self.logger,
"failed to delete";
"strategy" => ?strategy,
"range_start" => log_wrappers::Value::key(&start_key),
"range_end" => log_wrappers::Value::key(&end_key),
"error" => ?e,
)
};
let tablet = self.tablet();
tablet
.delete_ranges_cf(cf, DeleteStrategy::DeleteFiles, &range)
.unwrap_or_else(|e| fail_f(e, DeleteStrategy::DeleteFiles));

let strategy = if use_delete_range {
DeleteStrategy::DeleteByRange
} else {
DeleteStrategy::DeleteByKey
};
// Delete all remaining keys.
tablet
.delete_ranges_cf(cf, strategy.clone(), &range)
.unwrap_or_else(move |e| fail_f(e, strategy));

// to do: support titan?
// tablet
// .delete_ranges_cf(cf, DeleteStrategy::DeleteBlobs, &range)
// .unwrap_or_else(move |e| fail_f(e,
// DeleteStrategy::DeleteBlobs));
}
if index != u64::MAX {
self.modifications_mut()[off] = index;
}

Ok(())
}
}
26 changes: 26 additions & 0 deletions components/raftstore-v2/src/raft/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ pub struct Apply<EK: KvEngine, R> {
observe: Observe,
coprocessor_host: CoprocessorHost<EK>,

<<<<<<< HEAD
=======
checkpoint_scheduler: Scheduler<checkpoint::Task>,

// Whether to use the delete range API instead of deleting one by one.
use_delete_range: bool,

>>>>>>> aa82f08c5a (raftstore-v2: implement delete range (#14714))
pub(crate) metrics: ApplyMetrics,
pub(crate) logger: Logger,
pub(crate) buckets: Option<BucketStat>,
Expand Down Expand Up @@ -123,6 +131,11 @@ impl<EK: KvEngine, R> Apply<EK, R> {
metrics: ApplyMetrics::default(),
buckets,
sst_importer,
<<<<<<< HEAD
=======
checkpoint_scheduler,
use_delete_range: cfg.use_delete_range,
>>>>>>> aa82f08c5a (raftstore-v2: implement delete range (#14714))
observe: Observe {
info: CmdObserveInfo::default(),
level: ObserveLevel::None,
Expand Down Expand Up @@ -308,4 +321,17 @@ impl<EK: KvEngine, R> Apply<EK, R> {
pub fn coprocessor_host(&self) -> &CoprocessorHost<EK> {
&self.coprocessor_host
}
<<<<<<< HEAD
=======

#[inline]
pub fn checkpoint_scheduler(&self) -> &Scheduler<checkpoint::Task> {
&self.checkpoint_scheduler
}

#[inline]
pub fn use_delete_range(&self) -> bool {
self.use_delete_range
}
>>>>>>> aa82f08c5a (raftstore-v2: implement delete range (#14714))
}
8 changes: 7 additions & 1 deletion components/test_raftstore-v2/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ pub trait Simulator<EK: KvEngine> {
write_encoder.delete(delete.get_cf(), delete.get_key());
}
CmdType::DeleteRange => {
unimplemented!()
let delete_range = req.get_delete_range();
write_encoder.delete_range(
delete_range.get_cf(),
delete_range.get_start_key(),
delete_range.get_end_key(),
delete_range.get_notify_only(),
);
}
_ => unreachable!(),
}
Expand Down
37 changes: 35 additions & 2 deletions components/test_raftstore-v2/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{fmt::Write, sync::Arc, thread, time::Duration};
use encryption_export::{data_key_manager_from_config, DataKeyManager};
use engine_rocks::{RocksEngine, RocksStatistics};
use engine_test::raft::RaftTestEngine;
use engine_traits::{KvEngine, TabletRegistry, CF_DEFAULT};
use engine_traits::{CfName, KvEngine, TabletRegistry, CF_DEFAULT};
use file_system::IoRateLimiter;
use futures::Future;
use kvproto::{kvrpcpb::Context, metapb, raft_cmdpb::RaftCmdResponse};
use raftstore::Result;
use rand::RngCore;
use rand::{prelude::SliceRandom, RngCore};
use server::common::ConfiguredRaftEngine;
use tempfile::TempDir;
use test_raftstore::{new_get_cmd, new_put_cf_cmd, new_request, Config};
Expand Down Expand Up @@ -233,3 +233,36 @@ pub fn async_read_on_peer<T: Simulator<EK>, EK: KvEngine>(
request.mut_header().set_replica_read(replica_read);
cluster.sim.wl().async_read(request)
}

pub fn test_delete_range<T: Simulator<EK>, EK: KvEngine>(cluster: &mut Cluster<T, EK>, cf: CfName) {
let data_set: Vec<_> = (1..500)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
format!("value{}", i).into_bytes(),
)
})
.collect();
for kvs in data_set.chunks(50) {
let requests = kvs.iter().map(|(k, v)| new_put_cf_cmd(cf, k, v)).collect();
// key9 is always the last region.
cluster.batch_put(b"key9", requests).unwrap();
}

// delete_range request with notify_only set should not actually delete data.
cluster.must_notify_delete_range_cf(cf, b"", b"");

let mut rng = rand::thread_rng();
for _ in 0..50 {
let (k, v) = data_set.choose(&mut rng).unwrap();
assert_eq!(cluster.get_cf(cf, k).unwrap(), *v);
}

// Empty keys means the whole range.
cluster.must_delete_range_cf(cf, b"", b"");

for _ in 0..50 {
let k = &data_set.choose(&mut rng).unwrap().0;
assert!(cluster.get_cf(cf, k).is_none());
}
}
37 changes: 35 additions & 2 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use encryption_export::{
use engine_rocks::{config::BlobRunMode, RocksEngine, RocksSnapshot, RocksStatistics};
use engine_test::raft::RaftTestEngine;
use engine_traits::{
CfNamesExt, Engines, Iterable, KvEngine, Peekable, RaftEngineDebug, RaftEngineReadOnly,
CfName, CfNamesExt, Engines, Iterable, KvEngine, Peekable, RaftEngineDebug, RaftEngineReadOnly,
CF_DEFAULT, CF_RAFT,
};
use file_system::IoRateLimiter;
Expand All @@ -42,7 +42,7 @@ use raftstore::{
store::{fsm::RaftRouter, *},
RaftRouterCompactedEventSender, Result,
};
use rand::RngCore;
use rand::{seq::SliceRandom, RngCore};
use server::common::ConfiguredRaftEngine;
use tempfile::TempDir;
use test_pd_client::TestPdClient;
Expand Down Expand Up @@ -1430,3 +1430,36 @@ pub fn wait_for_synced(cluster: &mut Cluster<ServerCluster>, node_id: u64, regio
}
assert!(snapshot.ext().is_max_ts_synced());
}

pub fn test_delete_range<T: Simulator>(cluster: &mut Cluster<T>, cf: CfName) {
let data_set: Vec<_> = (1..500)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
format!("value{}", i).into_bytes(),
)
})
.collect();
for kvs in data_set.chunks(50) {
let requests = kvs.iter().map(|(k, v)| new_put_cf_cmd(cf, k, v)).collect();
// key9 is always the last region.
cluster.batch_put(b"key9", requests).unwrap();
}

// delete_range request with notify_only set should not actually delete data.
cluster.must_notify_delete_range_cf(cf, b"", b"");

let mut rng = rand::thread_rng();
for _ in 0..50 {
let (k, v) = data_set.choose(&mut rng).unwrap();
assert_eq!(cluster.get_cf(cf, k).unwrap(), *v);
}

// Empty keys means the whole range.
cluster.must_delete_range_cf(cf, b"", b"");

for _ in 0..50 {
let k = &data_set.choose(&mut rng).unwrap().0;
assert!(cluster.get_cf(cf, k).is_none());
}
}
50 changes: 8 additions & 42 deletions tests/integrations/raftstore/test_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,15 @@

use std::time::Duration;

use engine_traits::{CfName, CF_DEFAULT, CF_WRITE};
use engine_traits::{CF_DEFAULT, CF_WRITE};
use raftstore::store::RAFT_INIT_LOG_INDEX;
use rand::prelude::*;
use test_raftstore::{new_put_cf_cmd, new_put_cmd, new_request, sleep_ms};
use test_raftstore::{new_put_cmd, new_request, sleep_ms};
use test_raftstore_macro::test_case;
use tikv_util::{config::*, time::Instant};

// TODO add epoch not match test cases.

fn test_delete_range<T: test_raftstore::Simulator>(
cluster: &mut test_raftstore::Cluster<T>,
cf: CfName,
) {
let data_set: Vec<_> = (1..500)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
format!("value{}", i).into_bytes(),
)
})
.collect();
for kvs in data_set.chunks(50) {
let requests = kvs.iter().map(|(k, v)| new_put_cf_cmd(cf, k, v)).collect();
// key9 is always the last region.
cluster.batch_put(b"key9", requests).unwrap();
}

// delete_range request with notify_only set should not actually delete data.
cluster.must_notify_delete_range_cf(cf, b"", b"");

let mut rng = rand::thread_rng();
for _ in 0..50 {
let (k, v) = data_set.choose(&mut rng).unwrap();
assert_eq!(cluster.get_cf(cf, k).unwrap(), *v);
}

// Empty keys means the whole range.
cluster.must_delete_range_cf(cf, b"", b"");

for _ in 0..50 {
let k = &data_set.choose(&mut rng).unwrap().0;
assert!(cluster.get_cf(cf, k).is_none());
}
}

#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore::new_server_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
Expand Down Expand Up @@ -127,19 +91,21 @@ fn test_delete() {
}
}

#[test]
#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_node_use_delete_range() {
let mut cluster = test_raftstore::new_node_cluster(0, 1);
let mut cluster = new_cluster(0, 1);
cluster.cfg.raft_store.use_delete_range = true;
cluster.run();
test_delete_range(&mut cluster, CF_DEFAULT);
// Prefix bloom filter is always enabled in the Write CF.
test_delete_range(&mut cluster, CF_WRITE);
}

#[test]
#[test_case(test_raftstore::new_node_cluster)]
#[test_case(test_raftstore_v2::new_node_cluster)]
fn test_node_not_use_delete_range() {
let mut cluster = test_raftstore::new_node_cluster(0, 1);
let mut cluster = new_cluster(0, 1);
cluster.cfg.raft_store.use_delete_range = false;
cluster.run();
test_delete_range(&mut cluster, CF_DEFAULT);
Expand Down

0 comments on commit a683a26

Please sign in to comment.