diff --git a/dbms/src/Flash/Executor/PipelineExecutor.cpp b/dbms/src/Flash/Executor/PipelineExecutor.cpp index 602531fa49e..a5e1ac7fa41 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutor.cpp @@ -77,16 +77,7 @@ void PipelineExecutor::wait() void PipelineExecutor::consume(ResultHandler & result_handler) { assert(result_handler); - if (unlikely(context.isTest())) - { - // In test mode, a single query should take no more than 5 minutes to execute. - static std::chrono::minutes timeout(5); - exec_context.consumeFor(result_handler, timeout); - } - else - { - exec_context.consume(result_handler); - } + exec_context.consume(result_handler); } ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler) diff --git a/dbms/src/Flash/Executor/PipelineExecutor.h b/dbms/src/Flash/Executor/PipelineExecutor.h index b86b3b09b9a..ebac7fe65dc 100644 --- a/dbms/src/Flash/Executor/PipelineExecutor.h +++ b/dbms/src/Flash/Executor/PipelineExecutor.h @@ -16,7 +16,7 @@ #include #include -#include +#include namespace DB { diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp index 516fdc36e3c..74b57015598 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.cpp +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -154,6 +155,7 @@ void PipelineExecutorContext::cancel() if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release)) { cancelSharedQueues(); + cancelResultQueueIfNeed(); if likely (TaskScheduler::instance && !query_id.empty()) TaskScheduler::instance->cancel(query_id, resource_group_name); } @@ -163,6 +165,7 @@ ResultQueuePtr PipelineExecutorContext::toConsumeMode(size_t queue_size) { std::lock_guard lock(mu); RUNTIME_ASSERT(!result_queue.has_value()); + RUNTIME_CHECK_MSG(!isCancelled(), "query has been cancelled."); result_queue.emplace(std::make_shared(queue_size)); return *result_queue; } @@ -185,4 +188,17 @@ void PipelineExecutorContext::cancelSharedQueues() for (const auto & shared_queue : tmp) shared_queue->cancel(); } + +void PipelineExecutorContext::cancelResultQueueIfNeed() +{ + ResultQueue * tmp{nullptr}; + { + std::lock_guard lock(mu); + if (isWaitMode()) + return; + tmp = (*result_queue).get(); + } + assert(tmp); + tmp->cancel(); +} } // namespace DB diff --git a/dbms/src/Flash/Executor/PipelineExecutorContext.h b/dbms/src/Flash/Executor/PipelineExecutorContext.h index fca5291c4da..a562d580518 100644 --- a/dbms/src/Flash/Executor/PipelineExecutorContext.h +++ b/dbms/src/Flash/Executor/PipelineExecutorContext.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include @@ -96,54 +96,6 @@ class PipelineExecutorContext : private boost::noncopyable void consume(ResultHandler & result_handler); - template - void consumeFor(ResultHandler & result_handler, const Duration & timeout_duration) - { - RUNTIME_ASSERT(result_handler); - auto consumed_result_queue = getConsumedResultQueue(); - bool is_timeout = false; - try - { - Block ret; - while (true) - { - auto res = consumed_result_queue->popTimeout(ret, timeout_duration); - if (res == MPMCQueueResult::TIMEOUT) - { - is_timeout = true; - break; - } - else if (res == MPMCQueueResult::OK) - { - result_handler(ret); - } - else - { - break; - } - } - } - catch (...) - { - // If result_handler throws an error, here should notify the query to terminate, and wait for the end of the query. - onErrorOccurred(std::current_exception()); - } - if (is_timeout) - { - LOG_WARNING(log, "wait timeout"); - onErrorOccurred(timeout_err_msg); - throw Exception(timeout_err_msg); - } - else - { - // In order to ensure that `decActiveRefCount` has finished calling at this point - // and avoid referencing the already destructed `mu` in `decActiveRefCount`. - std::unique_lock lock(mu); - cv.wait(lock, [&] { return 0 == active_ref_count; }); - } - LOG_DEBUG(log, "query finished and consume done"); - } - void cancel(); ALWAYS_INLINE bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); } @@ -188,6 +140,7 @@ class PipelineExecutorContext : private boost::noncopyable ResultQueuePtr getConsumedResultQueue(); void cancelSharedQueues(); + void cancelResultQueueIfNeed(); private: const String query_id; diff --git a/dbms/src/Flash/Executor/ResultQueue.h b/dbms/src/Flash/Executor/ResultQueue.h index 14d203a8e3a..cbb1ffb1111 100644 --- a/dbms/src/Flash/Executor/ResultQueue.h +++ b/dbms/src/Flash/Executor/ResultQueue.h @@ -14,14 +14,35 @@ #pragma once -#include -#include +#include #include +#include -#include +#include namespace DB { -using ResultQueue = MPMCQueue; +class ResultQueue : public NotifyFuture +{ +public: + explicit ResultQueue(size_t queue_size) + : queue(queue_size) + {} + + // read + MPMCQueueResult pop(Block & block) { return queue.pop(block); } + + // write + MPMCQueueResult push(Block && block) { return queue.push(block); } + MPMCQueueResult tryPush(Block && block) { return queue.tryPush(block); } + void registerTask(TaskPtr && task) override { queue.registerPipeWriteTask(std::move(task)); } + + // finish/cancel + bool finish() { return queue.finish(); } + bool cancel() { return queue.cancel(); } + +private: + LooseBoundedMPMCQueue queue; +}; using ResultQueuePtr = std::shared_ptr; } // namespace DB diff --git a/dbms/src/Flash/Executor/ResultQueue_fwd.h b/dbms/src/Flash/Executor/ResultQueue_fwd.h new file mode 100644 index 00000000000..45da8e23487 --- /dev/null +++ b/dbms/src/Flash/Executor/ResultQueue_fwd.h @@ -0,0 +1,23 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class ResultQueue; +using ResultQueuePtr = std::shared_ptr; +} // namespace DB diff --git a/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_context.cpp b/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_context.cpp index b5123f7f721..b117ecb881e 100644 --- a/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_context.cpp +++ b/dbms/src/Flash/Executor/tests/gtest_pipeline_executor_context.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -44,29 +45,6 @@ try } CATCH -TEST_F(PipelineExecutorContextTestRunner, popTimeout) -try -{ - PipelineExecutorContext context; - context.toConsumeMode(1); - try - { - context.incActiveRefCount(); - std::chrono::milliseconds timeout(10); - ResultHandler result_handler{[](const Block &) { - }}; - context.consumeFor(result_handler, timeout); - GTEST_FAIL(); - } - catch (DB::Exception & e) - { - GTEST_ASSERT_EQ(e.message(), PipelineExecutorContext::timeout_err_msg); - auto err_msg = context.getExceptionMsg(); - ASSERT_EQ(err_msg, PipelineExecutorContext::timeout_err_msg); - } -} -CATCH - TEST_F(PipelineExecutorContextTestRunner, run) try { diff --git a/dbms/src/Flash/Pipeline/Pipeline.h b/dbms/src/Flash/Pipeline/Pipeline.h index acd4a0a3f74..ea6c8717496 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.h +++ b/dbms/src/Flash/Pipeline/Pipeline.h @@ -15,7 +15,7 @@ #pragma once #include -#include +#include #include #include diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTask.h index bfb75038abb..d45be192c3d 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTask.h @@ -41,7 +41,7 @@ class PipelineTask ExecTaskStatus awaitImpl() override { return runAwait(); } - void notifyImpl() override { runNotify(); } + ExecTaskStatus notifyImpl() override { return runNotify(); } void doFinalizeImpl() override { diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTaskBase.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTaskBase.h index 991813dd52d..1187e33155d 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTaskBase.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/PipelineTaskBase.h @@ -133,10 +133,11 @@ class PipelineTaskBase pipeline_exec_holder.reset(); } - void runNotify() + ExecTaskStatus runNotify() { assert(pipeline_exec); pipeline_exec->notify(); + return ExecTaskStatus::RUNNING; } private: diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h index 245d45f8e30..78885b84abb 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/RFWaitTask.h @@ -35,7 +35,7 @@ class RFWaitTask : public Task int max_wait_time_ms, RuntimeFilteList && waiting_rf_list_, RuntimeFilteList && ready_rf_list_) - : Task(exec_context_, req_id) + : Task(exec_context_, req_id, ExecTaskStatus::WAITING) , task_pool(task_pool_) , max_wait_time_ns(max_wait_time_ms < 0 ? 0 : 1000000UL * max_wait_time_ms) , waiting_rf_list(std::move(waiting_rf_list_)) @@ -75,7 +75,7 @@ class RFWaitTask : public Task } private: - ExecTaskStatus executeImpl() override { return ExecTaskStatus::WAITING; } + ExecTaskStatus executeImpl() override { throw Exception("unreachable"); } ExecTaskStatus awaitImpl() override { diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/SimplePipelineTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/SimplePipelineTask.h index a15bbd33935..54eb5887fd2 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/SimplePipelineTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/SimplePipelineTask.h @@ -40,7 +40,7 @@ class SimplePipelineTask ExecTaskStatus awaitImpl() override { return runAwait(); } - void notifyImpl() override { runNotify(); } + ExecTaskStatus notifyImpl() override { return runNotify(); } void finalizeImpl() override { diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp index 95773c0823c..a08a1a9aff6 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.cpp @@ -13,79 +13,76 @@ // limitations under the License. #include +#include #include namespace DB { -namespace -{ -ALWAYS_INLINE ExecTaskStatus tryPushBlock(const ResultQueuePtr & result_queue, Block & block) -{ - assert(block); - auto ret = result_queue->tryPush(std::move(block)); - switch (ret) - { - case MPMCQueueResult::OK: - block = {}; - return ExecTaskStatus::IO_IN; - case MPMCQueueResult::FULL: - // If returning Full, the block was not actually moved. - assert(block); // NOLINT(bugprone-use-after-move) - return ExecTaskStatus::WAITING; - default: - throw Exception(fmt::format("Unexpect result: {}", magic_enum::enum_name(ret))); - } -} -} // namespace - StreamRestoreTask::StreamRestoreTask( PipelineExecutorContext & exec_context_, const String & req_id, const BlockInputStreamPtr & stream_, - const ResultQueuePtr & result_queue_) + const SharedQueueSinkHolderPtr & sink_) : Task(exec_context_, req_id, ExecTaskStatus::IO_IN) , stream(stream_) - , result_queue(result_queue_) + , sink(sink_) { + assert(sink); assert(stream); stream->readPrefix(); - assert(result_queue); } ExecTaskStatus StreamRestoreTask::executeImpl() { - return is_done ? ExecTaskStatus::FINISHED : ExecTaskStatus::IO_IN; + throw Exception("unreachable"); } -ExecTaskStatus StreamRestoreTask::awaitImpl() +ExecTaskStatus StreamRestoreTask::tryFlush() { - if (unlikely(is_done)) - return ExecTaskStatus::FINISHED; - if (unlikely(!block)) + assert(t_block); + auto ret = sink->tryPush(std::move(t_block)); + switch (ret) + { + case MPMCQueueResult::OK: + t_block.clear(); return ExecTaskStatus::IO_IN; - - return tryPushBlock(result_queue, block); + case MPMCQueueResult::FULL: + setNotifyFuture(sink); + return ExecTaskStatus::WAIT_FOR_NOTIFY; + case MPMCQueueResult::CANCELLED: + return ExecTaskStatus::CANCELLED; + default: + // queue result can not be finished/empty here. + throw Exception(fmt::format("Unexpect result: {}", magic_enum::enum_name(ret))); + } } ExecTaskStatus StreamRestoreTask::executeIOImpl() { - if (!block) + if unlikely (is_done) + return ExecTaskStatus::FINISHED; + + if (t_block) { - block = stream->read(); - if (unlikely(!block)) - { - is_done = true; - return ExecTaskStatus::FINISHED; - } + auto try_flush_status = tryFlush(); + if (try_flush_status != ExecTaskStatus::IO_IN) + return try_flush_status; } - return tryPushBlock(result_queue, block); + assert(!t_block); + t_block = stream->read(); + if (unlikely(!t_block)) + { + is_done = true; + return ExecTaskStatus::FINISHED; + } + return tryFlush(); } void StreamRestoreTask::finalizeImpl() { - result_queue->finish(); stream->readSuffix(); + sink->finish(); } } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.h index 2d6aa7f08d7..29b8cd44049 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Impls/StreamRestoreTask.h @@ -15,11 +15,13 @@ #pragma once #include -#include #include namespace DB { +class SharedQueueSinkHolder; +using SharedQueueSinkHolderPtr = std::shared_ptr; + /// Used to read block from io-based block input stream like `SpilledFilesInputStream` and write block to result queue. /// /// io_stream (read in io_thread_pool) --> result_queue --> caller. @@ -30,22 +32,25 @@ class StreamRestoreTask : public Task PipelineExecutorContext & exec_context_, const String & req_id, const BlockInputStreamPtr & stream_, - const ResultQueuePtr & result_queue_); + const SharedQueueSinkHolderPtr & sink_); protected: ExecTaskStatus executeImpl() override; - ExecTaskStatus awaitImpl() override; - ExecTaskStatus executeIOImpl() override; void finalizeImpl() override; + ExecTaskStatus notifyImpl() override { return ExecTaskStatus::IO_IN; } + +private: + ExecTaskStatus tryFlush(); + private: BlockInputStreamPtr stream; - ResultQueuePtr result_queue; + SharedQueueSinkHolderPtr sink; + Block t_block; bool is_done = false; - Block block; }; } // namespace DB diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h index 2f0171b8ba2..196d325462d 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h @@ -77,15 +77,7 @@ class PipeConditionVariable assert(task); task->notify(); assert(TaskScheduler::instance); - if (unlikely(task->getStatus() == ExecTaskStatus::WAITING)) - { - TaskScheduler::instance->submitToWaitReactor(std::move(task)); - } - else - { - assert(task->getStatus() == ExecTaskStatus::RUNNING); - TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task)); - } + TaskScheduler::instance->submit(std::move(task)); } private: diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp index c4e09c0e864..86ca4b2b0eb 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.cpp @@ -151,13 +151,12 @@ ExecTaskStatus Task::await() void Task::notify() { assert(task_status == ExecTaskStatus::WAIT_FOR_NOTIFY); + auto next_status = notifyImpl(); // If the query has been canceled, // move the task to WaitReactor to quickly trigger the cancel process. if (unlikely(exec_context.isCancelled())) - switchStatus(ExecTaskStatus::WAITING); - else - switchStatus(ExecTaskStatus::RUNNING); - notifyImpl(); + next_status = ExecTaskStatus::WAITING; + switchStatus(next_status); profile_info.elapsedWaitForNotifyTime(); } diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h index e47ce10f38c..59b74e0ffcb 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/Task.h @@ -103,7 +103,7 @@ class Task // Avoid allocating memory in `await` if possible. virtual ExecTaskStatus awaitImpl() { return ExecTaskStatus::RUNNING; } - virtual void notifyImpl() {} + virtual ExecTaskStatus notifyImpl() { return ExecTaskStatus::RUNNING; } // Used to release held resources, just like `Event::finishImpl`. virtual void finalizeImpl() {} diff --git a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h index 8ff3b598b4d..9691f9971a5 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h +++ b/dbms/src/Flash/Planner/Plans/PhysicalGetResultSink.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include namespace DB diff --git a/dbms/src/Operators/GetResultSinkOp.cpp b/dbms/src/Operators/GetResultSinkOp.cpp index 16e8fc37a9e..bc904676b6b 100644 --- a/dbms/src/Operators/GetResultSinkOp.cpp +++ b/dbms/src/Operators/GetResultSinkOp.cpp @@ -12,51 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include namespace DB { OperatorStatus GetResultSinkOp::writeImpl(Block && block) { - if (!block) + if unlikely (!block) return OperatorStatus::FINISHED; assert(!t_block); - auto ret = result_queue->tryPush(std::move(block)); - switch (ret) - { - case MPMCQueueResult::OK: - return OperatorStatus::NEED_INPUT; - case MPMCQueueResult::FULL: - // If returning Full, the block was not actually moved. - assert(block); // NOLINT(bugprone-use-after-move) - t_block.emplace(std::move(block)); // NOLINT(bugprone-use-after-move) - return OperatorStatus::WAITING; - default: - return OperatorStatus::FINISHED; - } + t_block.emplace(std::move(block)); + return tryFlush(); } OperatorStatus GetResultSinkOp::prepareImpl() { - return awaitImpl(); + return t_block ? tryFlush() : OperatorStatus::NEED_INPUT; } -OperatorStatus GetResultSinkOp::awaitImpl() +OperatorStatus GetResultSinkOp::tryFlush() { - if (!t_block) - return OperatorStatus::NEED_INPUT; - - auto ret = result_queue->tryPush(std::move(*t_block)); - switch (ret) + auto queue_result = result_queue->tryPush(std::move(*t_block)); + switch (queue_result) { + case MPMCQueueResult::FULL: + setNotifyFuture(result_queue); + return OperatorStatus::WAIT_FOR_NOTIFY; case MPMCQueueResult::OK: t_block.reset(); return OperatorStatus::NEED_INPUT; - case MPMCQueueResult::FULL: - return OperatorStatus::WAITING; + case MPMCQueueResult::CANCELLED: + return OperatorStatus::CANCELLED; default: - return OperatorStatus::FINISHED; + // queue result can not be finished/empty here. + RUNTIME_CHECK_MSG( + false, + "Unexpected queue result for GetResultSinkOp: {}", + magic_enum::enum_name(queue_result)); } } } // namespace DB diff --git a/dbms/src/Operators/GetResultSinkOp.h b/dbms/src/Operators/GetResultSinkOp.h index dda52eec23c..592c1ad4493 100644 --- a/dbms/src/Operators/GetResultSinkOp.h +++ b/dbms/src/Operators/GetResultSinkOp.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include namespace DB @@ -40,7 +40,8 @@ class GetResultSinkOp : public SinkOp OperatorStatus prepareImpl() override; - OperatorStatus awaitImpl() override; +private: + OperatorStatus tryFlush(); private: ResultQueuePtr result_queue; diff --git a/dbms/src/Operators/HashJoinProbeTransformOp.cpp b/dbms/src/Operators/HashJoinProbeTransformOp.cpp index ee5de46641e..3349a0cb7fe 100644 --- a/dbms/src/Operators/HashJoinProbeTransformOp.cpp +++ b/dbms/src/Operators/HashJoinProbeTransformOp.cpp @@ -230,7 +230,6 @@ OperatorStatus HashJoinProbeTransformOp::awaitImpl() } return OperatorStatus::WAITING; case ProbeStatus::RESTORE_PROBE: - return probe_transform->prepareProbeRestoredBlock() ? OperatorStatus::HAS_OUTPUT : OperatorStatus::WAITING; case ProbeStatus::READ_SCAN_HASH_MAP_DATA: case ProbeStatus::FINISHED: return OperatorStatus::HAS_OUTPUT; diff --git a/dbms/src/Operators/HashProbeTransformExec.cpp b/dbms/src/Operators/HashProbeTransformExec.cpp index dbed1a6755f..7c0a1e375c7 100644 --- a/dbms/src/Operators/HashProbeTransformExec.cpp +++ b/dbms/src/Operators/HashProbeTransformExec.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -100,16 +101,18 @@ void HashProbeTransformExec::startRestoreProbe() /// StreamRestoreTask [probe_restore_stream] /// | read and push /// ▼ - /// probe_result_queue + /// probe_source_holder /// | pop and probe /// ▼ /// HashProbeTransformExec (probe restored hash partition) assert(!is_probe_restore_done && probe_restore_stream); // Use 1 as the queue_size to avoid accumulating too many blocks and causing the memory to exceed the limit. - assert(!probe_result_queue); - probe_result_queue = std::make_shared(1); + assert(!probe_source_holder); + + SharedQueueSinkHolderPtr probe_sink_holder; + std::tie(probe_sink_holder, probe_source_holder) = SharedQueue::build(exec_context, 1, 1, -1, 1); TaskScheduler::instance->submit( - std::make_unique(exec_context, log->identifier(), probe_restore_stream, probe_result_queue)); + std::make_unique(exec_context, log->identifier(), probe_restore_stream, probe_sink_holder)); probe_restore_stream.reset(); } @@ -119,18 +122,21 @@ bool HashProbeTransformExec::prepareProbeRestoredBlock() return true; if (probe_restored_block) return true; - assert(probe_result_queue); - auto ret = probe_result_queue->tryPop(probe_restored_block); + assert(probe_source_holder); + auto ret = probe_source_holder->tryPop(probe_restored_block); switch (ret) { case MPMCQueueResult::OK: return true; case MPMCQueueResult::EMPTY: + setNotifyFuture(probe_source_holder); return false; case MPMCQueueResult::FINISHED: + case MPMCQueueResult::CANCELLED: is_probe_restore_done = true; return true; default: + // queue result can not be full here. throw Exception(fmt::format("Unexpected result: {}", magic_enum::enum_name(ret))); } } @@ -174,7 +180,7 @@ OperatorStatus HashProbeTransformExec::tryFillProcessInfoInRestoreProbeStage(Pro else { if (!prepareProbeRestoredBlock()) - return OperatorStatus::WAITING; + return OperatorStatus::WAIT_FOR_NOTIFY; auto restore_ret = popProbeRestoredBlock(); if (likely(restore_ret)) { diff --git a/dbms/src/Operators/HashProbeTransformExec.h b/dbms/src/Operators/HashProbeTransformExec.h index 55af07b88c4..9cb151d253e 100644 --- a/dbms/src/Operators/HashProbeTransformExec.h +++ b/dbms/src/Operators/HashProbeTransformExec.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include @@ -25,6 +24,9 @@ using HashProbeTransformExecPtr = std::shared_ptr; class PipelineExecutorContext; +class SharedQueueSourceHolder; +using SharedQueueSourceHolderPtr = std::shared_ptr; + class HashProbeTransformExec : public std::enable_shared_from_this { public: @@ -76,7 +78,6 @@ class HashProbeTransformExec : public std::enable_shared_from_thisisSpilled() || join->isRestoreJoin(); } @@ -88,6 +89,9 @@ class HashProbeTransformExec : public std::enable_shared_from_this 0 && consumer > 0); - // The queue size is same as UnionBlockInputStream = concurrency * 5. - CapacityLimits queue_limits(std::max(producer, consumer) * 5, max_buffered_bytes); + // The default queue size is same as UnionBlockInputStream = concurrency * 5. + if (max_queue_size <= 0) + max_queue_size = std::max(producer, consumer) * 5; + CapacityLimits queue_limits(max_queue_size, max_buffered_bytes); return std::make_shared(queue_limits, producer); } @@ -31,9 +37,10 @@ std::pair SharedQueue::bui PipelineExecutorContext & exec_context, size_t producer, size_t consumer, - Int64 max_buffered_bytes) + Int64 max_buffered_bytes, + Int64 max_queue_size) { - auto shared_queue = buildInternal(producer, consumer, max_buffered_bytes); + auto shared_queue = buildInternal(producer, consumer, max_buffered_bytes, max_queue_size); exec_context.addSharedQueue(shared_queue); return { std::make_shared(shared_queue), diff --git a/dbms/src/Operators/SharedQueue.h b/dbms/src/Operators/SharedQueue.h index a75b53f17bd..aa35d1e5feb 100644 --- a/dbms/src/Operators/SharedQueue.h +++ b/dbms/src/Operators/SharedQueue.h @@ -39,13 +39,18 @@ using SharedQueueSourceHolderPtr = std::shared_ptr; class SharedQueue { public: - static SharedQueuePtr buildInternal(size_t producer, size_t consumer, Int64 max_buffered_bytes); + static SharedQueuePtr buildInternal( + size_t producer, + size_t consumer, + Int64 max_buffered_bytes = -1, + Int64 max_queue_size = -1); static std::pair build( PipelineExecutorContext & exec_context, size_t producer, size_t consumer, - Int64 max_buffered_bytes); + Int64 max_buffered_bytes = -1, + Int64 max_queue_size = -1); SharedQueue(CapacityLimits queue_limits, size_t init_producer); @@ -71,7 +76,7 @@ class SharedQueueSinkHolder : public NotifyFuture : queue(queue_) {} MPMCQueueResult tryPush(Block && block) { return queue->tryPush(std::move(block)); } - void producerFinish() { queue->producerFinish(); } + void finish() { queue->producerFinish(); } void registerTask(TaskPtr && task) override { queue->registerWriteTask(std::move(task)); } @@ -90,7 +95,7 @@ class SharedQueueSinkOp : public SinkOp , shared_queue(shared_queue_) {} - ~SharedQueueSinkOp() override { shared_queue->producerFinish(); } + ~SharedQueueSinkOp() override { shared_queue->finish(); } String getName() const override { return "SharedQueueSinkOp"; } @@ -119,7 +124,6 @@ class SharedQueueSourceHolder : public NotifyFuture private: SharedQueuePtr queue; }; -using SharedQueueSourceHolderPtr = std::shared_ptr; class SharedQueueSourceOp : public SourceOp { diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index fb78104efd7..d7bd2d6188d 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -86,7 +86,7 @@ void UnorderedSourceOp::operatePrefixImpl() else { // Poll and check if the RuntimeFilters is ready in the WaitReactor. - TaskScheduler::instance->submitToWaitReactor(std::make_unique( + TaskScheduler::instance->submit(std::make_unique( exec_context, log->identifier(), task_pool,