Skip to content

Commit

Permalink
introduce ImmutableOptions
Browse files Browse the repository at this point in the history
Summary:
As a preparation to support updating some options dynamically, I'd like
to first introduce ImmutableOptions, which is a subset of Options that
cannot be changed during the course of a DB lifetime without restart.

ColumnFamily will keep both Options and ImmutableOptions. Any component
below ColumnFamily should only take ImmutableOptions in their
constructor. Other options should be taken from APIs, which will be
allowed to adjust dynamically.

I am yet to make changes to memtable and other related classes to take
ImmutableOptions in their ctor. That can be done in a seprate diff as
this one is already pretty big.

Test Plan: make all check

Reviewers: yhchiang, igor, sdong

Reviewed By: sdong

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D22545
  • Loading branch information
Lei Jin committed Sep 4, 2014
1 parent e0b99d4 commit 5665e5e
Show file tree
Hide file tree
Showing 42 changed files with 554 additions and 368 deletions.
40 changes: 22 additions & 18 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@ namespace rocksdb {

class TableFactory;

TableBuilder* NewTableBuilder(const Options& options,
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
CompressionType compression_type) {
return options.table_factory->NewTableBuilder(options, internal_comparator,
file, compression_type);
const CompressionType compression_type,
const CompressionOptions& compression_opts) {
return ioptions.table_factory->NewTableBuilder(
ioptions, internal_comparator, file, compression_type, compression_opts);
}

Status BuildTable(const std::string& dbname, Env* env, const Options& options,
const EnvOptions& soptions, TableCache* table_cache,
Status BuildTable(const std::string& dbname, Env* env,
const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, TableCache* table_cache,
Iterator* iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts,
const Env::IOPriority io_priority) {
Status s;
meta->fd.file_size = 0;
Expand All @@ -50,23 +53,24 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
// If the sequence number of the smallest entry in the memtable is
// smaller than the most recent snapshot, then we do not trigger
// removal of duplicate/deleted keys as part of this builder.
bool purge = options.purge_redundant_kvs_while_flush;
bool purge = ioptions.purge_redundant_kvs_while_flush;
if (earliest_seqno_in_memtable <= newest_snapshot) {
purge = false;
}

std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(),
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
if (iter->Valid()) {
unique_ptr<WritableFile> file;
s = env->NewWritableFile(fname, &file, soptions);
s = env->NewWritableFile(fname, &file, env_options);
if (!s.ok()) {
return s;
}
file->SetIOPriority(io_priority);

TableBuilder* builder =
NewTableBuilder(options, internal_comparator, file.get(), compression);
TableBuilder* builder = NewTableBuilder(
ioptions, internal_comparator, file.get(),
compression, compression_opts);

// the first key is the smallest key
Slice key = iter->key();
Expand All @@ -75,8 +79,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
meta->largest_seqno = meta->smallest_seqno;

MergeHelper merge(internal_comparator.user_comparator(),
options.merge_operator.get(), options.info_log.get(),
options.min_partial_merge_operands,
ioptions.merge_operator, ioptions.info_log,
ioptions.min_partial_merge_operands,
true /* internal key corruption is not ok */);

if (purge) {
Expand Down Expand Up @@ -196,12 +200,12 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
delete builder;

// Finish and check for file errors
if (s.ok() && !options.disableDataSync) {
if (options.use_fsync) {
StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
if (s.ok() && !ioptions.disable_data_sync) {
if (ioptions.use_fsync) {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
s = file->Fsync();
} else {
StopWatch sw(env, options.statistics.get(), TABLE_SYNC_MICROS);
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
s = file->Sync();
}
}
Expand All @@ -211,7 +215,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,

if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(), soptions,
Iterator* it = table_cache->NewIterator(ReadOptions(), env_options,
internal_comparator, meta->fd);
s = it->status();
delete it;
Expand Down
11 changes: 8 additions & 3 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"

namespace rocksdb {

Expand All @@ -26,22 +27,26 @@ class TableBuilder;
class WritableFile;

extern TableBuilder* NewTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type);
const ImmutableCFOptions& options,
const InternalKeyComparator& internal_comparator,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts);

// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
// *meta will be filled with metadata about the generated table.
// If no data is present in *iter, meta->file_size will be set to
// zero, and no Table file will be produced.
extern Status BuildTable(const std::string& dbname, Env* env,
const Options& options, const EnvOptions& soptions,
const ImmutableCFOptions& options,
const EnvOptions& env_options,
TableCache* table_cache, Iterator* iter,
FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts,
const Env::IOPriority io_priority = Env::IO_HIGH);

} // namespace rocksdb
15 changes: 8 additions & 7 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& storage_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set)
: id_(id),
name_(name),
Expand All @@ -188,6 +188,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
dropped_(false),
internal_comparator_(options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, options)),
ioptions_(options_),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
Expand All @@ -204,7 +205,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
if (dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(options_.num_levels, db_options->env, this));
table_cache_.reset(new TableCache(&options_, storage_options, table_cache));
table_cache_.reset(new TableCache(ioptions_, env_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
Expand Down Expand Up @@ -306,7 +307,7 @@ void ColumnFamilyData::RecalculateWriteStallRateLimitsConditions() {
}

const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->storage_options_);
return &(column_family_set_->env_options_);
}

void ColumnFamilyData::SetCurrent(Version* current) {
Expand Down Expand Up @@ -462,16 +463,16 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {

ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& storage_options,
const EnvOptions& env_options,
Cache* table_cache)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
storage_options_, nullptr)),
env_options_, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
storage_options_(storage_options),
env_options_(env_options),
table_cache_(table_cache),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
Expand Down Expand Up @@ -537,7 +538,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, table_cache_, options,
db_options_, storage_options_, this);
db_options_, env_options_, this);
Lock();
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
Expand Down
13 changes: 8 additions & 5 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ class ColumnFamilyData {
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }

