Skip to content

Commit

Permalink
Add path to WritableFileWriter. (facebook#4039)
Browse files Browse the repository at this point in the history
Summary:
We want to sample the file I/O issued by RocksDB and report the function calls. This requires us to include the file paths otherwise it's hard to tell what has been going on.
Pull Request resolved: facebook#4039

Differential Revision: D8670178

Pulled By: riversand963

fbshipit-source-id: 97ee806d1c583a2983e28e213ee764dc6ac28f7a
  • Loading branch information
riversand963 authored and rcane committed Sep 13, 2018
1 parent 7a67078 commit aa62fc6
Show file tree
Hide file tree
Showing 50 changed files with 157 additions and 140 deletions.
4 changes: 2 additions & 2 deletions db/builder.cc
Expand Up @@ -121,8 +121,8 @@ Status BuildTable(
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);

file_writer.reset(new WritableFileWriter(std::move(file), env_options,
ioptions.statistics));
file_writer.reset(new WritableFileWriter(
std::move(file), fname, env_options, ioptions.statistics));
builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
Expand Down
6 changes: 4 additions & 2 deletions db/c.cc
Expand Up @@ -2433,11 +2433,13 @@ void rocksdb_options_set_max_write_buffer_number_to_maintain(
opt->rep.max_write_buffer_number_to_maintain = n;
}

void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt, unsigned char v) {
void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.enable_pipelined_write = v;
}

void rocksdb_options_set_max_subcompactions(rocksdb_options_t* opt, uint32_t n) {
void rocksdb_options_set_max_subcompactions(rocksdb_options_t* opt,
uint32_t n) {
opt->rep.max_subcompactions = n;
}

Expand Down
4 changes: 2 additions & 2 deletions db/compaction_iterator.cc
Expand Up @@ -18,8 +18,8 @@ CompactionIterator::CompactionIterator(
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
RangeDelAggregator* range_del_agg, const Compaction* compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum)
: CompactionIterator(
Expand Down
5 changes: 2 additions & 3 deletions db/compaction_iterator_test.cc
Expand Up @@ -247,9 +247,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
earliest_write_conflict_snapshot, snapshot_checker_.get(),
Env::Default(), false /* report_detailed_time */,
false, range_del_agg_.get(), std::move(compaction), filter,
&shutting_down_));
Env::Default(), false /* report_detailed_time */, false,
range_del_agg_.get(), std::move(compaction), filter, &shutting_down_));
}

void AddSnapshot(SequenceNumber snapshot,
Expand Down
5 changes: 3 additions & 2 deletions db/compaction_job.cc
Expand Up @@ -1464,8 +1464,9 @@ Status CompactionJob::OpenCompactionOutputFile(
writable_file->SetWriteLifeTimeHint(write_hint_);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
sub_compact->outfile.reset(new WritableFileWriter(
std::move(writable_file), env_options_, db_options_.statistics.get()));
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, env_options_,
db_options_.statistics.get()));

// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_job_test.cc
Expand Up @@ -205,7 +205,7 @@ class CompactionJobTest : public testing::Test {
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_));
new WritableFileWriter(std::move(file), manifest, env_options_));
{
log::Writer log(std::move(file_writer), 0, false);
std::string record;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl.cc
Expand Up @@ -3143,7 +3143,8 @@ Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
return s;
}

Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
const Slice& key) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl.h
Expand Up @@ -22,9 +22,9 @@
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
Expand Down
14 changes: 7 additions & 7 deletions db/db_impl_open.cc
Expand Up @@ -232,7 +232,7 @@ Status DBImpl::NewDB() {
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
new WritableFileWriter(std::move(file), manifest, env_options));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
Expand Down Expand Up @@ -1075,19 +1075,19 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl->immutable_db_options_.env->OptimizeForLogWrite(
soptions, BuildDBOptions(impl->immutable_db_options_,
impl->mutable_db_options_));
s = NewWritableFile(
impl->immutable_db_options_.env,
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
&lfile, opt_env_options);
std::string log_fname =
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number);
s = NewWritableFile(impl->immutable_db_options_.env, log_fname, &lfile,
opt_env_options);
if (s.ok()) {
lfile->SetWriteLifeTimeHint(write_hint);
lfile->SetPreallocationBlockSize(
impl->GetWalPreallocateBlockSize(max_write_buffer_size));
{
InstrumentedMutexLock wl(&impl->log_write_mutex_);
impl->logfile_number_ = new_log_number;
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_options));
unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_env_options));
impl->logs_.emplace_back(
new_log_number,
new log::Writer(
Expand Down
16 changes: 8 additions & 8 deletions db/db_impl_write.cc
Expand Up @@ -1319,21 +1319,21 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
auto write_hint = CalculateWALWriteHint();
mutex_.Unlock();
{
std::string log_fname =
LogFileName(immutable_db_options_.wal_dir, new_log_number);
if (creating_new_log) {
EnvOptions opt_env_opt =
env_->OptimizeForLogWrite(env_options_, db_options);
if (recycle_log_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"reusing log %" PRIu64 " from recycle list\n",
recycle_log_number);
s = env_->ReuseWritableFile(
LogFileName(immutable_db_options_.wal_dir, new_log_number),
LogFileName(immutable_db_options_.wal_dir, recycle_log_number),
&lfile, opt_env_opt);
std::string old_log_fname =
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
s = env_->ReuseWritableFile(log_fname, old_log_fname, &lfile,
opt_env_opt);
} else {
s = NewWritableFile(
env_, LogFileName(immutable_db_options_.wal_dir, new_log_number),
&lfile, opt_env_opt);
s = NewWritableFile(env_, log_fname, &lfile, opt_env_opt);
}
if (s.ok()) {
// Our final size should be less than write_buffer_size
Expand All @@ -1344,7 +1344,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
lfile->SetPreallocationBlockSize(preallocate_block_size);
lfile->SetWriteLifeTimeHint(write_hint);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt));
new WritableFileWriter(std::move(lfile), log_fname, opt_env_opt));
new_log = new log::Writer(
std::move(file_writer), new_log_number,
immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
Expand Down
2 changes: 1 addition & 1 deletion db/db_wal_test.cc
Expand Up @@ -815,7 +815,7 @@ class RecoveryTestHelper {
unique_ptr<WritableFile> file;
ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
new WritableFileWriter(std::move(file), fname, env_options));
current_log_writer.reset(
new log::Writer(std::move(file_writer), current_log_number,
db_options.recycle_log_file_num > 0));
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job_test.cc
Expand Up @@ -62,7 +62,7 @@ class FlushJobTest : public testing::Test {
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions()));
new WritableFileWriter(std::move(file), manifest, EnvOptions()));
{
log::Writer log(std::move(file_writer), 0, false);
std::string record;
Expand Down
5 changes: 3 additions & 2 deletions db/log_test.cc
Expand Up @@ -159,7 +159,7 @@ class LogTest : public ::testing::TestWithParam<int> {
LogTest()
: reader_contents_(),
dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_))),
new test::StringSink(&reader_contents_), "" /* don't care */)),
source_holder_(test::GetSequentialFileReader(
new StringSource(reader_contents_), "" /* file name */)),
writer_(std::move(dest_holder_), 123, GetParam()),
Expand Down Expand Up @@ -718,7 +718,8 @@ TEST_P(LogTest, Recycle) {
Write("xxxxxxxxxxxxxxxx");
}
unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
new test::OverwritingStringSink(get_reader_contents())));
new test::OverwritingStringSink(get_reader_contents()),
"" /* don't care */));
Writer recycle_writer(std::move(dest_holder), 123, true);
recycle_writer.AddRecord(Slice("foooo"));
recycle_writer.AddRecord(Slice("bar"));
Expand Down
4 changes: 3 additions & 1 deletion db/range_del_aggregator.cc
Expand Up @@ -76,7 +76,9 @@ class UncollapsedRangeDelMap : public RangeDelMap {
return false;
}

void AddTombstone(RangeTombstone tombstone) override { rep_.emplace(tombstone); }
void AddTombstone(RangeTombstone tombstone) override {
rep_.emplace(tombstone);
}

size_t Size() const override { return rep_.size(); }

Expand Down
5 changes: 3 additions & 2 deletions db/snapshot_checker.h
Expand Up @@ -19,8 +19,9 @@ class SnapshotChecker {
class DisableGCSnapshotChecker : public SnapshotChecker {
public:
virtual ~DisableGCSnapshotChecker() {}
virtual bool IsInSnapshot(SequenceNumber /*sequence*/,
SequenceNumber /*snapshot_sequence*/) const override {
virtual bool IsInSnapshot(
SequenceNumber /*sequence*/,
SequenceNumber /*snapshot_sequence*/) const override {
// By returning false, we prevent all the values from being GCed
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion db/table_properties_collector_test.cc
Expand Up @@ -46,7 +46,8 @@ void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions,
std::unique_ptr<WritableFileWriter>* writable,
std::unique_ptr<TableBuilder>* builder) {
unique_ptr<WritableFile> wf(new test::StringSink);
writable->reset(new WritableFileWriter(std::move(wf), EnvOptions()));
writable->reset(
new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
int unknown_level = -1;
builder->reset(NewTableBuilder(
ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories,
Expand Down
11 changes: 6 additions & 5 deletions db/version_set.cc
Expand Up @@ -2901,16 +2901,17 @@ Status VersionSet::ProcessManifestWrites(
// create new manifest file
ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
pending_manifest_file_number_);
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
unique_ptr<WritableFile> descriptor_file;
s = NewWritableFile(
env_, DescriptorFileName(dbname_, pending_manifest_file_number_),
&descriptor_file, opt_env_opts);
s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
opt_env_opts);
if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);

unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_env_opts));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get());
Expand Down
2 changes: 1 addition & 1 deletion db/version_set_test.cc
Expand Up @@ -566,7 +566,7 @@ class ManifestWriterTest : public testing::Test {
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_));
new WritableFileWriter(std::move(file), manifest, env_options_));
{
log::Writer log(std::move(file_writer), 0, false);
std::string record;
Expand Down
4 changes: 2 additions & 2 deletions db/wal_manager_test.cc
Expand Up @@ -79,7 +79,7 @@ class WalManagerTest : public testing::Test {
unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options_));
new WritableFileWriter(std::move(file), fname, env_options_));
current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
}

