Skip to content

Commit

Permalink
Add option for max file size. The currend hard-coded value of 2M is i…
Browse files Browse the repository at this point in the history
…nefficient in colossus.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=134391640
  • Loading branch information
corrado authored and cmumford committed Sep 28, 2016
1 parent 3080a45 commit a2fb086
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 32 deletions.
38 changes: 29 additions & 9 deletions db/db_bench.cc
Expand Up @@ -84,6 +84,14 @@ static bool FLAGS_histogram = false;
// (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0;

// Number of bytes written to each file.
// (initialized to default value by "main")
static int FLAGS_max_file_size = 0;

// Approximate size of user data packed per block (before compression.
// (initialized to default value by "main")
static int FLAGS_block_size = 0;

// Number of bytes to use as a cache of uncompressed data.
// Negative means use default settings.
static int FLAGS_cache_size = -1;
Expand All @@ -109,6 +117,7 @@ static const char* FLAGS_db = NULL;
namespace leveldb {

namespace {
leveldb::Env* g_env = NULL;

// Helper for quickly generating random data.
class RandomGenerator {
Expand Down Expand Up @@ -186,7 +195,7 @@ class Stats {
done_ = 0;
bytes_ = 0;
seconds_ = 0;
start_ = Env::Default()->NowMicros();
start_ = g_env->NowMicros();
finish_ = start_;
message_.clear();
}
Expand All @@ -204,7 +213,7 @@ class Stats {
}

void Stop() {
finish_ = Env::Default()->NowMicros();
finish_ = g_env->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}

Expand All @@ -214,7 +223,7 @@ class Stats {

void FinishedSingleOp() {
if (FLAGS_histogram) {
double now = Env::Default()->NowMicros();
double now = g_env->NowMicros();
double micros = now - last_op_finish_;
hist_.Add(micros);
if (micros > 20000) {
Expand Down Expand Up @@ -404,10 +413,10 @@ class Benchmark {
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0) {
std::vector<std::string> files;
Env::Default()->GetChildren(FLAGS_db, &files);
g_env->GetChildren(FLAGS_db, &files);
for (size_t i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) {
Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
g_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
}
}
if (!FLAGS_use_existing_db) {
Expand Down Expand Up @@ -589,7 +598,7 @@ class Benchmark {
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
arg[i].thread->shared = &shared;
Env::Default()->StartThread(ThreadBody, &arg[i]);
g_env->StartThread(ThreadBody, &arg[i]);
}

shared.mu.Lock();
Expand Down Expand Up @@ -700,9 +709,12 @@ class Benchmark {
void Open() {
assert(db_ == NULL);
Options options;
options.env = g_env;
options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_file_size = FLAGS_max_file_size;
options.block_size = FLAGS_block_size;
options.max_open_files = FLAGS_open_files;
options.filter_policy = filter_policy_;
options.reuse_logs = FLAGS_reuse_logs;
Expand Down Expand Up @@ -925,7 +937,7 @@ class Benchmark {
char fname[100];
snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file);
Status s = g_env->NewWritableFile(fname, &file);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
Expand All @@ -934,7 +946,7 @@ class Benchmark {
delete file;
if (!ok) {
fprintf(stderr, "heap profiling not supported\n");
Env::Default()->DeleteFile(fname);
g_env->DeleteFile(fname);
}
}
};
Expand All @@ -943,6 +955,8 @@ class Benchmark {

int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
FLAGS_max_file_size = leveldb::Options().max_file_size;
FLAGS_block_size = leveldb::Options().block_size;
FLAGS_open_files = leveldb::Options().max_open_files;
std::string default_db_path;

Expand Down Expand Up @@ -973,6 +987,10 @@ int main(int argc, char** argv) {
FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) {
FLAGS_max_file_size = n;
} else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
FLAGS_block_size = n;
} else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
FLAGS_cache_size = n;
} else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
Expand All @@ -987,9 +1005,11 @@ int main(int argc, char** argv) {
}
}

leveldb::g_env = leveldb::Env::Default();

// Choose a location for the test database if none given with --db=<path>
if (FLAGS_db == NULL) {
leveldb::Env::Default()->GetTestDirectory(&default_db_path);
leveldb::g_env->GetTestDirectory(&default_db_path);
default_db_path += "/dbbench";
FLAGS_db = default_db_path.c_str();
}
Expand Down
1 change: 1 addition & 0 deletions db/db_impl.cc
Expand Up @@ -96,6 +96,7 @@ Options SanitizeOptions(const std::string& dbname,
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
ClipToRange(&result.max_file_size, 1<<20, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
// Open a log file in the same directory as the db
Expand Down
57 changes: 35 additions & 22 deletions db/version_set.cc
Expand Up @@ -20,30 +20,39 @@

namespace leveldb {

static const int kTargetFileSize = 2 * 1048576;
static int TargetFileSize(const Options* options) {
return options->max_file_size;
}

// Maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
static int64_t MaxGrandParentOverlapBytes(const Options* options) {
return 10 * TargetFileSize(options);
}

// Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the
// total compaction cover more than this many bytes.
static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
static int64_t ExpandedCompactionByteSizeLimit(const Options* options) {
return 25 * TargetFileSize(options);
}

static double MaxBytesForLevel(int level) {
static double MaxBytesForLevel(const Options* options, int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
double result = 10 * 1048576.0; // Result for both level-0 and level-1

// Result for both level-0 and level-1
double result = 10. * 1048576.0;
while (level > 1) {
result *= 10;
level--;
}
return result;
}

static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files?
static uint64_t MaxFileSizeForLevel(const Options* options, int level) {
// We could vary per level to reduce number of files?
return TargetFileSize(options);
}

static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
Expand Down Expand Up @@ -508,7 +517,7 @@ int Version::PickLevelForMemTableOutput(
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) {
if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
break;
}
}
Expand Down Expand Up @@ -1027,7 +1036,7 @@ bool VersionSet::ReuseManifest(const std::string& dscname,
manifest_type != kDescriptorFile ||
!env_->GetFileSize(dscname, &manifest_size).ok() ||
// Make new compacted MANIFEST if old one is too big
manifest_size >= kTargetFileSize) {
manifest_size >= TargetFileSize(options_)) {
return false;
}

Expand Down Expand Up @@ -1076,7 +1085,8 @@ void VersionSet::Finalize(Version* v) {
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
Expand Down Expand Up @@ -1290,7 +1300,7 @@ Compaction* VersionSet::PickCompaction() {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
c = new Compaction(options_, level);

// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
Expand All @@ -1307,7 +1317,7 @@ Compaction* VersionSet::PickCompaction() {
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
Expand Down Expand Up @@ -1352,7 +1362,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
Expand Down Expand Up @@ -1414,7 +1425,7 @@ Compaction* VersionSet::CompactRange(
// and we must not pick one file and drop another older file if the
// two files overlap.
if (level > 0) {
const uint64_t limit = MaxFileSizeForLevel(level);
const uint64_t limit = MaxFileSizeForLevel(options_, level);
uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); i++) {
uint64_t s = inputs[i]->file_size;
Expand All @@ -1426,17 +1437,17 @@ Compaction* VersionSet::CompactRange(
}
}

Compaction* c = new Compaction(level);
Compaction* c = new Compaction(options_, level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
SetupOtherInputs(c);
return c;
}

Compaction::Compaction(int level)
Compaction::Compaction(const Options* options, int level)
: level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)),
max_output_file_size_(MaxFileSizeForLevel(options, level)),
input_version_(NULL),
grandparent_index_(0),
seen_key_(false),
Expand All @@ -1453,12 +1464,13 @@ Compaction::~Compaction() {
}

bool Compaction::IsTrivialMove() const {
const VersionSet* vset = input_version_->vset_;
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <=
MaxGrandParentOverlapBytes(vset->options_));
}

void Compaction::AddInputDeletions(VersionEdit* edit) {
Expand Down Expand Up @@ -1491,8 +1503,9 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
}

bool Compaction::ShouldStopBefore(const Slice& internal_key) {
const VersionSet* vset = input_version_->vset_;
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
const InternalKeyComparator* icmp = &vset->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
Expand All @@ -1503,7 +1516,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
}
seen_key_ = true;

if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
Expand Down
2 changes: 1 addition & 1 deletion db/version_set.h
Expand Up @@ -366,7 +366,7 @@ class Compaction {
friend class Version;
friend class VersionSet;

explicit Compaction(int level);
Compaction(const Options* options, int level);

int level_;
uint64_t max_output_file_size_;
Expand Down
12 changes: 12 additions & 0 deletions include/leveldb/options.h
Expand Up @@ -112,6 +112,18 @@ struct Options {
// Default: 16
int block_restart_interval;

// Leveldb will write up to this amount of bytes to a file before
// switching to a new one.
// Most clients should leave this parameter alone. However if your
// filesystem is more efficient with larger files, you could
// consider increasing the value. The downside will be longer
// compactions and hence longer latency/performance hiccups.
// Another reason to increase this parameter might be when you are
// initially populating a large database.
//
// Default: 2MB
size_t max_file_size;

// Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically.
//
Expand Down
1 change: 1 addition & 0 deletions util/options.cc
Expand Up @@ -21,6 +21,7 @@ Options::Options()
block_cache(NULL),
block_size(4096),
block_restart_interval(16),
max_file_size(2<<20),
compression(kSnappyCompression),
reuse_logs(false),
filter_policy(NULL) {
Expand Down

0 comments on commit a2fb086

Please sign in to comment.