-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline: use notify instead of polling for ExchangeReceiver
#9073
Conversation
ExchangeReceiver
ExchangeReceiver
@@ -374,6 +374,8 @@ class DAGContext | |||
/* const */ bool is_disaggregated_task = false; // a disagg task handling by the write node | |||
// `tunnel_set` is always set by `MPPTask` and is used later. | |||
MPPTunnelSetPtr tunnel_set; | |||
// `mpp_receiver_set` is always set by `MPPTask` and is used later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move it to here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be solved by adding a public function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A public function can be added here, but since mpp-tunnel-set is already in the public section, it might be better to maintain consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it will change the destructor order in DAGContext
, I'm not sure if there is some potential risk of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, updated.
@@ -152,6 +150,15 @@ MPMCQueueResult ReceivedMessageQueue::pop(size_t stream_id, ReceivedMessagePtr & | |||
grpc_recv_queue.tryDequeue(); | |||
#endif | |||
} | |||
ExchangeReceiverMetric::subDataSizeMetric(*data_size_in_queue, recv_msg->getPacket().ByteSizeLong()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be a bug in the previous implementation, this subDataSizeMetric
should only be called if recv_msg->getRemainingConsumers()->fetch_sub(1) == 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, updated.
|
||
extern thread_local NotifyFuturePtr current_notify_future; | ||
extern thread_local NotifyFuture * current_notify_future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this change may increase the risk of using a object that have been already released, is it by design that the current_notify_future
will not be released during its lifecycle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, current_notify_future
is only released when the pipeline task changes from wait-for-notify
to running
: queue(log_, std::forward<Args>(args)...) | ||
{} | ||
|
||
void registerTask(TaskPtr && task) override { queue.registerPipeReadTask(std::move(task)); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename it to registerPipelineReadTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is an override of NotifyFuture::registerTask, it cannot be renamed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
private: | ||
using QueueImpl = LooseBoundedMPMCQueue<ReceivedMessagePtr>; | ||
// these are unbounded queues. | ||
std::unique_ptr<QueueImpl> queue = std::make_unique<QueueImpl>(std::numeric_limits<size_t>::max()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use unique_ptr
? Also, how about letting MSGChannel
inherits LooseBoundedMPMCQueue<ReceivedMessagePtr>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ref #7963,
Because LooseBoundedMPMCQueue includes locks, it must use unique_ptr or shared_ptr somewhere.
void registerPipeWriteTask(TaskPtr && task) { queue.registerPipeWriteTask(std::move(task)); } | ||
|
||
private: | ||
GRPCRecvQueue<ReceivedMessagePtr> queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about directly letting GRPCRecvNotifyQueue
inherit GRPCRecvQueue<ReceivedMessagePtr>
? If so, these functions do not need to be written, only the registerTask
function is required.
fa91f69
to
bb18d78
Compare
Signed-off-by: gengliqi <gengliqiii@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: gengliqi, windtalker The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
/retest |
/retest |
What problem does this PR solve?
Issue Number: ref #8869
Problem Summary:
What is changed and how it works?
The cpu usage of wait reactor is reduced to 0 in tpch50.
Check List
Tests
Tested tpch50, no performance regression
Side effects
Documentation
Release note