Skip to content

Commit

Permalink
16% end-to-end performance improvement from the skiplist
Browse files Browse the repository at this point in the history
This commit changes each element in the skiplist to avoid an extra pointer
indirection on items with sufficiently dissimilar keys.  Its effect is dependent
upon the comparator implementation.  The skiplist stores a (uint64_t, const
char*) instead of a const char*.  The uint64_t is a number that compares similar
to the key, such that:

    num_a < num_b => key_a < key_b
    num_a > num_b => key_a > key_b

Only if the numbers are equal does the skiplist have to follow the const char*
ptr.
  • Loading branch information
rescrv committed Sep 2, 2013
1 parent 28ffd32 commit f6fa561
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 14 deletions.
42 changes: 31 additions & 11 deletions db/memtable.cc
Expand Up @@ -19,6 +19,10 @@ static Slice GetLengthPrefixedSlice(const char* data) {
return Slice(p, len);
}

static Slice GetLengthPrefixedSlice(std::pair<uint64_t, const char*> tk) {
return GetLengthPrefixedSlice(tk.second);
}

MemTable::MemTable(const InternalKeyComparator& cmp)
: comparator_(cmp),
refs_(0),
Expand All @@ -34,11 +38,16 @@ size_t MemTable::ApproximateMemoryUsage() {
return arena_.MemoryUsage();
}

int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
int MemTable::KeyComparator::operator()(TableKey ak, TableKey bk)
const {
if (ak.first < bk.first) {
return -1;
} else if (ak.first > bk.first) {
return 1;
}
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
Slice a = GetLengthPrefixedSlice(ak);
Slice b = GetLengthPrefixedSlice(bk);
return comparator.Compare(a, b);
}

Expand All @@ -54,10 +63,15 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {

class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
explicit MemTableIterator(MemTable::Table* table,
MemTable::KeyComparator* cmp)
: iter_(table), comparator_(cmp) { }

virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void Seek(const Slice& k) {
uint64_t keynum = comparator_->comparator.user_comparator()->KeyNum(Slice(k.data(), k.size() - 8));
iter_.Seek(std::make_pair(keynum, EncodeKey(&tmp_, k)));
}
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
Expand All @@ -72,6 +86,7 @@ class MemTableIterator: public Iterator {

private:
MemTable::Table::Iterator iter_;
MemTable::KeyComparator* comparator_;
std::string tmp_; // For passing to EncodeKey

// No copying allowed
Expand All @@ -80,7 +95,7 @@ class MemTableIterator: public Iterator {
};

Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
return new MemTableIterator(&table_, &comparator_);
}

void MemTable::Add(SequenceNumber s, ValueType type,
Expand Down Expand Up @@ -112,18 +127,22 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == encoded_len);
Table::InsertHint ih(&table_, buf);
uint64_t keynum = comparator_.comparator.user_comparator()->KeyNum(key);
TableKey tk(keynum, buf);
Table::InsertHint ih(&table_, tk);

{
MutexLock l(&mtx_);
table_.InsertWithHint(&ih, buf);
table_.InsertWithHint(&ih, tk);
}
}

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
uint64_t keynum = comparator_.comparator.user_comparator()->KeyNum(key.user_key());
TableKey tk(keynum, memkey.data());
iter.Seek(tk);
if (iter.Valid()) {
// entry format is:
// klength varint32
Expand All @@ -134,10 +153,11 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
const char* entry = iter.key().second;
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
if (iter.key().first == tk.first &&
comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
Expand Down
5 changes: 3 additions & 2 deletions db/memtable.h
Expand Up @@ -67,16 +67,17 @@ class MemTable {

private:
~MemTable(); // Private since only Unref() should be used to delete it
typedef std::pair<uint64_t, const char*> TableKey;

struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
int operator()(TableKey a, TableKey b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;

typedef SkipList<const char*, KeyComparator> Table;
typedef SkipList<TableKey, KeyComparator> Table;

KeyComparator comparator_;
int refs_;
Expand Down
2 changes: 1 addition & 1 deletion db/skiplist.h
Expand Up @@ -351,7 +351,7 @@ template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
head_(NewNode(Key(), kMaxHeight)),
max_height_(reinterpret_cast<void*>(1)),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
Expand Down
4 changes: 4 additions & 0 deletions hyperleveldb/comparator.h
Expand Up @@ -5,6 +5,7 @@
#ifndef STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_
#define STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_

#include <stdint.h>
#include <string>

namespace leveldb {
Expand Down Expand Up @@ -51,6 +52,9 @@ class Comparator {
// Simple comparator implementations may return with *key unchanged,
// i.e., an implementation of this method that does nothing is correct.
virtual void FindShortSuccessor(std::string* key) const = 0;

// If unsure, return 0;
virtual uint64_t KeyNum(const Slice& key) const;
};

// Return a builtin comparator that uses lexicographic byte-wise
Expand Down
21 changes: 21 additions & 0 deletions util/comparator.cc
Expand Up @@ -7,12 +7,17 @@
#include "hyperleveldb/comparator.h"
#include "hyperleveldb/slice.h"
#include "port/port.h"
#include "util/coding.h"
#include "util/logging.h"

namespace leveldb {

Comparator::~Comparator() { }

uint64_t Comparator::KeyNum(const Slice& key) const {
return 0;
}

namespace {
class BytewiseComparatorImpl : public Comparator {
public:
Expand Down Expand Up @@ -63,6 +68,22 @@ class BytewiseComparatorImpl : public Comparator {
}
// *key is a run of 0xffs. Leave it alone.
}

virtual uint64_t KeyNum(const Slice& key) const {
unsigned char buf[sizeof(uint64_t)];
memset(buf, 0, sizeof(buf));
memmove(buf, key.data(), std::min(key.size(), sizeof(uint64_t)));
uint64_t number;
number = static_cast<uint64_t>(buf[0]) << 56
| static_cast<uint64_t>(buf[1]) << 48
| static_cast<uint64_t>(buf[2]) << 40
| static_cast<uint64_t>(buf[3]) << 32
| static_cast<uint64_t>(buf[4]) << 24
| static_cast<uint64_t>(buf[5]) << 16
| static_cast<uint64_t>(buf[6]) << 8
| static_cast<uint64_t>(buf[7]);
return number;
}
};
} // namespace

Expand Down

0 comments on commit f6fa561

Please sign in to comment.