Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merge fn closure from pr385 #481

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,21 @@ jobs:
test:
name: ${{ matrix.build }}
runs-on: ${{ matrix.os }}
continue-on-error: ${{ matrix.experimental }}
strategy:
fail-fast: false
matrix:
build: [Linux, macOS, Windows]
include:
- build: Linux
os: ubuntu-latest
experimental: false
- build: macOS
os: macOS-latest
experimental: false
- build: Windows
os: windows-latest
experimental: true
steps:
- name: Checkout sources
uses: actions/checkout@v2
Expand Down
48 changes: 36 additions & 12 deletions src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,26 +929,50 @@ impl Options {
}
}

pub fn set_merge_operator(
pub fn set_merge_operator_associative<F: MergeFn + Clone>(
&mut self,
name: &str,
full_merge_fn: MergeFn,
partial_merge_fn: Option<MergeFn>,
full_merge_fn: F,
) {
let cb = Box::new(MergeOperatorCallback {
name: CString::new(name.as_bytes()).unwrap(),
full_merge_fn: full_merge_fn.clone(),
partial_merge_fn: full_merge_fn,
});

unsafe {
let mo = ffi::rocksdb_mergeoperator_create(
Box::into_raw(cb) as _,
Some(merge_operator::destructor_callback::<F, F>),
Some(full_merge_callback::<F, F>),
Some(partial_merge_callback::<F, F>),
None,
Some(merge_operator::name_callback::<F, F>),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
}

pub fn set_merge_operator<F: MergeFn, PF: MergeFn>(
&mut self,
name: &str,
full_merge_fn: F,
partial_merge_fn: PF,
) {
let cb = Box::new(MergeOperatorCallback {
name: CString::new(name.as_bytes()).unwrap(),
full_merge_fn,
partial_merge_fn: partial_merge_fn.unwrap_or(full_merge_fn),
partial_merge_fn,
});

unsafe {
let mo = ffi::rocksdb_mergeoperator_create(
mem::transmute(cb),
Some(merge_operator::destructor_callback),
Some(full_merge_callback),
Some(partial_merge_callback),
Some(merge_operator::delete_callback),
Some(merge_operator::name_callback),
Box::into_raw(cb) as _,
Some(merge_operator::destructor_callback::<F, PF>),
Some(full_merge_callback::<F, PF>),
Some(partial_merge_callback::<F, PF>),
None,
Some(merge_operator::name_callback::<F, PF>),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
Expand All @@ -958,8 +982,8 @@ impl Options {
since = "0.5.0",
note = "add_merge_operator has been renamed to set_merge_operator"
)]
pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) {
self.set_merge_operator(name, merge_fn, None);
pub fn add_merge_operator<F: MergeFn + Clone>(&mut self, name: &str, merge_fn: F) {
self.set_merge_operator_associative(name, merge_fn);
}

/// Sets a compaction filter used to determine if entries should be kept, changed,
Expand Down
38 changes: 25 additions & 13 deletions src/merge_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
//!let mut opts = Options::default();
//!
//!opts.create_if_missing(true);
//!opts.set_merge_operator("test operator", concat_merge, None);
//!opts.set_merge_operator_associative("test operator", concat_merge);
//!//opts.set_merge_operator("test operator", concat_merge, partial_concat_merge); // if your merge is not associative
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this comment could be deleted as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops…
done !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

//!{
//! let db = DB::open(&opts, path).unwrap();
//! let p = db.put(b"k1", b"a");
Expand All @@ -61,16 +62,25 @@ use std::mem;
use std::ptr;
use std::slice;

pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>>;
//pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment really needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously not, it can be deleted

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I suspect things may get stuck here. I think this was a suggestion for you to update the PR @BoOTheFurious . But from your comment it seems like you expect someone else to do it 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry… I've done it

pub trait MergeFn:
Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
{
}
impl<F> MergeFn for F where
F: Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
{
}

pub struct MergeOperatorCallback {
pub struct MergeOperatorCallback<F: MergeFn, PF: MergeFn> {
pub name: CString,
pub full_merge_fn: MergeFn,
pub partial_merge_fn: MergeFn,
pub full_merge_fn: F,
pub partial_merge_fn: PF,
}

pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) {
let _: Box<MergeOperatorCallback> = mem::transmute(raw_cb);
pub unsafe extern "C" fn destructor_callback<F: MergeFn, PF: MergeFn>(raw_cb: *mut c_void) {
let _: Box<MergeOperatorCallback<F, PF>> =
Box::from_raw(raw_cb as *mut MergeOperatorCallback<F, PF>);
}

pub unsafe extern "C" fn delete_callback(
Expand All @@ -86,12 +96,14 @@ pub unsafe extern "C" fn delete_callback(
}
}

pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
pub unsafe extern "C" fn name_callback<F: MergeFn, PF: MergeFn>(
raw_cb: *mut c_void,
) -> *const c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
cb.name.as_ptr()
}

pub unsafe extern "C" fn full_merge_callback(
pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
Expand All @@ -103,7 +115,7 @@ pub unsafe extern "C" fn full_merge_callback(
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
let oldval = if existing_value.is_null() {
Expand All @@ -125,7 +137,7 @@ pub unsafe extern "C" fn full_merge_callback(
}
}

pub unsafe extern "C" fn partial_merge_callback(
pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
Expand All @@ -135,7 +147,7 @@ pub unsafe extern "C" fn partial_merge_callback(
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
if let Some(result) = (cb.partial_merge_fn)(key, None, operands) {
Expand Down
8 changes: 4 additions & 4 deletions tests/test_column_family.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn test_column_family() {
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let mut db = DB::open(&opts, &n).unwrap();
let opts = Options::default();
match db.create_cf("cf1", &opts) {
Expand All @@ -41,7 +41,7 @@ fn test_column_family() {
// should fail to open db without specifying same column families
{
let mut opts = Options::default();
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
match DB::open(&opts, &n) {
Ok(_db) => panic!(
"should not have opened DB successfully without \
Expand All @@ -58,7 +58,7 @@ fn test_column_family() {
// should properly open db when specyfing all column families
{
let mut opts = Options::default();
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
match DB::open_cf(&opts, &n, &["cf1"]) {
Ok(_db) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e),
Expand Down Expand Up @@ -137,7 +137,7 @@ fn test_merge_operator() {
// TODO should be able to write, read, merge, batch, and iterate over a cf
{
let mut opts = Options::default();
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);
let db = match DB::open_cf(&opts, &n, &["cf1"]) {
Ok(db) => {
println!("successfully opened db with column family");
Expand Down
76 changes: 73 additions & 3 deletions tests/test_merge_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod util;

use pretty_assertions::assert_eq;

use rocksdb::merge_operator::MergeFn;
use rocksdb::{DBCompactionStyle, MergeOperands, Options, DB};
use util::DBPath;

Expand Down Expand Up @@ -46,7 +47,7 @@ fn merge_test() {
let db_path = DBPath::new("_rust_rocksdb_merge_test");
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_provided_merge, None);
opts.set_merge_operator_associative("test operator", test_provided_merge);

let db = DB::open(&opts, &db_path).unwrap();
let p = db.put(b"k1", b"a");
Expand Down Expand Up @@ -159,7 +160,7 @@ fn counting_merge_test() {
opts.set_merge_operator(
"sort operator",
test_counting_full_merge,
Some(test_counting_partial_merge),
test_counting_partial_merge,
);

let db = Arc::new(DB::open(&opts, &db_path).unwrap());
Expand Down Expand Up @@ -278,7 +279,7 @@ fn failed_merge_test() {
let db_path = DBPath::new("_rust_rocksdb_failed_merge_test");
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator("test operator", test_failing_merge, None);
opts.set_merge_operator_associative("test operator", test_failing_merge);

let db = DB::open(&opts, &db_path).expect("open with a merge operator");
db.put(b"key", b"value").expect("put_ok");
Expand All @@ -290,3 +291,72 @@ fn failed_merge_test() {
}
}
}

fn make_merge_max_with_limit(limit: u64) -> impl MergeFn + Clone {
move |_key: &[u8], first: Option<&[u8]>, rest: &mut MergeOperands| {
let max = first
.into_iter()
.chain(rest)
.map(|slice| {
let mut bytes: [u8; 8] = Default::default();
bytes.clone_from_slice(slice);
u64::from_ne_bytes(bytes)
})
.fold(0, u64::max);
let new_value = max.min(limit);
Some(Vec::from(new_value.to_ne_bytes().as_ref()))
}
}

#[test]
fn test_merge_state() {
use {Options, DB};
let path = "_rust_rocksdb_mergetest_state";
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_merge_operator_associative("max-limit-12", make_merge_max_with_limit(12));
{
let db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", 1u64.to_ne_bytes());
assert!(p.is_ok());
let _ = db.merge(b"k1", 7u64.to_ne_bytes());
let m = db.merge(b"k1", 64u64.to_ne_bytes());
assert!(m.is_ok());
match db.get(b"k1") {
Ok(Some(value)) => {
let mut bytes: [u8; 8] = Default::default();
bytes.copy_from_slice(&value);
assert_eq!(u64::from_ne_bytes(bytes), 12);
}
Err(_) => println!("error reading value"),
_ => panic!("value not present"),
}

assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
assert!(DB::destroy(&opts, path).is_ok());

opts.set_merge_operator_associative("max-limit-128", make_merge_max_with_limit(128));
{
let db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", 1u64.to_ne_bytes());
assert!(p.is_ok());
let _ = db.merge(b"k1", 7u64.to_ne_bytes());
let m = db.merge(b"k1", 64u64.to_ne_bytes());
assert!(m.is_ok());
match db.get(b"k1") {
Ok(Some(value)) => {
let mut bytes: [u8; 8] = Default::default();
bytes.copy_from_slice(&value);
assert_eq!(u64::from_ne_bytes(bytes), 64);
}
Err(_) => println!("error reading value"),
_ => panic!("value not present"),
}

assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
assert!(DB::destroy(&opts, path).is_ok());
}