Skip to content

Commit

Permalink
Pipeline: use notify instead of polling for SharedQueue (#8875)
Browse files Browse the repository at this point in the history
ref #8869
  • Loading branch information
SeaRise authored May 20, 2024
1 parent d33545d commit 242f2d9
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 130 deletions.
229 changes: 152 additions & 77 deletions dbms/src/Common/LooseBoundedMPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#pragma once

#include <Common/MPMCQueue.h>
#include <Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h>

#include <condition_variable>
#include <deque>

namespace DB
Expand Down Expand Up @@ -48,46 +50,81 @@ class LooseBoundedMPMCQueue
, push_callback(std::move(push_callback_))
{}

/// blocking function.
/// Just like MPMCQueue::push.
template <typename U>
MPMCQueueResult push(U && data)
void registerPipeReadTask(TaskPtr && task)
{
std::unique_lock lock(mu);
writer_head.wait(lock, [&] { return !isFullWithoutLock() || (unlikely(status != MPMCQueueStatus::NORMAL)); });
{
std::lock_guard lock(mu);
if (queue.empty() && status == MPMCQueueStatus::NORMAL)
{
pipe_reader_cv.registerTask(std::move(task));
return;
}
}
PipeConditionVariable::notifyTaskDirectly(std::move(task));
}

if ((likely(status == MPMCQueueStatus::NORMAL)) && !isFullWithoutLock())
void registerPipeWriteTask(TaskPtr && task)
{
{
pushFront(std::forward<U>(data));
return MPMCQueueResult::OK;
std::lock_guard lock(mu);
if (isFullWithoutLock() && status == MPMCQueueStatus::NORMAL)
{
pipe_writer_cv.registerTask(std::move(task));
return;
}
}
PipeConditionVariable::notifyTaskDirectly(std::move(task));
}

switch (status)
/// blocking function.
/// Just like MPMCQueue::push.
template <typename U>
MPMCQueueResult push(U && data)
{
bool notify_writer{false};
{
case MPMCQueueStatus::NORMAL:
return MPMCQueueResult::FULL;
case MPMCQueueStatus::CANCELLED:
return MPMCQueueResult::CANCELLED;
case MPMCQueueStatus::FINISHED:
return MPMCQueueResult::FINISHED;
std::unique_lock lock(mu);
writer_cv.wait(lock, [&] { return !isFullWithoutLock() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
else if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;
else
{
assert(status == MPMCQueueStatus::NORMAL);
assert(!isFullWithoutLock());
notify_writer = pushFront(std::forward<U>(data));
}
}
if (notify_writer)
notifyOneWriter();
notifyOneReader();
return MPMCQueueResult::OK;
}

/// Just like MPMCQueue::tryPush.
template <typename U>
MPMCQueueResult tryPush(U && data)
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;

if (isFullWithoutLock())
return MPMCQueueResult::FULL;

pushFront(std::forward<U>(data));
bool notify_writer{false};
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
else if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;
else
{
if (isFullWithoutLock())
return MPMCQueueResult::FULL;
notify_writer = pushFront(std::forward<U>(data));
}
}
if (notify_writer)
notifyOneWriter();
notifyOneReader();
return MPMCQueueResult::OK;
}

Expand All @@ -97,64 +134,74 @@ class LooseBoundedMPMCQueue
template <typename U>
MPMCQueueResult forcePush(U && data)
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;

pushFront(std::forward<U>(data));
bool notify_writer{false};
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
else if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;
else
notify_writer = pushFront(std::forward<U>(data));
}
if (notify_writer)
notifyOneWriter();
notifyOneReader();
return MPMCQueueResult::OK;
}

