Skip to content

Commit

Permalink
WriteBatchWithIndex to allow different Comparators for different colu…
Browse files Browse the repository at this point in the history
…mn families

Summary:
Previously, one single column family is given to WriteBatchWithIndex to index keys for all column families. An extra map from column family ID to comparator is maintained which can override the default comparator given in the constructor. A WriteBatchWithIndex::SetComparatorForCF() is added for user to add comparators per column family.

Also move more codes into anonymous namespace.

Test Plan: Add a unit test

Reviewers: ljin, igor

Reviewed By: igor

Subscribers: dhruba, leveldb, yhchiang

Differential Revision: https://reviews.facebook.net/D23355
  • Loading branch information
siying committed Sep 22, 2014
1 parent 57a32f1 commit d0de413
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 41 deletions.
13 changes: 13 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {

uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}

ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
Expand Down Expand Up @@ -726,4 +730,13 @@ uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
return column_family_id;
}

const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family) {
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->user_comparator();
}
return nullptr;
}

} // namespace rocksdb
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual const Comparator* user_comparator() const;

virtual uint32_t GetID() const;

Expand Down Expand Up @@ -448,4 +449,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {

extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);

extern const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family);

} // namespace rocksdb
5 changes: 4 additions & 1 deletion db/write_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
uint32_t GetID() const override { return id_; }
const Comparator* user_comparator() const override {
return BytewiseComparator();
}

private:
uint32_t id_;
Expand Down Expand Up @@ -320,7 +323,7 @@ TEST(WriteBatchTest, ColumnFamiliesBatchTest) {
}

TEST(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
WriteBatchWithIndex batch(BytewiseComparator(), 20);
WriteBatchWithIndex batch;
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Put(&two, Slice("twofoo"), Slice("bar2"));
Expand Down
15 changes: 9 additions & 6 deletions include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

#pragma once

#include "rocksdb/status.h"
#include "rocksdb/comparator.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"

namespace rocksdb {
Expand Down Expand Up @@ -56,12 +57,14 @@ class WBWIIterator {
// A user can call NewIterator() to create an iterator.
class WriteBatchWithIndex {
public:
// index_comparator indicates the order when iterating data in the write
// batch. Technically, it doesn't have to be the same as the one used in
// the DB.
// backup_index_comparator: the backup comparator used to compare keys
// within the same column family, if column family is not given in the
// interface, or we can't find a column family from the column family handle
// passed in, backup_index_comparator will be used for the column family.
// reserved_bytes: reserved bytes in underlying WriteBatch
explicit WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes = 0);
explicit WriteBatchWithIndex(
const Comparator* backup_index_comparator = BytewiseComparator(),
size_t reserved_bytes = 0);
virtual ~WriteBatchWithIndex();

WriteBatch* GetWriteBatch();
Expand Down
79 changes: 49 additions & 30 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class ReadableWriteBatch : public WriteBatch {
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob) const;
};
} // namespace

// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
Expand All @@ -38,44 +37,28 @@ struct WriteBatchIndexEntry {

class WriteBatchEntryComparator {
public:
WriteBatchEntryComparator(const Comparator* comparator,
WriteBatchEntryComparator(const Comparator* default_comparator,
const ReadableWriteBatch* write_batch)
: comparator_(comparator), write_batch_(write_batch) {}
: default_comparator_(default_comparator), write_batch_(write_batch) {}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
int operator()(const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const;

void SetComparatorForCF(uint32_t column_family_id,
const Comparator* comparator) {
cf_comparator_map_[column_family_id] = comparator;
}

private:
const Comparator* comparator_;
const Comparator* default_comparator_;
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
const ReadableWriteBatch* write_batch_;
};

typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;

struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;

WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
return GetEntryWithCfId(GetColumnFamilyID(column_family));
}

WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};

class WBWIIteratorImpl : public WBWIIterator {
public:
WBWIIteratorImpl(uint32_t column_family_id,
Expand Down Expand Up @@ -138,6 +121,35 @@ class WBWIIteratorImpl : public WBWIIterator {
}
}
};
} // namespace

struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;

WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
uint32_t cf_id = GetColumnFamilyID(column_family);
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
if (cf_cmp != nullptr) {
comparator.SetComparatorForCF(cf_id, cf_cmp);
}

return GetEntryWithCfId(cf_id);
}

WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};

Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Expand Down Expand Up @@ -179,9 +191,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
return Status::OK();
}

WriteBatchWithIndex::WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes)
: rep(new Rep(index_comparator, reserved_bytes)) {}
WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes)
: rep(new Rep(default_index_comparator, reserved_bytes)) {}

WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }

Expand Down Expand Up @@ -287,7 +299,14 @@ int WriteBatchEntryComparator::operator()(
key2 = *(entry2->search_key);
}

int cmp = comparator_->Compare(key1, key2);
int cmp;
auto comparator_for_cf = cf_comparator_map_.find(entry1->column_family);
if (comparator_for_cf != cf_comparator_map_.end()) {
cmp = comparator_for_cf->second->Compare(key1, key2);
} else {
cmp = default_comparator_->Compare(key1, key2);
}

if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {
Expand Down
114 changes: 110 additions & 4 deletions utilities/write_batch_with_index/write_batch_with_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ namespace rocksdb {
namespace {
class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
public:
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
id_(id),
comparator_(comparator) {}
uint32_t GetID() const override { return id_; }
const Comparator* user_comparator() const override { return comparator_; }

private:
uint32_t id_;
const Comparator* comparator_;
};

struct Entry {
Expand Down Expand Up @@ -90,8 +94,9 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
index_map[e.value].push_back(&e);
}

WriteBatchWithIndex batch(BytewiseComparator(), 20);
ColumnFamilyHandleImplDummy data(6), index(8);
WriteBatchWithIndex batch(nullptr, 20);
ColumnFamilyHandleImplDummy data(6, BytewiseComparator());
ColumnFamilyHandleImplDummy index(8, BytewiseComparator());
for (auto& e : entries) {
if (e.type == kPutRecord) {
batch.Put(&data, e.key, e.value);
Expand Down Expand Up @@ -230,6 +235,107 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
}
}

class ReverseComparator : public Comparator {
public:
ReverseComparator() {}

virtual const char* Name() const override {
return "rocksdb.ReverseComparator";
}

virtual int Compare(const Slice& a, const Slice& b) const override {
return 0 - BytewiseComparator()->Compare(a, b);
}

virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const {}
virtual void FindShortSuccessor(std::string* key) const {}
};

TEST(WriteBatchWithIndexTest, TestComparatorForCF) {
ReverseComparator reverse_cmp;
ColumnFamilyHandleImplDummy cf1(6, nullptr);
ColumnFamilyHandleImplDummy reverse_cf(66, &reverse_cmp);
ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20);

batch.Put(&cf1, "ddd", "");
batch.Put(&cf2, "aaa", "");
batch.Put(&cf2, "eee", "");
batch.Put(&cf1, "ccc", "");
batch.Put(&reverse_cf, "a11", "");
batch.Put(&cf1, "bbb", "");
batch.Put(&reverse_cf, "a33", "");
batch.Put(&reverse_cf, "a22", "");

{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbb", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ccc", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ddd", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}

{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("aaa", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("eee", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}

{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());

iter->Seek("z");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a33", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a22", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a11", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());

iter->Seek("a22");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a22", iter->Entry().key.ToString());

iter->Seek("a13");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a11", iter->Entry().key.ToString());
}
}

} // namespace

int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

0 comments on commit d0de413

Please sign in to comment.