Skip to content

Commit

Permalink
rawkv: fix ttl_checker for RawKV API v2 (#15143) (#15155)
Browse files Browse the repository at this point in the history
close #15142

rawkv: fix ttl_checker for RawKV API v2.

Signed-off-by: Ping Yu <yuping@pingcap.com>

Co-authored-by: Ping Yu <yuping@pingcap.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 24, 2023
1 parent a9676d1 commit 8577e53
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 115 deletions.
4 changes: 3 additions & 1 deletion components/engine_rocks/src/mvcc_properties.rs
Expand Up @@ -3,7 +3,7 @@
use engine_traits::{MvccProperties, MvccPropertiesExt, Result};
use txn_types::TimeStamp;

use crate::{decode_properties::DecodeProperties, RocksEngine, UserProperties};
use crate::{decode_properties::DecodeProperties, RocksEngine, RocksTtlProperties, UserProperties};

pub const PROP_NUM_ERRORS: &str = "tikv.num_errors";
pub const PROP_MIN_TS: &str = "tikv.min_ts";
Expand All @@ -28,6 +28,7 @@ impl RocksMvccProperties {
props.encode_u64(PROP_NUM_DELETES, mvcc_props.num_deletes);
props.encode_u64(PROP_NUM_VERSIONS, mvcc_props.num_versions);
props.encode_u64(PROP_MAX_ROW_VERSIONS, mvcc_props.max_row_versions);
RocksTtlProperties::encode_to(&mvcc_props.ttl, &mut props);
props
}

Expand All @@ -43,6 +44,7 @@ impl RocksMvccProperties {
.decode_u64(PROP_NUM_DELETES)
.unwrap_or(res.num_versions - res.num_puts);
res.max_row_versions = props.decode_u64(PROP_MAX_ROW_VERSIONS)?;
RocksTtlProperties::decode_from(&mut res.ttl, props);
Ok(res)
}
}
Expand Down
5 changes: 5 additions & 0 deletions components/engine_rocks/src/properties.rs
Expand Up @@ -461,6 +461,9 @@ impl TablePropertiesCollector for MvccPropertiesCollector {
} else {
self.props.num_deletes += 1;
}
if let Some(expire_ts) = raw_value.expire_ts {
self.props.ttl.add(expire_ts);
}
}
Err(_) => {
self.num_errors += 1;
Expand Down Expand Up @@ -847,6 +850,8 @@ mod tests {
assert_eq!(props.num_puts, 4);
assert_eq!(props.num_versions, 7);
assert_eq!(props.max_row_versions, 3);
assert_eq!(props.ttl.max_expire_ts, Some(u64::MAX));
assert_eq!(props.ttl.min_expire_ts, Some(10));
}

#[bench]
Expand Down
106 changes: 73 additions & 33 deletions components/engine_rocks/src/ttl_properties.rs
Expand Up @@ -15,19 +15,30 @@ const PROP_MIN_EXPIRE_TS: &str = "tikv.min_expire_ts";
pub struct RocksTtlProperties;

impl RocksTtlProperties {
pub fn encode_to(ttl_props: &TtlProperties, user_props: &mut UserProperties) {
if let Some(max_expire_ts) = ttl_props.max_expire_ts {
user_props.encode_u64(PROP_MAX_EXPIRE_TS, max_expire_ts);
}
if let Some(min_expire_ts) = ttl_props.min_expire_ts {
user_props.encode_u64(PROP_MIN_EXPIRE_TS, min_expire_ts);
}
}

pub fn encode(ttl_props: &TtlProperties) -> UserProperties {
let mut props = UserProperties::new();
props.encode_u64(PROP_MAX_EXPIRE_TS, ttl_props.max_expire_ts);
props.encode_u64(PROP_MIN_EXPIRE_TS, ttl_props.min_expire_ts);
Self::encode_to(ttl_props, &mut props);
props
}

pub fn decode<T: DecodeProperties>(props: &T) -> Result<TtlProperties> {
let res = TtlProperties {
max_expire_ts: props.decode_u64(PROP_MAX_EXPIRE_TS)?,
min_expire_ts: props.decode_u64(PROP_MIN_EXPIRE_TS)?,
};
Ok(res)
pub fn decode_from<T: DecodeProperties>(ttl_props: &mut TtlProperties, props: &T) {
ttl_props.max_expire_ts = props.decode_u64(PROP_MAX_EXPIRE_TS).ok();
ttl_props.min_expire_ts = props.decode_u64(PROP_MIN_EXPIRE_TS).ok();
}

pub fn decode<T: DecodeProperties>(props: &T) -> TtlProperties {
let mut res = TtlProperties::default();
Self::decode_from(&mut res, props);
res
}
}

Expand All @@ -46,11 +57,10 @@ impl TtlPropertiesExt for RocksEngine {

let mut res = Vec::new();
for (file_name, v) in collection.iter() {
let prop = match RocksTtlProperties::decode(v.user_collected_properties()) {
Ok(v) => v,
Err(_) => continue,
};
res.push((file_name.to_string(), prop));
let prop = RocksTtlProperties::decode(v.user_collected_properties());
if prop.is_some() {
res.push((file_name.to_string(), prop));
}
}
Ok(res)
}
Expand Down Expand Up @@ -81,12 +91,7 @@ impl<F: KvFormat> TablePropertiesCollector for TtlPropertiesCollector<F> {
expire_ts: Some(expire_ts),
..
}) => {
self.prop.max_expire_ts = std::cmp::max(self.prop.max_expire_ts, expire_ts);
if self.prop.min_expire_ts == 0 {
self.prop.min_expire_ts = expire_ts;
} else {
self.prop.min_expire_ts = std::cmp::min(self.prop.min_expire_ts, expire_ts);
}
self.prop.add(expire_ts);
}
Err(err) => {
error!(
Expand All @@ -101,9 +106,6 @@ impl<F: KvFormat> TablePropertiesCollector for TtlPropertiesCollector<F> {
}

fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> {
if self.prop.max_expire_ts == 0 && self.prop.min_expire_ts == 0 {
return HashMap::default();
}
RocksTtlProperties::encode(&self.prop).0
}
}
Expand Down Expand Up @@ -138,7 +140,7 @@ mod tests {
}

fn test_ttl_properties_impl<F: KvFormat>() {
let get_properties = |case: &[(&'static str, u64)]| -> Result<TtlProperties> {
let get_properties = |case: &[(&'static str, u64)]| -> TtlProperties {
let mut collector = TtlPropertiesCollector::<F> {
prop: Default::default(),
_phantom: PhantomData,
Expand All @@ -165,31 +167,69 @@ mod tests {
RocksTtlProperties::decode(&result)
};

// NOTE: expire_ts=0 is considered as no TTL in `ApiVersion::V1ttl`
let case1 = [
("zr\0a", 0),
("zr\0b", UnixSecs::now().into_inner()),
("zr\0c", 1),
("zr\0d", u64::MAX),
("zr\0e", 0),
];
let props = get_properties(&case1).unwrap();
assert_eq!(props.max_expire_ts, u64::MAX);
let props = get_properties(&case1);
assert_eq!(props.max_expire_ts, Some(u64::MAX));
match F::TAG {
ApiVersion::V1 => unreachable!(),
ApiVersion::V1ttl => assert_eq!(props.min_expire_ts, 1),
// expire_ts = 0 is no longer a special case in API V2
ApiVersion::V2 => assert_eq!(props.min_expire_ts, 0),
ApiVersion::V1ttl => assert_eq!(props.min_expire_ts, Some(1)),
ApiVersion::V2 => assert_eq!(props.min_expire_ts, Some(0)),
}

let case2 = [("zr\0a", 0)];
get_properties(&case2).unwrap_err();
match F::TAG {
ApiVersion::V1 => unreachable!(),
ApiVersion::V1ttl => assert!(get_properties(&case2).is_none()),
ApiVersion::V2 => assert_eq!(props.min_expire_ts, Some(0)),
}

let case3 = [];
get_properties(&case3).unwrap_err();
assert!(get_properties(&case3).is_none());

let case4 = [("zr\0a", 1)];
let props = get_properties(&case4).unwrap();
assert_eq!(props.max_expire_ts, 1);
assert_eq!(props.min_expire_ts, 1);
let props = get_properties(&case4);
assert_eq!(props.max_expire_ts, Some(1));
assert_eq!(props.min_expire_ts, Some(1));
}

#[test]
fn test_ttl_properties_codec() {
let cases: Vec<(Option<u64>, Option<u64>, Vec<(&[u8], u64)>)> = vec![
(
Some(0), // min_expire_ts
Some(1), // max_expire_ts
vec![(b"tikv.min_expire_ts", 0), (b"tikv.max_expire_ts", 1)], // UserProperties
),
(None, None, vec![]),
(Some(0), None, vec![(b"tikv.min_expire_ts", 0)]),
(None, Some(0), vec![(b"tikv.max_expire_ts", 0)]),
];

for (i, (min_expire_ts, max_expire_ts, expect_user_props)) in cases.into_iter().enumerate()
{
let ttl_props = TtlProperties {
min_expire_ts,
max_expire_ts,
};
let user_props = RocksTtlProperties::encode(&ttl_props);
let expect_user_props = UserProperties(
expect_user_props
.into_iter()
.map(|(name, value)| (name.to_vec(), value.to_be_bytes().to_vec()))
.collect::<HashMap<_, _>>(),
);
assert_eq!(user_props.0, expect_user_props.0, "case {}", i);

let decoded = RocksTtlProperties::decode(&user_props);
assert_eq!(decoded.max_expire_ts, ttl_props.max_expire_ts, "case {}", i);
assert_eq!(decoded.min_expire_ts, ttl_props.min_expire_ts, "case {}", i);
}
}
}
5 changes: 5 additions & 0 deletions components/engine_traits/src/mvcc_properties.rs
Expand Up @@ -4,6 +4,8 @@ use std::cmp;

use txn_types::TimeStamp;

use crate::TtlProperties;

#[derive(Clone, Debug)]
pub struct MvccProperties {
pub min_ts: TimeStamp, // The minimal timestamp.
Expand All @@ -13,6 +15,7 @@ pub struct MvccProperties {
pub num_deletes: u64, // The number of MVCC deletes of all rows.
pub num_versions: u64, // The number of MVCC versions of all rows.
pub max_row_versions: u64, // The maximal number of MVCC versions of a single row.
pub ttl: TtlProperties, // The ttl properties of all rows, for RawKV only.
}

impl MvccProperties {
Expand All @@ -25,6 +28,7 @@ impl MvccProperties {
num_deletes: 0,
num_versions: 0,
max_row_versions: 0,
ttl: TtlProperties::default(),
}
}

Expand All @@ -36,6 +40,7 @@ impl MvccProperties {
self.num_deletes += other.num_deletes;
self.num_versions += other.num_versions;
self.max_row_versions = cmp::max(self.max_row_versions, other.max_row_versions);
self.ttl.merge(&other.ttl);
}
}

Expand Down
88 changes: 85 additions & 3 deletions components/engine_traits/src/ttl_properties.rs
Expand Up @@ -2,10 +2,42 @@

use crate::errors::Result;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct TtlProperties {
pub max_expire_ts: u64,
pub min_expire_ts: u64,
pub max_expire_ts: Option<u64>,
pub min_expire_ts: Option<u64>,
}

impl TtlProperties {
pub fn add(&mut self, expire_ts: u64) {
self.merge(&TtlProperties {
max_expire_ts: Some(expire_ts),
min_expire_ts: Some(expire_ts),
});
}

pub fn merge(&mut self, other: &TtlProperties) {
if let Some(max_expire_ts) = other.max_expire_ts {
self.max_expire_ts = Some(std::cmp::max(
self.max_expire_ts.unwrap_or(u64::MIN),
max_expire_ts,
));
}
if let Some(min_expire_ts) = other.min_expire_ts {
self.min_expire_ts = Some(std::cmp::min(
self.min_expire_ts.unwrap_or(u64::MAX),
min_expire_ts,
));
}
}

pub fn is_some(&self) -> bool {
self.max_expire_ts.is_some() || self.min_expire_ts.is_some()
}

pub fn is_none(&self) -> bool {
!self.is_some()
}
}

pub trait TtlPropertiesExt {
Expand All @@ -16,3 +48,53 @@ pub trait TtlPropertiesExt {
end_key: &[u8],
) -> Result<Vec<(String, TtlProperties)>>;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_ttl_properties() {
let verify = |prop: &TtlProperties, min: Option<u64>, max: Option<u64>| {
assert_eq!(prop.min_expire_ts, min);
assert_eq!(prop.max_expire_ts, max);
};

// add
let mut prop1 = TtlProperties::default();
assert!(prop1.is_none());
prop1.add(10);
assert!(prop1.is_some());
verify(&prop1, Some(10), Some(10));

// merge
{
let mut prop2 = TtlProperties::default();
prop2.add(20);
verify(&prop2, Some(20), Some(20));

prop1.merge(&prop2);
verify(&prop1, Some(10), Some(20));
}

// none merge some
let mut prop3 = TtlProperties::default();
prop3.merge(&prop1);
verify(&prop3, Some(10), Some(20));

// some merge none
{
let prop4 = TtlProperties::default();
prop3.merge(&prop4);
verify(&prop3, Some(10), Some(20));
}

// add
{
prop3.add(30);
verify(&prop3, Some(10), Some(30));
prop3.add(0);
verify(&prop3, Some(0), Some(30));
}
}
}
12 changes: 8 additions & 4 deletions src/config/mod.rs
Expand Up @@ -721,10 +721,6 @@ impl DefaultCfConfig {
prop_size_index_distance: self.prop_size_index_distance,
prop_keys_index_distance: self.prop_keys_index_distance,
};
cf_opts.add_table_properties_collector_factory(
"tikv.rawkv-mvcc-properties-collector",
RawMvccPropertiesCollectorFactory::default(),
);
cf_opts.add_table_properties_collector_factory("tikv.range-properties-collector", f);
if let Some(factory) = filter_factory {
match api_version {
Expand All @@ -750,6 +746,10 @@ impl DefaultCfConfig {
.unwrap();
}
ApiVersion::V2 => {
cf_opts.add_table_properties_collector_factory(
"tikv.rawkv-mvcc-properties-collector",
RawMvccPropertiesCollectorFactory::default(),
);
let factory = StackingCompactionFilterFactory::new(
factory.clone(),
RawCompactionFilterFactory,
Expand Down Expand Up @@ -780,6 +780,10 @@ impl DefaultCfConfig {
.unwrap();
}
ApiVersion::V2 => {
cf_opts.add_table_properties_collector_factory(
"tikv.rawkv-mvcc-properties-collector",
RawMvccPropertiesCollectorFactory::default(),
);
cf_opts
.set_compaction_filter_factory(
"apiv2_gc_compaction_filter_factory",
Expand Down
4 changes: 2 additions & 2 deletions src/server/gc_worker/compaction_filter.rs
Expand Up @@ -880,7 +880,7 @@ pub mod test_utils {
self
}

fn prepare_gc(&self, engine: &RocksEngine) {
pub fn prepare_gc(&self, engine: &RocksEngine) {
let safe_point = Arc::new(AtomicU64::new(self.safe_point));
let cfg_tracker = {
let mut cfg = GcConfig::default();
Expand Down Expand Up @@ -909,7 +909,7 @@ pub mod test_utils {
});
}

fn post_gc(&mut self) {
pub fn post_gc(&mut self) {
self.callbacks_on_drop.clear();
let mut gc_context = GC_CONTEXT.lock().unwrap();
let callbacks = &mut gc_context.as_mut().unwrap().callbacks_on_drop;
Expand Down

0 comments on commit 8577e53

Please sign in to comment.