Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for SharedQueue #8875

Merged
merged 50 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
7036d82
for task
SeaRise Mar 26, 2024
c0321fb
operator
SeaRise Mar 26, 2024
11b364f
for profile
SeaRise Mar 26, 2024
ff3fea4
fix build
SeaRise Mar 26, 2024
9511939
fix notify
SeaRise Mar 26, 2024
c7084d5
minor refactor
SeaRise Mar 26, 2024
2512249
__thread
SeaRise Mar 26, 2024
9914a58
tmp save
SeaRise Mar 26, 2024
71275bf
tmp
SeaRise Mar 26, 2024
afff674
update
SeaRise Mar 26, 2024
2b2b029
fix
SeaRise Mar 26, 2024
c701633
u
SeaRise Mar 26, 2024
ca9d6f3
refactor
SeaRise Mar 26, 2024
f93842e
fiux
SeaRise Mar 26, 2024
b453caa
Merge branch 'master' into notify_instead_of_pollinh
SeaRise Mar 26, 2024
4b6c363
final
SeaRise Mar 26, 2024
f0c6bfd
final final
SeaRise Mar 26, 2024
a05bd14
refine for lock
SeaRise Mar 27, 2024
0ba130f
address comment
SeaRise Mar 27, 2024
20d297d
Update dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h
SeaRise Mar 27, 2024
28b1a58
update
SeaRise Mar 28, 2024
2b15547
update
SeaRise Mar 29, 2024
acafb58
Merge branch 'master' into shared_queue_notify
SeaRise Mar 29, 2024
00543bf
u
SeaRise Mar 29, 2024
ba964ab
update
SeaRise Mar 29, 2024
6c33a4a
Merge branch 'master' into shared_queue_notify
SeaRise Mar 29, 2024
9e1270d
Update dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
SeaRise Mar 29, 2024
ace773f
fix
SeaRise Mar 29, 2024
8234f45
u
SeaRise Mar 29, 2024
30208c3
fix
SeaRise Apr 1, 2024
a0bffb6
add test case
SeaRise Apr 1, 2024
5b2610b
Update dbms/src/Operators/GetResultSinkOp.cpp
SeaRise Apr 1, 2024
9af4fe3
Update dbms/src/Flash/Executor/PipelineExecutorContext.h
SeaRise Apr 1, 2024
1e63a3b
Update dbms/src/Flash/Executor/PipelineExecutorContext.cpp
SeaRise Apr 1, 2024
72a4c5f
refine
SeaRise Apr 1, 2024
122bcf5
u
SeaRise Apr 1, 2024
05eae12
some refine
SeaRise Apr 1, 2024
81d7936
update
SeaRise Apr 1, 2024
e129791
update
SeaRise Apr 1, 2024
430cce5
Merge branch 'master' into shared_queue_notify
SeaRise Apr 7, 2024
8f57d6f
refine
SeaRise Apr 7, 2024
9747de4
fix
SeaRise Apr 7, 2024
0061987
Merge branch 'master' into shared_queue_notify
SeaRise May 6, 2024
306af86
fix
SeaRise May 6, 2024
41e6d57
Merge branch 'master' into shared_queue_notify
SeaRise May 8, 2024
9049a5b
Update dbms/src/Operators/Operator.cpp
SeaRise May 8, 2024
08ba3a1
Update dbms/src/Operators/Operator.cpp
SeaRise May 8, 2024
3d717c4
Update dbms/src/Operators/Operator.cpp
SeaRise May 8, 2024
1d9563c
Update LooseBoundedMPMCQueue.h
SeaRise May 10, 2024
e533eee
Merge branch 'master' into shared_queue_notify
SeaRise May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 152 additions & 77 deletions dbms/src/Common/LooseBoundedMPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#pragma once

#include <Common/MPMCQueue.h>
#include <Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h>

#include <condition_variable>
#include <deque>

namespace DB
Expand Down Expand Up @@ -48,46 +50,81 @@ class LooseBoundedMPMCQueue
, push_callback(std::move(push_callback_))
{}

/// blocking function.
/// Just like MPMCQueue::push.
template <typename U>
MPMCQueueResult push(U && data)
void registerPipeReadTask(TaskPtr && task)
{
std::unique_lock lock(mu);
writer_head.wait(lock, [&] { return !isFullWithoutLock() || (unlikely(status != MPMCQueueStatus::NORMAL)); });
{
std::lock_guard lock(mu);
if (queue.empty() && status == MPMCQueueStatus::NORMAL)
{
pipe_reader_cv.registerTask(std::move(task));
return;
}
}
PipeConditionVariable::notifyTaskDirectly(std::move(task));
}

