From 71fbb13fed13a35e398b6612f2cfbdefd28a1130 Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Mon, 29 Sep 2025 17:01:16 +0800 Subject: [PATCH 1/2] simplify checkpoint settings --- src/Checkpoint/CheckpointCoordinator.cpp | 20 +++++++-------- src/Checkpoint/CheckpointCoordinator.h | 4 +-- src/Checkpoint/CheckpointReplicationType.h | 12 +++++++++ src/Checkpoint/CheckpointSettings.cpp | 25 +++++++++++-------- src/Checkpoint/CheckpointSettings.h | 6 ++--- src/Checkpoint/CheckpointStorage.h | 4 +-- src/Checkpoint/CheckpointStorageFactory.cpp | 2 +- src/Checkpoint/CheckpointStorageFactory.h | 2 +- src/Checkpoint/CheckpointStorageType.h | 12 --------- src/Checkpoint/DiskCheckpointStorage.cpp | 8 +++--- .../LocalFileSystemCheckpointStorage.h | 2 +- .../Streaming/InterpreterUnsubscribeQuery.cpp | 2 +- src/Interpreters/executeQuery.cpp | 2 +- .../Executors/PipelineExecutor_Checkpoint.cpp | 4 +-- .../MatView/StorageMaterializedView.h | 2 +- .../StorageMaterializedView_Checkpoint.cpp | 2 +- 16 files changed, 56 insertions(+), 53 deletions(-) create mode 100644 src/Checkpoint/CheckpointReplicationType.h delete mode 100644 src/Checkpoint/CheckpointStorageType.h diff --git a/src/Checkpoint/CheckpointCoordinator.cpp b/src/Checkpoint/CheckpointCoordinator.cpp index 59af7bfe15..478a4b8079 100644 --- a/src/Checkpoint/CheckpointCoordinator.cpp +++ b/src/Checkpoint/CheckpointCoordinator.cpp @@ -33,7 +33,7 @@ CheckpointCoordinator::CheckpointCoordinator(CheckpointConfig config_) : config(std::move(config_)), logger(getLogger("CheckpointCoordinator")) { local_ckpt_storage - = assert_cast(&getCheckpointStorage(CheckpointStorageType::LocalFileSystem)); + = assert_cast(&getCheckpointStorage(CheckpointReplicationType::LocalFileSystem)); const auto & path_str = config.path; @@ -115,21 +115,21 @@ void CheckpointCoordinator::shutdown() void CheckpointCoordinator::validateCheckpointSettings( const CheckpointSettingsPtr & ckpt_settings, [[maybe_unused]] const CheckpointContextPtr & ckpt_ctx) { - if (ckpt_settings->storage_type != CheckpointStorageType::LocalFileSystem) + if (ckpt_settings->replication_type != CheckpointReplicationType::LocalFileSystem) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only LocalFileSystem checkpoint storage is supported"); } } -const CheckpointStorage & CheckpointCoordinator::getCheckpointStorage(CheckpointStorageType ckpt_storage_type) const +const CheckpointStorage & CheckpointCoordinator::getCheckpointStorage(CheckpointReplicationType ckpt_replication_type) const { std::scoped_lock lock(ckpt_storages_mutex); - auto iter = ckpt_storages.find(ckpt_storage_type); + auto iter = ckpt_storages.find(ckpt_replication_type); if (iter != ckpt_storages.end()) return *iter->second; - auto ckpt_storage = CheckpointStorageFactory::create(ckpt_storage_type, config); - auto [it, inserted] = ckpt_storages.emplace(ckpt_storage_type, std::move(ckpt_storage)); + auto ckpt_storage = CheckpointStorageFactory::create(ckpt_replication_type, config); + auto [it, inserted] = ckpt_storages.emplace(ckpt_replication_type, std::move(ckpt_storage)); chassert(inserted); return *it->second; } @@ -143,14 +143,14 @@ void CheckpointCoordinator::registerQuery( { chassert(ckpt_ctx->epoch == 0); const auto & qid = ckpt_ctx->qid; - const auto & ckpt_storage = getCheckpointStorage(ckpt_settings->storage_type); + const auto & ckpt_storage = getCheckpointStorage(ckpt_settings->replication_type); if (!ckpt_storage.isLocal() && !ckpt_ctx->extra_ctx) throw Exception( ErrorCodes::LOGICAL_ERROR, - "Missing extra checkpoint context for non-local checkpoint storage '{}'", - ckpt_settings->storage_type); + "Missing extra checkpoint context for non-local checkpoint replication type '{}'", + ckpt_settings->replication_type); - ckpt_settings->storage_type = ckpt_storage.storageType(); /// Update storage type to the actual storage type (E.g. Auto -> NativeLog) + ckpt_settings->replication_type = ckpt_storage.replicationType(); /// Update replication type (E.g. Auto -> NativeLog) validateCheckpointSettings(ckpt_settings, ckpt_ctx); diff --git a/src/Checkpoint/CheckpointCoordinator.h b/src/Checkpoint/CheckpointCoordinator.h index 1fbad62ca5..0ecfae9489 100644 --- a/src/Checkpoint/CheckpointCoordinator.h +++ b/src/Checkpoint/CheckpointCoordinator.h @@ -93,7 +93,7 @@ class CheckpointCoordinator final /// \brief Trigger checkpoint for a query, and call the callback when the checkpoint is done. TriggerResult triggerCheckpointForQuery(const String & qid, std::function && callback); - const CheckpointStorage & getCheckpointStorage(CheckpointStorageType ckpt_storage_type) const; + const CheckpointStorage & getCheckpointStorage(CheckpointReplicationType ckpt_replication_type) const; UInt64 getStorageSize(CheckpointContextPtr ckpt_ctx) const; @@ -143,7 +143,7 @@ class CheckpointCoordinator final CheckpointConfig config; mutable std::mutex ckpt_storages_mutex; - mutable absl::flat_hash_map> + mutable absl::flat_hash_map> ckpt_storages TSA_GUARDED_BY(ckpt_storages_mutex); const LocalFileSystemCheckpointStorage * local_ckpt_storage; /// Used for async checkpointing diff --git a/src/Checkpoint/CheckpointReplicationType.h b/src/Checkpoint/CheckpointReplicationType.h new file mode 100644 index 0000000000..5253d94f45 --- /dev/null +++ b/src/Checkpoint/CheckpointReplicationType.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +namespace DB +{ +/// Replicating checkpoints via different backends +enum class CheckpointReplicationType : uint8_t +{ + LocalFileSystem = 1 +}; +} diff --git a/src/Checkpoint/CheckpointSettings.cpp b/src/Checkpoint/CheckpointSettings.cpp index 110896240b..546123d324 100644 --- a/src/Checkpoint/CheckpointSettings.cpp +++ b/src/Checkpoint/CheckpointSettings.cpp @@ -60,15 +60,15 @@ void CheckpointSettings::deserialize(VersionType version, ReadBuffer & rb) // Convert any old type values to File type = CheckpointType::File; - storage_type = cluster::deserializeEnum(rb); + replication_type = cluster::deserializeEnum(rb); readVarUInt(interval, rb); // Always use LocalFileSystem - storage_type = CheckpointStorageType::LocalFileSystem; + replication_type = CheckpointReplicationType::LocalFileSystem; /// Repair raw settings - std::string storage_type_str = "local_file_system"; // Always local - raw_settings = fmt::format("type={};storage_type={};interval={};", "file", storage_type_str, interval); + std::string replication_type_str = "local_file_system"; // Always local + raw_settings = fmt::format("type={};replication_type={};interval={};", "file", replication_type_str, interval); } } @@ -88,19 +88,19 @@ CheckpointType CheckpointSettings::parseCheckpointType(std::string_view ckpt_typ return iter->second; } -CheckpointStorageType CheckpointSettings::parseCheckpointStorageType(std::string_view ckpt_storage_type_str) +CheckpointReplicationType CheckpointSettings::parseCheckpointReplicationType(std::string_view ckpt_replication_type_str) { // Accept any valid input but always use LocalFileSystem static const std::unordered_set valid_types{"auto", "local_file_system", "nativelog", "shared"}; - if (!valid_types.contains(ckpt_storage_type_str)) + if (!valid_types.contains(ckpt_replication_type_str)) throw Exception( ErrorCodes::INVALID_SETTING_VALUE, - "Unknown checkpoint storage type '{}', expected one of 'auto', 'local_file_system', 'nativelog', 'shared'", - ckpt_storage_type_str); + "Unknown checkpoint replication type '{}', expected one of 'auto', 'local_file_system', 'nativelog', 'shared'", + ckpt_replication_type_str); // Always return LocalFileSystem - return CheckpointStorageType::LocalFileSystem; + return CheckpointReplicationType::LocalFileSystem; } void CheckpointSettings::parseImpl() @@ -144,8 +144,11 @@ void CheckpointSettings::parseImpl() if (auto iter = settings_map.find("type"); iter != settings_map.end()) type = parseCheckpointType(iter->second); - if (auto iter = settings_map.find("storage_type"); iter != settings_map.end()) - storage_type = parseCheckpointStorageType(iter->second); + /// storage_type is deprecated, use replication_type instead + if (auto iter = settings_map.find("replication_type"); iter != settings_map.end()) + replication_type = parseCheckpointReplicationType(iter->second); + else if (iter = settings_map.find("storage_type"); iter != settings_map.end()) + replication_type = parseCheckpointReplicationType(iter->second); if (auto iter = settings_map.find("interval"); iter != settings_map.end()) interval = std::stoul(iter->second); diff --git a/src/Checkpoint/CheckpointSettings.h b/src/Checkpoint/CheckpointSettings.h index f909191235..e306c87237 100644 --- a/src/Checkpoint/CheckpointSettings.h +++ b/src/Checkpoint/CheckpointSettings.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -22,7 +22,7 @@ struct CheckpointSettings /// Parsed settings CheckpointType type = CheckpointType::Auto; - CheckpointStorageType storage_type = CheckpointStorageType::LocalFileSystem; + CheckpointReplicationType replication_type = CheckpointReplicationType::LocalFileSystem; CheckpointStrategy strategy{}; UInt64 interval = 0; @@ -34,7 +34,7 @@ struct CheckpointSettings void deserialize(VersionType version, ReadBuffer & rb); static CheckpointType parseCheckpointType(std::string_view ckpt_type_str); - static CheckpointStorageType parseCheckpointStorageType(std::string_view ckpt_storage_type_str); + static CheckpointReplicationType parseCheckpointReplicationType(std::string_view ckpt_replication_type_str); static CheckpointSettingsPtr parse(const std::string & settings_str); static constexpr VersionType VERSION = 2; diff --git a/src/Checkpoint/CheckpointStorage.h b/src/Checkpoint/CheckpointStorage.h index c0f0e7064b..f0ecc2d486 100644 --- a/src/Checkpoint/CheckpointStorage.h +++ b/src/Checkpoint/CheckpointStorage.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include @@ -54,7 +54,7 @@ class CheckpointStorage /// \return checkpoint type if exists, otherwise std::nullopt virtual std::optional exists(const std::string & key, CheckpointContextPtr cpt_ctx) const = 0; - virtual CheckpointStorageType storageType() const = 0; + virtual CheckpointReplicationType replicationType() const = 0; virtual uint64_t getStorageSize(CheckpointContextPtr ckpt_ctx) const = 0; virtual PathSizes getStorageStat(CheckpointContextPtr ckpt_ctx) const = 0; diff --git a/src/Checkpoint/CheckpointStorageFactory.cpp b/src/Checkpoint/CheckpointStorageFactory.cpp index cc6b5d6375..0ac3825466 100644 --- a/src/Checkpoint/CheckpointStorageFactory.cpp +++ b/src/Checkpoint/CheckpointStorageFactory.cpp @@ -16,7 +16,7 @@ extern const int NOT_IMPLEMENTED; extern const int INVALID_CONFIG_PARAMETER; } -std::unique_ptr CheckpointStorageFactory::create(CheckpointStorageType type, const CheckpointConfig & config) +std::unique_ptr CheckpointStorageFactory::create(CheckpointReplicationType type, const CheckpointConfig & config) { /// Only support LocalFileSystem storage static auto logger = getLogger("CheckpointStorageFactory"); diff --git a/src/Checkpoint/CheckpointStorageFactory.h b/src/Checkpoint/CheckpointStorageFactory.h index aea9223cd2..e493575603 100644 --- a/src/Checkpoint/CheckpointStorageFactory.h +++ b/src/Checkpoint/CheckpointStorageFactory.h @@ -7,6 +7,6 @@ namespace DB { struct CheckpointStorageFactory final { - static std::unique_ptr create(CheckpointStorageType type, const CheckpointConfig & config); + static std::unique_ptr create(CheckpointReplicationType type, const CheckpointConfig & config); }; } diff --git a/src/Checkpoint/CheckpointStorageType.h b/src/Checkpoint/CheckpointStorageType.h deleted file mode 100644 index 8823f0ad2e..0000000000 --- a/src/Checkpoint/CheckpointStorageType.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include - -namespace DB -{ -/// Where to store checkpoints -enum class CheckpointStorageType : uint8_t -{ - LocalFileSystem = 1 -}; -} diff --git a/src/Checkpoint/DiskCheckpointStorage.cpp b/src/Checkpoint/DiskCheckpointStorage.cpp index 4d4d224e86..e4a7f3f715 100644 --- a/src/Checkpoint/DiskCheckpointStorage.cpp +++ b/src/Checkpoint/DiskCheckpointStorage.cpp @@ -117,7 +117,7 @@ int64_t DiskCheckpointStorage::getLastCommittedEpoch(CheckpointContextPtr ckpt_c return 0; /// We'd like to touch the ckpt dir to extend its TTL - if (storageType() == CheckpointStorageType::LocalFileSystem) + if (replicationType() == CheckpointReplicationType::LocalFileSystem) disk->setLastModified(ckpt_dir, Poco::Timestamp::fromEpochTime(time(nullptr))); /// 1) Loop the directory to figure out largest committed checkpoint epoch @@ -194,12 +194,12 @@ void DiskCheckpointStorage::removeExpired( uint64_t ttl_secs, bool delete_marked, std::function delete_precheck) const { /// FIXME: So far only local file system supports this feature - if (storageType() != CheckpointStorageType::LocalFileSystem) + if (replicationType() != CheckpointReplicationType::LocalFileSystem) return; auto disk = getDisk(/*extra_ckpt_ctx=*/nullptr); - LOG_INFO(logger, "Scanning delete-marked and expired checkpoints in {}, ttl={}s", storageType(), ttl_secs); + LOG_INFO(logger, "Scanning delete-marked and expired checkpoints in {}, ttl={}s", replicationType(), ttl_secs); iterateDirectory(disk, "", [&](const fs::path & path, bool is_dir) { if (!is_dir) @@ -231,7 +231,7 @@ void DiskCheckpointStorage::removeExpired( void DiskCheckpointStorage::removeOldCheckpoints() const { /// FIXME: So far only local file system supports this feature - if (storageType() != CheckpointStorageType::LocalFileSystem) + if (replicationType() != CheckpointReplicationType::LocalFileSystem) return; auto disk = getDisk(/*extra_ckpt_ctx=*/nullptr); diff --git a/src/Checkpoint/LocalFileSystemCheckpointStorage.h b/src/Checkpoint/LocalFileSystemCheckpointStorage.h index 1f1bc75677..8d4f511d5b 100644 --- a/src/Checkpoint/LocalFileSystemCheckpointStorage.h +++ b/src/Checkpoint/LocalFileSystemCheckpointStorage.h @@ -16,7 +16,7 @@ class LocalFileSystemCheckpointStorage final : public DiskCheckpointStorage local_disk->createDirectories(""); } - CheckpointStorageType storageType() const override { return CheckpointStorageType::LocalFileSystem; } + CheckpointReplicationType replicationType() const override { return CheckpointReplicationType::LocalFileSystem; } bool isLocal() const override { return true; } diff --git a/src/Interpreters/Streaming/InterpreterUnsubscribeQuery.cpp b/src/Interpreters/Streaming/InterpreterUnsubscribeQuery.cpp index ffa835f95e..463f81f5e8 100644 --- a/src/Interpreters/Streaming/InterpreterUnsubscribeQuery.cpp +++ b/src/Interpreters/Streaming/InterpreterUnsubscribeQuery.cpp @@ -28,7 +28,7 @@ BlockIO InterpreterUnsubscribeQuery::execute() { /// Unsubscribe query only supports with local filesystem checkpoint storage auto & coordinator = Globals::getCheckpointCoordinator(); - auto ckpt_ctx = std::make_shared(query_id, coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem), &coordinator); + auto ckpt_ctx = std::make_shared(query_id, coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem), &coordinator); coordinator.removeCheckpoint(std::move(ckpt_ctx)); } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index a580ef6057..0c0cfcacf8 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -119,7 +119,7 @@ std::pair handleRecoverQuery(const Streaming::ASTRecoverQuery * auto query_id = literal->value.get(); auto & coordinator = Globals::getCheckpointCoordinator(); /// Subscribe/Recover query only supports with local filesystem checkpoint storage - auto ckpt_ctx = std::make_shared(query_id, coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem), &coordinator); + auto ckpt_ctx = std::make_shared(query_id, coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem), &coordinator); auto query = coordinator.getQuery(std::move(ckpt_ctx)); const char * begin = query.data(); diff --git a/src/Processors/Executors/PipelineExecutor_Checkpoint.cpp b/src/Processors/Executors/PipelineExecutor_Checkpoint.cpp index 2b0ed0bcaa..fc2c1665ee 100644 --- a/src/Processors/Executors/PipelineExecutor_Checkpoint.cpp +++ b/src/Processors/Executors/PipelineExecutor_Checkpoint.cpp @@ -26,7 +26,7 @@ void PipelineExecutor::registerCheckpoint(ExecuteMode exec_mode_, CheckpointCont auto & ckpt_coordinator = Globals::getCheckpointCoordinator(); ckpt_ctx = std::make_shared( process_list_element->getClientInfo().current_query_id, - ckpt_coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem), + ckpt_coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem), &ckpt_coordinator); } @@ -136,7 +136,7 @@ std::pair PipelineExecutor::recover(CheckpointCont ckpt_settings = CheckpointSettings::parse(process_list_element->getContext()->getSettingsRef().checkpoint_settings); /// Recover query states from checkpoint - const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(ckpt_settings->storage_type); + const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(ckpt_settings->replication_type); auto recovered_epoch = ckpt_storage.getLastCommittedEpoch(ckpt_ctx); if (recovered_epoch > 0) graph->recover(ckpt_ctx->cloneWithEpoch(recovered_epoch)); diff --git a/src/Storages/MatView/StorageMaterializedView.h b/src/Storages/MatView/StorageMaterializedView.h index 7158876203..cba085598e 100644 --- a/src/Storages/MatView/StorageMaterializedView.h +++ b/src/Storages/MatView/StorageMaterializedView.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include diff --git a/src/Storages/MatView/StorageMaterializedView_Checkpoint.cpp b/src/Storages/MatView/StorageMaterializedView_Checkpoint.cpp index e9ad5d77dd..980567a2c1 100644 --- a/src/Storages/MatView/StorageMaterializedView_Checkpoint.cpp +++ b/src/Storages/MatView/StorageMaterializedView_Checkpoint.cpp @@ -34,7 +34,7 @@ void StorageMaterializedView::prepareCheckpoint() if (!curr_ckpt_ctx) { - const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem); + const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem); auto ckpt_ctx = std::make_shared(getInnerQueryId(), ckpt_storage, &ckpt_coordinator); curr_ckpt_ctx = std::move(ckpt_ctx); } From 6e684f27643423dd09925fe93e3df10589e32765 Mon Sep 17 00:00:00 2001 From: Lisen Date: Mon, 20 Oct 2025 18:48:59 +0800 Subject: [PATCH 2/2] fixes smoke test --- .../cluster/smoke/0015_changelog_stream_aggr/0_stream_aggr.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cluster/smoke/0015_changelog_stream_aggr/0_stream_aggr.yaml b/tests/cluster/smoke/0015_changelog_stream_aggr/0_stream_aggr.yaml index 64cddd7476..7694105e57 100644 --- a/tests/cluster/smoke/0015_changelog_stream_aggr/0_stream_aggr.yaml +++ b/tests/cluster/smoke/0015_changelog_stream_aggr/0_stream_aggr.yaml @@ -34,7 +34,7 @@ steps: <> node: [p2] <> - query: select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_stream_aggr emit periodic 1s + query: select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_stream_aggr emit periodic 1s settings seek_to='earliest' schema: - name: count() type: uint64