Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/Checkpoint/CheckpointCoordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ CheckpointCoordinator::CheckpointCoordinator(CheckpointConfig config_)
: config(std::move(config_)), logger(getLogger("CheckpointCoordinator"))
{
local_ckpt_storage
= assert_cast<const LocalFileSystemCheckpointStorage *>(&getCheckpointStorage(CheckpointStorageType::LocalFileSystem));
= assert_cast<const LocalFileSystemCheckpointStorage *>(&getCheckpointStorage(CheckpointReplicationType::LocalFileSystem));

const auto & path_str = config.path;

Expand Down Expand Up @@ -115,21 +115,21 @@ void CheckpointCoordinator::shutdown()
void CheckpointCoordinator::validateCheckpointSettings(
const CheckpointSettingsPtr & ckpt_settings, [[maybe_unused]] const CheckpointContextPtr & ckpt_ctx)
{
if (ckpt_settings->storage_type != CheckpointStorageType::LocalFileSystem)
if (ckpt_settings->replication_type != CheckpointReplicationType::LocalFileSystem)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only LocalFileSystem checkpoint storage is supported");
}
}

const CheckpointStorage & CheckpointCoordinator::getCheckpointStorage(CheckpointStorageType ckpt_storage_type) const
const CheckpointStorage & CheckpointCoordinator::getCheckpointStorage(CheckpointReplicationType ckpt_replication_type) const
{
std::scoped_lock lock(ckpt_storages_mutex);
auto iter = ckpt_storages.find(ckpt_storage_type);
auto iter = ckpt_storages.find(ckpt_replication_type);
if (iter != ckpt_storages.end())
return *iter->second;

auto ckpt_storage = CheckpointStorageFactory::create(ckpt_storage_type, config);
auto [it, inserted] = ckpt_storages.emplace(ckpt_storage_type, std::move(ckpt_storage));
auto ckpt_storage = CheckpointStorageFactory::create(ckpt_replication_type, config);
auto [it, inserted] = ckpt_storages.emplace(ckpt_replication_type, std::move(ckpt_storage));
chassert(inserted);
return *it->second;
}
Expand All @@ -143,14 +143,14 @@ void CheckpointCoordinator::registerQuery(
{
chassert(ckpt_ctx->epoch == 0);
const auto & qid = ckpt_ctx->qid;
const auto & ckpt_storage = getCheckpointStorage(ckpt_settings->storage_type);
const auto & ckpt_storage = getCheckpointStorage(ckpt_settings->replication_type);
if (!ckpt_storage.isLocal() && !ckpt_ctx->extra_ctx)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missing extra checkpoint context for non-local checkpoint storage '{}'",
ckpt_settings->storage_type);
"Missing extra checkpoint context for non-local checkpoint replication type '{}'",
ckpt_settings->replication_type);

ckpt_settings->storage_type = ckpt_storage.storageType(); /// Update storage type to the actual storage type (E.g. Auto -> NativeLog)
ckpt_settings->replication_type = ckpt_storage.replicationType(); /// Update replication type (E.g. Auto -> NativeLog)

validateCheckpointSettings(ckpt_settings, ckpt_ctx);

Expand Down
4 changes: 2 additions & 2 deletions src/Checkpoint/CheckpointCoordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CheckpointCoordinator final
/// \brief Trigger checkpoint for a query, and call the callback when the checkpoint is done.
TriggerResult triggerCheckpointForQuery(const String & qid, std::function<void(CheckpointContextPtr)> && callback);

const CheckpointStorage & getCheckpointStorage(CheckpointStorageType ckpt_storage_type) const;
const CheckpointStorage & getCheckpointStorage(CheckpointReplicationType ckpt_replication_type) const;


UInt64 getStorageSize(CheckpointContextPtr ckpt_ctx) const;
Expand Down Expand Up @@ -143,7 +143,7 @@ class CheckpointCoordinator final
CheckpointConfig config;

mutable std::mutex ckpt_storages_mutex;
mutable absl::flat_hash_map<CheckpointStorageType, std::unique_ptr<const CheckpointStorage>>
mutable absl::flat_hash_map<CheckpointReplicationType, std::unique_ptr<const CheckpointStorage>>
ckpt_storages TSA_GUARDED_BY(ckpt_storages_mutex);
const LocalFileSystemCheckpointStorage * local_ckpt_storage; /// Used for async checkpointing

