Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DB iteration #10

Merged
merged 18 commits into from Dec 7, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9ecb158
Update DB#each to take 3 optional args: key_from, key_to, reversed
technoweenie Nov 28, 2011
dcac0dd
the iterator limit checker works properly with reversed iteration
technoweenie Nov 28, 2011
b80ea34
#reverse_each is more idiomatic ruby
technoweenie Nov 28, 2011
68b1ff2
key_from and key_to should always be a string
technoweenie Nov 28, 2011
1174d14
add iteration test
technoweenie Nov 29, 2011
4e6fd35
change the order of the limit checking so that `key_to` in #each beha…
technoweenie Nov 29, 2011
232d010
don't cache the data for iterations in memory
technoweenie Dec 3, 2011
b976809
add support for synchronous writes
technoweenie Dec 3, 2011
78b7eea
add :fill_cache and :verify_checksums to #get and :sync to #delete
technoweenie Dec 3, 2011
4af9562
don't bother reading values into memory when looping to count the key…
technoweenie Dec 3, 2011
782ae5f
preliminary WriteBatch support. crashes on exit, doh!
technoweenie Dec 3, 2011
e224962
don't need to free batch again, let the finalizer take care of it
technoweenie Dec 3, 2011
028fab1
add a test
technoweenie Dec 3, 2011
4a7bdd1
don't bother trying to instantiate WriteBatch yourself
technoweenie Dec 3, 2011
e22019f
stop interning those strings all the time
technoweenie Dec 3, 2011
40e0037
add sync support to #batch
technoweenie Dec 4, 2011
42b9aff
rename db_readOption to parse_read_options and db_writeOptions to par…
technoweenie Dec 7, 2011
6a246b5
move the custom readoptions to a static var so we don't have to recre…
technoweenie Dec 7, 2011
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -7,3 +7,4 @@ ext/leveldb/libleveldb.a
leveldb/libleveldb.a
leveldb/leveldb.gyp
pkg
test/*.db
7 changes: 7 additions & 0 deletions Rakefile
Expand Up @@ -26,4 +26,11 @@ end
Gem::PackageTask.new spec do
end

require 'rake/testtask'
Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.pattern = 'test/**/*_test.rb'
test.verbose = true
end

# vim: syntax=ruby
199 changes: 177 additions & 22 deletions ext/leveldb/leveldb.cc
Expand Up @@ -2,10 +2,17 @@

#include "leveldb/db.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"

static VALUE m_leveldb;
static VALUE c_db;
static VALUE c_batch;
static VALUE c_error;
static VALUE k_fill;
static VALUE k_verify;
static VALUE k_sync;
static ID to_s;
static leveldb::ReadOptions uncached_read_options;

// support 1.9 and 1.8
#ifndef RSTRING_PTR
Expand Down Expand Up @@ -62,37 +69,69 @@ static VALUE db_close(VALUE self) {
return Qtrue;
}

static leveldb::ReadOptions parse_read_options(VALUE options) {
leveldb::ReadOptions readOptions;

if(!NIL_P(options)) {
Check_Type(options, T_HASH);
if(rb_hash_aref(options, k_fill) == Qfalse)
readOptions.fill_cache = false;
if(rb_hash_aref(options, k_verify) == Qtrue)
readOptions.verify_checksums = true;
}

return readOptions;
}

static leveldb::WriteOptions parse_write_options(VALUE options) {
leveldb::WriteOptions writeOptions;

if(!NIL_P(options)) {
Check_Type(options, T_HASH);
if(rb_hash_aref(options, k_sync) == Qtrue)
writeOptions.sync = true;
}

return writeOptions;
}