if ((likely(status == MPMCQueueStatus::NORMAL)) && !isFullWithoutLock())
void registerPipeWriteTask(TaskPtr && task)
{
{
pushFront(std::forward<U>(data));
return MPMCQueueResult::OK;
std::lock_guard lock(mu);
if (isFullWithoutLock() && status == MPMCQueueStatus::NORMAL)
{
pipe_writer_cv.registerTask(std::move(task));
return;
}
}
PipeConditionVariable::notifyTaskDirectly(std::move(task));
}

switch (status)
/// blocking function.
/// Just like MPMCQueue::push.
template <typename U>
MPMCQueueResult push(U && data)
{
bool notify_writer{false};
{
case MPMCQueueStatus::NORMAL:
return MPMCQueueResult::FULL;
case MPMCQueueStatus::CANCELLED:
return MPMCQueueResult::CANCELLED;
case MPMCQueueStatus::FINISHED:
return MPMCQueueResult::FINISHED;
std::unique_lock lock(mu);
writer_cv.wait(lock, [&] { return !isFullWithoutLock() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
else if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;
else
{
assert(status == MPMCQueueStatus::NORMAL);
assert(!isFullWithoutLock());
notify_writer = pushFront(std::forward<U>(data));
}
}
if (notify_writer)
notifyOneWriter();
notifyOneReader();
return MPMCQueueResult::OK;
}

/// Just like MPMCQueue::tryPush.
template <typename U>
MPMCQueueResult tryPush(U && data)
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;

if (isFullWithoutLock())
return MPMCQueueResult::FULL;

pushFront(std::forward<U>(data));
bool notify_writer{false};
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
else if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;
else
{
if (isFullWithoutLock())
return MPMCQueueResult::FULL;
notify_writer = pushFront(std::forward<U>(data));
}
}
if (notify_writer)
notifyOneWriter();
notifyOneReader();
return MPMCQueueResult::OK;
}

Expand All @@ -97,64 +134,74 @@ class LooseBoundedMPMCQueue
template <typename U>
MPMCQueueResult forcePush(U && data)
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;

pushFront(std::forward<U>(data));
bool notify_writer{false};
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
else if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;
else
notify_writer = pushFront(std::forward<U>(data));
}
if (notify_writer)
notifyOneWriter();
notifyOneReader();
return MPMCQueueResult::OK;
}