Expand Down
12 changes: 12 additions & 0 deletions src/Checkpoint/CheckpointReplicationType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <cstdint>

namespace DB
{
/// Replicating checkpoints via different backends
enum class CheckpointReplicationType : uint8_t
{
LocalFileSystem = 1
};
}
25 changes: 14 additions & 11 deletions src/Checkpoint/CheckpointSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ void CheckpointSettings::deserialize(VersionType version, ReadBuffer & rb)
// Convert any old type values to File
type = CheckpointType::File;

storage_type = cluster::deserializeEnum<CheckpointStorageType>(rb);
replication_type = cluster::deserializeEnum<CheckpointReplicationType>(rb);
readVarUInt(interval, rb);

// Always use LocalFileSystem
storage_type = CheckpointStorageType::LocalFileSystem;
replication_type = CheckpointReplicationType::LocalFileSystem;

/// Repair raw settings
std::string storage_type_str = "local_file_system"; // Always local
raw_settings = fmt::format("type={};storage_type={};interval={};", "file", storage_type_str, interval);
std::string replication_type_str = "local_file_system"; // Always local
raw_settings = fmt::format("type={};replication_type={};interval={};", "file", replication_type_str, interval);
}
}

Expand All @@ -88,19 +88,19 @@ CheckpointType CheckpointSettings::parseCheckpointType(std::string_view ckpt_typ
return iter->second;
}

CheckpointStorageType CheckpointSettings::parseCheckpointStorageType(std::string_view ckpt_storage_type_str)
CheckpointReplicationType CheckpointSettings::parseCheckpointReplicationType(std::string_view ckpt_replication_type_str)
{
// Accept any valid input but always use LocalFileSystem
static const std::unordered_set<std::string_view> valid_types{"auto", "local_file_system", "nativelog", "shared"};

if (!valid_types.contains(ckpt_storage_type_str))
if (!valid_types.contains(ckpt_replication_type_str))
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"Unknown checkpoint storage type '{}', expected one of 'auto', 'local_file_system', 'nativelog', 'shared'",
ckpt_storage_type_str);
"Unknown checkpoint replication type '{}', expected one of 'auto', 'local_file_system', 'nativelog', 'shared'",
ckpt_replication_type_str);

// Always return LocalFileSystem
return CheckpointStorageType::LocalFileSystem;
return CheckpointReplicationType::LocalFileSystem;
}

void CheckpointSettings::parseImpl()
Expand Down Expand Up @@ -144,8 +144,11 @@ void CheckpointSettings::parseImpl()
if (auto iter = settings_map.find("type"); iter != settings_map.end())
type = parseCheckpointType(iter->second);

if (auto iter = settings_map.find("storage_type"); iter != settings_map.end())
storage_type = parseCheckpointStorageType(iter->second);
/// storage_type is deprecated, use replication_type instead
if (auto iter = settings_map.find("replication_type"); iter != settings_map.end())
replication_type = parseCheckpointReplicationType(iter->second);
else if (iter = settings_map.find("storage_type"); iter != settings_map.end())
replication_type = parseCheckpointReplicationType(iter->second);

if (auto iter = settings_map.find("interval"); iter != settings_map.end())
interval = std::stoul(iter->second);
Expand Down
6 changes: 3 additions & 3 deletions src/Checkpoint/CheckpointSettings.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Checkpoint/CheckpointStorageType.h>
#include <Checkpoint/CheckpointReplicationType.h>
#include <Checkpoint/CheckpointStrategy.h>
#include <Checkpoint/CheckpointType.h>

Expand All @@ -22,7 +22,7 @@ struct CheckpointSettings

/// Parsed settings
CheckpointType type = CheckpointType::Auto;
CheckpointStorageType storage_type = CheckpointStorageType::LocalFileSystem;
CheckpointReplicationType replication_type = CheckpointReplicationType::LocalFileSystem;
CheckpointStrategy strategy{};

UInt64 interval = 0;
Expand All @@ -34,7 +34,7 @@ struct CheckpointSettings
void deserialize(VersionType version, ReadBuffer & rb);