#define RUBY_STRING_TO_SLICE(x) leveldb::Slice(RSTRING_PTR(x), RSTRING_LEN(x))
#define SLICE_TO_RUBY_STRING(x) rb_str_new(x.data(), x.size())
#define STRING_TO_RUBY_STRING(x) rb_str_new(x.data(), x.size())
static VALUE db_get(VALUE self, VALUE v_key) {
static VALUE db_get(int argc, VALUE* argv, VALUE self) {
VALUE v_key, v_options;
rb_scan_args(argc, argv, "11", &v_key, &v_options);
Check_Type(v_key, T_STRING);
leveldb::ReadOptions readOptions = parse_read_options(v_options);

bound_db* db;
Data_Get_Struct(self, bound_db, db);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
std::string value;
leveldb::Status status = db->db->Get(leveldb::ReadOptions(), key, &value);
leveldb::Status status = db->db->Get(readOptions, key, &value);
if(status.IsNotFound()) return Qnil;

RAISE_ON_ERROR(status);
return STRING_TO_RUBY_STRING(value);
}

static VALUE db_delete(VALUE self, VALUE v_key) {
static VALUE db_delete(int argc, VALUE* argv, VALUE self) {
VALUE v_key, v_options;
rb_scan_args(argc, argv, "11", &v_key, &v_options);
Check_Type(v_key, T_STRING);
leveldb::WriteOptions writeOptions = parse_write_options(v_options);

bound_db* db;
Data_Get_Struct(self, bound_db, db);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
std::string value;
leveldb::Status status = db->db->Get(leveldb::ReadOptions(), key, &value);
leveldb::Status status = db->db->Get(uncached_read_options, key, &value);

if(status.IsNotFound()) return Qnil;

status = db->db->Delete(leveldb::WriteOptions(), key);
status = db->db->Delete(writeOptions, key);
RAISE_ON_ERROR(status);

return STRING_TO_RUBY_STRING(value);
Expand All @@ -112,16 +151,20 @@ static VALUE db_exists(VALUE self, VALUE v_key) {
return Qtrue;
}

static VALUE db_put(VALUE self, VALUE v_key, VALUE v_value) {
static VALUE db_put(int argc, VALUE* argv, VALUE self) {
VALUE v_key, v_value, v_options;

rb_scan_args(argc, argv, "21", &v_key, &v_value, &v_options);
Check_Type(v_key, T_STRING);
Check_Type(v_value, T_STRING);
leveldb::WriteOptions writeOptions = parse_write_options(v_options);

bound_db* db;
Data_Get_Struct(self, bound_db, db);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
leveldb::Slice value = RUBY_STRING_TO_SLICE(v_value);
leveldb::Status status = db->db->Put(leveldb::WriteOptions(), key, value);
leveldb::Status status = db->db->Put(writeOptions, key, value);

RAISE_ON_ERROR(status);

Expand All @@ -133,7 +176,7 @@ static VALUE db_size(VALUE self) {

bound_db* db;
Data_Get_Struct(self, bound_db, db);
leveldb::Iterator* it = db->db->NewIterator(leveldb::ReadOptions());
leveldb::Iterator* it = db->db->NewIterator(uncached_read_options);

// apparently this is how we have to do it. slow and painful!
for (it->SeekToFirst(); it->Valid(); it->Next()) count++;
Expand All @@ -142,18 +185,45 @@ static VALUE db_size(VALUE self) {
return INT2NUM(count);
}

static VALUE db_each(VALUE self) {
static VALUE db_iterate(VALUE self, VALUE key_from, VALUE key_to, bool reversed) {
bound_db* db;
Data_Get_Struct(self, bound_db, db);
leveldb::Iterator* it = db->db->NewIterator(leveldb::ReadOptions());

for (it->SeekToFirst(); it->Valid(); it->Next()) {
VALUE key = SLICE_TO_RUBY_STRING(it->key());
VALUE value = SLICE_TO_RUBY_STRING(it->value());
VALUE ary = rb_ary_new2(2);
rb_ary_push(ary, key);
rb_ary_push(ary, value);
rb_yield(ary);
leveldb::Iterator* it = db->db->NewIterator(uncached_read_options);

if(RTEST(key_from)) {
it->Seek(RUBY_STRING_TO_SLICE(rb_funcall(key_from, to_s, 0)));
} else {
if(reversed) {
it->SeekToLast();
} else {
it->SeekToFirst();
}
}

bool passed_limit = false;
bool check_limit = RTEST(key_to);
std::string key_to_str;

if(check_limit)
key_to_str = RUBY_STRING_TO_SLICE(rb_funcall(key_to, to_s, 0)).ToString();

while(!passed_limit && it->Valid()) {
leveldb::Slice key_sl = it->key();

if(check_limit &&
(reversed ?
(key_sl.ToString() < key_to_str) :
(key_sl.ToString() > key_to_str))) {
passed_limit = true;
} else {
VALUE key = SLICE_TO_RUBY_STRING(key_sl);
VALUE value = SLICE_TO_RUBY_STRING(it->value());
VALUE ary = rb_ary_new2(2);
rb_ary_push(ary, key);
rb_ary_push(ary, value);
rb_yield(ary);
reversed ? it->Prev() : it->Next();
}
}

RAISE_ON_ERROR(it->status());
Expand All @@ -162,25 +232,110 @@ static VALUE db_each(VALUE self) {
return self;
}

static VALUE db_each(int argc, VALUE* argv, VALUE self) {
VALUE key_from, key_to;
rb_scan_args(argc, argv, "02", &key_from, &key_to);

return db_iterate(self, key_from, key_to, false);
}

static VALUE db_reverse_each(int argc, VALUE* argv, VALUE self) {
VALUE key_from, key_to;
rb_scan_args(argc, argv, "02", &key_from, &key_to);

return db_iterate(self, key_from, key_to, true);
}

static VALUE db_init(VALUE self, VALUE v_pathname) {
rb_iv_set(self, "@pathname", v_pathname);
return self;
}

typedef struct bound_batch {
leveldb::WriteBatch batch;
} bound_batch;

static void batch_free(bound_batch* batch) {
delete batch;
}

static VALUE batch_make(VALUE klass) {
bound_batch* batch = new bound_batch;
batch->batch = leveldb::WriteBatch();

VALUE o_batch = Data_Wrap_Struct(klass, NULL, batch_free, batch);
VALUE argv[0];
rb_obj_call_init(o_batch, 0, argv);

return o_batch;
}

static VALUE batch_put(VALUE self, VALUE v_key, VALUE v_value) {
Check_Type(v_key, T_STRING);
Check_Type(v_value, T_STRING);

bound_batch* batch;
Data_Get_Struct(self, bound_batch, batch);
batch->batch.Put(RUBY_STRING_TO_SLICE(v_key), RUBY_STRING_TO_SLICE(v_value));

return v_value;
}

static VALUE batch_delete(VALUE self, VALUE v_key) {
Check_Type(v_key, T_STRING);
bound_batch* batch;
Data_Get_Struct(self, bound_batch, batch);
batch->batch.Delete(RUBY_STRING_TO_SLICE(v_key));
return Qtrue;
}

static VALUE db_batch(int argc, VALUE* argv, VALUE self) {
VALUE o_batch = batch_make(c_batch);

rb_yield(o_batch);

bound_batch* batch;
bound_db* db;
Data_Get_Struct(o_batch, bound_batch, batch);
Data_Get_Struct(self, bound_db, db);

VALUE v_options;
rb_scan_args(argc, argv, "01", &v_options);
leveldb::WriteOptions writeOptions = parse_write_options(v_options);

leveldb::Status status = db->db->Write(writeOptions, &batch->batch);
RAISE_ON_ERROR(status);
return Qtrue;
}

extern "C" {
void Init_leveldb() {
k_fill = ID2SYM(rb_intern("fill_cache"));
k_verify = ID2SYM(rb_intern("verify_checksums"));
k_sync = ID2SYM(rb_intern("sync"));
to_s = rb_intern("to_s");
uncached_read_options = leveldb::ReadOptions();
uncached_read_options.fill_cache = false;

m_leveldb = rb_define_module("LevelDB");

c_db = rb_define_class_under(m_leveldb, "DB", rb_cObject);
rb_define_singleton_method(c_db, "make", (VALUE (*)(...))db_make, 3);
rb_define_method(c_db, "initialize", (VALUE (*)(...))db_init, 1);
rb_define_method(c_db, "get", (VALUE (*)(...))db_get, 1);
rb_define_method(c_db, "delete", (VALUE (*)(...))db_delete, 1);
rb_define_method(c_db, "put", (VALUE (*)(...))db_put, 2);
rb_define_method(c_db, "get", (VALUE (*)(...))db_get, -1);
rb_define_method(c_db, "delete", (VALUE (*)(...))db_delete, -1);
rb_define_method(c_db, "put", (VALUE (*)(...))db_put, -1);
rb_define_method(c_db, "exists?", (VALUE (*)(...))db_exists, 1);
rb_define_method(c_db, "close", (VALUE (*)(...))db_close, 0);
rb_define_method(c_db, "size", (VALUE (*)(...))db_size, 0);
rb_define_method(c_db, "each", (VALUE (*)(...))db_each, 0);
rb_define_method(c_db, "each", (VALUE (*)(...))db_each, -1);
rb_define_method(c_db, "reverse_each", (VALUE (*)(...))db_reverse_each, -1);
rb_define_method(c_db, "batch", (VALUE (*)(...))db_batch, -1);

c_batch = rb_define_class_under(m_leveldb, "WriteBatch", rb_cObject);
rb_define_singleton_method(c_batch, "make", (VALUE (*)(...))batch_make, 0);
rb_define_method(c_batch, "put", (VALUE (*)(...))batch_put, 2);
rb_define_method(c_batch, "delete", (VALUE (*)(...))batch_delete, 1);

c_error = rb_define_class_under(m_leveldb, "Error", rb_eStandardError);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/leveldb.rb
Expand Up @@ -41,4 +41,10 @@ def path_string pathname
def keys; map { |k, v| k } end
def values; map { |k, v| v } end
end

class WriteBatch
class << self
private :new
end
end
end
57 changes: 57 additions & 0 deletions test/db_test.rb
@@ -0,0 +1,57 @@
require 'test/unit'
require File.expand_path("../../lib/leveldb", __FILE__)
require 'fileutils'

class DBTest < Test::Unit::TestCase
path = File.expand_path("../db_test.db", __FILE__)
FileUtils.rm_rf path
DB = LevelDB::DB.new(path)

def test_get
DB.put 'test:read', '1'

assert_equal '1', DB.get('test:read')
assert_equal '1', DB.get('test:read', :fill_cache => false)
assert_equal '1', DB.get('test:read',
:fill_cache => false,
:verify_checksums => true)
end

def test_put
DB.put "test:async", "1"
DB.put "test:sync", "1", :sync => true

assert_equal "1", DB.get("test:async")
assert_equal "1", DB.get("test:sync")
end

def test_delete
DB.put 'test:async', '1'
DB.put 'test:sync', '1'

assert DB.delete("test:async")
assert DB.delete("test:sync", :sync => true)
end

def test_batch
DB.put 'a', '1'
DB.put 'b', '1'

DB.batch do |b|
b.put 'a', 'batch'
b.delete 'b'
end

assert_equal 'batch', DB.get('a')
assert_nil DB.get('b')

DB.batch :sync => true do |b|
b.put 'b', 'batch'
b.delete 'a'
end

assert_equal 'batch', DB.get('b')
assert_nil DB.get('a')
end
end