Skip to content

Commit

Permalink
Push- instead of pull-model for managing Write stalls
Browse files Browse the repository at this point in the history
Summary:
Introducing WriteController, which is a source of truth about per-DB write delays. Let's define an DB epoch as a period where there are no flushes and compactions (i.e. new epoch is started when flush or compaction finishes). Each epoch can either:
* proceed with all writes without delay
* delay all writes by fixed time
* stop all writes

The three modes are recomputed at each epoch change (flush, compaction), rather than on every write (which is currently the case).

When we have a lot of column families, our current pull behavior adds a big overhead, since we need to loop over every column family for every write. With new push model, overhead on Write code-path is minimal.

This is just the start. Next step is to also take care of stalls introduced by slow memtable flushes. The final goal is to eliminate function MakeRoomForWrite(), which currently needs to be called for every column family by every write.

Test Plan: make check for now. I'll add some unit tests later. Also, perf test.

Reviewers: dhruba, yhchiang, MarkCallaghan, sdong, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22791
  • Loading branch information
igorcanadi committed Sep 8, 2014
1 parent 0af157f commit a2bb7c3
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 258 deletions.
5 changes: 4 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Rocksdb Change Log

## Unreleased
## Unreleased (will be released with 3.6)

### Behavior changes
* We have refactored our system of stalling writes. Any stall-related statistics' meanings are changed. Instead of per-write stall counts, we now count stalls per-epoch, where epochs are periods between flushes and compactions. You'll find more information in our Tuning Perf Guide once we release RocksDB 3.6.

----- Past Releases -----

Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ TESTS = \
version_edit_test \
version_set_test \
file_indexer_test \
write_batch_test\
write_batch_test \
write_controller_test\
deletefile_test \
table_test \
thread_local_test \
Expand Down Expand Up @@ -427,6 +428,9 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

write_controller_test: db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

Expand Down
130 changes: 96 additions & 34 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

#include "db/column_family.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <vector>
#include <string>
#include <algorithm>
Expand All @@ -19,11 +24,42 @@
#include "db/internal_stats.h"
#include "db/compaction_picker.h"
#include "db/table_properties_collector.h"
#include "db/write_controller.h"
#include "util/autovector.h"
#include "util/hash_skiplist_rep.h"

namespace rocksdb {

namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
// (top - bottom)
// and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
uint64_t delay;
if (n >= top) {
delay = 1000;
} else if (n < bottom) {
delay = 0;
} else {
// If we are here, we know that:
// level0_start_slowdown <= n < level0_slowdown
// since the previous two conditions are false.
double how_much = static_cast<double>(n - bottom) / (top - bottom);
delay = std::max(how_much * how_much * 1000, 100.0);
}
assert(delay <= 1000);
return delay;
}
} // namespace

ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
DBImpl* db, port::Mutex* mutex)
: cfd_(cfd), db_(db), mutex_(mutex) {
Expand Down Expand Up @@ -197,7 +233,6 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
next_(nullptr),
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false),
column_family_set_(column_family_set) {
Ref();

Expand Down Expand Up @@ -278,44 +313,70 @@ ColumnFamilyData::~ColumnFamilyData() {
}

void ColumnFamilyData::RecalculateWriteStallConditions() {
need_wait_for_num_memtables_ =
(imm()->size() == options()->max_write_buffer_number - 1);

if (current_ != nullptr) {
need_wait_for_num_level0_files_ =
(current_->NumLevelFiles(0) >= options()->level0_stop_writes_trigger);
} else {
need_wait_for_num_level0_files_ = false;
}

RecalculateWriteStallRateLimitsConditions();
}

void ColumnFamilyData::RecalculateWriteStallRateLimitsConditions() {
if (current_ != nullptr) {
exceeds_hard_rate_limit_ =
(options()->hard_rate_limit > 1.0 &&
current_->MaxCompactionScore() > options()->hard_rate_limit);

exceeds_soft_rate_limit_ =
(options()->soft_rate_limit > 0.0 &&
current_->MaxCompactionScore() > options()->soft_rate_limit);
} else {
exceeds_hard_rate_limit_ = false;
exceeds_soft_rate_limit_ = false;
const double score = current_->MaxCompactionScore();
const int max_level = current_->MaxCompactionScoreLevel();

auto write_controller = column_family_set_->write_controller_;

if (imm()->size() == options_.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(options_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush)",
name_.c_str(), imm()->size());
} else if (options_.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >=
options_.level0_slowdown_writes_trigger) {
uint64_t slowdown = SlowdownAmount(
current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(options_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
} else if (current_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(options_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), current_->NumLevelFiles(0));
} else if (options_.hard_rate_limit > 1.0 &&
score > options_.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;
write_controller_token_ =
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
false);
Log(options_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
} else if (options_.soft_rate_limit > 0.0 &&
score > options_.soft_rate_limit) {
uint64_t slowdown = SlowdownAmount(score, options_.soft_rate_limit,
options_.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(options_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
"us)",
name_.c_str(), max_level, slowdown);
} else {
write_controller_token_.reset();
}
}
}

const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->env_options_);
}