static CheckpointType parseCheckpointType(std::string_view ckpt_type_str);
static CheckpointStorageType parseCheckpointStorageType(std::string_view ckpt_storage_type_str);
static CheckpointReplicationType parseCheckpointReplicationType(std::string_view ckpt_replication_type_str);
static CheckpointSettingsPtr parse(const std::string & settings_str);

static constexpr VersionType VERSION = 2;
Expand Down
4 changes: 2 additions & 2 deletions src/Checkpoint/CheckpointStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <Checkpoint/Checkpoint.h>
#include <Checkpoint/CheckpointContextFwd.h>
#include <Checkpoint/CheckpointStorageType.h>
#include <Checkpoint/CheckpointReplicationType.h>
#include <Checkpoint/DiskPath.h>

#include <Core/PathSize.h>
Expand Down Expand Up @@ -54,7 +54,7 @@ class CheckpointStorage
/// \return checkpoint type if exists, otherwise std::nullopt
virtual std::optional<CheckpointType> exists(const std::string & key, CheckpointContextPtr cpt_ctx) const = 0;

virtual CheckpointStorageType storageType() const = 0;
virtual CheckpointReplicationType replicationType() const = 0;

virtual uint64_t getStorageSize(CheckpointContextPtr ckpt_ctx) const = 0;
virtual PathSizes getStorageStat(CheckpointContextPtr ckpt_ctx) const = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Checkpoint/CheckpointStorageFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern const int NOT_IMPLEMENTED;
extern const int INVALID_CONFIG_PARAMETER;
}

