Skip to content

Commit

Permalink
make compaction related options changeable
Browse files Browse the repository at this point in the history
Summary:
make compaction related options changeable. Most of changes are tedious,
following the same convention: grabs MutableCFOptions at the beginning
of compaction under mutex, then pass it throughout the job and register
it in SuperVersion at the end.

Test Plan: make all check

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23349
  • Loading branch information
Lei Jin committed Oct 1, 2014
1 parent d122e7b commit 5ec53f3
Show file tree
Hide file tree
Showing 22 changed files with 686 additions and 399 deletions.
75 changes: 40 additions & 35 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
internal_comparator_(cf_options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_),
mutable_cf_options_(options_, ioptions_),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
Expand All @@ -245,27 +245,27 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(options_.num_levels, db_options->env, this));
new InternalStats(ioptions_.num_levels, db_options->env, this));
table_cache_.reset(new TableCache(ioptions_, env_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
} else if (options_.compaction_style == kCompactionStyleLevel) {
new UniversalCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
new LevelCompactionPicker(ioptions_, &internal_comparator_));
} else {
assert(options_.compaction_style == kCompactionStyleFIFO);
assert(ioptions_.compaction_style == kCompactionStyleFIFO);
compaction_picker_.reset(
new FIFOCompactionPicker(&options_, &internal_comparator_));
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
}

Log(options_.info_log, "Options for column family \"%s\":\n",
Log(ioptions_.info_log, "Options for column family \"%s\":\n",
name.c_str());
const ColumnFamilyOptions* cf_options = &options_;
cf_options->Dump(options_.info_log.get());
cf_options->Dump(ioptions_.info_log);
}

RecalculateWriteStallConditions();
RecalculateWriteStallConditions(mutable_cf_options_);
}

// DB mutex held
Expand Down Expand Up @@ -318,7 +318,8 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}

void ColumnFamilyData::RecalculateWriteStallConditions() {
void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) {
const double score = current_->MaxCompactionScore();
const int max_level = current_->MaxCompactionScoreLevel();
Expand All @@ -328,26 +329,27 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
if (imm()->size() == options_.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush)",
name_.c_str(), imm()->size());
} else if (current_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), current_->NumLevelFiles(0));
} else if (options_.level0_slowdown_writes_trigger >= 0 &&
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >=
options_.level0_slowdown_writes_trigger) {
mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown = SlowdownAmount(
current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
current_->NumLevelFiles(0),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
Expand All @@ -358,7 +360,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
false);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
Expand All @@ -368,7 +370,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
options_.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
"us)",
name_.c_str(), max_level, slowdown);
Expand All @@ -393,19 +395,21 @@ void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) {
mem_->Ref();
}

Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(current_, log_buffer);
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(
mutable_options, current_, log_buffer);
return result;
}

Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(current_, input_level, output_level,
output_path_id, begin, end,
compaction_end);
Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(
mutable_cf_options, current_, input_level, output_level,
output_path_id, begin, end, compaction_end);
}

SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
Expand Down Expand Up @@ -443,11 +447,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;

if (sv && sv->Unref()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
db_mutex->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
Expand Down Expand Up @@ -502,7 +506,7 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
// Reset SuperVersions cached in thread local storage
ResetThreadLocalSuperVersions();

RecalculateWriteStallConditions();
RecalculateWriteStallConditions(mutable_cf_options);

if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
Expand Down Expand Up @@ -533,6 +537,7 @@ bool ColumnFamilyData::SetOptions(
if (GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
&new_mutable_cf_options)) {
mutable_cf_options_ = new_mutable_cf_options;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
return true;
}
return false;
Expand Down
16 changes: 10 additions & 6 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,14 @@ class ColumnFamilyData {
TableCache* table_cache() const { return table_cache_.get(); }

// See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer);
Compaction* CompactRange(int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer);
Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);

CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
Expand Down Expand Up @@ -260,7 +263,8 @@ class ColumnFamilyData {
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions();
void RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);

uint32_t id_;
const std::string name_;
Expand Down
7 changes: 3 additions & 4 deletions db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level,
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {

cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
Expand Down Expand Up @@ -267,12 +266,12 @@ void Compaction::Summary(char* output, int len) {
snprintf(output + write, len - write, "]");
}

uint64_t Compaction::OutputFilePreallocationSize() {
uint64_t Compaction::OutputFilePreallocationSize(
const MutableCFOptions& mutable_options) {
uint64_t preallocation_size = 0;

if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
preallocation_size =
cfd_->compaction_picker()->MaxFileSizeForLevel(output_level());
preallocation_size = mutable_options.MaxFileSizeForLevel(output_level());
} else {
for (int level = 0; level < num_input_levels(); ++level) {
for (const auto& f : inputs_[level].files) {
Expand Down
8 changes: 7 additions & 1 deletion db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once
#include "util/arena.h"
#include "util/autovector.h"
#include "util/mutable_cf_options.h"
#include "db/version_set.h"

namespace rocksdb {
Expand Down Expand Up @@ -151,10 +152,14 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; }

// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }

// Returns the size in bytes that the output file should be preallocated to.
// In level compaction, that is max_file_size_. In universal compaction, that
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize();
uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options);

private:
friend class CompactionPicker;
Expand All @@ -171,6 +176,7 @@ class Compaction {
const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t max_grandparent_overlap_bytes_;
MutableCFOptions mutable_cf_options_;
Version* input_version_;
VersionEdit* edit_;
int number_levels_;
Expand Down

0 comments on commit 5ec53f3

Please sign in to comment.