// thread-safe
// TODO(ljin): make this API thread-safe once we allow updating options_
const Options* options() const { return &options_; }
// thread-safe
const EnvOptions* soptions() const;
const ImmutableCFOptions* ioptions() const { return &ioptions_; }

InternalStats* internal_stats() { return internal_stats_.get(); }

Expand Down Expand Up @@ -251,7 +253,7 @@ class ColumnFamilyData {
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& storage_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set);

// Recalculate some small conditions, which are changed only during
Expand All @@ -272,7 +274,8 @@ class ColumnFamilyData {

const InternalKeyComparator internal_comparator_;

Options const options_;
const Options options_;
const ImmutableCFOptions ioptions_;

std::unique_ptr<TableCache> table_cache_;

Expand Down Expand Up @@ -367,7 +370,7 @@ class ColumnFamilySet {
};

ColumnFamilySet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& storage_options, Cache* table_cache);
const EnvOptions& env_options, Cache* table_cache);
~ColumnFamilySet();

ColumnFamilyData* GetDefault() const;
Expand Down Expand Up @@ -420,7 +423,7 @@ class ColumnFamilySet {

const std::string db_name_;
const DBOptions* const db_options_;
const EnvOptions storage_options_;
const EnvOptions env_options_;
Cache* table_cache_;
std::atomic_flag spin_lock_;
};
Expand Down
37 changes: 20 additions & 17 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
default_interval_to_delete_obsolete_WAL_(600),
flush_on_destroy_(false),
delayed_writes_(0),
storage_options_(options),
env_options_(options),
bg_work_gate_closed_(false),
refitting_level_(false),
opened_successfully_(false) {
Expand All @@ -372,7 +372,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
options_.table_cache_remove_scan_count_limit);

versions_.reset(
new VersionSet(dbname_, &options_, storage_options_, table_cache_.get()));
new VersionSet(dbname_, &options_, env_options_, table_cache_.get()));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));