std::unique_ptr<CheckpointStorage> CheckpointStorageFactory::create(CheckpointStorageType type, const CheckpointConfig & config)
std::unique_ptr<CheckpointStorage> CheckpointStorageFactory::create(CheckpointReplicationType type, const CheckpointConfig & config)
{
/// Only support LocalFileSystem storage
static auto logger = getLogger("CheckpointStorageFactory");
Expand Down
2 changes: 1 addition & 1 deletion src/Checkpoint/CheckpointStorageFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace DB
{
struct CheckpointStorageFactory final
{
static std::unique_ptr<CheckpointStorage> create(CheckpointStorageType type, const CheckpointConfig & config);
static std::unique_ptr<CheckpointStorage> create(CheckpointReplicationType type, const CheckpointConfig & config);
};
}
12 changes: 0 additions & 12 deletions src/Checkpoint/CheckpointStorageType.h

This file was deleted.

8 changes: 4 additions & 4 deletions src/Checkpoint/DiskCheckpointStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ int64_t DiskCheckpointStorage::getLastCommittedEpoch(CheckpointContextPtr ckpt_c
return 0;

/// We'd like to touch the ckpt dir to extend its TTL
if (storageType() == CheckpointStorageType::LocalFileSystem)
if (replicationType() == CheckpointReplicationType::LocalFileSystem)
disk->setLastModified(ckpt_dir, Poco::Timestamp::fromEpochTime(time(nullptr)));

/// 1) Loop the directory to figure out largest committed checkpoint epoch
Expand Down Expand Up @@ -194,12 +194,12 @@ void DiskCheckpointStorage::removeExpired(
uint64_t ttl_secs, bool delete_marked, std::function<bool(const std::string &)> delete_precheck) const
{
/// FIXME: So far only local file system supports this feature
if (storageType() != CheckpointStorageType::LocalFileSystem)
if (replicationType() != CheckpointReplicationType::LocalFileSystem)
return;

auto disk = getDisk(/*extra_ckpt_ctx=*/nullptr);

LOG_INFO(logger, "Scanning delete-marked and expired checkpoints in {}, ttl={}s", storageType(), ttl_secs);
LOG_INFO(logger, "Scanning delete-marked and expired checkpoints in {}, ttl={}s", replicationType(), ttl_secs);

iterateDirectory(disk, "", [&](const fs::path & path, bool is_dir) {
if (!is_dir)
Expand Down Expand Up @@ -231,7 +231,7 @@ void DiskCheckpointStorage::removeExpired(
void DiskCheckpointStorage::removeOldCheckpoints() const
{
/// FIXME: So far only local file system supports this feature
if (storageType() != CheckpointStorageType::LocalFileSystem)
if (replicationType() != CheckpointReplicationType::LocalFileSystem)
return;

auto disk = getDisk(/*extra_ckpt_ctx=*/nullptr);
Expand Down
2 changes: 1 addition & 1 deletion src/Checkpoint/LocalFileSystemCheckpointStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class LocalFileSystemCheckpointStorage final : public DiskCheckpointStorage
local_disk->createDirectories("");
}

CheckpointStorageType storageType() const override { return CheckpointStorageType::LocalFileSystem; }
CheckpointReplicationType replicationType() const override { return CheckpointReplicationType::LocalFileSystem; }

bool isLocal() const override { return true; }

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/InterpreterUnsubscribeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ BlockIO InterpreterUnsubscribeQuery::execute()
{
/// Unsubscribe query only supports with local filesystem checkpoint storage
auto & coordinator = Globals::getCheckpointCoordinator();
auto ckpt_ctx = std::make_shared<CheckpointContext>(query_id, coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem), &coordinator);
auto ckpt_ctx = std::make_shared<CheckpointContext>(query_id, coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem), &coordinator);
coordinator.removeCheckpoint(std::move(ckpt_ctx));
}

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ std::pair<String, ASTPtr> handleRecoverQuery(const Streaming::ASTRecoverQuery *
auto query_id = literal->value.get<String>();
auto & coordinator = Globals::getCheckpointCoordinator();
/// Subscribe/Recover query only supports with local filesystem checkpoint storage
auto ckpt_ctx = std::make_shared<CheckpointContext>(query_id, coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem), &coordinator);
auto ckpt_ctx = std::make_shared<CheckpointContext>(query_id, coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem), &coordinator);
auto query = coordinator.getQuery(std::move(ckpt_ctx));

const char * begin = query.data();
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Executors/PipelineExecutor_Checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void PipelineExecutor::registerCheckpoint(ExecuteMode exec_mode_, CheckpointCont
auto & ckpt_coordinator = Globals::getCheckpointCoordinator();
ckpt_ctx = std::make_shared<CheckpointContext>(
process_list_element->getClientInfo().current_query_id,
ckpt_coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem),
ckpt_coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem),
&ckpt_coordinator);
}

Expand Down Expand Up @@ -136,7 +136,7 @@ std::pair<Int64, CheckpointSettingsPtr> PipelineExecutor::recover(CheckpointCont
ckpt_settings = CheckpointSettings::parse(process_list_element->getContext()->getSettingsRef().checkpoint_settings);

/// Recover query states from checkpoint
const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(ckpt_settings->storage_type);
const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(ckpt_settings->replication_type);
auto recovered_epoch = ckpt_storage.getLastCommittedEpoch(ckpt_ctx);
if (recovered_epoch > 0)
graph->recover(ckpt_ctx->cloneWithEpoch(recovered_epoch));
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MatView/StorageMaterializedView.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <Checkpoint/CheckpointRequestMetrics.h>
#include <Checkpoint/CheckpointStorageType.h>
#include <Checkpoint/CheckpointReplicationType.h>
#include <Checkpoint/LogStoreCheckpointContext.h>
#include <Cluster/Common/ExponentialBackoff.h>
#include <Cluster/Common/NodeID.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void StorageMaterializedView::prepareCheckpoint()

if (!curr_ckpt_ctx)
{
const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(CheckpointStorageType::LocalFileSystem);
const auto & ckpt_storage = ckpt_coordinator.getCheckpointStorage(CheckpointReplicationType::LocalFileSystem);
auto ckpt_ctx = std::make_shared<CheckpointContext>(getInnerQueryId(), ckpt_storage, &ckpt_coordinator);
curr_ckpt_ctx = std::move(ckpt_ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ steps:
<<else>>
node: [p2]
<<end>>
query: select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_stream_aggr emit periodic 1s
query: select count(), avg(i), sum(i), min(i), max(i), min(s), max(s) from changelog_stream_aggr emit periodic 1s settings seek_to='earliest'
schema:
- name: count()
type: uint64
Expand Down
Loading