Skip to content

Commit

Permalink
Integrating Cuckoo Hash SST Table format into RocksDB
Browse files Browse the repository at this point in the history
Summary:
Contains the following changes:
- Implementation of cuckoo_table_factory
- Adding cuckoo table into AdaptiveTableFactory
- Adding cuckoo_table_db_test, similar to lines of plain_table_db_test
- Minor fixes to Reader: When a key is found in the table, return the key found instead of the search key.
- Minor fixes to Builder: Add table properties that are required by Version::UpdateTemporaryStats() during Get operation. Don't define curr_node as a reference variable as the memory locations may get reassigned during tree.push_back operation, leading to invalid memory access.

Test Plan:
cuckoo_table_reader_test --enable_perf
cuckoo_table_builder_test
cuckoo_table_db_test
make check all
make valgrind_check
make asan_check

Reviewers: sdong, igor, yhchiang, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D21219
  • Loading branch information
adyarshyam committed Aug 12, 2014
1 parent 37c6740 commit 9674c11
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 31 deletions.
8 changes: 6 additions & 2 deletions Makefile
Expand Up @@ -117,9 +117,10 @@ TESTS = \
thread_local_test \
geodb_test \
rate_limiter_test \
cuckoo_table_builder_test \
options_test \
cuckoo_table_reader_test
cuckoo_table_builder_test \
cuckoo_table_reader_test \
cuckoo_table_db_test

TOOLS = \
sst_dump \
Expand Down Expand Up @@ -430,6 +431,9 @@ cuckoo_table_builder_test: table/cuckoo_table_builder_test.o $(LIBOBJECTS) $(TES
cuckoo_table_reader_test: table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(BENCHHARNESS)
$(CXX) table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(BENCHHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

cuckoo_table_db_test: db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

Expand Down
291 changes: 291 additions & 0 deletions db/cuckoo_table_db_test.cc
@@ -0,0 +1,291 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "table/meta_blocks.h"
#include "table/cuckoo_table_factory.h"
#include "table/cuckoo_table_reader.h"
#include "util/testharness.h"
#include "util/testutil.h"

namespace rocksdb {

class CuckooTableDBTest {
private:
std::string dbname_;
Env* env_;
DB* db_;

public:
CuckooTableDBTest() : env_(Env::Default()) {
dbname_ = test::TmpDir() + "/cuckoo_table_db_test";
ASSERT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}

~CuckooTableDBTest() {
delete db_;
ASSERT_OK(DestroyDB(dbname_, Options()));
}

Options CurrentOptions() {
Options options;
options.table_factory.reset(NewCuckooTableFactory());
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true));
options.allow_mmap_reads = true;
options.create_if_missing = true;
options.max_mem_compaction_level = 0;
return options;
}

DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}

// The following util methods are copied from plain_table_db_test.
void Reopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
Options opts;
if (options != nullptr) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
ASSERT_OK(DB::Open(opts, dbname_, &db_));
}

Status Put(const Slice& k, const Slice& v) {
return db_->Put(WriteOptions(), k, v);
}

Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}

std::string Get(const std::string& k) {
ReadOptions options;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}

int NumTableFilesAtLevel(int level) {
std::string property;
ASSERT_TRUE(
db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}

// Return spread of files per level
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < db_->NumberLevels(); level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
};

