Skip to content

Commit

Permalink
Auto spill enhance (#8033)
Browse files Browse the repository at this point in the history
ref #7738
  • Loading branch information
windtalker committed Aug 29, 2023
1 parent 0a96293 commit aed7d84
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 41 deletions.
11 changes: 11 additions & 0 deletions dbms/src/Common/Arena.h
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/Allocator.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Core/Defines.h>
#include <common/likely.h>
Expand All @@ -26,6 +27,7 @@

namespace DB
{
using ResizeCallback = std::function<bool()>;
/** Memory pool to append something. For example, short strings.
* Usage scenario:
* - put lot of strings inside pool, keep their addresses;
Expand Down Expand Up @@ -71,6 +73,8 @@ class Arena : private boost::noncopyable
Chunk * head;
size_t size_in_bytes;

ResizeCallback resize_callback;

static size_t roundUpToPageSize(size_t s) { return (s + 4096 - 1) / 4096 * 4096; }

/// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
Expand All @@ -93,6 +97,11 @@ class Arena : private boost::noncopyable
/// Add next contiguous chunk of memory with size not less than specified.
void NO_INLINE addChunk(size_t min_size)
{
if (resize_callback != nullptr)
{
if unlikely (!resize_callback())
throw ResizeException("Error in arena resize");
}
head = new Chunk(nextSize(min_size), head);
size_in_bytes += head->size();
}
Expand Down Expand Up @@ -148,6 +157,8 @@ class Arena : private boost::noncopyable
*/
void rollback(size_t size) { head->pos -= size; }

void setResizeCallback(const ResizeCallback & resize_callback_) { resize_callback = resize_callback_; }

/** Begin or expand allocation of contiguous piece of memory.
* 'begin' - current begin of piece of memory, if it need to be expanded, or nullptr, if it need to be started.
* If there is no space in chunk to expand current piece of memory - then copy all piece to new chunk and change value of 'begin'.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Expand Up @@ -395,6 +395,7 @@ extern const int UNKNOWN_WINDOW_FUNCTION = 449;
extern const int UNSUPPORTED_URI_SCHEME = 450;
extern const int UNACCEPTABLE_URL = 450;
extern const int TOO_MANY_REDIRECTS = 450;
extern const int ERROR_DURING_HASH_TABLE_OR_ARENA_RESIZE = 451;

#if USE_QPL
extern const int QPL_INIT_JOB_FAILED = 453;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Common/Exception.h
Expand Up @@ -33,10 +33,10 @@

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ERROR_DURING_HASH_TABLE_OR_ARENA_RESIZE;
} // namespace ErrorCodes

class Exception : public Poco::Exception
Expand Down Expand Up @@ -102,6 +102,14 @@ class ErrnoException : public Exception
int saved_errno;
};

class ResizeException : public Exception
{
public:
explicit ResizeException(const std::string & msg)
: Exception(msg, ErrorCodes::ERROR_DURING_HASH_TABLE_OR_ARENA_RESIZE)
{}
};

using Exceptions = std::vector<std::exception_ptr>;

[[noreturn]] void throwFromErrno(const std::string & s, int code = 0, int e = errno);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/HashTable/FixedHashTable.h
Expand Up @@ -500,6 +500,8 @@ class FixedHashTable

size_t getBufferSizeInBytes() const { return NUM_CELLS * sizeof(Cell); }

void setResizeCallback(const ResizeCallback &) {}

size_t getBufferSizeInCells() const { return NUM_CELLS; }

/// Return offset for result in internal buffer.
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Common/HashTable/HashTable.h
Expand Up @@ -162,6 +162,7 @@ struct VoidMapped
}
};

using ResizeCallback = std::function<bool()>;
/** Compile-time interface for cell of the hash table.
* Different cell types are used to implement different hash tables.
* The cell must contain a key.
Expand Down Expand Up @@ -428,6 +429,7 @@ class HashTable
size_t m_size = 0; /// Amount of elements
Cell * buf; /// A piece of memory for all elements except the element with zero key.
Grower grower;
ResizeCallback resize_callback;

#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
mutable size_t collisions = 0;
Expand Down Expand Up @@ -480,6 +482,11 @@ class HashTable
/// Increase the size of the buffer.
void resize(size_t for_num_elems = 0, size_t for_buf_size = 0)
{
if (resize_callback != nullptr)
{
if unlikely (!resize_callback())
throw DB::ResizeException("Error in hash table resize");
}
#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
Stopwatch watch;
#endif
Expand Down Expand Up @@ -736,6 +743,8 @@ class HashTable
free();
}

void setResizeCallback(const ResizeCallback & resize_callback_) { resize_callback = resize_callback_; }

HashTable & operator=(HashTable && rhs)
{
destroyElements();
Expand Down Expand Up @@ -1398,6 +1407,7 @@ class HashTableWithLock
return hash_table.has(x, hash_value);
}
size_t getBufferSizeInBytes() const { return hash_table.getBufferSizeInBytes(); }
void setResizeCallback(const ResizeCallback & resize_callback) { hash_table.setResizeCallback(resize_callback); }
size_t size() const { return hash_table.size(); }

private:
Expand Down Expand Up @@ -1576,6 +1586,12 @@ class ConcurrentHashTable
return ret;
}

void setResizeCallback(const ResizeCallback & resize_callback)
{
for (auto & segment : segments)
segment->setResizeCallback(resize_callback);
}

size_t getSegmentBufferSizeInBytes(size_t segment_index) const
{
return segments[segment_index] ? segments[segment_index]->getBufferSizeInBytes() : 0;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/HashTable/SmallTable.h
Expand Up @@ -372,6 +372,8 @@ class SmallTable
}

size_t getBufferSizeInBytes() const { return sizeof(buf); }

void setResizeCallback(const ResizeCallback &) {}
};


Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Common/HashTable/StringHashTable.h
Expand Up @@ -157,6 +157,7 @@ struct StringHashTableEmpty //-V730
size_t size() const { return hasZero() ? 1 : 0; }
bool empty() const { return !hasZero(); }
size_t getBufferSizeInBytes() const { return sizeof(Cell); }
void setResizeCallback(const ResizeCallback &) {}
size_t getCollisions() const { return 0; }
};

Expand Down Expand Up @@ -439,6 +440,15 @@ class StringHashTable : private boost::noncopyable
+ m3.getBufferSizeInBytes() + ms.getBufferSizeInBytes();
}

void setResizeCallback(const ResizeCallback & resize_callback)
{
m0.setResizeCallback(resize_callback);
m1.setResizeCallback(resize_callback);
m2.setResizeCallback(resize_callback);
m3.setResizeCallback(resize_callback);
ms.setResizeCallback(resize_callback);
}

void clearAndShrink()
{
m1.clearHasZero();
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Core/AutoSpillTrigger.h
Expand Up @@ -22,6 +22,9 @@ namespace DB
{
class AutoSpillTrigger
{
private:
constexpr static const float MAX_TRIGGER_THRESHOLD = 0.85;

public:
AutoSpillTrigger(
const MemoryTrackerPtr & memory_tracker_,
Expand All @@ -37,27 +40,34 @@ class AutoSpillTrigger
"Invalid auto trigger threshold {} or invalid auto target threshold {}",
auto_memory_revoke_trigger_threshold,
auto_memory_revoke_target_threshold);
if (unlikely(auto_memory_revoke_trigger_threshold < auto_memory_revoke_target_threshold))
if (unlikely(
auto_memory_revoke_trigger_threshold < auto_memory_revoke_target_threshold
|| auto_memory_revoke_trigger_threshold > MAX_TRIGGER_THRESHOLD))
{
LOG_WARNING(
query_operator_spill_contexts->getLogger(),
"Auto trigger threshold {} less than auto target threshold {}, not valid, use default value instead",
"Auto trigger threshold {} less than auto target threshold {} or more than max trigger threshold {}, "
"not valid, use default value instead",
auto_memory_revoke_trigger_threshold,
auto_memory_revoke_target_threshold);
auto_memory_revoke_target_threshold,
MAX_TRIGGER_THRESHOLD);
/// invalid value, set the value to default value
auto_memory_revoke_trigger_threshold = 0.7;
auto_memory_revoke_target_threshold = 0.5;
}
trigger_threshold = static_cast<Int64>(memory_tracker->getLimit() * auto_memory_revoke_trigger_threshold);
target_threshold = static_cast<Int64>(memory_tracker->getLimit() * auto_memory_revoke_target_threshold);
force_trigger_threshold = static_cast<Int64>(memory_tracker->getLimit() * MAX_TRIGGER_THRESHOLD);
}

void triggerAutoSpill()
{
auto current_memory_usage = memory_tracker->get();
if (current_memory_usage > trigger_threshold)
{
query_operator_spill_contexts->triggerAutoSpill(current_memory_usage - target_threshold);
query_operator_spill_contexts->triggerAutoSpill(
current_memory_usage - target_threshold,
current_memory_usage > force_trigger_threshold);
}
}

Expand All @@ -66,5 +76,6 @@ class AutoSpillTrigger
std::shared_ptr<QueryOperatorSpillContexts> query_operator_spill_contexts;
Int64 trigger_threshold;
Int64 target_threshold;
Int64 force_trigger_threshold;
};
} // namespace DB
17 changes: 8 additions & 9 deletions dbms/src/Core/QueryOperatorSpillContexts.cpp
Expand Up @@ -16,34 +16,33 @@

namespace DB
{
Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories)
Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories, bool ignore_cooldown_time_check)
{
std::unique_lock lock(mutex, std::try_to_lock);
/// use mutex to avoid concurrent check
if (lock.owns_lock())
{
auto log_level = Poco::Message::PRIO_DEBUG;
bool check_cooldown_time = true;
bool check_cooldown_time = !ignore_cooldown_time_check;
if unlikely (!first_check_done)
{
first_check_done = true;
check_cooldown_time = false;
log_level = Poco::Message::PRIO_INFORMATION;
}

LOG_IMPL(
log,
log_level,
"Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}",
expected_released_memories);

auto current_time = watch.elapsed();
if (check_cooldown_time && current_time - last_checked_time_ns < auto_spill_check_min_interval_ns)
{
LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check");
return expected_released_memories;
}

LOG_IMPL(
log,
log_level,
"Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}",
expected_released_memories);

last_checked_time_ns = current_time;

auto ret = expected_released_memories;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Core/QueryOperatorSpillContexts.h
Expand Up @@ -30,7 +30,7 @@ class QueryOperatorSpillContexts
watch.start();
}

Int64 triggerAutoSpill(Int64 expected_released_memories);
Int64 triggerAutoSpill(Int64 expected_released_memories, bool ignore_cooldown_time_check = false);

void registerTaskOperatorSpillContexts(
const std::shared_ptr<TaskOperatorSpillContexts> & task_operator_spill_contexts)
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Expand Up @@ -309,6 +309,10 @@ class DAGContext
{
auto_spill_trigger = auto_spill_trigger_;
}
AutoSpillTrigger * getAutoSpillTrigger()
{
return auto_spill_trigger == nullptr ? nullptr : auto_spill_trigger.get();
}

void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Expand Up @@ -345,6 +345,7 @@ void DAGQueryBlockInterpreter::handleJoin(
context.getDAGContext()->registerOperatorSpillContext(operator_spill_context);
}
},
context.getDAGContext() != nullptr ? context.getDAGContext()->getAutoSpillTrigger() : nullptr,
tiflash_join.join_key_collators,
join_non_equal_conditions,
max_block_size,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Expand Up @@ -330,7 +330,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, connection_uncreated_num(source_num)
, thread_manager(newThreadManager())
, received_message_queue(
max_buffer_size,
CapacityLimits(max_buffer_size, settings.max_buffered_bytes_in_executor.get()),
exc_log,
&data_size_in_queue,
enable_fine_grained_shuffle_flag,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Expand Up @@ -184,6 +184,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
context.getDAGContext()->registerOperatorSpillContext(operator_spill_context);
}
},
context.getDAGContext() != nullptr ? context.getDAGContext()->getAutoSpillTrigger() : nullptr,
tiflash_join.join_key_collators,
join_non_equal_conditions,
max_block_size,
Expand Down
20 changes: 11 additions & 9 deletions dbms/src/Flash/executeQuery.cpp
Expand Up @@ -99,7 +99,17 @@ QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool

if (memory_tracker != nullptr && memory_tracker->getLimit() != 0
&& context.getSettingsRef().auto_memory_revoke_trigger_threshold > 0)
{
dag_context.setAutoSpillMode();
auto auto_spill_trigger_threshold = context.getSettingsRef().auto_memory_revoke_trigger_threshold.get();
auto auto_spill_target_threshold = context.getSettingsRef().auto_memory_revoke_target_threshold.get();
auto auto_spill_trigger = std::make_shared<AutoSpillTrigger>(
memory_tracker,
dag_context.getQueryOperatorSpillContexts(),
auto_spill_trigger_threshold,
auto_spill_target_threshold);
dag_context.setAutoSpillTrigger(auto_spill_trigger);
}

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint);
auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete);
Expand All @@ -110,17 +120,9 @@ QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool
/// if query is in auto spill mode, then setup auto spill trigger
if (dag_context.isInAutoSpillMode())
{
auto auto_spill_trigger_threshold = context.getSettingsRef().auto_memory_revoke_trigger_threshold.get();
auto auto_spill_target_threshold = context.getSettingsRef().auto_memory_revoke_target_threshold.get();
auto auto_spill_trigger = std::make_shared<AutoSpillTrigger>(
memory_tracker,
dag_context.getQueryOperatorSpillContexts(),
auto_spill_trigger_threshold,
auto_spill_target_threshold);
dag_context.setAutoSpillTrigger(auto_spill_trigger);
auto * stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get());
RUNTIME_ASSERT(stream != nullptr);
stream->setAutoSpillTrigger(auto_spill_trigger.get());
stream->setAutoSpillTrigger(dag_context.getAutoSpillTrigger());
}
if (likely(!internal))
logQueryPipeline(logger, res.in);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/HashJoinSpillContext.h
Expand Up @@ -55,7 +55,7 @@ class HashJoinSpillContext final : public OperatorSpillContext
Int64 triggerSpillImpl(Int64 expected_released_memories) override;
bool supportAutoTriggerSpill() const override { return true; }
void finishOneSpill(size_t partition_id);
bool needFinalSpill(size_t partition_id) const
bool isPartitionMarkedForAutoSpill(size_t partition_id) const
{
return (*partition_spill_status)[partition_id] != AutoSpillStatus::NO_NEED_AUTO_SPILL;
}
Expand Down

0 comments on commit aed7d84

Please sign in to comment.