diff --git a/resources/gflags.json b/resources/gflags.json index 3d6a2b43286..bc928e7b306 100644 --- a/resources/gflags.json +++ b/resources/gflags.json @@ -7,7 +7,6 @@ "clean_wal_interval_secs", "wal_ttl", "clean_wal_interval_secs", - "custom_filter_interval_secs", "accept_partial_success", "system_memory_high_watermark_ratio", "num_rows_to_check_memory", diff --git a/src/common/meta/GflagsManager.cpp b/src/common/meta/GflagsManager.cpp index b1e100aabf0..76c14412211 100644 --- a/src/common/meta/GflagsManager.cpp +++ b/src/common/meta/GflagsManager.cpp @@ -58,7 +58,6 @@ std::unordered_map> GflagsManager {"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}}, {"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}}, {"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, - {"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"accept_partial_success", {cpp2::ConfigMode::MUTABLE, false}}, {"rocksdb_db_options", {cpp2::ConfigMode::MUTABLE, true}}, diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index 6e8df267da4..02716567c3b 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -34,13 +34,15 @@ class KVFilter { /** * @brief Whether remove the key during compaction * + * @param level * @param spaceId * @param key * @param val * @return true Key will not be removed * @return false Key will be removed */ - virtual bool filter(GraphSpaceID spaceId, + virtual bool filter(int level, + GraphSpaceID spaceId, const folly::StringPiece& key, const folly::StringPiece& val) const = 0; }; diff --git a/src/kvstore/CompactionFilter.h b/src/kvstore/CompactionFilter.h index bb2830f5282..cb73cacc9b3 100644 --- a/src/kvstore/CompactionFilter.h +++ b/src/kvstore/CompactionFilter.h @@ -34,7 +34,7 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter { /** * @brief whether remove the key during compaction * - * @param level Levels of key in rocksdb, not used for now + * @param level Levels of key in rocksdb * @param key Rocksdb key * @param val Rocksdb val * @return true Key will not be removed @@ -45,8 +45,8 @@ class KVCompactionFilter final : public rocksdb::CompactionFilter { const rocksdb::Slice& val, std::string*, bool*) const override { - UNUSED(level); - return kvFilter_->filter(spaceId_, + return kvFilter_->filter(level, + spaceId_, folly::StringPiece(key.data(), key.size()), folly::StringPiece(val.data(), val.size())); } @@ -77,21 +77,14 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory { */ std::unique_ptr CreateCompactionFilter( const rocksdb::CompactionFilter::Context& context) override { - auto now = time::WallClock::fastNowInSec(); if (context.is_full_compaction || context.is_manual_compaction) { LOG(INFO) << "Do full/manual compaction!"; - lastRunCustomFilterTimeSec_ = now; - return std::make_unique(spaceId_, createKVFilter()); } else { - if (FLAGS_custom_filter_interval_secs >= 0 && - now - lastRunCustomFilterTimeSec_ > FLAGS_custom_filter_interval_secs) { - LOG(INFO) << "Do custom minor compaction!"; - lastRunCustomFilterTimeSec_ = now; - return std::make_unique(spaceId_, createKVFilter()); - } - LOG(INFO) << "Do default minor compaction!"; - return std::unique_ptr(nullptr); + // No worry, by default flush will not go through the custom compaction filter. + // See CompactionFilterFactory::ShouldFilterTableFileCreation. + LOG(INFO) << "Do automatic or periodic compaction!"; } + return std::make_unique(spaceId_, createKVFilter()); } const char* Name() const override { @@ -102,7 +95,6 @@ class KVCompactionFilterFactory : public rocksdb::CompactionFilterFactory { private: GraphSpaceID spaceId_; - int32_t lastRunCustomFilterTimeSec_ = 0; }; /** diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 7d87695a1c4..58e6ad31be6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -20,10 +20,6 @@ #include "kvstore/listener/elasticsearch/ESListener.h" DEFINE_string(engine_type, "rocksdb", "rocksdb, memory..."); -DEFINE_int32(custom_filter_interval_secs, - 24 * 3600, - "interval to trigger custom compaction, < 0 means always do " - "default minor compaction"); DEFINE_int32(num_workers, 4, "Number of worker threads"); DEFINE_int32(clean_wal_interval_secs, 600, "interval to trigger clean expired wal"); DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid space when restart"); diff --git a/src/storage/CompactionFilter.h b/src/storage/CompactionFilter.h index 253b82414d4..2c47e1576a9 100644 --- a/src/storage/CompactionFilter.h +++ b/src/storage/CompactionFilter.h @@ -16,6 +16,10 @@ #include "storage/CommonUtils.h" #include "storage/StorageFlags.h" +DEFINE_int32(min_level_for_custom_filter, + 4, + "Minimal level compaction which will go through custom compaction filter"); + namespace nebula { namespace storage { @@ -28,9 +32,15 @@ class StorageCompactionFilter final : public kvstore::KVFilter { CHECK_NOTNULL(schemaMan_); } - bool filter(GraphSpaceID spaceId, + bool filter(int level, + GraphSpaceID spaceId, const folly::StringPiece& key, const folly::StringPiece& val) const override { + if (level < FLAGS_min_level_for_custom_filter) { + // for upper level such as L0/L1, we don't go through the custom + // validation to achieve better performance + return false; + } if (NebulaKeyUtils::isTag(vIdLen_, key)) { return !tagValid(spaceId, key, val); } else if (NebulaKeyUtils::isEdge(vIdLen_, key)) { diff --git a/src/storage/test/CompactionTest.cpp b/src/storage/test/CompactionTest.cpp index 7c4f8cf9079..0130fa2f8d3 100644 --- a/src/storage/test/CompactionTest.cpp +++ b/src/storage/test/CompactionTest.cpp @@ -18,6 +18,8 @@ #include "storage/test/QueryTestUtils.h" #include "storage/test/TestUtils.h" +DECLARE_int32(min_level_for_custom_filter); + namespace nebula { namespace storage { @@ -167,6 +169,7 @@ TEST(CompactionFilterTest, InvalidSchemaFilterTest) { adhoc->removeTagSchema(spaceId, tagId); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(spaceId); @@ -215,6 +218,7 @@ TEST(CompactionFilterTest, TTLFilterDataExpiredTest) { sleep(FLAGS_mock_ttl_duration + 1); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(spaceId); @@ -262,6 +266,7 @@ TEST(CompactionFilterTest, TTLFilterDataNotExpiredTest) { checkEdgeData(spaceVidLen, spaceId, 102, parts, env, 18); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(spaceId); @@ -323,6 +328,7 @@ TEST(CompactionFilterTest, DropIndexTest) { adIndex->removeTagIndex(spaceId, indexId); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(spaceId); @@ -392,6 +398,7 @@ TEST(CompactionFilterTest, TTLFilterDataIndexExpiredTest) { sleep(FLAGS_mock_ttl_duration + 1); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(spaceId); @@ -460,6 +467,7 @@ TEST(CompactionFilterTest, TTLFilterDataIndexNotExpiredTest) { checkIndexData(spaceId, 102, 6, env, 18); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(spaceId); diff --git a/src/storage/test/IndexWithTTLTest.cpp b/src/storage/test/IndexWithTTLTest.cpp index 86f6e9ae7d6..8efe0b0817d 100644 --- a/src/storage/test/IndexWithTTLTest.cpp +++ b/src/storage/test/IndexWithTTLTest.cpp @@ -24,6 +24,8 @@ #include "storage/mutate/UpdateEdgeProcessor.h" #include "storage/mutate/UpdateVertexProcessor.h" +DECLARE_int32(min_level_for_custom_filter); + namespace nebula { namespace storage { @@ -182,6 +184,7 @@ TEST(IndexWithTTLTest, AddVerticesIndexWithTTL) { sleep(2); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(1); @@ -229,6 +232,7 @@ TEST(IndexWithTTLTest, AddEdgesIndexWithTTL) { sleep(2); LOG(INFO) << "Do compaction"; + FLAGS_min_level_for_custom_filter = -1; auto* ns = dynamic_cast(env->kvstore_); ns->compact(1); diff --git a/tests/admin/test_configs.py b/tests/admin/test_configs.py index 8a28bd1c79b..ab39049b444 100644 --- a/tests/admin/test_configs.py +++ b/tests/admin/test_configs.py @@ -86,7 +86,6 @@ def test_configs(self): ['STORAGE', 'v', 'int', 'MUTABLE', 3], ['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400], ['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0], - ['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400], ['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],