Expand Down Expand Up @@ -130,7 +130,7 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
ASSERT_EQ(s, 0U);

unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions()));
new WritableFileWriter(std::move(file), path, EnvOptions()));
log::Writer writer(std::move(file_writer), 1,
db_options_.recycle_log_file_num > 0);
WriteBatch batch;
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/slice.h
Expand Up @@ -48,7 +48,7 @@ class Slice {
#ifdef __cpp_lib_string_view
// Create a slice that refers to the same contents as "sv"
/* implicit */
Slice(std::string_view sv) : data_(sv.data()), size_(sv.size()) { }
Slice(std::string_view sv) : data_(sv.data()), size_(sv.size()) {}
#endif

// Create a slice that refers to s[0,strlen(s)-1]
Expand Down
2 changes: 1 addition & 1 deletion options/options_parser.cc
Expand Up @@ -49,7 +49,7 @@ Status PersistRocksDBOptions(const DBOptions& db_opt,
return s;
}
unique_ptr<WritableFileWriter> writable;
writable.reset(new WritableFileWriter(std::move(wf), EnvOptions(),
writable.reset(new WritableFileWriter(std::move(wf), file_name, EnvOptions(),
nullptr /* statistics */));

std::string options_file_content;
Expand Down
23 changes: 10 additions & 13 deletions port/win/env_win.cc
Expand Up @@ -716,30 +716,27 @@ Status WinEnvIO::LinkFile(const std::string& src,
return result;
}

Status WinEnvIO::NumFileLinks(const std::string& fname,
uint64_t* count) {
Status WinEnvIO::NumFileLinks(const std::string& fname, uint64_t* count) {
Status s;
HANDLE handle = ::CreateFileA(fname.c_str(), 0,
FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS,
NULL);
HANDLE handle = ::CreateFileA(
fname.c_str(), 0, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, NULL);

if (INVALID_HANDLE_VALUE == handle) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"NumFileLinks: " + fname, lastError);
s = IOErrorFromWindowsError("NumFileLinks: " + fname, lastError);
return s;
}
UniqueCloseHandlePtr handle_guard(handle, CloseHandleFunc);
FILE_STANDARD_INFO standard_info;
if (0 != GetFileInformationByHandleEx(handle, FileStandardInfo,
&standard_info, sizeof(standard_info))) {
if (0 != GetFileInformationByHandleEx(handle, FileStandardInfo,
&standard_info,
sizeof(standard_info))) {
*count = standard_info.NumberOfLinks;
} else {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("GetFileInformationByHandleEx: " + fname, lastError);
s = IOErrorFromWindowsError("GetFileInformationByHandleEx: " + fname,
lastError);
}
return s;
}
Expand Down
5 changes: 2 additions & 3 deletions port/win/env_win.h
Expand Up @@ -145,7 +145,7 @@ class WinEnvIO {
const std::string& target);

virtual Status NumFileLinks(const std::string& /*fname*/,
uint64_t* /*count*/);
uint64_t* /*count*/);

virtual Status AreFilesSame(const std::string& first,
const std::string& second, bool* res);
Expand Down Expand Up @@ -271,8 +271,7 @@ class WinEnv : public Env {
Status LinkFile(const std::string& src,
const std::string& target) override;

Status NumFileLinks(const std::string& fname,
uint64_t* count) override;
Status NumFileLinks(const std::string& fname, uint64_t* count) override;

Status AreFilesSame(const std::string& first,
const std::string& second, bool* res) override;
Expand Down
2 changes: 1 addition & 1 deletion table/block_builder.h
Expand Up @@ -60,7 +60,7 @@ class BlockBuilder {

private:
const int block_restart_interval_;
//TODO(myabandeh): put it into a separate IndexBlockBuilder
// TODO(myabandeh): put it into a separate IndexBlockBuilder
const bool use_delta_encoding_;
// Refer to BlockIter::DecodeCurrentValue for format of delta encoded values
const bool use_value_delta_encoding_;
Expand Down
8 changes: 4 additions & 4 deletions table/block_test.cc
Expand Up @@ -70,10 +70,10 @@ void GenerateRandomKVs(std::vector<std::string> *keys,

// Same as GenerateRandomKVs but the values are BlockHandle
void GenerateRandomKBHs(std::vector<std::string> *keys,
std::vector<BlockHandle> *values, const int from,
const int len, const int step = 1,
const int padding_size = 0,
const int keys_share_prefix = 1) {
std::vector<BlockHandle> *values, const int from,
const int len, const int step = 1,
const int padding_size = 0,
const int keys_share_prefix = 1) {
Random rnd(302);
uint64_t offset = 0;

Expand Down

0 comments on commit aa62fc6

Please sign in to comment.