Skip to content

Commit

Permalink
Merge pull request #10 from technoweenie/iterator
Browse files Browse the repository at this point in the history
Improve DB iteration
  • Loading branch information
wmorgan committed Dec 7, 2011
2 parents f3fe2dd + 6a246b5 commit a37037c
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -7,3 +7,4 @@ ext/leveldb/libleveldb.a
leveldb/libleveldb.a
leveldb/leveldb.gyp
pkg
test/*.db
7 changes: 7 additions & 0 deletions Rakefile
Expand Up @@ -26,4 +26,11 @@ end
Gem::PackageTask.new spec do
end

require 'rake/testtask'
Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.pattern = 'test/**/*_test.rb'
test.verbose = true
end

# vim: syntax=ruby
199 changes: 177 additions & 22 deletions ext/leveldb/leveldb.cc
Expand Up @@ -2,10 +2,17 @@

#include "leveldb/db.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"

static VALUE m_leveldb;
static VALUE c_db;
static VALUE c_batch;
static VALUE c_error;
static VALUE k_fill;
static VALUE k_verify;
static VALUE k_sync;
static ID to_s;
static leveldb::ReadOptions uncached_read_options;

// support 1.9 and 1.8
#ifndef RSTRING_PTR
Expand Down Expand Up @@ -62,37 +69,69 @@ static VALUE db_close(VALUE self) {
return Qtrue;
}

static leveldb::ReadOptions parse_read_options(VALUE options) {
leveldb::ReadOptions readOptions;

if(!NIL_P(options)) {
Check_Type(options, T_HASH);
if(rb_hash_aref(options, k_fill) == Qfalse)
readOptions.fill_cache = false;
if(rb_hash_aref(options, k_verify) == Qtrue)
readOptions.verify_checksums = true;
}

return readOptions;
}

static leveldb::WriteOptions parse_write_options(VALUE options) {
leveldb::WriteOptions writeOptions;

if(!NIL_P(options)) {
Check_Type(options, T_HASH);
if(rb_hash_aref(options, k_sync) == Qtrue)
writeOptions.sync = true;
}

return writeOptions;
}

