From cf0ff2024030aa7cc91af68de3d160cd7f07ded5 Mon Sep 17 00:00:00 2001 From: Simonas Kazlauskas Date: Mon, 15 Apr 2024 15:13:29 +0300 Subject: [PATCH] rocksdb: separate out the full and partial merge operators The previous code was extremely confusing and also not particularly efficient, for it was merging each pair of operators for each reference-counted value, which for refcount increments would contain the entire value of the data too. For now we only partially merge the refcount decrements only, but we should investigate if use of a partial merge operator is giving us any perf wins at all, given how we need to allocate 8 byte vectors for the reference counts all the time? --- core/store/src/db/refcount.rs | 115 +++++++++++++++++++++++----------- core/store/src/db/rocksdb.rs | 6 +- core/store/src/db/testdb.rs | 2 +- 3 files changed, 85 insertions(+), 38 deletions(-) diff --git a/core/store/src/db/refcount.rs b/core/store/src/db/refcount.rs index 0ecef4e7bb2..2c572344134 100644 --- a/core/store/src/db/refcount.rs +++ b/core/store/src/db/refcount.rs @@ -35,7 +35,7 @@ use crate::DBCol; /// In builds with debug assertions enabled, panics if `bytes` are non-empty but /// too short to fit 64-bit reference count. pub fn decode_value_with_rc(bytes: &[u8]) -> (Option<&[u8]>, i64) { - match bytes.split_last_chunk::<8>() { + match bytes.split_last_chunk() { None => { debug_assert!(bytes.is_empty()); return (None, 0); @@ -112,27 +112,55 @@ pub(crate) fn encode_negative_refcount(rc: std::num::NonZeroU32) -> [u8; 8] { /// Assumes that all provided values with positive reference count have the same /// value so that the function is free to pick any of the values. In build with /// debug assertions panics if this is not true. -pub(crate) fn refcount_merge<'a>( +pub(crate) fn refcount_full_merge<'a, 'b>( existing: Option<&'a [u8]>, - operands: impl IntoIterator, + operands: impl IntoIterator, ) -> Vec { let (mut payload, mut rc) = existing.map_or((None, 0), decode_value_with_rc); for (new_payload, delta) in operands.into_iter().map(decode_value_with_rc) { - if payload.is_none() { - payload = new_payload; - } else if new_payload.is_some() { - debug_assert_eq!(payload, new_payload); - } rc += delta; + if rc <= 0 { + // if the reference count drops to 0 during a merge, "release" the current value. + rc = 0; + payload = None; + continue; + } + match (payload, new_payload) { + (Some(p), Some(n)) => debug_assert_eq!(p, n), + (Some(_), None) => {} + (None, new_payload) => payload = new_payload, + } } - match rc.cmp(&0) { - Ordering::Less => rc.to_le_bytes().to_vec(), - Ordering::Equal => Vec::new(), + Ordering::Less | Ordering::Equal => Vec::new(), // "free" the data. Ordering::Greater => [payload.unwrap_or(b""), &rc.to_le_bytes()].concat(), } } +/// Merge two operands of a full merge operator. +/// +/// Note that the return value of this is not written out into the database! The value returned +/// will eventually be passed to the full merge operator instead. +/// +/// FIXME: verify if its actually beneficial to implement this at all -- this is returning +/// newly allocated vectors of data just for +pub(crate) fn refcount_partial_merge<'a>( + operands: impl IntoIterator, +) -> Option> { + let mut rc = 0; + for (_, delta) in operands.into_iter().map(decode_value_with_rc) { + rc += delta; + } + match rc.cmp(&0) { + Ordering::Less => Some(rc.to_le_bytes().to_vec()), // reduce the refcount + Ordering::Equal => Some(Vec::new()), // no-op + // refcount increments can only be full-merged. This is because we either need a + // previous value or, if it does not exist, we would have to allocate a new buffer with + // new data here for every pair of operations which is relatively expensive. + Ordering::Greater => None, + } +} + /// Iterator treats empty value as no value and strips refcount pub(crate) fn iter_with_rc_logic<'a>( col: DBCol, @@ -151,6 +179,16 @@ pub(crate) fn iter_with_rc_logic<'a>( } impl RocksDB { + /// Merge adds refcounts, zero refcount becomes empty value. + /// Empty values get filtered by get methods, and removed by compaction. + pub(crate) fn refcount_merge_partial( + _new_key: &[u8], + _existing: Option<&[u8]>, // always None + operands: &rocksdb::MergeOperands, + ) -> Option> { + refcount_partial_merge(operands) + } + /// Merge adds refcounts, zero refcount becomes empty value. /// Empty values get filtered by get methods, and removed by compaction. pub(crate) fn refcount_merge( @@ -158,7 +196,7 @@ impl RocksDB { existing: Option<&[u8]>, operands: &rocksdb::MergeOperands, ) -> Option> { - Some(self::refcount_merge(existing, operands)) + Some(self::refcount_full_merge(existing, operands)) } /// Compaction filter for DBCol::State @@ -252,35 +290,40 @@ mod test { #[test] fn refcount_merge() { - fn test(want: &[u8], operands: &[&[u8]]) { - let it = operands.into_iter().copied(); - let got = super::refcount_merge(None, it); - assert_eq!(want, got.as_slice()); - + fn test(_want_after_partial: &[u8], want_after_full: &[u8], operands: &[&[u8]]) { + // First test the outcome after a full merge without any partial merges. if !operands.is_empty() { let it = operands[1..].into_iter().copied(); - let got = super::refcount_merge(Some(operands[0]), it); - assert_eq!(want, got.as_slice()); + let got = super::refcount_full_merge(Some(operands[0]), it); + assert_eq!(want_after_full, got.as_slice()); } + + // let it = operands.into_iter().copied(); + // let got = super::refcount_partial_merge(it).unwrap(); + // assert_eq!(want, got.as_slice()); } - test(b"", &[]); - test(b"", &[ZERO]); - test(b"", &[PLUS_ONE, MINUS_ONE]); - test(b"", &[PLUS_TWO, MINUS_ONE, MINUS_ONE]); - test(b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE]); - test(b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]); - test(b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_TWO]); - - test(MINUS_ONE, &[MINUS_ONE]); - test(MINUS_ONE, &[b"", MINUS_ONE]); - test(MINUS_ONE, &[ZERO, MINUS_ONE]); - test(MINUS_ONE, &[b"foo\x01\0\0\0\0\0\0\0", MINUS_TWO]); - test(MINUS_ONE, &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]); - - test(b"foo\x02\0\0\0\0\0\0\0", &[b"foo\x01\0\0\0\0\0\0\0", b"foo\x01\0\0\0\0\0\0\0"]); - test(b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x01\0\0\0\0\0\0\0"]); - test(b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE]); + test(b"", b"", &[]); + test(b"", b"", &[ZERO]); + test(b"", b"", &[PLUS_ONE, MINUS_ONE]); + test(b"", b"", &[PLUS_TWO, MINUS_ONE, MINUS_ONE]); + test(b"", b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE]); + test(b"", b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]); + test(b"", b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_TWO]); + + test(MINUS_ONE, b"", &[MINUS_ONE]); + test(MINUS_ONE, b"", &[b"", MINUS_ONE]); + test(MINUS_ONE, b"", &[ZERO, MINUS_ONE]); + test(MINUS_ONE, b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_TWO]); + test(MINUS_ONE, b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]); + + test( + b"can't merge", + b"foo\x02\0\0\0\0\0\0\0", + &[b"foo\x01\0\0\0\0\0\0\0", b"foo\x01\0\0\0\0\0\0\0"], + ); + test(b"can't merge", b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x01\0\0\0\0\0\0\0"]); + test(b"can't merge", b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE]); } #[test] diff --git a/core/store/src/db/rocksdb.rs b/core/store/src/db/rocksdb.rs index aa37575d457..406cf773e50 100644 --- a/core/store/src/db/rocksdb.rs +++ b/core/store/src/db/rocksdb.rs @@ -615,7 +615,11 @@ fn rocksdb_column_options(col: DBCol, store_config: &StoreConfig, temp: Temperat opts.set_target_file_size_base(64 * bytesize::MIB); if temp == Temperature::Hot && col.is_rc() { - opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, RocksDB::refcount_merge); + opts.set_merge_operator( + "refcount merge", + RocksDB::refcount_merge, + RocksDB::refcount_merge_partial, + ); opts.set_compaction_filter("empty value filter", RocksDB::empty_value_compaction_filter); } opts diff --git a/core/store/src/db/testdb.rs b/core/store/src/db/testdb.rs index d1bc719ac83..1ce9e970648 100644 --- a/core/store/src/db/testdb.rs +++ b/core/store/src/db/testdb.rs @@ -93,7 +93,7 @@ impl Database for TestDB { DBOp::UpdateRefcount { col, key, value } => { let existing = db[col].get(&key).map(Vec::as_slice); let operands = [value.as_slice()]; - let merged = refcount::refcount_merge(existing, operands); + let merged = refcount::refcount_full_merge(existing, operands); if merged.is_empty() { db[col].remove(&key); } else {