Skip to content

Commit

Permalink
rocksdb: separate out the full and partial merge operators
Browse files Browse the repository at this point in the history
The previous code was extremely confusing and also not particularly
efficient, for it was merging each pair of operators for each
reference-counted value, which for refcount increments would contain the
entire value of the data too.

For now we only partially merge the refcount decrements only, but we
should investigate if use of a partial merge operator is giving us any
perf wins at all, given how we need to allocate 8 byte vectors for the
reference counts all the time?
  • Loading branch information
nagisa committed Apr 16, 2024
1 parent 4d506a7 commit cf0ff20
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 38 deletions.
115 changes: 79 additions & 36 deletions core/store/src/db/refcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::DBCol;
/// In builds with debug assertions enabled, panics if `bytes` are non-empty but
/// too short to fit 64-bit reference count.
pub fn decode_value_with_rc(bytes: &[u8]) -> (Option<&[u8]>, i64) {
match bytes.split_last_chunk::<8>() {
match bytes.split_last_chunk() {
None => {
debug_assert!(bytes.is_empty());
return (None, 0);
Expand Down Expand Up @@ -112,27 +112,55 @@ pub(crate) fn encode_negative_refcount(rc: std::num::NonZeroU32) -> [u8; 8] {
/// Assumes that all provided values with positive reference count have the same
/// value so that the function is free to pick any of the values. In build with
/// debug assertions panics if this is not true.
pub(crate) fn refcount_merge<'a>(
pub(crate) fn refcount_full_merge<'a, 'b>(
existing: Option<&'a [u8]>,
operands: impl IntoIterator<Item = &'a [u8]>,
operands: impl IntoIterator<Item = &'b [u8]>,
) -> Vec<u8> {
let (mut payload, mut rc) = existing.map_or((None, 0), decode_value_with_rc);
for (new_payload, delta) in operands.into_iter().map(decode_value_with_rc) {
if payload.is_none() {
payload = new_payload;
} else if new_payload.is_some() {
debug_assert_eq!(payload, new_payload);
}
rc += delta;
if rc <= 0 {
// if the reference count drops to 0 during a merge, "release" the current value.
rc = 0;
payload = None;
continue;
}
match (payload, new_payload) {
(Some(p), Some(n)) => debug_assert_eq!(p, n),
(Some(_), None) => {}
(None, new_payload) => payload = new_payload,
}
}

match rc.cmp(&0) {
Ordering::Less => rc.to_le_bytes().to_vec(),
Ordering::Equal => Vec::new(),
Ordering::Less | Ordering::Equal => Vec::new(), // "free" the data.
Ordering::Greater => [payload.unwrap_or(b""), &rc.to_le_bytes()].concat(),
}
}

/// Merge two operands of a full merge operator.
///
/// Note that the return value of this is not written out into the database! The value returned
/// will eventually be passed to the full merge operator instead.
///
/// FIXME: verify if its actually beneficial to implement this at all -- this is returning
/// newly allocated vectors of data just for
pub(crate) fn refcount_partial_merge<'a>(
operands: impl IntoIterator<Item = &'a [u8]>,
) -> Option<Vec<u8>> {
let mut rc = 0;
for (_, delta) in operands.into_iter().map(decode_value_with_rc) {
rc += delta;
}
match rc.cmp(&0) {
Ordering::Less => Some(rc.to_le_bytes().to_vec()), // reduce the refcount
Ordering::Equal => Some(Vec::new()), // no-op
// refcount increments can only be full-merged. This is because we either need a
// previous value or, if it does not exist, we would have to allocate a new buffer with
// new data here for every pair of operations which is relatively expensive.
Ordering::Greater => None,
}
}

/// Iterator treats empty value as no value and strips refcount
pub(crate) fn iter_with_rc_logic<'a>(
col: DBCol,
Expand All @@ -151,14 +179,24 @@ pub(crate) fn iter_with_rc_logic<'a>(
}

impl RocksDB {
/// Merge adds refcounts, zero refcount becomes empty value.
/// Empty values get filtered by get methods, and removed by compaction.
pub(crate) fn refcount_merge_partial(
_new_key: &[u8],
_existing: Option<&[u8]>, // always None
operands: &rocksdb::MergeOperands,
) -> Option<Vec<u8>> {
refcount_partial_merge(operands)
}

/// Merge adds refcounts, zero refcount becomes empty value.
/// Empty values get filtered by get methods, and removed by compaction.
pub(crate) fn refcount_merge(
_new_key: &[u8],
existing: Option<&[u8]>,
operands: &rocksdb::MergeOperands,
) -> Option<Vec<u8>> {
Some(self::refcount_merge(existing, operands))
Some(self::refcount_full_merge(existing, operands))
}

/// Compaction filter for DBCol::State
Expand Down Expand Up @@ -252,35 +290,40 @@ mod test {

#[test]
fn refcount_merge() {
fn test(want: &[u8], operands: &[&[u8]]) {
let it = operands.into_iter().copied();
let got = super::refcount_merge(None, it);
assert_eq!(want, got.as_slice());

fn test(_want_after_partial: &[u8], want_after_full: &[u8], operands: &[&[u8]]) {
// First test the outcome after a full merge without any partial merges.
if !operands.is_empty() {
let it = operands[1..].into_iter().copied();
let got = super::refcount_merge(Some(operands[0]), it);
assert_eq!(want, got.as_slice());
let got = super::refcount_full_merge(Some(operands[0]), it);
assert_eq!(want_after_full, got.as_slice());
}

// let it = operands.into_iter().copied();
// let got = super::refcount_partial_merge(it).unwrap();
// assert_eq!(want, got.as_slice());
}

test(b"", &[]);
test(b"", &[ZERO]);
test(b"", &[PLUS_ONE, MINUS_ONE]);
test(b"", &[PLUS_TWO, MINUS_ONE, MINUS_ONE]);
test(b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE]);
test(b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]);
test(b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_TWO]);

test(MINUS_ONE, &[MINUS_ONE]);
test(MINUS_ONE, &[b"", MINUS_ONE]);
test(MINUS_ONE, &[ZERO, MINUS_ONE]);
test(MINUS_ONE, &[b"foo\x01\0\0\0\0\0\0\0", MINUS_TWO]);
test(MINUS_ONE, &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]);

test(b"foo\x02\0\0\0\0\0\0\0", &[b"foo\x01\0\0\0\0\0\0\0", b"foo\x01\0\0\0\0\0\0\0"]);
test(b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x01\0\0\0\0\0\0\0"]);
test(b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE]);
test(b"", b"", &[]);
test(b"", b"", &[ZERO]);
test(b"", b"", &[PLUS_ONE, MINUS_ONE]);
test(b"", b"", &[PLUS_TWO, MINUS_ONE, MINUS_ONE]);
test(b"", b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE]);
test(b"", b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]);
test(b"", b"", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_TWO]);

