Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support Sort operator auto spill when query memory usage exceed threshold #7900

Merged
merged 18 commits into from
Aug 11, 2023
70 changes: 70 additions & 0 deletions dbms/src/Core/AutoSpillTrigger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Common/MemoryTracker.h>
#include <Core/QueryOperatorSpillContexts.h>

namespace DB
{
class AutoSpillTrigger
{
public:
AutoSpillTrigger(
const MemoryTrackerPtr & memory_tracker_,
const std::shared_ptr<QueryOperatorSpillContexts> & query_operator_spill_contexts_,
float auto_memory_revoke_trigger_threshold,
float auto_memory_revoke_target_threshold)
: memory_tracker(memory_tracker_)
, query_operator_spill_contexts(query_operator_spill_contexts_)
{
RUNTIME_CHECK_MSG(memory_tracker->getLimit() > 0, "Memory limit must be set for auto spill trigger");
RUNTIME_CHECK_MSG(
auto_memory_revoke_target_threshold >= 0 && auto_memory_revoke_trigger_threshold > 0,
"Invalid auto trigger threshold {} or invalid auto target threshold {}",
auto_memory_revoke_trigger_threshold,
auto_memory_revoke_target_threshold);
if (unlikely(auto_memory_revoke_trigger_threshold < auto_memory_revoke_target_threshold))
{
LOG_WARNING(
query_operator_spill_contexts->getLogger(),
"Auto trigger threshold {} less than auto target threshold {}, not valid, use default value instead",
auto_memory_revoke_trigger_threshold,
auto_memory_revoke_target_threshold);
/// invalid value, set the value to default value
auto_memory_revoke_trigger_threshold = 0.7;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
auto_memory_revoke_target_threshold = 0.5;
}
trigger_threshold = static_cast<Int64>(memory_tracker->getLimit() * auto_memory_revoke_trigger_threshold);
target_threshold = static_cast<Int64>(memory_tracker->getLimit() * auto_memory_revoke_target_threshold);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
}

void triggerAutoSpill()
{
auto current_memory_usage = memory_tracker->get();
if (current_memory_usage > trigger_threshold)
{
query_operator_spill_contexts->triggerAutoSpill(current_memory_usage - target_threshold);
}
}

private:
MemoryTrackerPtr memory_tracker;
std::shared_ptr<QueryOperatorSpillContexts> query_operator_spill_contexts;
Int64 trigger_threshold;
Int64 target_threshold;
};
} // namespace DB
30 changes: 21 additions & 9 deletions dbms/src/Core/OperatorSpillContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,39 @@

namespace DB
{
enum class SpillStatus
enum class AutoSpillStatus
{
NOT_SPILL,
SPILL,
/// auto spill is not needed or current auto spill already finished
NO_NEED_AUTO_SPILL,
/// auto spill is needed
NEED_AUTO_SPILL,
};

class OperatorSpillContext
{
protected:
UInt64 operator_spill_threshold;
std::atomic<bool> in_spillable_stage{true};
std::atomic<SpillStatus> spill_status{SpillStatus::NOT_SPILL};
std::atomic<bool> is_spilled{false};
bool enable_spill = true;
String op_name;
LoggerPtr log;

virtual Int64 getTotalRevocableMemoryImpl() = 0;

public:
/// minimum revocable operator memories that will trigger a spill
const static Int64 MIN_SPILL_THRESHOLD = 10ULL * 1024 * 1024;
OperatorSpillContext(UInt64 operator_spill_threshold_, const String op_name_, const LoggerPtr & log_)
: operator_spill_threshold(operator_spill_threshold_)
, op_name(op_name_)
, log(log_)
{}
virtual ~OperatorSpillContext() = default;
bool isSpillEnabled() const { return enable_spill && operator_spill_threshold > 0; }
bool isSpillEnabled() const { return enable_spill && (supportAutoTriggerSpill() || operator_spill_threshold > 0); }
void disableSpill() { enable_spill = false; }
void finishSpillableStage() { in_spillable_stage = false; }
bool spillableStageFinished() const { return !in_spillable_stage; }
Int64 getTotalRevocableMemory()
{
if (in_spillable_stage)
Expand All @@ -56,14 +61,21 @@ class OperatorSpillContext
return 0;
}
UInt64 getOperatorSpillThreshold() const { return operator_spill_threshold; }
void markSpill()
void markSpilled()
{
SpillStatus init_value = SpillStatus::NOT_SPILL;
if (spill_status.compare_exchange_strong(init_value, SpillStatus::SPILL, std::memory_order_relaxed))
bool init_value = false;
if (is_spilled.compare_exchange_strong(init_value, true, std::memory_order_relaxed))
{
LOG_INFO(log, "Begin spill in {}", op_name);
}
}
bool isSpilled() const { return spill_status != SpillStatus::NOT_SPILL; }
bool isSpilled() const { return is_spilled; }
/// auto trigger spill means the operator will auto spill under the constraint of query/global level memory threshold,
/// so user does not need set operator_spill_threshold explicitly
virtual bool supportAutoTriggerSpill() const { return false; }
virtual Int64 triggerSpill(Int64 expected_released_memories) = 0;
};

using OperatorSpillContextPtr = std::shared_ptr<OperatorSpillContext>;
using RegisterOperatorSpillContext = std::function<void(const OperatorSpillContextPtr & ptr)>;
} // namespace DB
92 changes: 92 additions & 0 deletions dbms/src/Core/QueryOperatorSpillContexts.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/TaskOperatorSpillContexts.h>
#include <Flash/Mpp/MPPTaskId.h>

