Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ namespace DB
M(tiflash_coprocessor_request_memory_usage, "Bucketed histogram of request memory usage", Histogram, \
F(type_cop, {{"type", "cop"}}, ExpBuckets{1024 * 1024, 2, 16}), \
F(type_batch, {{"type", "batch"}}, ExpBuckets{1024 * 1024, 2, 20}), \
F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20}), \
F(type_run_mpp_query, {{"type", "run_mpp_query"}}, ExpBuckets{1024 * 1024, 2, 20})) \
F(type_run_mpp_task, {{"type", "run_mpp_task"}}, ExpBuckets{1024 * 1024, 2, 20})) \
M(tiflash_coprocessor_request_error, "Total number of request error", Counter, F(reason_meet_lock, {"reason", "meet_lock"}), \
F(reason_region_not_found, {"reason", "region_not_found"}), F(reason_epoch_not_match, {"reason", "epoch_not_match"}), \
F(reason_kv_client_error, {"reason", "kv_client_error"}), F(reason_internal_error, {"reason", "internal_error"}), \
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void setTipbRegionInfo(coprocessor::RegionInfo * tipb_region_info, const std::pa
range->set_end(RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id));
}

BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, DAGSchema & root_task_schema, const String & root_addr)
BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, DAGSchema & root_task_schema, const String & root_addr, bool enable_local_tunnel)
{
for (auto & field : root_task_schema)
{
Expand All @@ -65,7 +65,7 @@ BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb:
root_tm,
context.getTMTContext().getKVCluster(),
context.getTMTContext().getMPPTaskManager(),
false,
enable_local_tunnel,
context.getSettingsRef().enable_async_grpc_client),
tipb_exchange_receiver.encoded_task_meta_size(),
10,
Expand All @@ -77,7 +77,7 @@ BlockInputStreamPtr constructRootExchangeReceiverStream(Context & context, tipb:
return ret;
}

BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProperties & properties, std::vector<Int64> & root_task_ids, DAGSchema & root_task_schema)
BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProperties & properties, std::vector<Int64> & root_task_ids, DAGSchema & root_task_schema, bool enable_local_tunnel)
{
tipb::ExchangeReceiver tipb_exchange_receiver;
for (const auto root_task_id : root_task_ids)
Expand All @@ -93,7 +93,7 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
auto * tm_string = tipb_exchange_receiver.add_encoded_task_meta();
tm.AppendToString(tm_string);
}
return constructRootExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, Debug::LOCAL_HOST);
return constructExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, Debug::LOCAL_HOST, enable_local_tunnel);
}

void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tipb_exchange_receiver, const DAGProperties & properties, Int64 task_id, String & addr)
Expand All @@ -116,7 +116,7 @@ BlockInputStreamPtr prepareRootExchangeReceiverWithMultipleContext(Context & con

prepareExchangeReceiverMetaWithMultipleContext(tipb_exchange_receiver, properties, task_id, addr);

return constructRootExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, root_addr);
return constructExchangeReceiverStream(context, tipb_exchange_receiver, properties, root_task_schema, root_addr, true);
}

void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchTaskRequest> req, const DAGProperties & properties, std::vector<Int64> & root_task_ids, DAGSchema & root_task_schema, String & addr)
Expand Down Expand Up @@ -222,7 +222,7 @@ BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & pro
if (call.getResp()->has_error())
throw Exception("Meet error while dispatch mpp task: " + call.getResp()->error().msg());
}
return prepareRootExchangeReceiver(context, properties, root_task_ids, root_task_schema);
return prepareRootExchangeReceiver(context, properties, root_task_ids, root_task_schema, context.getSettingsRef().enable_local_tunnel);
}

