diff --git a/include/rocksdb/comparator.h b/include/rocksdb/comparator.h index 4b1b61eb4..58311c0f7 100644 --- a/include/rocksdb/comparator.h +++ b/include/rocksdb/comparator.h @@ -150,4 +150,10 @@ extern const Comparator* BytewiseComparator(); // ordering. extern const Comparator* ReverseBytewiseComparator(); +bool IsForwardBytewiseComparator(const Comparator* cmp); +bool IsForwardBytewiseComparator(const Slice& name); + +bool IsBytewiseComparator(const Comparator* cmp); +bool IsBytewiseComparator(const Slice& name); + } // namespace ROCKSDB_NAMESPACE diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 72e667f40..0cc237213 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -26,17 +26,91 @@ #include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { -// Without anonymous namespace here, we fail the warning -Wmissing-prototypes -namespace { -using MergerMaxIterHeap = BinaryHeap; -using MergerMinIterHeap = BinaryHeap; -} // namespace + +#if defined(_MSC_VER) /* Visual Studio */ +# define FORCE_INLINE __forceinline +#elif defined(__GNUC__) +# define FORCE_INLINE __attribute__((always_inline)) +#else +# define inline +#endif + +static FORCE_INLINE +uint64_t GetUnalignedU64(const void* ptr) noexcept { + uint64_t x; + memcpy(&x, ptr, sizeof(uint64_t)); + return x; +} + +static FORCE_INLINE +bool BytewiseCompareInternalKey(Slice x, Slice y) noexcept { + size_t n = std::min(x.size_, y.size_) - 8; + int cmp = memcmp(x.data_, y.data_, n); + if (0 != cmp) + return cmp < 0; + if (x.size_ != y.size_) + return x.size_ < y.size_; + return GetUnalignedU64(x.data_ + n) > GetUnalignedU64(y.data_ + n); +} + +static FORCE_INLINE +bool RevBytewiseCompareInternalKey(Slice x, Slice y) noexcept { + size_t n = std::min(x.size_, y.size_) - 8; + int cmp = memcmp(x.data_, y.data_, n); + if (0 != cmp) + return cmp > 0; + if (x.size_ != y.size_) + return x.size_ > y.size_; + return GetUnalignedU64(x.data_ + n) > GetUnalignedU64(y.data_ + n); +} + +struct MaxInlineBytewiseComp { + FORCE_INLINE + bool operator()(const IteratorWrapper* a, const IteratorWrapper* b) + const noexcept { + return BytewiseCompareInternalKey(a->key(), b->key()); + } + MaxInlineBytewiseComp(const InternalKeyComparator*) {} +}; +struct MinInlineBytewiseComp { + FORCE_INLINE + bool operator()(const IteratorWrapper* a, const IteratorWrapper* b) + const noexcept { + return BytewiseCompareInternalKey(b->key(), a->key()); + } + MinInlineBytewiseComp(const InternalKeyComparator*) {} +}; + +struct MaxInlineRevBytewiseComp { + FORCE_INLINE + bool operator()(const IteratorWrapper* a, const IteratorWrapper* b) + const noexcept { + return RevBytewiseCompareInternalKey(a->key(), b->key()); + } + MaxInlineRevBytewiseComp(const InternalKeyComparator*) {} +}; +struct MinInlineRevBytewiseComp { + FORCE_INLINE + bool operator()(const IteratorWrapper* a, const IteratorWrapper* b) + const noexcept { + return RevBytewiseCompareInternalKey(b->key(), a->key()); + } + MinInlineRevBytewiseComp(const InternalKeyComparator*) {} +}; const size_t kNumIterReserve = 4; class MergingIterator : public InternalIterator { +public: + virtual void AddIterator(InternalIterator* iter) = 0; +}; + +template +class MergingIterTmpl : public MergingIterator { + using MergerMaxIterHeap = BinaryHeap; + using MergerMinIterHeap = BinaryHeap; public: - MergingIterator(const InternalKeyComparator* comparator, + MergingIterTmpl(const InternalKeyComparator* comparator, InternalIterator** children, int n, bool is_arena_mode, bool prefix_seek_mode) : is_arena_mode_(is_arena_mode), @@ -68,7 +142,7 @@ class MergingIterator : public InternalIterator { current_ = nullptr; } - ~MergingIterator() override { + ~MergingIterTmpl() override { for (auto& child : children_) { child.DeleteIter(is_arena_mode_); } @@ -348,7 +422,9 @@ class MergingIterator : public InternalIterator { } }; -void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) { +template +void MergingIterTmpl +::AddToMinHeapOrCheckStatus(IteratorWrapper* child) { if (child->Valid()) { assert(child->status().ok()); minHeap_.push(child); @@ -357,7 +433,9 @@ void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) { } } -void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) { +template +void MergingIterTmpl +::MergingIterTmpl::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) { if (child->Valid()) { assert(child->status().ok()); maxHeap_.push(child); @@ -366,7 +444,9 @@ void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) { } } -void MergingIterator::SwitchToForward() { +template +void MergingIterTmpl +::MergingIterTmpl::SwitchToForward() { // Otherwise, advance the non-current children. We advance current_ // just after the if-block. InitMinHeap(); @@ -403,7 +483,9 @@ void MergingIterator::SwitchToForward() { direction_ = kForward; } -void MergingIterator::SwitchToBackward() { +template +void MergingIterTmpl +::MergingIterTmpl::SwitchToBackward() { InitMaxHeap(); Slice target = key(); for (auto& child : children_) { @@ -428,37 +510,17 @@ void MergingIterator::SwitchToBackward() { assert(current_ == CurrentReverse()); } -void MergingIterator::InitMinHeap() { -#if 0 - // this can be simplified because maxHeap_ and minHeap_ are physical identical, - // the only difference between them are logical(the interpretation of comparator) - if (kReverse == direction_) { - maxHeap_.~MergerMaxIterHeap(); - new(&minHeap_)MergerMinIterHeap(comparator_); - direction_ = kForward; - } - else { - minHeap_.clear(); - } -#else +template +void MergingIterTmpl +::MergingIterTmpl::InitMinHeap() { minHeap_.clear(); -#endif } -void MergingIterator::InitMaxHeap() { -#if 0 - if (kForward == direction_) { - minHeap_.~MergerMinIterHeap(); - new(&maxHeap_)MergerMaxIterHeap(comparator_); - direction_ = kReverse; - } - else { - maxHeap_.clear(); - } -#else +template +void MergingIterTmpl +::MergingIterTmpl::InitMaxHeap() { // use InitMinHeap(), because maxHeap_ and minHeap_ are physical identical InitMinHeap(); -#endif } InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp, @@ -469,12 +531,29 @@ InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp, return NewEmptyInternalIterator(arena); } else if (n == 1) { return list[0]; + } else if (IsForwardBytewiseComparator(cmp->user_comparator())) { + using MergingIterInst = MergingIterTmpl; + if (arena == nullptr) { + return new MergingIterInst(cmp, list, n, false, prefix_seek_mode); + } else { + auto mem = arena->AllocateAligned(sizeof(MergingIterInst)); + return new (mem) MergingIterInst(cmp, list, n, true, prefix_seek_mode); + } + } else if (IsBytewiseComparator(cmp->user_comparator())) { // must is rev bytewise + using MergingIterInst = MergingIterTmpl; + if (arena == nullptr) { + return new MergingIterInst(cmp, list, n, false, prefix_seek_mode); + } else { + auto mem = arena->AllocateAligned(sizeof(MergingIterInst)); + return new (mem) MergingIterInst(cmp, list, n, true, prefix_seek_mode); + } } else { + using MergingIterInst = MergingIterTmpl; if (arena == nullptr) { - return new MergingIterator(cmp, list, n, false, prefix_seek_mode); + return new MergingIterInst(cmp, list, n, false, prefix_seek_mode); } else { - auto mem = arena->AllocateAligned(sizeof(MergingIterator)); - return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode); + auto mem = arena->AllocateAligned(sizeof(MergingIterInst)); + return new (mem) MergingIterInst(cmp, list, n, true, prefix_seek_mode); } } } @@ -482,9 +561,24 @@ InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp, MergeIteratorBuilder::MergeIteratorBuilder( const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode) : first_iter(nullptr), use_merging_iter(false), arena(a) { - auto mem = arena->AllocateAligned(sizeof(MergingIterator)); - merge_iter = - new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode); + if (IsForwardBytewiseComparator(comparator->user_comparator())) { + using MergingIterInst = MergingIterTmpl; + auto mem = arena->AllocateAligned(sizeof(MergingIterInst)); + merge_iter = + new (mem) MergingIterInst(comparator, nullptr, 0, true, prefix_seek_mode); + } else if (IsBytewiseComparator(comparator->user_comparator())) { + // must is rev bytewise + using MergingIterInst = MergingIterTmpl; + auto mem = arena->AllocateAligned(sizeof(MergingIterInst)); + merge_iter = + new (mem) MergingIterInst(comparator, nullptr, 0, true, prefix_seek_mode); + } + else { + using MergingIterInst = MergingIterTmpl; + auto mem = arena->AllocateAligned(sizeof(MergingIterInst)); + merge_iter = + new (mem) MergingIterInst(comparator, nullptr, 0, true, prefix_seek_mode); + } } MergeIteratorBuilder::~MergeIteratorBuilder() { diff --git a/util/comparator.cc b/util/comparator.cc index d04031e39..6a604f0a3 100644 --- a/util/comparator.cc +++ b/util/comparator.cc @@ -378,4 +378,30 @@ Status Comparator::CreateFromString(const ConfigOptions& config_options, } return status; } + +bool IsForwardBytewiseComparator(const Comparator* cmp) { + return IsForwardBytewiseComparator(cmp->Name()); +} +bool IsForwardBytewiseComparator(const Slice& name) { + if (name.starts_with("RocksDB_SE_")) { + return true; + } + return name == "leveldb.BytewiseComparator"; +} + +bool IsBytewiseComparator(const Comparator* cmp) { + return IsBytewiseComparator(cmp->Name()); +} +bool IsBytewiseComparator(const Slice& name) { + if (name.starts_with("RocksDB_SE_")) { + return true; + } + if (name.starts_with("rev:RocksDB_SE_")) { + // reverse bytewise compare, needs reverse in iterator + return true; + } + return name == "leveldb.BytewiseComparator" || + name == "rocksdb.ReverseBytewiseComparator"; +} + } // namespace ROCKSDB_NAMESPACE