#define RUBY_STRING_TO_SLICE(x) leveldb::Slice(RSTRING_PTR(x), RSTRING_LEN(x))
#define SLICE_TO_RUBY_STRING(x) rb_str_new(x.data(), x.size())
#define STRING_TO_RUBY_STRING(x) rb_str_new(x.data(), x.size())
static VALUE db_get(VALUE self, VALUE v_key) {
static VALUE db_get(int argc, VALUE* argv, VALUE self) {
VALUE v_key, v_options;
rb_scan_args(argc, argv, "11", &v_key, &v_options);
Check_Type(v_key, T_STRING);
leveldb::ReadOptions readOptions = parse_read_options(v_options);

bound_db* db;
Data_Get_Struct(self, bound_db, db);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
std::string value;
leveldb::Status status = db->db->Get(leveldb::ReadOptions(), key, &value);
leveldb::Status status = db->db->Get(readOptions, key, &value);
if(status.IsNotFound()) return Qnil;

RAISE_ON_ERROR(status);
return STRING_TO_RUBY_STRING(value);
}

static VALUE db_delete(VALUE self, VALUE v_key) {
static VALUE db_delete(int argc, VALUE* argv, VALUE self) {
VALUE v_key, v_options;
rb_scan_args(argc, argv, "11", &v_key, &v_options);
Check_Type(v_key, T_STRING);
leveldb::WriteOptions writeOptions = parse_write_options(v_options);

bound_db* db;
Data_Get_Struct(self, bound_db, db);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
std::string value;
leveldb::Status status = db->db->Get(leveldb::ReadOptions(), key, &value);
leveldb::Status status = db->db->Get(uncached_read_options, key, &value);

if(status.IsNotFound()) return Qnil;

status = db->db->Delete(leveldb::WriteOptions(), key);
status = db->db->Delete(writeOptions, key);
RAISE_ON_ERROR(status);

return STRING_TO_RUBY_STRING(value);
Expand All @@ -112,16 +151,20 @@ static VALUE db_exists(VALUE self, VALUE v_key) {
return Qtrue;
}

static VALUE db_put(VALUE self, VALUE v_key, VALUE v_value) {
static VALUE db_put(int argc, VALUE* argv, VALUE self) {
VALUE v_key, v_value, v_options;

rb_scan_args(argc, argv, "21", &v_key, &v_value, &v_options);
Check_Type(v_key, T_STRING);
Check_Type(v_value, T_STRING);
leveldb::WriteOptions writeOptions = parse_write_options(v_options);

bound_db* db;
Data_Get_Struct(self, bound_db, db);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
leveldb::Slice value = RUBY_STRING_TO_SLICE(v_value);
leveldb::Status status = db->db->Put(leveldb::WriteOptions(), key, value);
leveldb::Status status = db->db->Put(writeOptions, key, value);

RAISE_ON_ERROR(status);

Expand All @@ -133,7 +176,7 @@ static VALUE db_size(VALUE self) {

bound_db* db;
Data_Get_Struct(self, bound_db, db);
leveldb::Iterator* it = db->db->NewIterator(leveldb::ReadOptions());
leveldb::Iterator* it = db->db->NewIterator(uncached_read_options);

// apparently this is how we have to do it. slow and painful!
for (it->SeekToFirst(); it->Valid(); it->Next()) count++;
Expand All @@ -142,18 +185,45 @@ static VALUE db_size(VALUE self) {
return INT2NUM(count);
}

static VALUE db_each(VALUE self) {
static VALUE db_iterate(VALUE self, VALUE key_from, VALUE key_to, bool reversed) {
bound_db* db;
Data_Get_Struct(self, bound_db, db);
leveldb::Iterator* it = db->db->NewIterator(leveldb::ReadOptions());

for (it->SeekToFirst(); it->Valid(); it->Next()) {
VALUE key = SLICE_TO_RUBY_STRING(it->key());
VALUE value = SLICE_TO_RUBY_STRING(it->value());
VALUE ary = rb_ary_new2(2);
rb_ary_push(ary, key);
rb_ary_push(ary, value);
rb_yield(ary);
leveldb::Iterator* it = db->db->NewIterator(uncached_read_options);

if(RTEST(key_from)) {
it->Seek(RUBY_STRING_TO_SLICE(rb_funcall(key_from, to_s, 0)));
} else {
if(reversed) {
it->SeekToLast();
} else {
it->SeekToFirst();
}
}

bool passed_limit = false;
bool check_limit = RTEST(key_to);
std::string key_to_str;

if(check_limit)
key_to_str = RUBY_STRING_TO_SLICE(rb_funcall(key_to, to_s, 0)).ToString();

while(!passed_limit && it->Valid()) {
leveldb::Slice key_sl = it->key();

if(check_limit &&
(reversed ?
(key_sl.ToString() < key_to_str) :
(key_sl.ToString() > key_to_str))) {
passed_limit = true;
} else {
VALUE key = SLICE_TO_RUBY_STRING(key_sl);
VALUE value = SLICE_TO_RUBY_STRING(it->value());
VALUE ary = rb_ary_new2(2);
rb_ary_push(ary, key);
rb_ary_push(ary, value);
rb_yield(ary);
reversed ? it->Prev() : it->Next();
}
}

RAISE_ON_ERROR(it->status());
Expand All @@ -162,25 +232,110 @@ static VALUE db_each(VALUE self) {
return self;
}

static VALUE db_each(int argc, VALUE* argv, VALUE self) {
VALUE key_from, key_to;
rb_scan_args(argc, argv, "02", &key_from, &key_to);

return db_iterate(self, key_from, key_to, false);
}

static VALUE db_reverse_each(int argc, VALUE* argv, VALUE self) {
VALUE key_from, key_to;
rb_scan_args(argc, argv, "02", &key_from, &key_to);

return db_iterate(self, key_from, key_to, true);
}

static VALUE db_init(VALUE self, VALUE v_pathname) {
rb_iv_set(self, "@pathname", v_pathname);
return self;
}

typedef struct bound_batch {
leveldb::WriteBatch batch;
} bound_batch;

static void batch_free(bound_batch* batch) {
delete batch;
}

static VALUE batch_make(VALUE klass) {
bound_batch* batch = new bound_batch;
batch->batch = leveldb::WriteBatch();

VALUE o_batch = Data_Wrap_Struct(klass, NULL, batch_free, batch);
VALUE argv[0];
rb_obj_call_init(o_batch, 0, argv);

return o_batch;
}

static VALUE batch_put(VALUE self, VALUE v_key, VALUE v_value) {
Check_Type(v_key, T_STRING);
Check_Type(v_value, T_STRING);

bound_batch* batch;
Data_Get_Struct(self, bound_batch, batch);
batch->batch.Put(RUBY_STRING_TO_SLICE(v_key), RUBY_STRING_TO_SLICE(v_value));

return v_value;
}

static VALUE batch_delete(VALUE self, VALUE v_key) {
Check_Type(v_key, T_STRING);
bound_batch* batch;
Data_Get_Struct(self, bound_batch, batch);
batch->batch.Delete(RUBY_STRING_TO_SLICE(v_key));
return Qtrue;
}

static VALUE db_batch(int argc, VALUE* argv, VALUE self) {
VALUE o_batch = batch_make(c_batch);

rb_yield(o_batch);

bound_batch* batch;
bound_db* db;
Data_Get_Struct(o_batch, bound_batch, batch);
Data_Get_Struct(self, bound_db, db);

VALUE v_options;
rb_scan_args(argc, argv, "01", &v_options);
leveldb::WriteOptions writeOptions = parse_write_options(v_options);

leveldb::Status status = db->db->Write(writeOptions, &batch->batch);
RAISE_ON_ERROR(status);
return Qtrue;
}

extern "C" {
void Init_leveldb() {
k_fill = ID2SYM(rb_intern("fill_cache"));
k_verify = ID2SYM(rb_intern("verify_checksums"));
k_sync = ID2SYM(rb_intern("sync"));
to_s = rb_intern("to_s");
uncached_read_options = leveldb::ReadOptions();
uncached_read_options.fill_cache = false;

m_leveldb = rb_define_module("LevelDB");

c_db = rb_define_class_under(m_leveldb, "DB", rb_cObject);
rb_define_singleton_method(c_db, "make", (VALUE (*)(...))db_make, 3);
rb_define_method(c_db, "initialize", (VALUE (*)(...))db_init, 1);
rb_define_method(c_db, "get", (VALUE (*)(...))db_get, 1);
rb_define_method(c_db, "delete", (VALUE (*)(...))db_delete, 1);
rb_define_method(c_db, "put", (VALUE (*)(...))db_put, 2);
rb_define_method(c_db, "get", (VALUE (*)(...))db_get, -1);
rb_define_method(c_db, "delete", (VALUE (*)(...))db_delete, -1);
rb_define_method(c_db, "put", (VALUE (*)(...))db_put, -1);
rb_define_method(c_db, "exists?", (VALUE (*)(...))db_exists, 1);
rb_define_method(c_db, "close", (VALUE (*)(...))db_close, 0);
rb_define_method(c_db, "size", (VALUE (*)(...))db_size, 0);
rb_define_method(c_db, "each", (VALUE (*)(...))db_each, 0);
rb_define_method(c_db, "each", (VALUE (*)(...))db_each, -1);
rb_define_method(c_db, "reverse_each", (VALUE (*)(...))db_reverse_each, -1);
rb_define_method(c_db, "batch", (VALUE (*)(...))db_batch, -1);

c_batch = rb_define_class_under(m_leveldb, "WriteBatch", rb_cObject);
rb_define_singleton_method(c_batch, "make", (VALUE (*)(...))batch_make, 0);
rb_define_method(c_batch, "put", (VALUE (*)(...))batch_put, 2);
rb_define_method(c_batch, "delete", (VALUE (*)(...))batch_delete, 1);

c_error = rb_define_class_under(m_leveldb, "Error", rb_eStandardError);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/leveldb.rb
Expand Up @@ -41,4 +41,10 @@ def path_string pathname
def keys; map { |k, v| k } end
def values; map { |k, v| v } end
end

class WriteBatch
class << self
private :new
end
end
end
57 changes: 57 additions & 0 deletions test/db_test.rb
@@ -0,0 +1,57 @@
require 'test/unit'
require File.expand_path("../../lib/leveldb", __FILE__)
require 'fileutils'

class DBTest < Test::Unit::TestCase
path = File.expand_path("../db_test.db", __FILE__)
FileUtils.rm_rf path
DB = LevelDB::DB.new(path)

def test_get
DB.put 'test:read', '1'

assert_equal '1', DB.get('test:read')
assert_equal '1', DB.get('test:read', :fill_cache => false)
assert_equal '1', DB.get('test:read',
:fill_cache => false,
:verify_checksums => true)
end

def test_put
DB.put "test:async", "1"
DB.put "test:sync", "1", :sync => true

assert_equal "1", DB.get("test:async")
assert_equal "1", DB.get("test:sync")
end

def test_delete
DB.put 'test:async', '1'
DB.put 'test:sync', '1'

assert DB.delete("test:async")
assert DB.delete("test:sync", :sync => true)
end

def test_batch
DB.put 'a', '1'
DB.put 'b', '1'

DB.batch do |b|
b.put 'a', 'batch'
b.delete 'b'
end

assert_equal 'batch', DB.get('a')
assert_nil DB.get('b')

DB.batch :sync => true do |b|
b.put 'b', 'batch'
b.delete 'a'
end

assert_equal 'batch', DB.get('b')
assert_nil DB.get('a')
end
end

0 comments on commit a37037c

Please sign in to comment.