From aed7d8413b324763c177993e7cfde0b69c1300db Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 29 Aug 2023 17:24:38 +0800 Subject: [PATCH] Auto spill enhance (#8033) ref pingcap/tiflash#7738 --- dbms/src/Common/Arena.h | 11 +++++ dbms/src/Common/ErrorCodes.cpp | 1 + dbms/src/Common/Exception.h | 10 +++- dbms/src/Common/HashTable/FixedHashTable.h | 2 + dbms/src/Common/HashTable/HashTable.h | 16 +++++++ dbms/src/Common/HashTable/SmallTable.h | 2 + dbms/src/Common/HashTable/StringHashTable.h | 10 ++++ dbms/src/Core/AutoSpillTrigger.h | 19 ++++++-- dbms/src/Core/QueryOperatorSpillContexts.cpp | 17 ++++--- dbms/src/Core/QueryOperatorSpillContexts.h | 2 +- dbms/src/Flash/Coprocessor/DAGContext.h | 4 ++ .../Coprocessor/DAGQueryBlockInterpreter.cpp | 1 + dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp | 1 + dbms/src/Flash/executeQuery.cpp | 20 ++++---- dbms/src/Interpreters/HashJoinSpillContext.h | 2 +- dbms/src/Interpreters/Join.cpp | 46 ++++++++++++++----- dbms/src/Interpreters/Join.h | 5 ++ dbms/src/Interpreters/JoinPartition.cpp | 42 ++++++++++++++++- dbms/src/Interpreters/JoinPartition.h | 1 + dbms/src/Interpreters/Settings.h | 2 +- .../gtest_query_operator_spill_contexts.cpp | 5 ++ 22 files changed, 180 insertions(+), 41 deletions(-) diff --git a/dbms/src/Common/Arena.h b/dbms/src/Common/Arena.h index 449684de7cc..b9999f6b179 100644 --- a/dbms/src/Common/Arena.h +++ b/dbms/src/Common/Arena.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include #include @@ -26,6 +27,7 @@ namespace DB { +using ResizeCallback = std::function; /** Memory pool to append something. For example, short strings. * Usage scenario: * - put lot of strings inside pool, keep their addresses; @@ -71,6 +73,8 @@ class Arena : private boost::noncopyable Chunk * head; size_t size_in_bytes; + ResizeCallback resize_callback; + static size_t roundUpToPageSize(size_t s) { return (s + 4096 - 1) / 4096 * 4096; } /// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth @@ -93,6 +97,11 @@ class Arena : private boost::noncopyable /// Add next contiguous chunk of memory with size not less than specified. void NO_INLINE addChunk(size_t min_size) { + if (resize_callback != nullptr) + { + if unlikely (!resize_callback()) + throw ResizeException("Error in arena resize"); + } head = new Chunk(nextSize(min_size), head); size_in_bytes += head->size(); } @@ -148,6 +157,8 @@ class Arena : private boost::noncopyable */ void rollback(size_t size) { head->pos -= size; } + void setResizeCallback(const ResizeCallback & resize_callback_) { resize_callback = resize_callback_; } + /** Begin or expand allocation of contiguous piece of memory. * 'begin' - current begin of piece of memory, if it need to be expanded, or nullptr, if it need to be started. * If there is no space in chunk to expand current piece of memory - then copy all piece to new chunk and change value of 'begin'. diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 5782d74af81..c2746d950df 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -395,6 +395,7 @@ extern const int UNKNOWN_WINDOW_FUNCTION = 449; extern const int UNSUPPORTED_URI_SCHEME = 450; extern const int UNACCEPTABLE_URL = 450; extern const int TOO_MANY_REDIRECTS = 450; +extern const int ERROR_DURING_HASH_TABLE_OR_ARENA_RESIZE = 451; #if USE_QPL extern const int QPL_INIT_JOB_FAILED = 453; diff --git a/dbms/src/Common/Exception.h b/dbms/src/Common/Exception.h index 4566d17f77c..5b7e51f7e45 100644 --- a/dbms/src/Common/Exception.h +++ b/dbms/src/Common/Exception.h @@ -33,10 +33,10 @@ namespace DB { - namespace ErrorCodes { extern const int LOGICAL_ERROR; +extern const int ERROR_DURING_HASH_TABLE_OR_ARENA_RESIZE; } // namespace ErrorCodes class Exception : public Poco::Exception @@ -102,6 +102,14 @@ class ErrnoException : public Exception int saved_errno; }; +class ResizeException : public Exception +{ +public: + explicit ResizeException(const std::string & msg) + : Exception(msg, ErrorCodes::ERROR_DURING_HASH_TABLE_OR_ARENA_RESIZE) + {} +}; + using Exceptions = std::vector; [[noreturn]] void throwFromErrno(const std::string & s, int code = 0, int e = errno); diff --git a/dbms/src/Common/HashTable/FixedHashTable.h b/dbms/src/Common/HashTable/FixedHashTable.h index 232efee9058..259e90684fc 100644 --- a/dbms/src/Common/HashTable/FixedHashTable.h +++ b/dbms/src/Common/HashTable/FixedHashTable.h @@ -500,6 +500,8 @@ class FixedHashTable size_t getBufferSizeInBytes() const { return NUM_CELLS * sizeof(Cell); } + void setResizeCallback(const ResizeCallback &) {} + size_t getBufferSizeInCells() const { return NUM_CELLS; } /// Return offset for result in internal buffer. diff --git a/dbms/src/Common/HashTable/HashTable.h b/dbms/src/Common/HashTable/HashTable.h index f49f452c96d..902b9eb4519 100644 --- a/dbms/src/Common/HashTable/HashTable.h +++ b/dbms/src/Common/HashTable/HashTable.h @@ -162,6 +162,7 @@ struct VoidMapped } }; +using ResizeCallback = std::function; /** Compile-time interface for cell of the hash table. * Different cell types are used to implement different hash tables. * The cell must contain a key. @@ -428,6 +429,7 @@ class HashTable size_t m_size = 0; /// Amount of elements Cell * buf; /// A piece of memory for all elements except the element with zero key. Grower grower; + ResizeCallback resize_callback; #ifdef DBMS_HASH_MAP_COUNT_COLLISIONS mutable size_t collisions = 0; @@ -480,6 +482,11 @@ class HashTable /// Increase the size of the buffer. void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) { + if (resize_callback != nullptr) + { + if unlikely (!resize_callback()) + throw DB::ResizeException("Error in hash table resize"); + } #ifdef DBMS_HASH_MAP_DEBUG_RESIZES Stopwatch watch; #endif @@ -736,6 +743,8 @@ class HashTable free(); } + void setResizeCallback(const ResizeCallback & resize_callback_) { resize_callback = resize_callback_; } + HashTable & operator=(HashTable && rhs) { destroyElements(); @@ -1398,6 +1407,7 @@ class HashTableWithLock return hash_table.has(x, hash_value); } size_t getBufferSizeInBytes() const { return hash_table.getBufferSizeInBytes(); } + void setResizeCallback(const ResizeCallback & resize_callback) { hash_table.setResizeCallback(resize_callback); } size_t size() const { return hash_table.size(); } private: @@ -1576,6 +1586,12 @@ class ConcurrentHashTable return ret; } + void setResizeCallback(const ResizeCallback & resize_callback) + { + for (auto & segment : segments) + segment->setResizeCallback(resize_callback); + } + size_t getSegmentBufferSizeInBytes(size_t segment_index) const { return segments[segment_index] ? segments[segment_index]->getBufferSizeInBytes() : 0; diff --git a/dbms/src/Common/HashTable/SmallTable.h b/dbms/src/Common/HashTable/SmallTable.h index 1b89f4ee823..fa40b479430 100644 --- a/dbms/src/Common/HashTable/SmallTable.h +++ b/dbms/src/Common/HashTable/SmallTable.h @@ -372,6 +372,8 @@ class SmallTable } size_t getBufferSizeInBytes() const { return sizeof(buf); } + + void setResizeCallback(const ResizeCallback &) {} }; diff --git a/dbms/src/Common/HashTable/StringHashTable.h b/dbms/src/Common/HashTable/StringHashTable.h index c42aa8cacdf..aa4825f171a 100644 --- a/dbms/src/Common/HashTable/StringHashTable.h +++ b/dbms/src/Common/HashTable/StringHashTable.h @@ -157,6 +157,7 @@ struct StringHashTableEmpty //-V730 size_t size() const { return hasZero() ? 1 : 0; } bool empty() const { return !hasZero(); } size_t getBufferSizeInBytes() const { return sizeof(Cell); } + void setResizeCallback(const ResizeCallback &) {} size_t getCollisions() const { return 0; } }; @@ -439,6 +440,15 @@ class StringHashTable : private boost::noncopyable + m3.getBufferSizeInBytes() + ms.getBufferSizeInBytes(); } + void setResizeCallback(const ResizeCallback & resize_callback) + { + m0.setResizeCallback(resize_callback); + m1.setResizeCallback(resize_callback); + m2.setResizeCallback(resize_callback); + m3.setResizeCallback(resize_callback); + ms.setResizeCallback(resize_callback); + } + void clearAndShrink() { m1.clearHasZero(); diff --git a/dbms/src/Core/AutoSpillTrigger.h b/dbms/src/Core/AutoSpillTrigger.h index b2597ba3617..435c3ce33b2 100644 --- a/dbms/src/Core/AutoSpillTrigger.h +++ b/dbms/src/Core/AutoSpillTrigger.h @@ -22,6 +22,9 @@ namespace DB { class AutoSpillTrigger { +private: + constexpr static const float MAX_TRIGGER_THRESHOLD = 0.85; + public: AutoSpillTrigger( const MemoryTrackerPtr & memory_tracker_, @@ -37,19 +40,24 @@ class AutoSpillTrigger "Invalid auto trigger threshold {} or invalid auto target threshold {}", auto_memory_revoke_trigger_threshold, auto_memory_revoke_target_threshold); - if (unlikely(auto_memory_revoke_trigger_threshold < auto_memory_revoke_target_threshold)) + if (unlikely( + auto_memory_revoke_trigger_threshold < auto_memory_revoke_target_threshold + || auto_memory_revoke_trigger_threshold > MAX_TRIGGER_THRESHOLD)) { LOG_WARNING( query_operator_spill_contexts->getLogger(), - "Auto trigger threshold {} less than auto target threshold {}, not valid, use default value instead", + "Auto trigger threshold {} less than auto target threshold {} or more than max trigger threshold {}, " + "not valid, use default value instead", auto_memory_revoke_trigger_threshold, - auto_memory_revoke_target_threshold); + auto_memory_revoke_target_threshold, + MAX_TRIGGER_THRESHOLD); /// invalid value, set the value to default value auto_memory_revoke_trigger_threshold = 0.7; auto_memory_revoke_target_threshold = 0.5; } trigger_threshold = static_cast(memory_tracker->getLimit() * auto_memory_revoke_trigger_threshold); target_threshold = static_cast(memory_tracker->getLimit() * auto_memory_revoke_target_threshold); + force_trigger_threshold = static_cast(memory_tracker->getLimit() * MAX_TRIGGER_THRESHOLD); } void triggerAutoSpill() @@ -57,7 +65,9 @@ class AutoSpillTrigger auto current_memory_usage = memory_tracker->get(); if (current_memory_usage > trigger_threshold) { - query_operator_spill_contexts->triggerAutoSpill(current_memory_usage - target_threshold); + query_operator_spill_contexts->triggerAutoSpill( + current_memory_usage - target_threshold, + current_memory_usage > force_trigger_threshold); } } @@ -66,5 +76,6 @@ class AutoSpillTrigger std::shared_ptr query_operator_spill_contexts; Int64 trigger_threshold; Int64 target_threshold; + Int64 force_trigger_threshold; }; } // namespace DB diff --git a/dbms/src/Core/QueryOperatorSpillContexts.cpp b/dbms/src/Core/QueryOperatorSpillContexts.cpp index 35ffd660973..aed814c487f 100644 --- a/dbms/src/Core/QueryOperatorSpillContexts.cpp +++ b/dbms/src/Core/QueryOperatorSpillContexts.cpp @@ -16,14 +16,14 @@ namespace DB { -Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories) +Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memories, bool ignore_cooldown_time_check) { std::unique_lock lock(mutex, std::try_to_lock); /// use mutex to avoid concurrent check if (lock.owns_lock()) { auto log_level = Poco::Message::PRIO_DEBUG; - bool check_cooldown_time = true; + bool check_cooldown_time = !ignore_cooldown_time_check; if unlikely (!first_check_done) { first_check_done = true; @@ -31,19 +31,18 @@ Int64 QueryOperatorSpillContexts::triggerAutoSpill(Int64 expected_released_memor log_level = Poco::Message::PRIO_INFORMATION; } - LOG_IMPL( - log, - log_level, - "Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}", - expected_released_memories); - auto current_time = watch.elapsed(); if (check_cooldown_time && current_time - last_checked_time_ns < auto_spill_check_min_interval_ns) { - LOG_IMPL(log, log_level, "Auto spill check still in cooldown time, skip this check"); return expected_released_memories; } + LOG_IMPL( + log, + log_level, + "Query memory usage exceeded threshold, trigger auto spill check, expected released memory: {}", + expected_released_memories); + last_checked_time_ns = current_time; auto ret = expected_released_memories; diff --git a/dbms/src/Core/QueryOperatorSpillContexts.h b/dbms/src/Core/QueryOperatorSpillContexts.h index ee4f50c5077..24811503061 100644 --- a/dbms/src/Core/QueryOperatorSpillContexts.h +++ b/dbms/src/Core/QueryOperatorSpillContexts.h @@ -30,7 +30,7 @@ class QueryOperatorSpillContexts watch.start(); } - Int64 triggerAutoSpill(Int64 expected_released_memories); + Int64 triggerAutoSpill(Int64 expected_released_memories, bool ignore_cooldown_time_check = false); void registerTaskOperatorSpillContexts( const std::shared_ptr & task_operator_spill_contexts) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index fb7bd1fc46f..b260a0c9265 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -309,6 +309,10 @@ class DAGContext { auto_spill_trigger = auto_spill_trigger_; } + AutoSpillTrigger * getAutoSpillTrigger() + { + return auto_spill_trigger == nullptr ? nullptr : auto_spill_trigger.get(); + } void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); } diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 8615562921a..0b5056a4d00 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -345,6 +345,7 @@ void DAGQueryBlockInterpreter::handleJoin( context.getDAGContext()->registerOperatorSpillContext(operator_spill_context); } }, + context.getDAGContext() != nullptr ? context.getDAGContext()->getAutoSpillTrigger() : nullptr, tiflash_join.join_key_collators, join_non_equal_conditions, max_block_size, diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 0336fb40bd6..0019732716a 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -330,7 +330,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( , connection_uncreated_num(source_num) , thread_manager(newThreadManager()) , received_message_queue( - max_buffer_size, + CapacityLimits(max_buffer_size, settings.max_buffered_bytes_in_executor.get()), exc_log, &data_size_in_queue, enable_fine_grained_shuffle_flag, diff --git a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp index f8c8ac0c43a..3bb5042d6b3 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp @@ -184,6 +184,7 @@ PhysicalPlanNodePtr PhysicalJoin::build( context.getDAGContext()->registerOperatorSpillContext(operator_spill_context); } }, + context.getDAGContext() != nullptr ? context.getDAGContext()->getAutoSpillTrigger() : nullptr, tiflash_join.join_key_collators, join_non_equal_conditions, max_block_size, diff --git a/dbms/src/Flash/executeQuery.cpp b/dbms/src/Flash/executeQuery.cpp index 26074111a1d..0f931602831 100644 --- a/dbms/src/Flash/executeQuery.cpp +++ b/dbms/src/Flash/executeQuery.cpp @@ -99,7 +99,17 @@ QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool if (memory_tracker != nullptr && memory_tracker->getLimit() != 0 && context.getSettingsRef().auto_memory_revoke_trigger_threshold > 0) + { dag_context.setAutoSpillMode(); + auto auto_spill_trigger_threshold = context.getSettingsRef().auto_memory_revoke_trigger_threshold.get(); + auto auto_spill_target_threshold = context.getSettingsRef().auto_memory_revoke_target_threshold.get(); + auto auto_spill_trigger = std::make_shared( + memory_tracker, + dag_context.getQueryOperatorSpillContexts(), + auto_spill_trigger_threshold, + auto_spill_target_threshold); + dag_context.setAutoSpillTrigger(auto_spill_trigger); + } FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete); @@ -110,17 +120,9 @@ QueryExecutorPtr doExecuteAsBlockIO(IQuerySource & dag, Context & context, bool /// if query is in auto spill mode, then setup auto spill trigger if (dag_context.isInAutoSpillMode()) { - auto auto_spill_trigger_threshold = context.getSettingsRef().auto_memory_revoke_trigger_threshold.get(); - auto auto_spill_target_threshold = context.getSettingsRef().auto_memory_revoke_target_threshold.get(); - auto auto_spill_trigger = std::make_shared( - memory_tracker, - dag_context.getQueryOperatorSpillContexts(), - auto_spill_trigger_threshold, - auto_spill_target_threshold); - dag_context.setAutoSpillTrigger(auto_spill_trigger); auto * stream = dynamic_cast(res.in.get()); RUNTIME_ASSERT(stream != nullptr); - stream->setAutoSpillTrigger(auto_spill_trigger.get()); + stream->setAutoSpillTrigger(dag_context.getAutoSpillTrigger()); } if (likely(!internal)) logQueryPipeline(logger, res.in); diff --git a/dbms/src/Interpreters/HashJoinSpillContext.h b/dbms/src/Interpreters/HashJoinSpillContext.h index 57a6f365b59..a2f92140350 100644 --- a/dbms/src/Interpreters/HashJoinSpillContext.h +++ b/dbms/src/Interpreters/HashJoinSpillContext.h @@ -55,7 +55,7 @@ class HashJoinSpillContext final : public OperatorSpillContext Int64 triggerSpillImpl(Int64 expected_released_memories) override; bool supportAutoTriggerSpill() const override { return true; } void finishOneSpill(size_t partition_id); - bool needFinalSpill(size_t partition_id) const + bool isPartitionMarkedForAutoSpill(size_t partition_id) const { return (*partition_spill_status)[partition_id] != AutoSpillStatus::NO_NEED_AUTO_SPILL; } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index b4543bc523b..fbefd6e4a9c 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -135,6 +136,7 @@ Join::Join( Int64 join_restore_concurrency_, const Names & tidb_output_column_names_, const RegisterOperatorSpillContext & register_operator_spill_context_, + AutoSpillTrigger * auto_spill_trigger_, const TiDB::TiDBCollators & collators_, const JoinNonEqualConditions & non_equal_conditions_, size_t max_block_size_, @@ -165,6 +167,7 @@ Join::Join( , runtime_filter_list(runtime_filter_list_) , join_restore_concurrency(join_restore_concurrency_) , register_operator_spill_context(register_operator_spill_context_) + , auto_spill_trigger(auto_spill_trigger_) , shallow_copy_cross_probe_threshold( shallow_copy_cross_probe_threshold_ > 0 ? shallow_copy_cross_probe_threshold_ : std::max(1, max_block_size / 10)) @@ -385,6 +388,7 @@ std::shared_ptr Join::createRestoreJoin(size_t max_bytes_before_external_j join_restore_concurrency, tidb_output_column_names, register_operator_spill_context, + auto_spill_trigger, collators, non_equal_conditions, max_block_size, @@ -426,6 +430,8 @@ void Join::initBuild(const Block & sample_block, size_t build_concurrency_) { hash_join_spill_context->buildBuildSpiller(build_sample_block); } + for (auto & partition : partitions) + partition->setResizeCallbackIfNeeded(); setSampleBlock(sample_block); build_side_marked_spilled_data.resize(build_concurrency); } @@ -565,20 +571,35 @@ void Join::insertFromBlock(const Block & block, size_t stream_index) { continue; } - const auto & join_partition = partitions[i]; - auto partition_lock = join_partition->lockPartition(); - join_partition->insertBlockForBuild(std::move(dispatch_blocks[i])); - /// to release memory before insert if already marked spill - checkAndMarkPartitionSpilledIfNeeded(*join_partition, partition_lock, i, stream_index); - if (!hash_join_spill_context->isPartitionSpilled(i)) { - // todo add hash table resize callback to check if current partition is already marked to spill - insertFromBlockInternal(join_partition->getLastBuildBlock(), i); - join_partition->updateHashMapAndPoolMemoryUsage(); - /// double check here to release memory + const auto & join_partition = partitions[i]; + auto partition_lock = join_partition->lockPartition(); + join_partition->insertBlockForBuild(std::move(dispatch_blocks[i])); + /// to release memory before insert if already marked spill checkAndMarkPartitionSpilledIfNeeded(*join_partition, partition_lock, i, stream_index); - /// todo check if it is necessary to trigger auto spill check here + if (!hash_join_spill_context->isPartitionSpilled(i)) + { + bool meet_resize_exception = false; + try + { + insertFromBlockInternal(join_partition->getLastBuildBlock(), i); + join_partition->updateHashMapAndPoolMemoryUsage(); + } + catch (ResizeException &) + { + meet_resize_exception = true; + LOG_DEBUG(log, "Meet resize exception when insert into partition {}", i); + } + /// double check here to release memory + checkAndMarkPartitionSpilledIfNeeded(*join_partition, partition_lock, i, stream_index); + if (meet_resize_exception) + RUNTIME_CHECK_MSG( + hash_join_spill_context->isPartitionSpilled(i), + "resize exception must trigger partition to spill"); + } } + if (auto_spill_trigger != nullptr) + auto_spill_trigger->triggerAutoSpill(); } if (!hash_join_spill_context->isInAutoSpillMode()) spillMostMemoryUsedPartitionIfNeed(stream_index); @@ -1744,7 +1765,8 @@ void Join::workAfterBuildFinish(size_t stream_index) for (size_t i = 0; i < partitions.size(); ++i) { - if (hash_join_spill_context->needFinalSpill(i) || hash_join_spill_context->isPartitionSpilled(i)) + if (hash_join_spill_context->isPartitionMarkedForAutoSpill(i) + || hash_join_spill_context->isPartitionSpilled(i)) { if (!hash_join_spill_context->isPartitionSpilled(i)) { diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index ffc4f596d29..4f25ac0959a 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -48,6 +48,8 @@ using JoinProfileInfoPtr = std::shared_ptr; class Join; using JoinPtr = std::shared_ptr; +class AutoSpillTrigger; + struct RestoreInfo { JoinPtr join; @@ -166,6 +168,7 @@ class Join Int64 join_restore_concurrency_, const Names & tidb_output_column_names_, const RegisterOperatorSpillContext & register_operator_spill_context_, + AutoSpillTrigger * auto_spill_trigger_, const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators, const JoinNonEqualConditions & non_equal_conditions_ = {}, size_t max_block_size = 0, @@ -379,6 +382,8 @@ class Join RegisterOperatorSpillContext register_operator_spill_context; + AutoSpillTrigger * auto_spill_trigger; + /// Whether to directly check all blocks for row with null key. bool null_key_check_all_blocks_directly = false; diff --git a/dbms/src/Interpreters/JoinPartition.cpp b/dbms/src/Interpreters/JoinPartition.cpp index 4577a3dc096..e5b86697515 100644 --- a/dbms/src/Interpreters/JoinPartition.cpp +++ b/dbms/src/Interpreters/JoinPartition.cpp @@ -155,6 +155,27 @@ static size_t getByteCountImpl(const Maps & maps, JoinMapMethod method) } } +template +static void setResizeCallbackImpl(const Maps & maps, JoinMapMethod method, const ResizeCallback & resize_callback) +{ + switch (method) + { + case JoinMapMethod::EMPTY: + case JoinMapMethod::CROSS: + return; +#define M(NAME) \ + case JoinMapMethod::NAME: \ + if (maps.NAME) \ + maps.NAME->setResizeCallback(resize_callback); \ + return; + APPLY_FOR_JOIN_VARIANTS(M) +#undef M + + default: + throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT); + } +} + template static size_t clearMaps(Maps & maps, JoinMapMethod method) { @@ -210,6 +231,23 @@ size_t JoinPartition::getHashMapAndPoolByteCount() return ret; } +void JoinPartition::setResizeCallbackIfNeeded() +{ + if (hash_join_spill_context->isSpillEnabled() && hash_join_spill_context->isInAutoSpillMode()) + { + auto resize_callback = [this]() { + return !hash_join_spill_context->isPartitionMarkedForAutoSpill(partition_index); + }; + assert(pool != nullptr); + pool->setResizeCallback(resize_callback); + setResizeCallbackImpl(maps_any, join_map_method, resize_callback); + setResizeCallbackImpl(maps_all, join_map_method, resize_callback); + setResizeCallbackImpl(maps_any_full, join_map_method, resize_callback); + setResizeCallbackImpl(maps_all_full, join_map_method, resize_callback); + setResizeCallbackImpl(maps_all_full_with_row_flag, join_map_method, resize_callback); + } +} + void JoinPartition::initMap() { if (isCrossJoin(kind)) @@ -574,13 +612,13 @@ void NO_INLINE insertBlockIntoMapsTypeCase( #define INSERT_TO_MAP(join_partition, segment_index) \ auto & current_map = (join_partition)->getHashMap(); \ - for (auto & i : (segment_index)) \ + for (auto & s_i : (segment_index)) \ { \ Inserter::insert( \ current_map, \ key_getter, \ stored_block, \ - i, \ + s_i, \ pool, \ sort_key_containers); \ } diff --git a/dbms/src/Interpreters/JoinPartition.h b/dbms/src/Interpreters/JoinPartition.h index d61c3e1032d..b1040e0b6ed 100644 --- a/dbms/src/Interpreters/JoinPartition.h +++ b/dbms/src/Interpreters/JoinPartition.h @@ -102,6 +102,7 @@ class JoinPartition void insertBlockForProbe(Block && block); size_t getRowCount(); size_t getHashMapAndPoolByteCount(); + void setResizeCallbackIfNeeded(); void updateHashMapAndPoolMemoryUsage(); size_t getHashMapAndPoolMemoryUsage() const { return hash_table_pool_memory_usage; } RowsNotInsertToMap * getRowsNotInsertedToMap() diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 40a37ffaf57..63b6758ec95 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -309,7 +309,7 @@ struct Settings M(SettingUInt64, async_recv_version, 2, "1: reactor mode, 2: no additional threads") \ M(SettingUInt64, recv_queue_size, 0, "size of ExchangeReceiver queue, 0 means the size is set to data_source_mpp_task_num * 50") \ M(SettingUInt64, shallow_copy_cross_probe_threshold, 0, "minimum right rows to use shallow copy probe mode for cross join, default is max(1, max_block_size/10)") \ - M(SettingInt64, max_buffered_bytes_in_executor, 200LL * 1024 * 1024, "The max buffered size in each executor, 0 mean unlimited, use 200MB as the default value") \ + M(SettingInt64, max_buffered_bytes_in_executor, 100LL * 1024 * 1024, "The max buffered size in each executor, 0 mean unlimited, use 200MB as the default value") \ M(SettingUInt64, ddl_sync_interval_seconds, 60, "The interval of background DDL sync schema in seconds") \ M(SettingUInt64, ddl_restart_wait_seconds, 180, "The wait time for sync schema in seconds when restart") \ M(SettingFloat, auto_memory_revoke_trigger_threshold, 0.7, "Trigger auto memory revocation when the memory usage is above this percentage.") \ diff --git a/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp b/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp index ffc6aa4ebfa..2498fb45e59 100644 --- a/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp +++ b/dbms/src/Interpreters/tests/gtest_query_operator_spill_contexts.cpp @@ -157,6 +157,11 @@ try ASSERT_TRUE(sort_spill_context_2->updateRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD * 2)); ASSERT_TRUE(sort_spill_context_3->updateRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD * 2)); /// check interval less than threshold, no spill is triggered + ASSERT_TRUE( + query_operator_spill_contexts.triggerAutoSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD * 3) + == OperatorSpillContext::MIN_SPILL_THRESHOLD * 3); + ASSERT_TRUE( + query_operator_spill_contexts.triggerAutoSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD * 3, true) == 0); ASSERT_TRUE( query_operator_spill_contexts.triggerAutoSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD * 3) == OperatorSpillContext::MIN_SPILL_THRESHOLD * 3);