TEST(CuckooTableDBTest, Flush) {
// Try with empty DB first.
ASSERT_TRUE(dbfull() != nullptr);
ASSERT_EQ("NOT_FOUND", Get("key2"));

// Add some values to db.
Options options = CurrentOptions();
Reopen(&options);

ASSERT_OK(Put("key1", "v1"));
ASSERT_OK(Put("key2", "v2"));
ASSERT_OK(Put("key3", "v3"));
dbfull()->TEST_FlushMemTable();

TablePropertiesCollection ptc;
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(1U, ptc.size());
ASSERT_EQ(3, ptc.begin()->second->num_entries);
ASSERT_EQ("1", FilesPerLevel());

ASSERT_EQ("v1", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("NOT_FOUND", Get("key4"));
ASSERT_EQ("Invalid argument: Length of key is invalid.", Get("somelongkey"));
ASSERT_EQ("Invalid argument: Length of key is invalid.", Get("s"));

// Now add more keys and flush.
ASSERT_OK(Put("key4", "v4"));
ASSERT_OK(Put("key5", "v5"));
ASSERT_OK(Put("key6", "v6"));
dbfull()->TEST_FlushMemTable();

reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(2U, ptc.size());
auto row = ptc.begin();
ASSERT_EQ(3, row->second->num_entries);
ASSERT_EQ(3, (++row)->second->num_entries);
ASSERT_EQ("2", FilesPerLevel());
ASSERT_EQ("v1", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("v4", Get("key4"));
ASSERT_EQ("v5", Get("key5"));
ASSERT_EQ("v6", Get("key6"));

ASSERT_OK(Delete("key6"));
ASSERT_OK(Delete("key5"));
ASSERT_OK(Delete("key4"));
dbfull()->TEST_FlushMemTable();
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(3U, ptc.size());
row = ptc.begin();
ASSERT_EQ(3, row->second->num_entries);
ASSERT_EQ(3, (++row)->second->num_entries);
ASSERT_EQ(3, (++row)->second->num_entries);
ASSERT_EQ("3", FilesPerLevel());
ASSERT_EQ("v1", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("NOT_FOUND", Get("key4"));
ASSERT_EQ("NOT_FOUND", Get("key5"));
ASSERT_EQ("NOT_FOUND", Get("key6"));
}

TEST(CuckooTableDBTest, FlushWithDuplicateKeys) {
Options options = CurrentOptions();
Reopen(&options);
ASSERT_OK(Put("key1", "v1"));
ASSERT_OK(Put("key2", "v2"));
ASSERT_OK(Put("key1", "v3")); // Duplicate
dbfull()->TEST_FlushMemTable();

TablePropertiesCollection ptc;
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(1U, ptc.size());
ASSERT_EQ(2, ptc.begin()->second->num_entries);
ASSERT_EQ("1", FilesPerLevel());
ASSERT_EQ("v3", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
}

namespace {
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key_______%06d", i);
return std::string(buf);
}
}

TEST(CuckooTableDBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
Reopen(&options);

// Write 11 values, each 10016 B
for (int idx = 0; idx < 11; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a' + idx)));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("1", FilesPerLevel());

// Generate one more file in level-0, and should trigger level-0 compaction
for (int idx = 11; idx < 22; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a' + idx)));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);

ASSERT_EQ("0,2", FilesPerLevel());
for (int idx = 0; idx < 22; ++idx) {
ASSERT_EQ(std::string(10000, 'a' + idx), Get(Key(idx)));
}
}

TEST(CuckooTableDBTest, SameKeyInsertedInTwoDifferentFilesAndCompacted) {
// Insert same key twice so that they go to different SST files. Then wait for
// compaction and check if the latest value is stored and old value removed.
Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
Reopen(&options);

// Write 11 values, each 10016 B
for (int idx = 0; idx < 11; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a')));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("1", FilesPerLevel());

// Generate one more file in level-0, and should trigger level-0 compaction
for (int idx = 0; idx < 11; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a' + idx)));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);

ASSERT_EQ("0,1", FilesPerLevel());
for (int idx = 0; idx < 11; ++idx) {
ASSERT_EQ(std::string(10000, 'a' + idx), Get(Key(idx)));
}
}

TEST(CuckooTableDBTest, AdaptiveTable) {
Options options = CurrentOptions();

// Write some keys using cuckoo table.
options.table_factory.reset(NewCuckooTableFactory());
Reopen(&options);

ASSERT_OK(Put("key1", "v1"));
ASSERT_OK(Put("key2", "v2"));
ASSERT_OK(Put("key3", "v3"));
dbfull()->TEST_FlushMemTable();

// Write some keys using plain table.
options.create_if_missing = false;
options.table_factory.reset(NewPlainTableFactory());
Reopen(&options);
ASSERT_OK(Put("key4", "v4"));
ASSERT_OK(Put("key1", "v5"));
dbfull()->TEST_FlushMemTable();

// Write some keys using block based table.
std::shared_ptr<TableFactory> block_based_factory(
NewBlockBasedTableFactory());
options.table_factory.reset(NewAdaptiveTableFactory(block_based_factory));
Reopen(&options);
ASSERT_OK(Put("key5", "v6"));
ASSERT_OK(Put("key2", "v7"));
dbfull()->TEST_FlushMemTable();

ASSERT_EQ("v5", Get("key1"));
ASSERT_EQ("v7", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("v4", Get("key4"));
ASSERT_EQ("v6", Get("key5"));
}
} // namespace rocksdb

int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
6 changes: 5 additions & 1 deletion include/rocksdb/table.h
Expand Up @@ -192,6 +192,9 @@ struct CuckooTablePropertyNames {
static const std::string kIsLastLevel;
};

extern TableFactory* NewCuckooTableFactory(double hash_table_ratio = 0.9,
uint32_t max_search_depth = 100);

#endif // ROCKSDB_LITE

// A base class for table factories.
Expand Down Expand Up @@ -263,7 +266,8 @@ class TableFactory {
extern TableFactory* NewAdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write = nullptr,
std::shared_ptr<TableFactory> block_based_table_factory = nullptr,
std::shared_ptr<TableFactory> plain_table_factory = nullptr);
std::shared_ptr<TableFactory> plain_table_factory = nullptr,
std::shared_ptr<TableFactory> cuckoo_table_factory = nullptr);

#endif // ROCKSDB_LITE

Expand Down
20 changes: 15 additions & 5 deletions table/adaptive_table_factory.cc
Expand Up @@ -12,10 +12,12 @@ namespace rocksdb {
AdaptiveTableFactory::AdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write,
std::shared_ptr<TableFactory> block_based_table_factory,
std::shared_ptr<TableFactory> plain_table_factory)
std::shared_ptr<TableFactory> plain_table_factory,
std::shared_ptr<TableFactory> cuckoo_table_factory)
: table_factory_to_write_(table_factory_to_write),
block_based_table_factory_(block_based_table_factory),
plain_table_factory_(plain_table_factory) {
plain_table_factory_(plain_table_factory),
cuckoo_table_factory_(cuckoo_table_factory) {
if (!table_factory_to_write_) {
table_factory_to_write_ = block_based_table_factory_;
}
Expand All @@ -25,12 +27,16 @@ AdaptiveTableFactory::AdaptiveTableFactory(
if (!block_based_table_factory_) {
block_based_table_factory_.reset(NewBlockBasedTableFactory());
}
if (!cuckoo_table_factory_) {
cuckoo_table_factory_.reset(NewCuckooTableFactory());
}
}

extern const uint64_t kPlainTableMagicNumber;
extern const uint64_t kLegacyPlainTableMagicNumber;
extern const uint64_t kBlockBasedTableMagicNumber;
extern const uint64_t kLegacyBlockBasedTableMagicNumber;
extern const uint64_t kCuckooTableMagicNumber;

Status AdaptiveTableFactory::NewTableReader(
const Options& options, const EnvOptions& soptions,
Expand All @@ -49,6 +55,9 @@ Status AdaptiveTableFactory::NewTableReader(
footer.table_magic_number() == kLegacyBlockBasedTableMagicNumber) {
return block_based_table_factory_->NewTableReader(
options, soptions, icomp, std::move(file), file_size, table);
} else if (footer.table_magic_number() == kCuckooTableMagicNumber) {
return cuckoo_table_factory_->NewTableReader(
options, soptions, icomp, std::move(file), file_size, table);
} else {
return Status::NotSupported("Unidentified table format");
}
Expand All @@ -64,9 +73,10 @@ TableBuilder* AdaptiveTableFactory::NewTableBuilder(
extern TableFactory* NewAdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write,
std::shared_ptr<TableFactory> block_based_table_factory,
std::shared_ptr<TableFactory> plain_table_factory) {
return new AdaptiveTableFactory(
table_factory_to_write, block_based_table_factory, plain_table_factory);
std::shared_ptr<TableFactory> plain_table_factory,
std::shared_ptr<TableFactory> cuckoo_table_factory) {
return new AdaptiveTableFactory(table_factory_to_write,
block_based_table_factory, plain_table_factory, cuckoo_table_factory);
}

} // namespace rocksdb
Expand Down

0 comments on commit 9674c11

Please sign in to comment.