Skip to content

Commit

Permalink
Revert "Change the compaction filter logic to let periodic compaction…
Browse files Browse the repository at this point in the history
… go through custom compaction filter, to gc expired data (#5447)" (#5522)

This reverts commit 37a24f1.
  • Loading branch information
Sophie-Xie committed Apr 23, 2023
1 parent 40c4bc7 commit b541615
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 33 deletions.
1 change: 1 addition & 0 deletions resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
"custom_filter_interval_secs",
"accept_partial_success",
"system_memory_high_watermark_ratio",
"num_rows_to_check_memory",
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"accept_partial_success", {cpp2::ConfigMode::MUTABLE, false}},

{"rocksdb_db_options", {cpp2::ConfigMode::MUTABLE, true}},
Expand Down
4 changes: 1 addition & 3 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ class KVFilter {
/**
* @brief Whether remove the key during compaction
*
* @param level
* @param spaceId
* @param key
* @param val
* @return true Key will not be removed
* @return false Key will be removed
*/
virtual bool filter(int level,
GraphSpaceID spaceId,
virtual bool filter(GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const = 0;
};
Expand Down
22 changes: 15 additions & 7 deletions src/kvstore/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter {
/**
* @brief whether remove the key during compaction
*
* @param level Levels of key in rocksdb
* @param level Levels of key in rocksdb, not used for now
* @param key Rocksdb key
* @param val Rocksdb val
* @return true Key will not be removed
Expand All @@ -45,8 +45,8 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter {
const rocksdb::Slice& val,
std::string*,
bool*) const override {
return kvFilter_->filter(level,
spaceId_,
UNUSED(level);
return kvFilter_->filter(spaceId_,
folly::StringPiece(key.data(), key.size()),
folly::StringPiece(val.data(), val.size()));
}
Expand Down Expand Up @@ -77,14 +77,21 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
*/
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context& context) override {
auto now = time::WallClock::fastNowInSec();
if (context.is_full_compaction || context.is_manual_compaction) {
LOG(INFO) << "Do full/manual compaction!";
lastRunCustomFilterTimeSec_ = now;
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
} else {
// No worry, by default flush will not go through the custom compaction filter.
// See CompactionFilterFactory::ShouldFilterTableFileCreation.
LOG(INFO) << "Do automatic or periodic compaction!";
if (FLAGS_custom_filter_interval_secs >= 0 &&
now - lastRunCustomFilterTimeSec_ > FLAGS_custom_filter_interval_secs) {
LOG(INFO) << "Do custom minor compaction!";
lastRunCustomFilterTimeSec_ = now;
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
}
LOG(INFO) << "Do default minor compaction!";
return std::unique_ptr<rocksdb::CompactionFilter>(nullptr);
}
return std::make_unique<KVCompactionFilter>(spaceId_, createKVFilter());
}

const char* Name() const override {
Expand All @@ -95,6 +102,7 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory {

private:
GraphSpaceID spaceId_;
int32_t lastRunCustomFilterTimeSec_ = 0;
};

/**
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#include "kvstore/listener/elasticsearch/ESListener.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs,
24 * 3600,
"interval to trigger custom compaction, < 0 means always do "
"default minor compaction");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(clean_wal_interval_secs, 600, "interval to trigger clean expired wal");
DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid space when restart");
Expand Down
12 changes: 1 addition & 11 deletions src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
#include "storage/CommonUtils.h"
#include "storage/StorageFlags.h"

DEFINE_int32(min_level_for_custom_filter,
4,
"Minimal level compaction which will go through custom compaction filter");

namespace nebula {
namespace storage {

Expand All @@ -32,15 +28,9 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
CHECK_NOTNULL(schemaMan_);
}

bool filter(int level,
GraphSpaceID spaceId,
bool filter(GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const override {
if (level < FLAGS_min_level_for_custom_filter) {
// for upper level such as L0/L1, we don't go through the custom
// validation to achieve better performance
return false;
}
if (NebulaKeyUtils::isTag(vIdLen_, key)) {
return !tagValid(spaceId, key, val);
} else if (NebulaKeyUtils::isEdge(vIdLen_, key)) {
Expand Down
8 changes: 0 additions & 8 deletions src/storage/test/CompactionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
#include "storage/test/QueryTestUtils.h"
#include "storage/test/TestUtils.h"

DECLARE_int32(min_level_for_custom_filter);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -169,7 +167,6 @@ TEST(CompactionFilterTest, InvalidSchemaFilterTest) {
adhoc->removeTagSchema(spaceId, tagId);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -218,7 +215,6 @@ TEST(CompactionFilterTest, TTLFilterDataExpiredTest) {
sleep(FLAGS_mock_ttl_duration + 1);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -266,7 +262,6 @@ TEST(CompactionFilterTest, TTLFilterDataNotExpiredTest) {
checkEdgeData(spaceVidLen, spaceId, 102, parts, env, 18);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -328,7 +323,6 @@ TEST(CompactionFilterTest, DropIndexTest) {
adIndex->removeTagIndex(spaceId, indexId);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -398,7 +392,6 @@ TEST(CompactionFilterTest, TTLFilterDataIndexExpiredTest) {
sleep(FLAGS_mock_ttl_duration + 1);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down Expand Up @@ -467,7 +460,6 @@ TEST(CompactionFilterTest, TTLFilterDataIndexNotExpiredTest) {
checkIndexData(spaceId, 102, 6, env, 18);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(spaceId);

Expand Down
4 changes: 0 additions & 4 deletions src/storage/test/IndexWithTTLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#include "storage/mutate/UpdateEdgeProcessor.h"
#include "storage/mutate/UpdateVertexProcessor.h"

DECLARE_int32(min_level_for_custom_filter);

namespace nebula {
namespace storage {

Expand Down Expand Up @@ -184,7 +182,6 @@ TEST(IndexWithTTLTest, AddVerticesIndexWithTTL) {
sleep(2);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(1);

Expand Down Expand Up @@ -232,7 +229,6 @@ TEST(IndexWithTTLTest, AddEdgesIndexWithTTL) {
sleep(2);

LOG(INFO) << "Do compaction";
FLAGS_min_level_for_custom_filter = -1;
auto* ns = dynamic_cast<kvstore::NebulaStore*>(env->kvstore_);
ns->compact(1);

Expand Down
1 change: 1 addition & 0 deletions tests/admin/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_configs(self):
['STORAGE', 'v', 'int', 'MUTABLE', 3],
['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400],
['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0],
['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400],
['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],
Expand Down

0 comments on commit b541615

Please sign in to comment.