Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pause/unpause materialized view #699

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/Checkpoint/CheckpointContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
#include <cassert>
#include <filesystem>

#include <Checkpoint/CheckpointRequestContext.h>

namespace DB
{
class CheckpointCoordinator;

struct CheckpointContext
{
CheckpointContext(int64_t epoch_, std::string_view qid_, CheckpointCoordinator * coordinator_)
: epoch(epoch_), qid(qid_), coordinator(coordinator_)
CheckpointContext(int64_t epoch_, std::string_view qid_, CheckpointCoordinator * coordinator_, CheckpointRequestContextPtr request_ctx_ = nullptr)
: epoch(epoch_), qid(qid_), coordinator(coordinator_), request_ctx(request_ctx_)
{
assert(epoch >= 0);
assert(!qid.empty());
Expand All @@ -26,6 +28,9 @@ struct CheckpointContext

CheckpointCoordinator * coordinator = nullptr;

/// If not null, it is a checkpoint request, otherwise it is used to register or recover the query
CheckpointRequestContextPtr request_ctx;

std::filesystem::path checkpointDir(const std::filesystem::path & base_dir) const
{
/// processor checkpoint epoch starts with 1
Expand Down
95 changes: 53 additions & 42 deletions src/Checkpoint/CheckpointCoordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ struct CheckpointableQuery
ack_node_ids = ack_node_ids_readonly;
}

CheckpointContextPtr prepareNextCheckpointContext(const String & qid, std::function<void(CheckpointContextPtr)> && callback = {})
{
auto ckpt_request_ctx = std::make_shared<CheckpointRequestContext>();
if (current_epoch == 0)
{
/// NOTE: the source processor to start checkpoint in a new epoch
ckpt_request_ctx->executor = executor;
current_epoch = last_epoch + 1;
}
ckpt_request_ctx->callback = std::move(callback);
return std::make_shared<CheckpointContext>(current_epoch, qid, &CheckpointCoordinator::instance(nullptr), ckpt_request_ctx);
}

String ackNodeDescriptions() const
{
auto exec = executor.lock();
Expand Down Expand Up @@ -248,7 +261,6 @@ void CheckpointCoordinator::removeCheckpoint(const String & qid)

void CheckpointCoordinator::triggerCheckpoint(const String & qid, UInt64 checkpoint_interval)
{
std::weak_ptr<PipelineExecutor> executor;
CheckpointContextPtr ckpt_ctx;
{
std::scoped_lock lock(mutex);
Expand All @@ -258,20 +270,10 @@ void CheckpointCoordinator::triggerCheckpoint(const String & qid, UInt64 checkpo
/// Already canceled the query
return;

if (iter->second->current_epoch != 0)
{
ckpt_ctx = std::make_shared<CheckpointContext>(iter->second->current_epoch, qid, this);
}
else
{
/// Notify the source processor to start checkpoint in a new epoch
executor = iter->second->executor;
ckpt_ctx = std::make_shared<CheckpointContext>(iter->second->last_epoch + 1, qid, this);
iter->second->current_epoch = ckpt_ctx->epoch; /// new epoch in process
}
ckpt_ctx = iter->second->prepareNextCheckpointContext(qid);
}

if (doTriggerCheckpoint(executor, std::move(ckpt_ctx)))
if (doTriggerCheckpoint(std::move(ckpt_ctx)) != TriggeredResult::Failed)
timer_service.runAfter(
checkpoint_interval, [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); });
else
Expand Down Expand Up @@ -325,7 +327,12 @@ void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id

/// 3) Delete ckpts for prev epochs
if (ckpt_epoch_done)
ckpt->remove(std::move(ckpt_ctx));
{
ckpt->remove(ckpt_ctx);

if (ckpt_ctx->request_ctx && ckpt_ctx->request_ctx->callback)
ckpt_ctx->request_ctx->callback(ckpt_ctx);
}
}

String CheckpointCoordinator::getQuery(const String & qid)
Expand Down Expand Up @@ -384,13 +391,14 @@ void CheckpointCoordinator::removeExpiredCheckpoints(bool delete_marked)
timer_service.runAfter(last_access_check_interval, [this]() { removeExpiredCheckpoints(false); });
}

bool CheckpointCoordinator::doTriggerCheckpoint(const std::weak_ptr<PipelineExecutor> & executor, CheckpointContextPtr ckpt_ctx)
CheckpointCoordinator::TriggeredResult CheckpointCoordinator::doTriggerCheckpoint(CheckpointContextPtr ckpt_ctx)
{
/// Create directory before hand. Then all other processors don't need
/// check and create target epoch ckpt directory.
try
{
auto exec = executor.lock();
assert(ckpt_ctx->request_ctx);
auto exec = ckpt_ctx->request_ctx->executor.lock();
if (!exec)
{
LOG_ERROR(
Expand All @@ -402,7 +410,7 @@ bool CheckpointCoordinator::doTriggerCheckpoint(const std::weak_ptr<PipelineExec

/// Not reset current_epoch here, since the query may be still in progress
/// resetCurrentCheckpointEpoch(ckpt_ctx->qid);
return false;
return TriggeredResult::Failed;
}

if (!exec->hasProcessedNewDataSinceLastCheckpoint())
Expand All @@ -414,15 +422,19 @@ bool CheckpointCoordinator::doTriggerCheckpoint(const std::weak_ptr<PipelineExec
ckpt_ctx->epoch);

resetCurrentCheckpointEpoch(ckpt_ctx->qid);
return false;

if (ckpt_ctx->request_ctx && ckpt_ctx->request_ctx->callback)
ckpt_ctx->request_ctx->callback(ckpt_ctx);

return TriggeredResult::Skipped;
}

preCheckpoint(ckpt_ctx);

exec->triggerCheckpoint(ckpt_ctx);

LOG_INFO(logger, "Triggered checkpointing state for query={} epoch={}", ckpt_ctx->qid, ckpt_ctx->epoch);
return true;
return TriggeredResult::Success;
}
catch (const Exception & e)
{
Expand All @@ -438,49 +450,32 @@ bool CheckpointCoordinator::doTriggerCheckpoint(const std::weak_ptr<PipelineExec
}

resetCurrentCheckpointEpoch(ckpt_ctx->qid);
return false;
return TriggeredResult::Failed;
}

void CheckpointCoordinator::triggerLastCheckpointAndFlush()
{
LOG_INFO(logger, "Trigger last checkpoint and flush begin");
Stopwatch stopwatch;

std::vector<std::weak_ptr<PipelineExecutor>> executors;
std::vector<CheckpointContextPtr> ckpt_ctxes;

{
std::scoped_lock lock(mutex);

executors.reserve(queries.size());
ckpt_ctxes.reserve(queries.size());
for (auto & [qid, query] : queries)
{
if (query->current_epoch != 0)
{
executors.emplace_back();
ckpt_ctxes.emplace_back(std::make_shared<CheckpointContext>(query->current_epoch, qid, this));
}
else
{
/// Notify the source processor to start checkpoint in a new epoch
executors.emplace_back(query->executor);
ckpt_ctxes.emplace_back(std::make_shared<CheckpointContext>(query->last_epoch + 1, qid, this));
query->current_epoch = ckpt_ctxes.back()->epoch; /// new epoch in process
}
}
ckpt_ctxes.emplace_back(query->prepareNextCheckpointContext(qid));
}

assert(executors.size() == ckpt_ctxes.size());

/// <query_id, triggered_epoch>
std::vector<std::pair<std::string_view, Int64>> triggered_queries;
triggered_queries.reserve(executors.size());
for (size_t i = 0; i < executors.size(); ++i)
triggered_queries.reserve(ckpt_ctxes.size());
for (auto & ckpt_ctx : ckpt_ctxes)
{
/// FIXME: So far we've only enforced a simple flush strategy by triggering new checkpoint once (regardless of success)
if (doTriggerCheckpoint(executors[i], ckpt_ctxes[i]))
triggered_queries.emplace_back(ckpt_ctxes[i]->qid, ckpt_ctxes[i]->epoch);
if (doTriggerCheckpoint(ckpt_ctx) == TriggeredResult::Success)
triggered_queries.emplace_back(ckpt_ctx->qid, ckpt_ctx->epoch);
}

// Wait for last checkpoint flush completed
Expand Down Expand Up @@ -523,4 +518,20 @@ void CheckpointCoordinator::resetCurrentCheckpointEpoch(const String & qid)
iter->second->current_epoch = 0;
}

CheckpointCoordinator::TriggeredResult CheckpointCoordinator::triggerCheckpointForQuery(const String & qid, std::function<void(CheckpointContextPtr)> && callback)
{
CheckpointContextPtr ckpt_ctx;
{
std::scoped_lock lock(mutex);

auto iter = queries.find(qid);
if (iter == queries.end())
/// Already canceled the query
return TriggeredResult::Failed;

ckpt_ctx = iter->second->prepareNextCheckpointContext(qid, std::move(callback));
}

return doTriggerCheckpoint(std::move(ckpt_ctx));
}
}
12 changes: 11 additions & 1 deletion src/Checkpoint/CheckpointCoordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,21 @@ class CheckpointCoordinator final

void triggerLastCheckpointAndFlush();

enum class TriggeredResult : uint8_t
{
Success = 0,
Skipped = 1,
Failed = 2
};

/// \brief Trigger checkpoint for a query, and call the callback when the checkpoint is done.
TriggeredResult triggerCheckpointForQuery(const String & qid, std::function<void(CheckpointContextPtr)> && callback);

private:
void triggerCheckpoint(const String & qid, UInt64 checkpoint_interval);
void removeExpiredCheckpoints(bool delete_marked);

bool doTriggerCheckpoint(const std::weak_ptr<PipelineExecutor> & executor, CheckpointContextPtr ckpt_ctx);
TriggeredResult doTriggerCheckpoint(CheckpointContextPtr ckpt_ctx);

void resetCurrentCheckpointEpoch(const String & qid);

Expand Down
18 changes: 18 additions & 0 deletions src/Checkpoint/CheckpointRequestContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <Checkpoint/CheckpointContextFwd.h>

namespace DB
{
class PipelineExecutor;

struct CheckpointRequestContext
{
/// Executor to be checkpointed
std::weak_ptr<PipelineExecutor> executor;

/// Callback to be called after checkpointed
std::function<void(CheckpointContextPtr)> callback;
};
using CheckpointRequestContextPtr = std::shared_ptr<CheckpointRequestContext>;
}
12 changes: 12 additions & 0 deletions src/Interpreters/InterpreterFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,15 @@
#include <Parsers/ASTShowCreateFormatSchemaQuery.h>
#include <Parsers/ASTShowFormatSchemasQuery.h>
#include <Parsers/Streaming/ASTUnsubscribeQuery.h>
#include <Parsers/Streaming/ASTPauseMaterializedViewQuery.h>
#include <Parsers/Streaming/ASTUnpauseMaterializedViewQuery.h>
#include <Interpreters/InterpreterCreateFormatSchemaQuery.h>
#include <Interpreters/InterpreterDropFormatSchemaQuery.h>
#include <Interpreters/InterpreterShowCreateFormatSchemaQuery.h>
#include <Interpreters/InterpreterShowFormatSchemasQuery.h>
#include <Interpreters/Streaming/InterpreterUnsubscribeQuery.h>
#include <Interpreters/Streaming/InterpreterPauseMaterializedViewQuery.h>
#include <Interpreters/Streaming/InterpreterUnpauseMaterializedViewQuery.h>
/// proton : ends

namespace ProfileEvents
Expand Down Expand Up @@ -330,6 +334,14 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
{
return std::make_unique<InterpreterShowCreateFormatSchemaQuery>(query, context);
}
else if (query->as<Streaming::ASTPauseMaterializedViewQuery>())
{
return std::make_unique<Streaming::InterpreterPauseMaterializedViewQuery>(query, context);
}
else if (query->as<Streaming::ASTUnpauseMaterializedViewQuery>())
{
return std::make_unique<Streaming::InterpreterUnpauseMaterializedViewQuery>(query, context);
}
// proton: ends
else if (query->as<ASTDropFunctionQuery>())
{
Expand Down
Loading