Skip to content

Commit

Permalink
[Support][Parallel] Add sequential mode to TaskGroup::spawn().
Browse files Browse the repository at this point in the history
This patch allows to specify that some part of tasks should be
done in sequential order. It makes it possible to not use
condition operator for separating sequential tasks:

TaskGroup tg;
for () {
  if(condition)      ==>   tg.spawn([](){fn();}, condition)
    fn();
  else
    tg.spawn([](){fn();});
}

It also prevents execution on main thread. Which allows adding
checks for getThreadIndex() function discussed in D142318.

The patch also replaces std::stack with std::deque in the
ThreadPoolExecutor to have natural execution order in case
(parallel::strategy.ThreadsRequested == 1).

Differential Revision: https://reviews.llvm.org/D148728
  • Loading branch information
avl-llvm committed Apr 26, 2023
1 parent 329bfcc commit fea8c07
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 43 deletions.
2 changes: 1 addition & 1 deletion lld/ELF/OutputSections.cpp
Expand Up @@ -534,7 +534,7 @@ void OutputSection::writeTo(uint8_t *buf, parallel::TaskGroup &tg) {
taskSize += sections[i]->getSize();
bool done = ++i == numSections;
if (done || taskSize >= taskSizeLimit) {
tg.execute([=] { fn(begin, i); });
tg.spawn([=] { fn(begin, i); });
if (done)
break;
begin = i;
Expand Down
7 changes: 2 additions & 5 deletions lld/ELF/Relocations.cpp
Expand Up @@ -1534,16 +1534,13 @@ template <class ELFT> void elf::scanRelocations() {
scanner.template scanSection<ELFT>(*s);
}
};
if (serial)
fn();
else
tg.execute(fn);
tg.spawn(fn, serial);
}

// Both the main thread and thread pool index 0 use getThreadIndex()==0. Be
// careful that they don't concurrently run scanSections. When serial is
// true, fn() has finished at this point, so running execute is safe.
tg.execute([] {
tg.spawn([] {
RelocationScanner scanner;
for (Partition &part : partitions) {
for (EhInputSection *sec : part.ehFrame->sections)
Expand Down
23 changes: 5 additions & 18 deletions llvm/include/llvm/Support/Parallel.h
Expand Up @@ -84,24 +84,11 @@ class TaskGroup {
~TaskGroup();

// Spawn a task, but does not wait for it to finish.
void spawn(std::function<void()> f);

// Similar to spawn, but execute the task immediately when ThreadsRequested ==
// 1. The difference is to give the following pattern a more intuitive order
// when single threading is requested.
//
// for (size_t begin = 0, i = 0, taskSize = 0;;) {
// taskSize += ...
// bool done = ++i == end;
// if (done || taskSize >= taskSizeLimit) {
// tg.execute([=] { fn(begin, i); });
// if (done)
// break;
// begin = i;
// taskSize = 0;
// }
// }
void execute(std::function<void()> f);
// Tasks marked with \p Sequential will be executed
// exactly in the order which they were spawned.
// Note: Sequential tasks may be executed on different
// threads, but strictly in sequential order.
void spawn(std::function<void()> f, bool Sequential = false);

void sync() const { L.sync(); }
};
Expand Down
58 changes: 39 additions & 19 deletions llvm/lib/Support/Parallel.cpp
Expand Up @@ -12,8 +12,8 @@
#include "llvm/Support/Threading.h"

#include <atomic>
#include <deque>
#include <future>
#include <stack>
#include <thread>
#include <vector>

Expand All @@ -39,7 +39,7 @@ namespace {
class Executor {
public:
virtual ~Executor() = default;
virtual void add(std::function<void()> func) = 0;
virtual void add(std::function<void()> func, bool Sequential = false) = 0;

static Executor *getDefaultExecutor();
};
Expand Down Expand Up @@ -97,32 +97,56 @@ class ThreadPoolExecutor : public Executor {
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
};

void add(std::function<void()> F) override {
void add(std::function<void()> F, bool Sequential = false) override {
{
bool UseSequentialQueue =
Sequential || parallel::strategy.ThreadsRequested == 1;
std::lock_guard<std::mutex> Lock(Mutex);
WorkStack.push(std::move(F));
if (UseSequentialQueue)
WorkQueueSequential.emplace_front(std::move(F));
else
WorkQueue.emplace_back(std::move(F));
}
Cond.notify_one();
}

private:
bool hasSequentialTasks() const {
return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
}

bool hasGeneralTasks() const { return !WorkQueue.empty(); }

void work(ThreadPoolStrategy S, unsigned ThreadID) {
threadIndex = ThreadID;
S.apply_thread_strategy(ThreadID);
while (true) {
std::unique_lock<std::mutex> Lock(Mutex);
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
Cond.wait(Lock, [&] {
return Stop || hasGeneralTasks() || hasSequentialTasks();
});
if (Stop)
break;
auto Task = std::move(WorkStack.top());
WorkStack.pop();
bool Sequential = hasSequentialTasks();
if (Sequential)
SequentialQueueIsLocked = true;
else
assert(hasGeneralTasks());

auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
auto Task = std::move(Queue.back());
Queue.pop_back();
Lock.unlock();
Task();
if (Sequential)
SequentialQueueIsLocked = false;
}
}

std::atomic<bool> Stop{false};
std::stack<std::function<void()>> WorkStack;
std::atomic<bool> SequentialQueueIsLocked{false};
std::deque<std::function<void()>> WorkQueue;
std::deque<std::function<void()>> WorkQueueSequential;
std::mutex Mutex;
std::condition_variable Cond;
std::promise<void> ThreadsCreated;
Expand Down Expand Up @@ -172,26 +196,22 @@ TaskGroup::~TaskGroup() {
--TaskGroupInstances;
}

void TaskGroup::spawn(std::function<void()> F) {
void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
#if LLVM_ENABLE_THREADS
if (Parallel) {
L.inc();
detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
F();
L.dec();
});
detail::Executor::getDefaultExecutor()->add(
[&, F = std::move(F)] {
F();
L.dec();
},
Sequential);
return;
}
#endif
F();
}

void TaskGroup::execute(std::function<void()> F) {
if (parallel::strategy.ThreadsRequested == 1)
F();
else
spawn(F);
}
} // namespace parallel
} // namespace llvm

Expand Down
10 changes: 10 additions & 0 deletions llvm/unittests/Support/ParallelTest.cpp
Expand Up @@ -92,4 +92,14 @@ TEST(Parallel, ForEachError) {
EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf"));
}

TEST(Parallel, TaskGroupSequentialFor) {
size_t Count = 0;
{
parallel::TaskGroup tg;
for (size_t Idx = 0; Idx < 500; Idx++)
tg.spawn([&Count, Idx]() { EXPECT_EQ(Count++, Idx); }, true);
}
EXPECT_EQ(Count, 500ul);
}

#endif

0 comments on commit fea8c07

Please sign in to comment.