void ColumnFamilyData::SetCurrent(Version* current) {
current_ = current;
need_slowdown_for_num_level0_files_ =
(options_.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger);
}
void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; }

void ColumnFamilyData::CreateNewMemtable() {
assert(current_ != nullptr);
Expand All @@ -328,7 +389,6 @@ void ColumnFamilyData::CreateNewMemtable() {

Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(current_, log_buffer);
RecalculateWriteStallRateLimitsConditions();
return result;
}

Expand Down Expand Up @@ -464,16 +524,18 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& env_options,
Cache* table_cache)
Cache* table_cache,
WriteController* write_controller)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
env_options_, nullptr)),
env_options, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
env_options_(env_options),
table_cache_(table_cache),
write_controller_(write_controller),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
Expand Down
54 changes: 8 additions & 46 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "rocksdb/env.h"
#include "db/memtable_list.h"
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "util/thread_local.h"

Expand Down Expand Up @@ -156,6 +157,7 @@ class ColumnFamilyData {
// can't drop default CF
assert(id_ != 0);
dropped_ = true;
write_controller_token_.reset();
}
bool IsDropped() const { return dropped_; }

Expand Down Expand Up @@ -225,35 +227,12 @@ class ColumnFamilyData {

void ResetThreadLocalSuperVersions();

// A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files.
bool NeedSlowdownForNumLevel0Files() const {
return need_slowdown_for_num_level0_files_;
}

bool NeedWaitForNumLevel0Files() const {
return need_wait_for_num_level0_files_;
}

bool NeedWaitForNumMemtables() const {
return need_wait_for_num_memtables_;
}

bool ExceedsSoftRateLimit() const {
return exceeds_soft_rate_limit_;
}

bool ExceedsHardRateLimit() const {
return exceeds_hard_rate_limit_;
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& env_options,
const DBOptions* db_options, const EnvOptions& env_options,
ColumnFamilySet* column_family_set);

// Recalculate some small conditions, which are changed only during
Expand All @@ -262,7 +241,6 @@ class ColumnFamilyData {
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions();
void RecalculateWriteStallRateLimitsConditions();

uint32_t id_;
const std::string name_;
Expand Down Expand Up @@ -304,31 +282,13 @@ class ColumnFamilyData {
// recovered from
uint64_t log_number_;

// A flag indicating whether we should delay writes because
// we have too many level 0 files
bool need_slowdown_for_num_level0_files_;

// These 4 variables are updated only after compaction,
// adding new memtable, flushing memtables to files
// and/or add recalculation of compaction score.
// That's why theirs values are cached in ColumnFamilyData.
// Recalculation is made by RecalculateWriteStallConditions and
// RecalculateWriteStallRateLimitsConditions function. They are used
// in DBImpl::MakeRoomForWrite function to decide, if it need
// to sleep during write operation
bool need_wait_for_num_memtables_;

bool need_wait_for_num_level0_files_;

bool exceeds_hard_rate_limit_;

bool exceeds_soft_rate_limit_;

// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;

ColumnFamilySet* column_family_set_;

std::unique_ptr<WriteControllerToken> write_controller_token_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down Expand Up @@ -370,7 +330,8 @@ class ColumnFamilySet {
};

ColumnFamilySet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache);
const EnvOptions& env_options, Cache* table_cache,
WriteController* write_controller);
~ColumnFamilySet();

ColumnFamilyData* GetDefault() const;
Expand Down Expand Up @@ -425,6 +386,7 @@ class ColumnFamilySet {
const DBOptions* const db_options_;
const EnvOptions env_options_;
Cache* table_cache_;
WriteController* write_controller_;
std::atomic_flag spin_lock_;
};

Expand Down

0 comments on commit a2bb7c3

Please sign in to comment.