BlockInputStreamPtr executeNonMPPQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ void ExchangeReceiverBase<RPCContext>::setUpLocalConnections(std::vector<Request
LoggerPtr local_log = Logger::get(fmt::format("{} {}", exc_log->identifier(), req_info));

LocalRequestHandler local_request_handler(
getMemoryTracker(),
[this, log = local_log](bool meet_error, const String & local_err_msg) {
this->connectionDone(meet_error, local_err_msg, log);
},
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Mpp/LocalRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ namespace DB
struct LocalRequestHandler
{
LocalRequestHandler(
MemoryTracker * recv_mem_tracker_,
std::function<void(bool, const String &)> && notify_write_done_,
std::function<void()> && notify_close_,
std::function<void()> && add_local_conn_num_,
ReceiverChannelWriter && channel_writer_)
: notify_write_done(std::move(notify_write_done_))
: recv_mem_tracker(recv_mem_tracker_)
, notify_write_done(std::move(notify_write_done_))
, notify_close(std::move(notify_close_))
, add_local_conn_num(std::move(add_local_conn_num_))
, channel_writer(std::move(channel_writer_))
Expand Down Expand Up @@ -73,6 +75,7 @@ struct LocalRequestHandler
return waiting_task_time;
}

MemoryTracker * recv_mem_tracker;
std::function<void(bool, const String &)> notify_write_done;
std::function<void()> notify_close;
std::function<void()> add_local_conn_num;
Expand Down
44 changes: 10 additions & 34 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ void MPPTaskMonitorHelper::initAndAddself(MPPTaskManager * manager_, const Strin
MPPTaskMonitorHelper::~MPPTaskMonitorHelper()
{
if (initialized)
{
manager->removeMonitoredTask(task_unique_id);
}
}

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
Expand All @@ -121,9 +119,8 @@ MPPTask::~MPPTask()
{
/// MPPTask maybe destructed by different thread, set the query memory_tracker
/// to current_memory_tracker in the destructor
auto * query_memory_tracker = getMemoryTracker();
if (query_memory_tracker != nullptr && current_memory_tracker != query_memory_tracker)
current_memory_tracker = query_memory_tracker;
if (process_list_entry != nullptr && current_memory_tracker != process_list_entry->get().getMemoryTrackerPtr().get())
current_memory_tracker = process_list_entry->get().getMemoryTrackerPtr().get();
abortTunnels("", true);
LOG_INFO(log, "finish MPPTask: {}", id.toString());
}
Expand Down Expand Up @@ -190,8 +187,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
if (unlikely(!task_meta.ParseFromString(exchange_sender.encoded_task_meta(i))))
throw TiFlashException("Failed to decode task meta info in ExchangeSender", Errors::Coprocessor::BadRequest);

/// when the receiver task is root task, it should never be local tunnel
bool is_local = context->getSettingsRef().enable_local_tunnel && task_meta.task_id() != -1 && meta.address() == task_meta.address();
bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(
task_meta,
Expand Down Expand Up @@ -297,13 +293,6 @@ void MPPTask::setErrString(const String & message)
err_string = message;
}

MemoryTracker * MPPTask::getMemoryTracker() const
{
if (process_list_entry_holder.process_list_entry != nullptr)
return process_list_entry_holder.process_list_entry->get().getMemoryTrackerPtr().get();
return nullptr;
}

void MPPTask::unregisterTask()
{
auto [result, reason] = manager->unregisterTask(id);
Expand All @@ -313,19 +302,6 @@ void MPPTask::unregisterTask()
LOG_WARNING(log, "task failed to unregister, reason: {}", reason);
}

void MPPTask::initProcessListEntry(MPPTaskManagerPtr & task_manager)
{
/// all the mpp tasks of the same mpp query shares the same process list entry
auto [query_process_list_entry, aborted_reason] = task_manager->getOrCreateQueryProcessListEntry(id.query_id, context);
if (!aborted_reason.empty())
throw TiFlashException(fmt::format("MPP query is already aborted, aborted reason: {}", aborted_reason), Errors::Coprocessor::Internal);
assert(query_process_list_entry != nullptr);
process_list_entry_holder.process_list_entry = query_process_list_entry;
dag_context->setProcessListEntry(query_process_list_entry);
context->setProcessListElement(&query_process_list_entry->get());
current_memory_tracker = getMemoryTracker();
}

void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
{
dag_req = getDAGRequestFromStringWithRetry(task_request.encoded_plan());
Expand Down Expand Up @@ -382,13 +358,13 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
dag_context->tidb_host = context->getClientInfo().current_address.toString();

context->setDAGContext(dag_context.get());

auto task_manager = tmt_context.getMPPTaskManager();
initProcessListEntry(task_manager);
process_list_entry = setProcessListElement(*context, dag_context->dummy_query_string, dag_context->dummy_ast.get());
dag_context->setProcessListEntry(process_list_entry);

injectFailPointBeforeRegisterTunnel(dag_context->isRootMPPTask());
registerTunnels(task_request);

auto task_manager = tmt_context.getMPPTaskManager();
LOG_DEBUG(log, "begin to register the task {}", id.toString());

injectFailPointBeforeRegisterMPPTask(dag_context->isRootMPPTask());
Expand Down Expand Up @@ -429,7 +405,7 @@ void MPPTask::preprocess()
void MPPTask::runImpl()
{
CPUAffinityManager::getInstance().bindSelfQueryThread();
RUNTIME_ASSERT(current_memory_tracker == getMemoryTracker(), log, "The current memory tracker is not set correctly for MPPTask::runImpl");
RUNTIME_ASSERT(current_memory_tracker == process_list_entry->get().getMemoryTrackerPtr().get(), log, "The current memory tracker is not set correctly for MPPTask::runImpl");
if (!switchStatus(INITIALIZING, RUNNING))
{
LOG_WARNING(log, "task not in initializing state, skip running");
Expand Down Expand Up @@ -532,9 +508,9 @@ void MPPTask::runImpl()
// todo when error happens, should try to update the metrics if it is available
if (auto throughput = dag_context->getTableScanThroughput(); throughput.first)
GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second);
/// note that memory_tracker is shared by all the mpp tasks, the peak memory usage is not accurate
/// todo log executor level peak memory usage instead
auto peak_memory = getMemoryTracker()->getPeak();
auto process_info = context->getProcessListElement()->getInfo();
auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0;
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory);
mpp_task_statistics.setMemoryPeak(peak_memory);
}
}
Expand Down
19 changes: 2 additions & 17 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
namespace DB
{
class MPPTaskManager;
using MPPTaskManagerPtr = std::shared_ptr<MPPTaskManager>;
class DAGContext;
class ProcessListEntry;

Expand Down Expand Up @@ -124,27 +123,12 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void registerTunnels(const mpp::DispatchTaskRequest & task_request);

void initProcessListEntry(MPPTaskManagerPtr & task_manager);

void initExchangeReceivers();

String getErrString() const;
void setErrString(const String & message);

MemoryTracker * getMemoryTracker() const;

private:
struct ProcessListEntryHolder
{
std::shared_ptr<ProcessListEntry> process_list_entry;
~ProcessListEntryHolder()
{
/// Because MemoryTracker is now saved in `MPPQueryTaskSet` and shared by all the mpp tasks belongs to the same mpp query,
/// it may not be destructed when MPPTask is destructed, so need to manually reset current_memory_tracker to nullptr at the
/// end of the destructor of MPPTask, otherwise, current_memory_tracker may point to a invalid memory tracker
current_memory_tracker = nullptr;
}
};
// We must ensure this member variable is put at this place to be destructed at proper time
MPPTaskMonitorHelper mpp_task_monitor_helper;

Expand All @@ -160,11 +144,12 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

MPPTaskScheduleEntry schedule_entry;

ProcessListEntryHolder process_list_entry_holder;
// `dag_context` holds inputstreams which could hold ref to `context` so it should be destructed
// before `context`.
std::unique_ptr<DAGContext> dag_context;

std::shared_ptr<ProcessListEntry> process_list_entry;

QueryExecutorHolder query_executor_holder;

std::atomic<TaskStatus> status{INITIALIZING};
Expand Down
42 changes: 6 additions & 36 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPTask.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <fmt/core.h>

#include <magic_enum.hpp>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
Expand All @@ -37,15 +34,6 @@ extern const char random_task_manager_find_task_failure_failpoint[];
extern const char pause_before_register_non_root_mpp_task[];
} // namespace FailPoints

MPPQueryTaskSet::~MPPQueryTaskSet()
{
if likely (process_list_entry != nullptr)
{
auto peak_memory = process_list_entry->get().getMemoryTrackerPtr()->getPeak();
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_query).Observe(peak_memory);
}
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
Expand Down Expand Up @@ -247,13 +235,14 @@ std::pair<bool, String> MPPTaskManager::registerTask(MPPTaskPtr task)
{
return {false, fmt::format("query is being aborted, error message = {}", error_msg)};
}
/// query_set must not be nullptr if the current query is not aborted since MPPTask::initProcessListEntry
/// will always create the query_set
RUNTIME_CHECK_MSG(query_set != nullptr, "query set must not be null when register task");
if (query_set->task_map.find(task->id) != query_set->task_map.end())
if (query_set != nullptr && query_set->task_map.find(task->id) != query_set->task_map.end())
{
return {false, "task has been registered"};
}
if (query_set == nullptr) /// the first one
{
query_set = addMPPQueryTaskSet(task->id.query_id);
}
query_set->task_map.emplace(task->id, task);
/// cancel all the alarm waiting on this task
auto alarm_it = query_set->alarms.find(task->id.task_id);
Expand Down Expand Up @@ -304,25 +293,6 @@ String MPPTaskManager::toString()
return res + ")";
}

std::pair<std::shared_ptr<ProcessListEntry>, String> MPPTaskManager::getOrCreateQueryProcessListEntry(const MPPQueryId & query_id, const ContextPtr & context)
{
std::lock_guard lock(mu);
auto [query_set, abort_reason] = getQueryTaskSetWithoutLock(query_id);
if (!abort_reason.empty())
return {nullptr, abort_reason};
if (query_set == nullptr)
query_set = addMPPQueryTaskSet(query_id);
if (query_set->process_list_entry == nullptr)
{
query_set->process_list_entry = setProcessListElement(
*context,
context->getDAGContext()->dummy_query_string,
context->getDAGContext()->dummy_ast.get(),
true);
}
return {query_set->process_list_entry, ""};
}

std::pair<MPPQueryTaskSetPtr, String> MPPTaskManager::getQueryTaskSetWithoutLock(const MPPQueryId & query_id)
{
auto it = mpp_query_map.find(query_id);
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ struct MPPQueryTaskSet
/// task can only be registered state is Normal
State state = Normal;
String error_message;
std::shared_ptr<ProcessListEntry> process_list_entry;
MPPTaskMap task_map;
std::unordered_map<Int64, std::unordered_map<Int64, grpc::Alarm>> alarms;
/// only used in scheduler
Expand All @@ -53,7 +52,6 @@ struct MPPQueryTaskSet
{
return state == Normal || state == Aborted;
}
~MPPQueryTaskSet();
};

/// A simple thread unsafe FIFO cache used to fix the "lost cancel" issues
Expand Down Expand Up @@ -195,8 +193,6 @@ class MPPTaskManager : private boost::noncopyable

void abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type);

