Skip to content

Commit

Permalink
MergingIterator inline bytewise comparator
Browse files Browse the repository at this point in the history
  • Loading branch information
rockeet committed Jun 12, 2022
1 parent 4ffd72b commit b91733d
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 43 deletions.
6 changes: 6 additions & 0 deletions include/rocksdb/comparator.h
Expand Up @@ -150,4 +150,10 @@ extern const Comparator* BytewiseComparator();
// ordering.
extern const Comparator* ReverseBytewiseComparator();

bool IsForwardBytewiseComparator(const Comparator* cmp);
bool IsForwardBytewiseComparator(const Slice& name);

bool IsBytewiseComparator(const Comparator* cmp);
bool IsBytewiseComparator(const Slice& name);

} // namespace ROCKSDB_NAMESPACE
180 changes: 137 additions & 43 deletions table/merging_iterator.cc
Expand Up @@ -26,17 +26,91 @@
#include "util/stop_watch.h"

namespace ROCKSDB_NAMESPACE {
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
using MergerMaxIterHeap = BinaryHeap<IteratorWrapper*, MaxIteratorComparator>;
using MergerMinIterHeap = BinaryHeap<IteratorWrapper*, MinIteratorComparator>;
} // namespace

#if defined(_MSC_VER) /* Visual Studio */
# define FORCE_INLINE __forceinline
#elif defined(__GNUC__)
# define FORCE_INLINE __attribute__((always_inline))
#else
# define inline
#endif

static FORCE_INLINE
uint64_t GetUnalignedU64(const void* ptr) noexcept {
uint64_t x;
memcpy(&x, ptr, sizeof(uint64_t));
return x;
}

static FORCE_INLINE
bool BytewiseCompareInternalKey(Slice x, Slice y) noexcept {
size_t n = std::min(x.size_, y.size_) - 8;
int cmp = memcmp(x.data_, y.data_, n);
if (0 != cmp)
return cmp < 0;
if (x.size_ != y.size_)
return x.size_ < y.size_;
return GetUnalignedU64(x.data_ + n) > GetUnalignedU64(y.data_ + n);
}

static FORCE_INLINE
bool RevBytewiseCompareInternalKey(Slice x, Slice y) noexcept {
size_t n = std::min(x.size_, y.size_) - 8;
int cmp = memcmp(x.data_, y.data_, n);
if (0 != cmp)
return cmp > 0;
if (x.size_ != y.size_)
return x.size_ > y.size_;
return GetUnalignedU64(x.data_ + n) > GetUnalignedU64(y.data_ + n);
}

struct MaxInlineBytewiseComp {
FORCE_INLINE
bool operator()(const IteratorWrapper* a, const IteratorWrapper* b)
const noexcept {
return BytewiseCompareInternalKey(a->key(), b->key());
}
MaxInlineBytewiseComp(const InternalKeyComparator*) {}
};
struct MinInlineBytewiseComp {
FORCE_INLINE
bool operator()(const IteratorWrapper* a, const IteratorWrapper* b)
const noexcept {
return BytewiseCompareInternalKey(b->key(), a->key());
}
MinInlineBytewiseComp(const InternalKeyComparator*) {}
};

struct MaxInlineRevBytewiseComp {
FORCE_INLINE
bool operator()(const IteratorWrapper* a, const IteratorWrapper* b)
const noexcept {
return RevBytewiseCompareInternalKey(a->key(), b->key());
}
MaxInlineRevBytewiseComp(const InternalKeyComparator*) {}
};
struct MinInlineRevBytewiseComp {
FORCE_INLINE
bool operator()(const IteratorWrapper* a, const IteratorWrapper* b)
const noexcept {
return RevBytewiseCompareInternalKey(b->key(), a->key());
}
MinInlineRevBytewiseComp(const InternalKeyComparator*) {}
};

const size_t kNumIterReserve = 4;