test(MINUS_ONE, b"", &[MINUS_ONE]);
test(MINUS_ONE, b"", &[b"", MINUS_ONE]);
test(MINUS_ONE, b"", &[ZERO, MINUS_ONE]);
test(MINUS_ONE, b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_TWO]);
test(MINUS_ONE, b"", &[b"foo\x01\0\0\0\0\0\0\0", MINUS_ONE, MINUS_ONE]);

test(
b"can't merge",
b"foo\x02\0\0\0\0\0\0\0",
&[b"foo\x01\0\0\0\0\0\0\0", b"foo\x01\0\0\0\0\0\0\0"],
);
test(b"can't merge", b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x01\0\0\0\0\0\0\0"]);
test(b"can't merge", b"foo\x01\0\0\0\0\0\0\0", &[b"foo\x02\0\0\0\0\0\0\0", MINUS_ONE]);
}

#[test]
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ fn rocksdb_column_options(col: DBCol, store_config: &StoreConfig, temp: Temperat

opts.set_target_file_size_base(64 * bytesize::MIB);
if temp == Temperature::Hot && col.is_rc() {
opts.set_merge_operator("refcount merge", RocksDB::refcount_merge, RocksDB::refcount_merge);
opts.set_merge_operator(
"refcount merge",
RocksDB::refcount_merge,
RocksDB::refcount_merge_partial,
);
opts.set_compaction_filter("empty value filter", RocksDB::empty_value_compaction_filter);
}
opts
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/db/testdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Database for TestDB {
DBOp::UpdateRefcount { col, key, value } => {
let existing = db[col].get(&key).map(Vec::as_slice);
let operands = [value.as_slice()];
let merged = refcount::refcount_merge(existing, operands);
let merged = refcount::refcount_full_merge(existing, operands);
if merged.is_empty() {
db[col].remove(&key);
} else {
Expand Down

0 comments on commit cf0ff20

Please sign in to comment.