Expand Down Expand Up @@ -453,7 +453,7 @@ Status DBImpl::NewDB() {
const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(storage_options_));
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -1075,7 +1075,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
};

unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, storage_options_);
Status status = env_->NewSequentialFile(fname, &file, env_options_);

if (!status.ok()) {
return status;
Expand Down Expand Up @@ -1275,7 +1275,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// Open the log file
std::string fname = LogFileName(options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, storage_options_);
Status status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
Expand Down Expand Up @@ -1425,10 +1425,11 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
s = BuildTable(dbname_, env_, *cfd->ioptions(), env_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->options()), Env::IO_HIGH);
GetCompressionFlush(*cfd->options()),
cfd->options()->compression_opts, Env::IO_HIGH);
LogFlush(options_.info_log);
mutex_.Lock();
}
Expand Down Expand Up @@ -1495,10 +1496,11 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
Log(options_.info_log, "[%s] Level-0 flush table #%" PRIu64 ": started",
cfd->GetName().c_str(), meta.fd.GetNumber());

s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
s = BuildTable(dbname_, env_, *cfd->ioptions(), env_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->options()), Env::IO_HIGH);
GetCompressionFlush(*cfd->options()),
cfd->options()->compression_opts, Env::IO_HIGH);
LogFlush(options_.info_log);
delete iter;
Log(options_.info_log,
Expand Down Expand Up @@ -2447,7 +2449,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
// Make the output file
std::string fname = TableFileName(options_.db_paths, file_number,
compact->compaction->GetOutputPathId());
Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);

if (s.ok()) {
compact->outfile->SetIOPriority(Env::IO_LOW);
Expand All @@ -2456,8 +2458,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {

ColumnFamilyData* cfd = compact->compaction->column_family_data();
compact->builder.reset(NewTableBuilder(
*cfd->options(), cfd->internal_comparator(), compact->outfile.get(),
compact->compaction->OutputCompressionType()));
*cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
compact->compaction->OutputCompressionType(),
cfd->options()->compression_opts));
}
LogFlush(options_.info_log);
return s;
Expand Down Expand Up @@ -2506,7 +2509,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
ColumnFamilyData* cfd = compact->compaction->column_family_data();
FileDescriptor fd(output_number, output_path_id, current_bytes);
Iterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), storage_options_, cfd->internal_comparator(), fd);
ReadOptions(), env_options_, cfd->internal_comparator(), fd);
s = iter->status();
delete iter;
if (s.ok()) {
Expand Down Expand Up @@ -3355,7 +3358,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
super_version->current->AddIterators(options, env_options_,
&merge_iter_builder);
internal_iter = merge_iter_builder.Finish();
} else {
Expand All @@ -3366,7 +3369,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &iterator_list);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
super_version->current->AddIterators(options, env_options_,
&iterator_list);
internal_iter = NewMergingIterator(&cfd->internal_comparator(),
&iterator_list[0], iterator_list.size());
Expand Down Expand Up @@ -4377,7 +4380,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
if (creating_new_log) {
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
&lfile,
env_->OptimizeForLogWrite(storage_options_));
env_->OptimizeForLogWrite(env_options_));
if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
Expand Down Expand Up @@ -4615,7 +4618,7 @@ Status DBImpl::GetUpdatesSince(
return s;
}
iter->reset(new TransactionLogIteratorImpl(options_.wal_dir, &options_,
read_options, storage_options_,
read_options, env_options_,
seq, std::move(wal_files), this));
return (*iter)->status();
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ class DBImpl : public DB {
int delayed_writes_;

// The options to access storage files
const EnvOptions storage_options_;
const EnvOptions env_options_;

// A value of true temporarily disables scheduling of background work
bool bg_work_gate_closed_;
Expand Down

0 comments on commit 5665e5e

Please sign in to comment.