std::pair<std::shared_ptr<ProcessListEntry>, String> getOrCreateQueryProcessListEntry(const MPPQueryId & query_id, const ContextPtr & context);

String toString();

private:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ std::shared_ptr<DB::TrackedMppDataPacket> LocalTunnelSenderV1::readForLocal()
if (result == MPMCQueueResult::OK)
{
MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong());

// switch tunnel's memory tracker into receiver's
res->switchMemTracker(current_memory_tracker);
return res;
}
else if (result == MPMCQueueResult::CANCELLED)
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ class LocalTunnelSenderV2 : public TunnelSender
if (unlikely(checkPacketErr(data)))
return false;

// receiver_mem_tracker pointer will always be valid because ExchangeReceiverBase won't be destructed
// before all local tunnels are destructed so that the MPPTask which contains ExchangeReceiverBase and
// is responsible for deleting receiver_mem_tracker must be destroyed after these local tunnels.
data->switchMemTracker(local_request_handler.recv_mem_tracker);

// When ExchangeReceiver receives data from local and remote tiflash, number of local tunnel threads
// is very large and causes the time of transfering data by grpc threads becomes longer, because
// grpc thread is hard to get chance to push data into MPMCQueue in ExchangeReceiver.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/TrackedMppDataPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct MemTrackerWrapper

struct TrackedMppDataPacket
{
TrackedMppDataPacket(const mpp::MPPDataPacket & data, MemoryTracker * memory_tracker)
explicit TrackedMppDataPacket(const mpp::MPPDataPacket & data, MemoryTracker * memory_tracker)
: mem_tracker_wrapper(estimateAllocatedSize(data), memory_tracker)
{
packet = data;
Expand Down
Loading