namespace DB
{
class QueryOperatorSpillContexts
{
public:
explicit QueryOperatorSpillContexts(const MPPQueryId & query_id)
: log(Logger::get(query_id.toString()))
{}
Int64 triggerAutoSpill(Int64 expected_released_memories)
{
std::unique_lock lock(mutex, std::try_to_lock);
/// use mutex to avoid concurrent check, todo maybe need add minimum check interval(like 100ms) here?
if (lock.owns_lock())
{
if unlikely (!first_check)
{
first_check = true;
LOG_INFO(log, "Query memory usage exceeded threshold, trigger auto spill check");
}
/// vector of <revocable_memories, task_operator_spill_contexts>
std::vector<std::pair<Int64, TaskOperatorSpillContexts *>> revocable_memories;
revocable_memories.reserve(task_operator_spill_contexts_list.size());
for (auto it = task_operator_spill_contexts_list.begin(); it != task_operator_spill_contexts_list.end();)
{
if ((*it)->isFinished())
{
it = task_operator_spill_contexts_list.erase(it);
}
else
{
revocable_memories.emplace_back((*it)->totalRevocableMemories(), (*it).get());
++it;
}
}
std::sort(revocable_memories.begin(), revocable_memories.end(), [](const auto & a, const auto & b) {
return a.first > b.first;
});
for (auto & pair : revocable_memories)
{
if (pair.first < OperatorSpillContext::MIN_SPILL_THRESHOLD)
break;
expected_released_memories = pair.second->triggerAutoSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
return expected_released_memories;
}
return expected_released_memories;
}

void registerTaskOperatorSpillContexts(
const std::shared_ptr<TaskOperatorSpillContexts> & task_operator_spill_contexts)
{
std::unique_lock lock(mutex);
task_operator_spill_contexts_list.push_back(task_operator_spill_contexts);
}
/// used for test
size_t getTaskOperatorSpillContextsCount() const
{
std::unique_lock lock(mutex);
return task_operator_spill_contexts_list.size();
}

const LoggerPtr & getLogger() const { return log; }

private:
std::list<std::shared_ptr<TaskOperatorSpillContexts>> task_operator_spill_contexts_list;
bool first_check = false;
LoggerPtr log;
mutable std::mutex mutex;
};

} // namespace DB
111 changes: 111 additions & 0 deletions dbms/src/Core/TaskOperatorSpillContexts.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/OperatorSpillContext.h>

namespace DB
{
class TaskOperatorSpillContexts
{
public:
Int64 triggerAutoSpill(Int64 expected_released_memories)
{
if (isFinished())
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return expected_released_memories;
appendAdditionalOperatorSpillContexts();
bool has_finished_operator_spill_contexts = false;
for (auto & operator_spill_context : operator_spill_contexts)
{
assert(operator_spill_context->supportAutoTriggerSpill());
if (operator_spill_context->spillableStageFinished())
{
has_finished_operator_spill_contexts = true;
continue;
}
expected_released_memories = operator_spill_context->triggerSpill(expected_released_memories);
if (expected_released_memories <= 0)
break;
}
if (has_finished_operator_spill_contexts)
{
/// clean finished spill context
operator_spill_contexts.erase(
std::remove_if(
operator_spill_contexts.begin(),
operator_spill_contexts.end(),
[](const auto & context) { return context->spillableStageFinished(); }),
operator_spill_contexts.end());
}
return expected_released_memories;
}
void registerOperatorSpillContext(const OperatorSpillContextPtr & operator_spill_context)
{
if (operator_spill_context->supportAutoTriggerSpill())
{
std::unique_lock lock(mutex);
additional_operator_spill_contexts.push_back(operator_spill_context);
has_additional_operator_spill_contexts = true;
}
}
/// for tests
size_t operatorSpillContextCount()
{
appendAdditionalOperatorSpillContexts();
return operator_spill_contexts.size();
}
/// for tests
size_t additionalOperatorSpillContextCount() const
{
std::unique_lock lock(mutex);
return additional_operator_spill_contexts.size();
}

Int64 totalRevocableMemories()
{
if unlikely (isFinished())
return 0;
appendAdditionalOperatorSpillContexts();
Int64 ret = 0;
for (const auto & operator_spill_context : operator_spill_contexts)
ret += operator_spill_context->getTotalRevocableMemory();
return ret;
}

bool isFinished() const { return is_task_finished; }

void finish() { is_task_finished = true; }

private:
void appendAdditionalOperatorSpillContexts()
{
if (has_additional_operator_spill_contexts)
{
std::unique_lock lock(mutex);
operator_spill_contexts.splice(operator_spill_contexts.end(), additional_operator_spill_contexts);
has_additional_operator_spill_contexts = false;
additional_operator_spill_contexts.clear();
}
}
/// access to operator_spill_contexts is thread safe
std::list<OperatorSpillContextPtr> operator_spill_contexts;
mutable std::mutex mutex;
/// access to additional_operator_spill_contexts need acquire lock first
std::list<OperatorSpillContextPtr> additional_operator_spill_contexts;
yibin87 marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<bool> has_additional_operator_spill_contexts{false};
std::atomic<bool> is_task_finished{false};
};

} // namespace DB