Skip to content

Commit

Permalink
add subcompaction listener (#218)
Browse files Browse the repository at this point in the history
Add listener for subcompaction, only called when there are actually multiple subcompactions executed in parallel.

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie committed Dec 21, 2020
1 parent a487d25 commit 2dfbfa5
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 0 deletions.
24 changes: 24 additions & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -233,6 +234,8 @@ struct CompactionJob::SubcompactionState {

return false;
}

bool IsPartialCompaction() { return start || end; }
};

// Maintains state for the entire compaction
Expand Down Expand Up @@ -812,6 +815,19 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();

#ifndef ROCKSDB_LITE
SubcompactionJobInfo info;
if (sub_compact->IsPartialCompaction()) {
info.cf_name = cfd->GetName();
info.thread_id = env_->GetThreadID();
info.base_input_level = sub_compact->compaction->start_level();
info.output_level = sub_compact->compaction->output_level();
for (auto listener : db_options_.listeners) {
listener->OnSubcompactionBegin(info);
}
}
#endif // !ROCKSDB_LITE

// Create compaction filter and fail the compaction if
// IgnoreSnapshots() = false because it is not supported anymore
const CompactionFilter* compaction_filter =
Expand Down Expand Up @@ -1070,6 +1086,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->c_iter.reset();
input.reset();
sub_compact->status = status;
#ifndef ROCKSDB_LITE
info.status = status;
if (sub_compact->IsPartialCompaction()) {
for (auto listener : db_options_.listeners) {
listener->OnSubcompactionCompleted(info);
}
}
#endif // !ROCKSDB_LITE
}

void CompactionJob::RecordDroppedKeys(
Expand Down
53 changes: 53 additions & 0 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,59 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
}
}

class TestSubcompactionListener : public EventListener {
public:
TestSubcompactionListener() : compacted_(0) {}
void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) override {
compacted_.fetch_add(1);
}

std::atomic<int> compacted_;
};

TEST_F(EventListenerTest, OnSingleDBSubcompactionTest) {
const int kNumL0Files = 8;

Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
options.max_subcompactions = 2;
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = k110KB * 2;
options.target_file_size_base = options.write_buffer_size;
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());

TestSubcompactionListener* listener = new TestSubcompactionListener();
options.listeners.emplace_back(listener);
Reopen(options);
ASSERT_OK(Put("k1", std::string(90000, 'k')));
ASSERT_OK(Put("k4", std::string(90000, 'k')));
ASSERT_OK(dbfull()->CompactRange(
CompactRangeOptions(), db_->DefaultColumnFamily(), nullptr, nullptr));
// Three large files overlapped with L1 will trigger at least two
// subcompactions.
ASSERT_OK(Put("k1", std::string(90000, 'k')));
ASSERT_OK(Put("k2", std::string(90000, 'k')));
ASSERT_OK(Flush());
ASSERT_OK(Put("k2", std::string(90000, 'k')));
ASSERT_OK(Put("k3", std::string(90000, 'k')));
ASSERT_OK(Flush());
ASSERT_OK(Put("k3", std::string(90000, 'k')));
ASSERT_OK(Put("k4", std::string(90000, 'k')));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->RunManualCompaction(
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd(),
0 /* input_level */, 1 /* output_level */, CompactRangeOptions(),
nullptr /* begin */, nullptr /* end */, true /* exclusive */,
true /* disallow_trivial_move */,
port::kMaxUint64 /* max_file_num_to_ignore */));
ASSERT_EQ(listener->compacted_.load(), 2);
}

// This simple Listener can only handle one flush at a time.
class TestFlushListener : public EventListener {
public:
Expand Down
27 changes: 27 additions & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,23 @@ struct CompactionJobInfo {
CompactionJobStats stats;
};

struct SubcompactionJobInfo {
SubcompactionJobInfo() = default;

// the id of the column family where the compaction happened.
uint32_t cf_id;
// the name of the column family where the compaction happened.
std::string cf_name;
// the status indicating whether the compaction was successful or not.
Status status;
// the id of the thread that completed this compaction job.
uint64_t thread_id;
// the smallest input level of the compaction.
int base_input_level;
// the output level of the compaction.
int output_level;
};

struct MemTableInfo {
// the name of the column family to which memtable belongs
std::string cf_name;
Expand Down Expand Up @@ -345,6 +362,16 @@ class EventListener {
virtual void OnCompactionCompleted(DB* /*db*/,
const CompactionJobInfo& /*ci*/) {}

// A callback function for RocksDB which will be called each time when
// a registered RocksDB uses multiple subcompactions to compact a file. The
// callback is called by each subcompaction and in the same thread.
virtual void OnSubcompactionBegin(const SubcompactionJobInfo& /*si*/) {}

// A callback function for RocksDB which will be called each time when
// a registered RocksDB uses multiple subcompactions to compact a file. The
// callback is called by each subcompaction and in the same thread.
virtual void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) {}

// A callback function for RocksDB which will be called whenever
// a SST file is created. Different from OnCompactionCompleted and
// OnFlushCompleted, this callback is designed for external logging
Expand Down

0 comments on commit 2dfbfa5

Please sign in to comment.