Skip to content

Commit

Permalink
Adds support to DBWithTTL
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduardo Barreto Alexandre committed Oct 11, 2020
1 parent ec888a1 commit 63f7a0f
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 0 deletions.
59 changes: 59 additions & 0 deletions rocks-sys/rocks/db.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "rocksdb/db.h"
#include "rocksdb/utilities/db_ttl.h"

#include <iostream>
#include <unordered_map>
Expand Down Expand Up @@ -45,6 +46,18 @@ rocks_db_t* rocks_db_open(const rocks_options_t* options, const char* name, rock
}
}

rocks_db_t* rocks_db_open_with_ttl(const rocks_options_t* options, const char* name, int ttl, rocks_status_t** status) {
DBWithTTL* db = nullptr;
Status st = DBWithTTL::Open(options->rep, std::string(name), &db, ttl);
if (SaveError(status, std::move(st))) {
return nullptr;
} else {
rocks_db_t* result = new rocks_db_t;
result->rep = db;
return result;
}
}

void rocks_db_close(rocks_db_t* db, rocks_status_t** status) { SaveError(status, db->rep->Close()); }

void rocks_db_resume(rocks_db_t* db, rocks_status_t** status) { SaveError(status, db->rep->Resume()); }
Expand Down Expand Up @@ -141,6 +154,36 @@ rocks_db_t* rocks_db_open_column_families(const rocks_dboptions_t* db_options, c
return result;
}

rocks_db_t* rocks_db_open_column_families_with_ttl(const rocks_dboptions_t* db_options, const char* name,
int num_column_families, const char* const* column_family_names,
const rocks_cfoptions_t* const* column_family_options,
rocks_column_family_handle_t** column_family_handles, const int* ttls,
rocks_status_t** status) {
std::vector<int32_t> ttls_vec;
std::vector<ColumnFamilyDescriptor> column_families;
for (int i = 0; i < num_column_families; i++) {
ttls_vec.push_back(ttls[i]);

column_families.push_back(ColumnFamilyDescriptor(std::string(column_family_names[i]),
ColumnFamilyOptions(column_family_options[i]->rep)));
}

DBWithTTL* db = nullptr;
std::vector<ColumnFamilyHandle*> handles;
if (SaveError(status, DBWithTTL::Open(db_options->rep, std::string(name), column_families, &handles, &db, ttls_vec))) {
return nullptr;
}

for (size_t i = 0; i < handles.size(); i++) {
rocks_column_family_handle_t* c_handle = new rocks_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
rocks_db_t* result = new rocks_db_t;
result->rep = db;
return result;
}

rocks_db_t* rocks_db_open_for_read_only_column_families(const rocks_dboptions_t* db_options, const char* name,
int num_column_families, const char* const* column_family_names,
const rocks_cfoptions_t* const* column_family_options,
Expand Down Expand Up @@ -208,6 +251,22 @@ rocks_column_family_handle_t* rocks_db_create_column_family(rocks_db_t* db,
return handle;
}

rocks_column_family_handle_t* rocks_db_create_column_family_with_ttl(rocks_db_t* db,
const rocks_cfoptions_t* column_family_options,
const char* column_family_name, int ttl,
rocks_status_t** status) {
DBWithTTL* db_with_ttl = static_cast<DBWithTTL*>(db->rep);

rocks_column_family_handle_t* handle = new rocks_column_family_handle_t;
auto st = db_with_ttl->CreateColumnFamilyWithTtl(ColumnFamilyOptions(column_family_options->rep),
std::string(column_family_name), &(handle->rep), ttl);
if (SaveError(status, std::move(st))) {
delete handle;
handle = nullptr;
}
return handle;
}

rocks_column_family_handle_t* rocks_db_default_column_family(rocks_db_t* db) {
return new rocks_column_family_handle_t{db->rep->DefaultColumnFamily()};
}
Expand Down
29 changes: 29 additions & 0 deletions rocks-sys/src/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,14 @@ extern "C" {
status: *mut *mut rocks_status_t,
) -> *mut rocks_db_t;
}
extern "C" {
pub fn rocks_db_open_with_ttl(
options: *const rocks_options_t,
name: *const ::std::os::raw::c_char,
ttl: ::std::os::raw::c_int,
status: *mut *mut rocks_status_t,
) -> *mut rocks_db_t;
}
extern "C" {
pub fn rocks_db_close(db: *mut rocks_db_t, status: *mut *mut rocks_status_t);
}
Expand Down Expand Up @@ -1181,6 +1189,18 @@ extern "C" {
status: *mut *mut rocks_status_t,
) -> *mut rocks_db_t;
}
extern "C" {
pub fn rocks_db_open_column_families_with_ttl(
db_options: *const rocks_dboptions_t,
name: *const ::std::os::raw::c_char,
num_column_families: ::std::os::raw::c_int,
column_family_names: *const *const ::std::os::raw::c_char,
column_family_options: *const *const rocks_cfoptions_t,
column_family_handles: *mut *mut rocks_column_family_handle_t,
ttls: *const ::std::os::raw::c_int,
status: *mut *mut rocks_status_t,
) -> *mut rocks_db_t;
}
extern "C" {
pub fn rocks_db_open_for_read_only_column_families(
db_options: *const rocks_dboptions_t,
Expand Down Expand Up @@ -1212,6 +1232,15 @@ extern "C" {
status: *mut *mut rocks_status_t,
) -> *mut rocks_column_family_handle_t;
}
extern "C" {
pub fn rocks_db_create_column_family_with_ttl(
db: *mut rocks_db_t,
column_family_options: *const rocks_cfoptions_t,
column_family_name: *const ::std::os::raw::c_char,
tll: ::std::os::raw::c_int,
status: *mut *mut rocks_status_t,
) -> *mut rocks_column_family_handle_t;
}
extern "C" {
pub fn rocks_db_default_column_family(db: *mut rocks_db_t) -> *mut rocks_column_family_handle_t;
}
Expand Down
101 changes: 101 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::ptr;
use std::slice;
use std::str;
use std::sync::Arc;
use std::time::Duration;

use rocks_sys as ll;

Expand Down Expand Up @@ -729,6 +730,18 @@ impl DB {
}
}

