Skip to content

Commit

Permalink
ReadOptions.total_order_seek to allow total order seek for block-base…
Browse files Browse the repository at this point in the history
…d table when hash index is enabled

Summary: as title

Test Plan: table_test

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22239
  • Loading branch information
Lei Jin committed Aug 25, 2014
1 parent a98badf commit 2386185
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 41 deletions.
10 changes: 7 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
FileMetaData meta;
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
Iterator* iter = mem->NewIterator(ReadOptions(), true);
ReadOptions ro;
ro.total_order_seek = true;
Iterator* iter = mem->NewIterator(ro);
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mem->GetFirstSequenceNumber();
Expand Down Expand Up @@ -1473,11 +1475,13 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
mutex_.Unlock();
log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables;
ReadOptions ro;
ro.total_order_seek = true;
for (MemTable* m : mems) {
Log(options_.info_log,
"[%s] Flushing memtable with next log file: %" PRIu64 "\n",
cfd->GetName().c_str(), m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ReadOptions(), true));
memtables.push_back(m->NewIterator(ro));
}
Iterator* iter = NewMergingIterator(&cfd->internal_comparator(),
&memtables[0], memtables.size());
Expand Down Expand Up @@ -3300,7 +3304,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(options, false, arena));
super_version->mem->NewIterator(options, arena));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln
Expand Down
13 changes: 6 additions & 7 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {

class MemTableIterator: public Iterator {
public:
MemTableIterator(const MemTable& mem, const ReadOptions& options,
bool enforce_total_order, Arena* arena)
MemTableIterator(
const MemTable& mem, const ReadOptions& options, Arena* arena)
: bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
valid_(false),
arena_mode_(arena != nullptr) {
if (prefix_extractor_ != nullptr && !enforce_total_order) {
if (prefix_extractor_ != nullptr && !options.total_order_seek) {
bloom_ = mem.prefix_bloom_.get();
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
Expand Down Expand Up @@ -248,14 +248,13 @@ class MemTableIterator: public Iterator {
void operator=(const MemTableIterator&);
};

Iterator* MemTable::NewIterator(const ReadOptions& options,
bool enforce_total_order, Arena* arena) {
Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) {
if (arena == nullptr) {
return new MemTableIterator(*this, options, enforce_total_order, nullptr);
return new MemTableIterator(*this, options, nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem)
MemTableIterator(*this, options, enforce_total_order, arena);
MemTableIterator(*this, options, arena);
}
}

Expand Down
1 change: 0 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class MemTable {
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
Iterator* NewIterator(const ReadOptions& options,
bool enforce_total_order = false,
Arena* arena = nullptr);

// Add an entry into memtable that maps key to value at the
Expand Down
3 changes: 2 additions & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ class Repairer {
FileMetaData meta;
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
ReadOptions ro;
Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */);
ro.total_order_seek = true;
Iterator* iter = mem->NewIterator(ro);
status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_,
iter, &meta, icmp_, 0, 0, kNoCompression);
delete iter;
Expand Down
11 changes: 9 additions & 2 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,18 +902,25 @@ struct ReadOptions {
// Not supported in ROCKSDB_LITE mode!
bool tailing;

// Enable a total order seek regardless of index format (e.g. hash index)
// used in the table. Some table format (e.g. plain table) may not support
// this option.
bool total_order_seek;

ReadOptions()
: verify_checksums(true),
fill_cache(true),
snapshot(nullptr),
read_tier(kReadAllTier),
tailing(false) {}
tailing(false),
total_order_seek(false) {}
ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
snapshot(nullptr),
read_tier(kReadAllTier),
tailing(false) {}
tailing(false),
total_order_seek(false) {}
};

// Options that control write operations
Expand Down
12 changes: 9 additions & 3 deletions table/block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ Block::~Block() {
}
}

Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) {
Iterator* Block::NewIterator(
const Comparator* cmp, BlockIter* iter, bool total_order_seek) {
if (size_ < 2*sizeof(uint32_t)) {
if (iter != nullptr) {
iter->SetStatus(Status::Corruption("bad block contents"));
Expand All @@ -339,12 +340,17 @@ Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) {
return NewEmptyIterator();
}
} else {
BlockHashIndex* hash_index_ptr =
total_order_seek ? nullptr : hash_index_.get();
BlockPrefixIndex* prefix_index_ptr =
total_order_seek ? nullptr : prefix_index_.get();

if (iter != nullptr) {
iter->Initialize(cmp, data_, restart_offset_, num_restarts,
hash_index_.get(), prefix_index_.get());
hash_index_ptr, prefix_index_ptr);
} else {
iter = new BlockIter(cmp, data_, restart_offset_, num_restarts,
hash_index_.get(), prefix_index_.get());
hash_index_ptr, prefix_index_ptr);
}
}

Expand Down
6 changes: 5 additions & 1 deletion table/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ class Block {
//
// If iter is null, return new Iterator
// If iter is not null, update this one and return it as Iterator*
//
// If total_order_seek is true, hash_index_ and prefix_index_ are ignored.
// This option only applies for index block. For data block, hash_index_
// and prefix_index_ are null, so this option does not matter.
Iterator* NewIterator(const Comparator* comparator,
BlockIter* iter = nullptr);
BlockIter* iter = nullptr, bool total_order_seek = true);
void SetBlockHashIndex(BlockHashIndex* hash_index);
void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index);

Expand Down
24 changes: 15 additions & 9 deletions table/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class BlockBasedTable::IndexReader {
// Create an iterator for index access.
// An iter is passed in, if it is not null, update this one and return it
// If it is null, create a new Iterator
virtual Iterator* NewIterator(BlockIter* iter = nullptr) = 0;
virtual Iterator* NewIterator(
BlockIter* iter = nullptr, bool total_order_seek = true) = 0;

// The size of the index.
virtual size_t size() const = 0;
Expand Down Expand Up @@ -174,8 +175,9 @@ class BinarySearchIndexReader : public IndexReader {
return s;
}

virtual Iterator* NewIterator(BlockIter* iter = nullptr) override {
return index_block_->NewIterator(comparator_, iter);
virtual Iterator* NewIterator(
BlockIter* iter = nullptr, bool dont_care = true) override {
return index_block_->NewIterator(comparator_, iter, true);
}

virtual size_t size() const override { return index_block_->size(); }
Expand Down Expand Up @@ -295,8 +297,9 @@ class HashIndexReader : public IndexReader {
return Status::OK();
}

virtual Iterator* NewIterator(BlockIter* iter = nullptr) override {
return index_block_->NewIterator(comparator_, iter);
virtual Iterator* NewIterator(
BlockIter* iter = nullptr, bool total_order_seek = true) override {
return index_block_->NewIterator(comparator_, iter, total_order_seek);
}

virtual size_t size() const override { return index_block_->size(); }
Expand Down Expand Up @@ -818,7 +821,8 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options,
BlockIter* input_iter) {
// index reader has already been pre-populated.
if (rep_->index_reader) {
return rep_->index_reader->NewIterator(input_iter);
return rep_->index_reader->NewIterator(
input_iter, read_options.total_order_seek);
}

bool no_io = read_options.read_tier == kBlockCacheTier;
Expand Down Expand Up @@ -866,10 +870,9 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options,
}

assert(cache_handle);
Iterator* iter;
iter = index_reader->NewIterator(input_iter);
auto* iter = index_reader->NewIterator(
input_iter, read_options.total_order_seek);
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);

return iter;
}

Expand Down Expand Up @@ -988,6 +991,9 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
}

bool PrefixMayMatch(const Slice& internal_key) override {
if (read_options_.total_order_seek) {
return true;
}
return table_->PrefixMayMatch(internal_key);
}

Expand Down
2 changes: 1 addition & 1 deletion table/block_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void CheckBlockContents(BlockContents contents, const int max_key,
}

std::unique_ptr<Iterator> hash_iter(
reader1.NewIterator(BytewiseComparator()));
reader1.NewIterator(BytewiseComparator(), nullptr, false));