class MergingIterator : public InternalIterator {
public:
virtual void AddIterator(InternalIterator* iter) = 0;
};

template<class MinHeapComparator, class MaxHeapComparator>
class MergingIterTmpl : public MergingIterator {
using MergerMaxIterHeap = BinaryHeap<IteratorWrapper*, MaxHeapComparator>;
using MergerMinIterHeap = BinaryHeap<IteratorWrapper*, MinHeapComparator>;
public:
MergingIterator(const InternalKeyComparator* comparator,
MergingIterTmpl(const InternalKeyComparator* comparator,
InternalIterator** children, int n, bool is_arena_mode,
bool prefix_seek_mode)
: is_arena_mode_(is_arena_mode),
Expand Down Expand Up @@ -68,7 +142,7 @@ class MergingIterator : public InternalIterator {
current_ = nullptr;
}

~MergingIterator() override {
~MergingIterTmpl() override {
for (auto& child : children_) {
child.DeleteIter(is_arena_mode_);
}
Expand Down Expand Up @@ -348,7 +422,9 @@ class MergingIterator : public InternalIterator {
}
};

void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
template<class MinHeapComparator, class MaxHeapComparator>
void MergingIterTmpl<MinHeapComparator, MaxHeapComparator>
::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
if (child->Valid()) {
assert(child->status().ok());
minHeap_.push(child);
Expand All @@ -357,7 +433,9 @@ void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
}
}

void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
template<class MinHeapComparator, class MaxHeapComparator>
void MergingIterTmpl<MinHeapComparator, MaxHeapComparator>
::MergingIterTmpl::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
if (child->Valid()) {
assert(child->status().ok());
maxHeap_.push(child);
Expand All @@ -366,7 +444,9 @@ void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
}
}

void MergingIterator::SwitchToForward() {
template<class MinHeapComparator, class MaxHeapComparator>
void MergingIterTmpl<MinHeapComparator, MaxHeapComparator>
::MergingIterTmpl::SwitchToForward() {
// Otherwise, advance the non-current children. We advance current_
// just after the if-block.
InitMinHeap();
Expand Down Expand Up @@ -403,7 +483,9 @@ void MergingIterator::SwitchToForward() {
direction_ = kForward;
}

void MergingIterator::SwitchToBackward() {
template<class MinHeapComparator, class MaxHeapComparator>
void MergingIterTmpl<MinHeapComparator, MaxHeapComparator>
::MergingIterTmpl::SwitchToBackward() {
InitMaxHeap();
Slice target = key();
for (auto& child : children_) {
Expand All @@ -428,37 +510,17 @@ void MergingIterator::SwitchToBackward() {
assert(current_ == CurrentReverse());
}

void MergingIterator::InitMinHeap() {
#if 0
// this can be simplified because maxHeap_ and minHeap_ are physical identical,
// the only difference between them are logical(the interpretation of comparator)
if (kReverse == direction_) {
maxHeap_.~MergerMaxIterHeap();
new(&minHeap_)MergerMinIterHeap(comparator_);
direction_ = kForward;
}
else {
minHeap_.clear();
}
#else
template<class MinHeapComparator, class MaxHeapComparator>
void MergingIterTmpl<MinHeapComparator, MaxHeapComparator>
::MergingIterTmpl::InitMinHeap() {
minHeap_.clear();
#endif
}

void MergingIterator::InitMaxHeap() {
#if 0
if (kForward == direction_) {
minHeap_.~MergerMinIterHeap();
new(&maxHeap_)MergerMaxIterHeap(comparator_);
direction_ = kReverse;
}
else {
maxHeap_.clear();
}
#else
template<class MinHeapComparator, class MaxHeapComparator>
void MergingIterTmpl<MinHeapComparator, MaxHeapComparator>
::MergingIterTmpl::InitMaxHeap() {
// use InitMinHeap(), because maxHeap_ and minHeap_ are physical identical
InitMinHeap();
#endif
}

InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
Expand All @@ -469,22 +531,54 @@ InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
return NewEmptyInternalIterator<Slice>(arena);
} else if (n == 1) {
return list[0];
} else if (IsForwardBytewiseComparator(cmp->user_comparator())) {
using MergingIterInst = MergingIterTmpl<MinInlineBytewiseComp, MaxInlineBytewiseComp>;
if (arena == nullptr) {
return new MergingIterInst(cmp, list, n, false, prefix_seek_mode);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterInst));
return new (mem) MergingIterInst(cmp, list, n, true, prefix_seek_mode);
}
} else if (IsBytewiseComparator(cmp->user_comparator())) { // must is rev bytewise
using MergingIterInst = MergingIterTmpl<MinInlineRevBytewiseComp, MaxInlineRevBytewiseComp>;
if (arena == nullptr) {
return new MergingIterInst(cmp, list, n, false, prefix_seek_mode);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterInst));
return new (mem) MergingIterInst(cmp, list, n, true, prefix_seek_mode);
}
} else {
using MergingIterInst = MergingIterTmpl<MinIteratorComparator, MaxIteratorComparator>;
if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
return new MergingIterInst(cmp, list, n, false, prefix_seek_mode);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
auto mem = arena->AllocateAligned(sizeof(MergingIterInst));
return new (mem) MergingIterInst(cmp, list, n, true, prefix_seek_mode);
}
}
}

MergeIteratorBuilder::MergeIteratorBuilder(
const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
: first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter =
new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
if (IsForwardBytewiseComparator(comparator->user_comparator())) {
using MergingIterInst = MergingIterTmpl<MinInlineBytewiseComp, MaxInlineBytewiseComp>;
auto mem = arena->AllocateAligned(sizeof(MergingIterInst));
merge_iter =
new (mem) MergingIterInst(comparator, nullptr, 0, true, prefix_seek_mode);
} else if (IsBytewiseComparator(comparator->user_comparator())) {
// must is rev bytewise
using MergingIterInst = MergingIterTmpl<MinInlineRevBytewiseComp, MaxInlineRevBytewiseComp>;
auto mem = arena->AllocateAligned(sizeof(MergingIterInst));
merge_iter =
new (mem) MergingIterInst(comparator, nullptr, 0, true, prefix_seek_mode);
}
else {
using MergingIterInst = MergingIterTmpl<MinIteratorComparator, MaxIteratorComparator>;
auto mem = arena->AllocateAligned(sizeof(MergingIterInst));
merge_iter =
new (mem) MergingIterInst(comparator, nullptr, 0, true, prefix_seek_mode);
}
}

MergeIteratorBuilder::~MergeIteratorBuilder() {
Expand Down
26 changes: 26 additions & 0 deletions util/comparator.cc
Expand Up @@ -378,4 +378,30 @@ Status Comparator::CreateFromString(const ConfigOptions& config_options,
}
return status;
}

bool IsForwardBytewiseComparator(const Comparator* cmp) {
return IsForwardBytewiseComparator(cmp->Name());
}
bool IsForwardBytewiseComparator(const Slice& name) {
if (name.starts_with("RocksDB_SE_")) {
return true;
}
return name == "leveldb.BytewiseComparator";
}

bool IsBytewiseComparator(const Comparator* cmp) {
return IsBytewiseComparator(cmp->Name());
}
bool IsBytewiseComparator(const Slice& name) {
if (name.starts_with("RocksDB_SE_")) {
return true;
}
if (name.starts_with("rev:RocksDB_SE_")) {
// reverse bytewise compare, needs reverse in iterator
return true;
}
return name == "leveldb.BytewiseComparator" ||
name == "rocksdb.ReverseBytewiseComparator";
}

} // namespace ROCKSDB_NAMESPACE

0 comments on commit b91733d

Please sign in to comment.