diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_inserted_portions.cpp b/ydb/core/tx/columnshard/normalizer/portion/clean_inserted_portions.cpp new file mode 100644 index 000000000000..0c3c62f26519 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_inserted_portions.cpp @@ -0,0 +1,81 @@ +#include "clean_inserted_portions.h" + +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions { + +class TCleanInsertedPortionsNormalizer::TNormalizerResult: public INormalizerChanges { + std::vector InsertedPortions; + +public: + TNormalizerResult(std::vector&& portions) + : InsertedPortions(std::move(portions)) { + } + + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& normController) const override { + NOlap::TBlobManagerDb blobManagerDb(txc.DB); + TDbWrapper db(txc.DB, nullptr); + for (auto&& portionInfo : InsertedPortions) { + auto copy = portionInfo.GetPortionInfo().MakeCopy(); + copy->SetRemoveSnapshot(TSnapshot(1, 1)); + db.WritePortion(portionInfo.GetBlobIds(), *copy); + } + if (InsertedPortions.size()) { + NIceDb::TNiceDb db(txc.DB); + normController.AddNormalizerEvent(db, "REMOVE_PORTIONS", DebugString()); + } + return true; + } + + void ApplyOnComplete(const TNormalizationController& /* normController */) const override { + } + + ui64 GetSize() const override { + return InsertedPortions.size(); + } + + TString DebugString() const override { + TStringBuilder sb; + ui64 recordsCount = 0; + sb << "path_ids=["; + for (auto&& p : InsertedPortions) { + sb << p.GetPortionInfo().GetPathId() << ","; + recordsCount += p.GetPortionInfo().GetRecordsCount(); + } + sb << "]"; + sb << ";records_count=" << recordsCount; + sb << ";inserted_portions_count=" << InsertedPortions.size(); + return sb; + } +}; + +bool TCleanInsertedPortionsNormalizer::CheckPortion(const NColumnShard::TTablesManager& /*tablesManager*/, const TPortionDataAccessor& /*portionInfo*/) const { + return false; +} + +INormalizerTask::TPtr TCleanInsertedPortionsNormalizer::BuildTask( + std::vector&& portions, std::shared_ptr>) const { + std::vector insertedPortions; + for (auto&& portion : portions) { + if (portion.GetPortionInfo().GetProduced() == NPortion::EProduced::INSERTED) { + insertedPortions.push_back(std::move(portion)); + } + } + auto taskResult = std::make_shared(std::move(insertedPortions)); + ACFL_WARN("normalizer", "TCleanInsertedPortionsNormalizer")("message", taskResult->DebugString()); + ACFL_WARN("normalizer", "TCleanInsertedPortionsNormalizer")("all portions", portions.size()); + return std::make_shared(taskResult); +} + +TConclusion TCleanInsertedPortionsNormalizer::DoInitImpl(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext&) { + return true; +} + +} // namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean_inserted_portions.h b/ydb/core/tx/columnshard/normalizer/portion/clean_inserted_portions.h new file mode 100644 index 000000000000..647bdd36830f --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/clean_inserted_portions.h @@ -0,0 +1,50 @@ +#pragma once + +#include "normalizer.h" + +#include +#include + +namespace NKikimr::NColumnShard { +class TTablesManager; +} + +namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions { + +class TCleanInsertedPortionsNormalizer: public TPortionsNormalizerBase { +public: + static TString GetClassNameStatic() { + return "CleanInsertedPortions"; + } + +private: + static inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +public: + class TNormalizerResult; + +public: + virtual std::optional DoGetEnumSequentialId() const override { + return {}; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + TCleanInsertedPortionsNormalizer(const TNormalizationController::TInitContext& info) + : TPortionsNormalizerBase(info) { + } + + virtual std::set GetColumnsFilter(const ISnapshotSchema::TPtr& /*schema*/) const override { + return {}; + } + + virtual INormalizerTask::TPtr BuildTask( + std::vector&& portions, std::shared_ptr> schemas) const override; + virtual TConclusion DoInitImpl(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + + virtual bool CheckPortion(const NColumnShard::TTablesManager& tablesManager, const TPortionDataAccessor& portionInfo) const override; +}; + +} // namespace NKikimr::NOlap::NNormalizer::NCleanInsertedPortions diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index 2c2b9992b4d4..e0af3e025654 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -2,24 +2,25 @@ LIBRARY() SRCS( normalizer.cpp - GLOBAL portion.cpp - GLOBAL chunks.cpp - GLOBAL clean.cpp - GLOBAL clean_empty.cpp GLOBAL broken_blobs.cpp - GLOBAL special_cleaner.cpp + GLOBAL chunks.cpp GLOBAL chunks_actualization.cpp - GLOBAL restore_v1_chunks.cpp - GLOBAL restore_v2_chunks.cpp - GLOBAL leaked_blobs.cpp - GLOBAL clean_deprecated_snapshot.cpp GLOBAL chunks_v0_meta.cpp + GLOBAL clean.cpp + GLOBAL clean_deprecated_snapshot.cpp + GLOBAL clean_empty.cpp GLOBAL clean_index_columns.cpp - GLOBAL clean_unused_tables_template.cpp + GLOBAL clean_inserted_portions.cpp GLOBAL clean_ttl_preset_setting_info.cpp GLOBAL clean_ttl_preset_setting_version_info.cpp + GLOBAL clean_unused_tables_template.cpp GLOBAL copy_blob_ids_to_v2.cpp + GLOBAL leaked_blobs.cpp + GLOBAL portion.cpp GLOBAL restore_appearance_snapshot.cpp + GLOBAL restore_v1_chunks.cpp + GLOBAL restore_v2_chunks.cpp + GLOBAL special_cleaner.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index fe5cd33f0263..4b40cddbb378 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -100,6 +100,12 @@ class TPortionsCleaner: public NYDBTest::ILocalDBModifier { } }; +class TInsertedPortionsCleaner: public NYDBTest::ILocalDBModifier { +public: + virtual void Apply(NTabletFlatExecutor::TTransactionContext&) const override { + } +}; + class TEmptyPortionsCleaner: public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { @@ -329,6 +335,25 @@ Y_UNIT_TEST_SUITE(Normalizers) { }; TestNormalizerImpl(TLocalNormalizerChecker()); } + + Y_UNIT_TEST(InsertedPortionsCleanerNormalizer) { + class TLocalNormalizerChecker: public TNormalizerChecker { + public: + virtual ui64 RecordsCountAfterReboot(const ui64 /*initialRecordsCount*/) const override { + return 0; + } + virtual void CorrectFeatureFlagsOnStart(TFeatureFlags& /* featuresFlags */) const override { + } + virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override { + { + auto* repair = columnShardConfig.MutableRepairs()->Add(); + repair->SetClassName("CleanInsertedPortions"); + repair->SetDescription("Removing inserted portions"); + } + } + }; + TestNormalizerImpl(TLocalNormalizerChecker()); + } Y_UNIT_TEST(SchemaVersionsNormalizer) { class TLocalNormalizerChecker: public TNormalizerChecker {