std::unique_ptr<Iterator> regular_iter(
reader2.NewIterator(BytewiseComparator()));
Expand Down
11 changes: 9 additions & 2 deletions table/cuckoo_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,17 @@ Slice CuckooTableIterator::value() const {
return curr_value_;
}

Iterator* CuckooTableReader::NewIterator(const ReadOptions&, Arena* arena) {
extern Iterator* NewErrorIterator(const Status& status, Arena* arena);

Iterator* CuckooTableReader::NewIterator(
const ReadOptions& read_options, Arena* arena) {
if (!status().ok()) {
return NewErrorIterator(
Status::Corruption("CuckooTableReader status is not okay."));
Status::Corruption("CuckooTableReader status is not okay."), arena);
}
if (read_options.total_order_seek) {
return NewErrorIterator(
Status::InvalidArgument("total_order_seek is not supported."), arena);
}
CuckooTableIterator* iter;
if (arena == nullptr) {
Expand Down
4 changes: 4 additions & 0 deletions table/plain_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ void PlainTableReader::SetupForCompaction() {

Iterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (options.total_order_seek && !IsTotalOrderMode()) {
return NewErrorIterator(
Status::InvalidArgument("total_order_seek not supported"), arena);
}
if (arena == nullptr) {
return new PlainTableIterator(this, prefix_extractor_ != nullptr);
} else {
Expand Down

0 comments on commit 2386185

Please sign in to comment.