MPMCQueueResult pop(T & data)
{
std::unique_lock lock(mu);
reader_head.wait(lock, [&] { return !queue.empty() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

if ((likely(status != MPMCQueueStatus::CANCELLED)) && !queue.empty())
{
data = popBack();
return MPMCQueueResult::OK;
}
std::unique_lock lock(mu);
reader_cv.wait(lock, [&] { return !queue.empty() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

switch (status)
{
case MPMCQueueStatus::NORMAL:
return MPMCQueueResult::EMPTY;
case MPMCQueueStatus::CANCELLED:
return MPMCQueueResult::CANCELLED;
case MPMCQueueStatus::FINISHED:
return MPMCQueueResult::FINISHED;
if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;

if (queue.empty())
{
assert(status == MPMCQueueStatus::FINISHED);
return MPMCQueueResult::FINISHED;
}

data = popBack();
}
notifyOneWriter();
return MPMCQueueResult::OK;
}

MPMCQueueResult tryPop(T & data)
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
{
std::lock_guard lock(mu);
if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;

if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;
if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;

data = popBack();
data = popBack();
}
notifyOneWriter();
return MPMCQueueResult::OK;
}

MPMCQueueResult tryDequeue()
{
std::lock_guard lock(mu);
{
std::lock_guard lock(mu);

if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;
if unlikely (status == MPMCQueueStatus::CANCELLED)
return MPMCQueueResult::CANCELLED;

if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;
if (queue.empty())
return status == MPMCQueueStatus::NORMAL ? MPMCQueueResult::EMPTY : MPMCQueueResult::FINISHED;

popBack();
popBack();
}
notifyOneWriter();
return MPMCQueueResult::OK;
}

Expand Down Expand Up @@ -208,6 +255,27 @@ class LooseBoundedMPMCQueue
}

private:
void notifyOneReader()
{
reader_cv.notify_one();
pipe_reader_cv.notifyOne();
}

void notifyOneWriter()
{
writer_cv.notify_one();
pipe_writer_cv.notifyOne();
}

void notifyAll()
{
reader_cv.notify_all();
pipe_reader_cv.notifyAll();

writer_cv.notify_all();
pipe_writer_cv.notifyAll();
}

bool isFullWithoutLock() const
{
assert(current_auxiliary_memory_usage >= 0);
Expand All @@ -218,15 +286,18 @@ class LooseBoundedMPMCQueue
template <typename FF>
ALWAYS_INLINE bool changeStatus(FF && ff)
{
std::lock_guard lock(mu);
if likely (status == MPMCQueueStatus::NORMAL)
bool ret{false};
{
ff();
reader_head.notifyAll();
writer_head.notifyAll();
return true;
std::lock_guard lock(mu);
if likely (status == MPMCQueueStatus::NORMAL)
{
ff();
ret = true;
}
}
return false;
if (ret)
notifyAll();
return ret;
}

ALWAYS_INLINE T popBack()
Expand All @@ -235,12 +306,12 @@ class LooseBoundedMPMCQueue
queue.pop_back();
current_auxiliary_memory_usage -= element.memory_usage;
assert(!queue.empty() || current_auxiliary_memory_usage == 0);
writer_head.notifyNext();
return element.data;
}

// If returns true, then notify writers afterward; if false, no need to notify writers.
template <typename U>
ALWAYS_INLINE void pushFront(U && data)
ALWAYS_INLINE bool pushFront(U && data)
{
Int64 memory_usage = get_auxiliary_memory_usage(data);
queue.emplace_front(std::forward<U>(data), memory_usage);
Expand All @@ -249,7 +320,7 @@ class LooseBoundedMPMCQueue
{
push_callback(queue.front().data);
}
reader_head.notifyNext();

/// consider a case that the queue capacity is 2, the max_auxiliary_memory_usage is 100,
/// T1: a writer write an object with size 100
/// T2: two writers(w2, w3) try to write, but all blocked because of the max_auxiliary_memory_usage
Expand All @@ -260,8 +331,9 @@ class LooseBoundedMPMCQueue
/// 1. there is another reader
/// 2. there is another writer
/// if we notify the writer if the queue is not full here, w3 can write immediately
if (capacity_limits.max_bytes != std::numeric_limits<Int64>::max() && !isFullWithoutLock())
writer_head.notifyNext();
///
/// return true means that writer should be notified.
return capacity_limits.max_bytes != std::numeric_limits<Int64>::max() && !isFullWithoutLock();
}

private:
Expand All @@ -286,8 +358,11 @@ class LooseBoundedMPMCQueue
const PushCallback push_callback;
Int64 current_auxiliary_memory_usage = 0;

MPMCQueueDetail::WaitingNode reader_head;
MPMCQueueDetail::WaitingNode writer_head;
std::condition_variable reader_cv;
std::condition_variable writer_cv;

PipeConditionVariable pipe_reader_cv;
PipeConditionVariable pipe_writer_cv;

MPMCQueueStatus status = MPMCQueueStatus::NORMAL;
String cancel_reason;
Expand Down
32 changes: 24 additions & 8 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,24 @@ void restoreConcurrency(
{
if (concurrency > 1 && group_builder.concurrency() == 1)
{
auto shared_queue = SharedQueue::build(1, concurrency, max_buffered_bytes);
// Doesn't use `auto [shared_queue_sink_holder, shared_queue_source_holder]` just to make c++ compiler happy.
SharedQueueSinkHolderPtr shared_queue_sink_holder;
SharedQueueSourceHolderPtr shared_queue_source_holder;
std::tie(shared_queue_sink_holder, shared_queue_source_holder)
= SharedQueue::build(exec_context, 1, concurrency, max_buffered_bytes);
// sink op of builder must be empty.
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue));
builder.setSinkOp(
std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue_sink_holder));
});
auto cur_header = group_builder.getCurrentHeader();
group_builder.addGroup();
for (size_t i = 0; i < concurrency; ++i)
group_builder.addConcurrency(
std::make_unique<SharedQueueSourceOp>(exec_context, log->identifier(), cur_header, shared_queue));
group_builder.addConcurrency(std::make_unique<SharedQueueSourceOp>(
exec_context,
log->identifier(),
cur_header,
shared_queue_source_holder));
}
}

Expand All @@ -120,14 +128,22 @@ void executeUnion(
{
if (group_builder.concurrency() > 1)
{
auto shared_queue = SharedQueue::build(group_builder.concurrency(), 1, max_buffered_bytes);
// Doesn't use `auto [shared_queue_sink_holder, shared_queue_source_holder]` just to make c++ compiler happy.
SharedQueueSinkHolderPtr shared_queue_sink_holder;
SharedQueueSourceHolderPtr shared_queue_source_holder;
std::tie(shared_queue_sink_holder, shared_queue_source_holder)
= SharedQueue::build(exec_context, group_builder.concurrency(), 1, max_buffered_bytes);
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue));
builder.setSinkOp(
std::make_unique<SharedQueueSinkOp>(exec_context, log->identifier(), shared_queue_sink_holder));
});
auto cur_header = group_builder.getCurrentHeader();
group_builder.addGroup();
group_builder.addConcurrency(
std::make_unique<SharedQueueSourceOp>(exec_context, log->identifier(), cur_header, shared_queue));
group_builder.addConcurrency(std::make_unique<SharedQueueSourceOp>(
exec_context,
log->identifier(),
cur_header,
shared_queue_source_holder));
}
}

Expand Down
Loading

0 comments on commit 242f2d9

Please sign in to comment.