/// Open the database with the specified `name` and ttl.
pub fn open_with_ttl<T: AsRef<Options>, P: AsRef<Path>>(options: T, name: P, ttl: Option<Duration>) -> Result<DB> {
let opt = options.as_ref().raw();
let dbname = CString::new(path_to_bytes(name)).unwrap();
let ttl = ttl.map(|ttl| ttl.as_secs() as i32).unwrap_or(0);
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open_with_ttl(opt, dbname.as_ptr(), ttl, &mut status);
Error::from_ll(status).map(|_| DB::from_ll(db_ptr))
}
}

/// Open DB with column families.
pub fn open_with_column_families<CF: Into<ColumnFamilyDescriptor>, P: AsRef<Path>, I: IntoIterator<Item = CF>>(
options: &DBOptions,
Expand Down Expand Up @@ -783,6 +796,71 @@ impl DB {
}
}

/// Open DB with column families and ttls.
pub fn open_with_column_families_and_ttls<
CF: Into<ColumnFamilyDescriptor>,
P: AsRef<Path>,
I: IntoIterator<Item = CF>,
>(
options: &DBOptions,
name: P,
column_families: I,
ttls: Vec<Option<Duration>>,
) -> Result<(DB, Vec<ColumnFamily>)> {
let opt = options.raw();
let dbname = CString::new(path_to_bytes(name)).unwrap();

let cfs = column_families
.into_iter()
.map(|desc| desc.into())
.collect::<Vec<ColumnFamilyDescriptor>>();

let num_column_families = cfs.len();
// for ffi
let mut cfnames: Vec<*const c_char> = Vec::with_capacity(num_column_families);
let mut cfopts: Vec<*const ll::rocks_cfoptions_t> = Vec::with_capacity(num_column_families);
let mut cfhandles = vec![ptr::null_mut(); num_column_families];

for cf in &cfs {
cfnames.push(cf.name_as_ptr());
cfopts.push(cf.options.raw());
}

let ttls: Vec<i32> = ttls
.into_iter()
.map(|opt| opt.map(|ttl| ttl.as_secs() as i32).unwrap_or(0))
.collect();

let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open_column_families_with_ttl(
options.raw(),
dbname.as_ptr(),
num_column_families as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
ttls.as_ptr(),
&mut status,
);
Error::from_ll(status).map(|_| {
let db = DB::from_ll(db_ptr);
let db_ref = db.context.clone();
(
db,
cfhandles
.into_iter()
.map(|p| ColumnFamily {
handle: ColumnFamilyHandle { raw: p },
db: db_ref.clone(),
owned: true,
})
.collect(),
)
})
}
}

/// Open the database for read only. All DB interfaces
/// that modify data, like `put/delete`, will return error.
/// If the db is opened in read only mode, then no compactions
Expand Down Expand Up @@ -980,6 +1058,29 @@ impl DB {
})
}
}

/// Create a column_family with ttl and return the handle of column family
/// through the argument handle.
pub fn create_column_family_with_ttl(
&self,
cfopts: &ColumnFamilyOptions,
column_family_name: &str,
ttl: Option<Duration>,
) -> Result<ColumnFamily> {
let dbname = CString::new(column_family_name).unwrap();
let ttl = ttl.map(|ttl| ttl.as_secs() as i32).unwrap_or(0);
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let handle =
ll::rocks_db_create_column_family_with_ttl(self.raw(), cfopts.raw(), dbname.as_ptr(), ttl, &mut status);
Error::from_ll(status).map(|_| ColumnFamily {
handle: ColumnFamilyHandle { raw: handle },
db: self.context.clone(),
owned: true,
})
}
}

/// Drop a column family specified by column_family handle. This call
/// only records a drop record in the manifest and prevents the column
/// family from flushing and compacting.
Expand Down

0 comments on commit 63f7a0f

Please sign in to comment.