Skip to content

Commit

Permalink
DB::Flush() Do not wait for background threads when there is nothing …
Browse files Browse the repository at this point in the history
…in mem table

Summary:
When we have multiple column families, users can issue Flush() on every column families to make sure everything is flushes, even if some of them might be empty. By skipping the waiting for empty cases, it can be greatly speed up.

Still wait for people's comments before writing unit tests for it.

Test Plan: Will write a unit test to make sure it is correct.

Reviewers: ljin, yhchiang, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D22953
  • Loading branch information
siying committed Sep 8, 2014
1 parent a2bb7c3 commit 011241b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
8 changes: 7 additions & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ DBImpl::~DBImpl() {
mutex_.Lock();
if (flush_on_destroy_) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
if (!cfd->mem()->IsEmpty()) {
cfd->Ref();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions());
Expand Down Expand Up @@ -1905,6 +1905,12 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
{
WriteContext context;
MutexLock guard_lock(&mutex_);

if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) {
// Nothing to flush
return Status::OK();
}

s = BeginWrite(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job

Expand Down
43 changes: 43 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,49 @@ class SleepingBackgroundTask {
bool done_with_sleep_;
};

TEST(DBTest, FlushEmptyColumnFamily) {
// Block flush thread and disable compaction thread
env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
Env::Priority::HIGH);

Options options = CurrentOptions();
// disable compaction
options.disable_auto_compactions = true;
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.max_write_buffer_number = 2;
options.min_write_buffer_number_to_merge = 1;
CreateAndReopenWithCF({"pikachu"}, &options);

// Compaction can still go through even if no thread can flush the
// mem table.
ASSERT_OK(Flush(0));
ASSERT_OK(Flush(1));

// Insert can go through
ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));

ASSERT_EQ("v1", Get(0, "foo"));
ASSERT_EQ("v1", Get(1, "bar"));

sleeping_task_high.WakeUp();
sleeping_task_high.WaitUntilDone();

// Flush can still go through.
ASSERT_OK(Flush(0));
ASSERT_OK(Flush(1));

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
}

TEST(DBTest, GetProperty) {
// Set sizes to both background thread pool to be 1 and block them.
env_->SetBackgroundThreads(1, Env::HIGH);
Expand Down
2 changes: 1 addition & 1 deletion db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ static bool SaveValue(void* arg, const char* entry) {
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
// The sequence number is updated synchronously in version_set.h
if (first_seqno_ == 0) {
if (IsEmpty()) {
// Avoiding recording stats for speed.
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class MemTable {
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }

// Returns if there is no entry inserted to the mem table.
bool IsEmpty() const { return first_seqno_ == 0; }

// Returns the sequence number of the first element that was inserted
// into the memtable
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
Expand Down

0 comments on commit 011241b

Please sign in to comment.