MPMCQueueResult pop(T & data)
{
std::unique_lock lock(mu);
reader_head.wait(lock, [&] { return !queue.empty() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

if ((likely(status != MPMCQueueStatus::CANCELLED)) && !queue.empty())
{
data = popBack();
return MPMCQueueResult::OK;
}
std::unique_lock lock(mu);
reader_cv.wait(lock, [&] { return !queue.empty() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

switch (status)
{
case MPMCQueueStatus::NORMAL:
return MPMCQueueResult::EMPTY;
case MPMCQueueStatus::CANCELLED:
return MPMCQueueResult::CANCELLED;
case MPMCQueueStatus::FINISHED:
return MPMCQueueResult::FINISHED;
if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;

if (queue.empty())
{
assert(status == MPMCQueueStatus::FINISHED);
return MPMCQueueResult::FINISHED;
}

data = popBack();
}
notifyOneWriter();
return MPMCQueueResult::OK;
}

MPMCQueueResult tryPop(T & data)
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
{
std::lock_guard lock(mu);
if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;

if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;
if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;

data = popBack();
data = popBack();
}
notifyOneWriter();
return MPMCQueueResult::OK;
}

MPMCQueueResult tryDequeue()
{
std::lock_guard lock(mu);
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;

if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;
if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;

popBack();
popBack();
}
notifyOneWriter();
return MPMCQueueResult::OK;
}

Expand Down Expand Up @@ -208,6 +255,27 @@ class LooseBoundedMPMCQueue
}

private:
void notifyOneReader()
{
reader_cv.notify_one();
pipe_reader_cv.notifyOne();
}

void notifyOneWriter()
{
writer_cv.notify_one();
pipe_writer_cv.notifyOne();
}

void notifyAll()
{
reader_cv.notify_all();
pipe_reader_cv.notifyAll();

writer_cv.notify_all();
pipe_writer_cv.notifyAll();
}

bool isFullWithoutLock() const
{
assert(current_auxiliary_memory_usage >= 0);
Expand All @@ -218,15 +286,18 @@ class LooseBoundedMPMCQueue
template <typename FF>
ALWAYS_INLINE bool changeStatus(FF && ff)
{
std::lock_guard lock(mu);
if likely (status == MPMCQueueStatus::NORMAL)
bool ret{false};
{
ff();
reader_head.notifyAll();
writer_head.notifyAll();
return true;
std::lock_guard lock(mu);
if likely (status == MPMCQueueStatus::NORMAL)
{
ff();
ret = true;
}
}
return false;
if (ret)
notifyAll();
return ret;
}

ALWAYS_INLINE T popBack()
Expand All @@ -235,12 +306,12 @@ class LooseBoundedMPMCQueue
queue.pop_back();
current_auxiliary_memory_usage -= element.memory_usage;
assert(!queue.empty() || current_auxiliary_memory_usage == 0);
writer_head.notifyNext();
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return element.data;
}

// If returns true, then notify writers afterward; if false, no need to notify writers.
template <typename U>
ALWAYS_INLINE void pushFront(U && data)
ALWAYS_INLINE bool pushFront(U && data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding comments about the return value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has added comment If returns true, then notify writers afterward; if false, no need to notify writers..

{
Int64 memory_usage = get_auxiliary_memory_usage(data);
queue.emplace_front(std::forward<U>(data), memory_usage);
Expand All @@ -249,7 +320,7 @@ class LooseBoundedMPMCQueue
{
push_callback(queue.front().data);
}
reader_head.notifyNext();

/// consider a case that the queue capacity is 2, the max_auxiliary_memory_usage is 100,
/// T1: a writer write an object with size 100
/// T2: two writers(w2, w3) try to write, but all blocked because of the max_auxiliary_memory_usage
Expand All @@ -260,8 +331,9 @@ class LooseBoundedMPMCQueue
/// 1. there is another reader
/// 2. there is another writer
/// if we notify the writer if the queue is not full here, w3 can write immediately
if (capacity_limits.max_bytes != std::numeric_limits<Int64>::max() && !isFullWithoutLock())
writer_head.notifyNext();
///
/// return true means that writer should be notified.
return capacity_limits.max_bytes != std::numeric_limits<Int64>::max() && !isFullWithoutLock();
}

private:
Expand All @@ -286,8 +358,11 @@ class LooseBoundedMPMCQueue
const PushCallback push_callback;
Int64 current_auxiliary_memory_usage = 0;

MPMCQueueDetail::WaitingNode reader_head;
MPMCQueueDetail::WaitingNode writer_head;
std::condition_variable reader_cv;
std::condition_variable writer_cv;

PipeConditionVariable pipe_reader_cv;
PipeConditionVariable pipe_writer_cv;

MPMCQueueStatus status = MPMCQueueStatus::NORMAL;
String cancel_reason;
Expand Down
32 changes: 24 additions & 8 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,24 @@ void restoreConcurrency(
{
if (concurrency > 1 && group_builder.concurrency() == 1)
{
auto shared_queue = SharedQueue::build(1, concurrency, max_buffered_bytes);
// Doesn't use `auto [shared_queue_sink_holder, shared_queue_source_holder]` just to make c++ compiler happy.
SharedQueueSinkHolderPtr shared_queue_sink_holder;
SharedQueueSourceHolderPtr shared_queue_source_holder;
std::tie(shared_queue_sink_holder, shared_queue_source_holder)
= SharedQueue::build(exec_context, 1, concurrency, max_buffered_bytes);
// sink op of builder must be empty.
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue));
builder.setSinkOp(
std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue_sink_holder));
});
auto cur_header = group_builder.getCurrentHeader();
group_builder.addGroup();
for (size_t i = 0; i < concurrency; ++i)
group_builder.addConcurrency(
std::make_unique<SharedQueueSourceOp>(exec_context, log->identifier(), cur_header, shared_queue));
group_builder.addConcurrency(std::make_unique<SharedQueueSourceOp>(
exec_context,
log->identifier(),
cur_header,
shared_queue_source_holder));
}
}

Expand All @@ -120,14 +128,22 @@ void executeUnion(
{
if (group_builder.concurrency() > 1)
{
auto shared_queue = SharedQueue::build(group_builder.concurrency(), 1, max_buffered_bytes);
// Doesn't use `auto [shared_queue_sink_holder, shared_queue_source_holder]` just to make c++ compiler happy.
SharedQueueSinkHolderPtr shared_queue_sink_holder;
SharedQueueSourceHolderPtr shared_queue_source_holder;
std::tie(shared_queue_sink_holder, shared_queue_source_holder)
= SharedQueue::build(exec_context, group_builder.concurrency(), 1, max_buffered_bytes);
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue));
builder.setSinkOp(
std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue_sink_holder));
});
auto cur_header = group_builder.getCurrentHeader();
group_builder.addGroup();
group_builder.addConcurrency(
std::make_unique<SharedQueueSourceOp>(exec_context, log->identifier(), cur_header, shared_queue));
group_builder.addConcurrency(std::make_unique<SharedQueueSourceOp>(
exec_context,
log->identifier(),
cur_header,
shared_queue_source_holder));
}
}

Expand Down
Loading