Skip to content

Commit

Permalink
[clangd] Add priorities to background index queue, extract to separat…
Browse files Browse the repository at this point in the history
…e class

Reviewers: kadircet

Subscribers: mgorny, ilya-biryukov, MaskRay, jkorous, arphaman, jfb, llvm-commits

Tags: #llvm

Differential Revision: https://reviews.llvm.org/D64560

llvm-svn: 365773
  • Loading branch information
sam-mccall committed Jul 11, 2019
1 parent 9cf1303 commit 7e27d86
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 139 deletions.
3 changes: 2 additions & 1 deletion clang-tools-extra/clangd/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ add_clang_library(clangDaemon
XRefs.cpp

index/Background.cpp
index/BackgroundRebuild.cpp
index/BackgroundIndexStorage.cpp
index/BackgroundQueue.cpp
index/BackgroundRebuild.cpp
index/CanonicalIncludes.cpp
index/FileIndex.cpp
index/Index.cpp
Expand Down
154 changes: 41 additions & 113 deletions clang-tools-extra/clangd/index/Background.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "index/Background.h"
#include "ClangdUnit.h"
#include "Compiler.h"
#include "Context.h"
#include "Headers.h"
#include "Logger.h"
#include "Path.h"
Expand All @@ -33,8 +34,10 @@
#include "llvm/ADT/StringRef.h"
#include "llvm/ADT/StringSet.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/Path.h"
#include "llvm/Support/Threading.h"

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
Expand All @@ -50,8 +53,6 @@ namespace clang {
namespace clangd {
namespace {

static std::atomic<bool> PreventStarvation = {false};

// Resolves URI to file paths with cache.
class URIToFileCache {
public:
Expand Down Expand Up @@ -134,8 +135,10 @@ BackgroundIndex::BackgroundIndex(
assert(ThreadPoolSize > 0 && "Thread pool size can't be zero.");
assert(this->IndexStorageFactory && "Storage factory can not be null!");
for (unsigned I = 0; I < ThreadPoolSize; ++I) {
ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1),
[this] { run(); });
ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1), [this] {
WithContext Ctx(this->BackgroundContext.clone());
Queue.work([&] { Rebuilder.idle(); });
});
}
}

Expand All @@ -144,113 +147,42 @@ BackgroundIndex::~BackgroundIndex() {
ThreadPool.wait();
}

void BackgroundIndex::stop() {
Rebuilder.shutdown();
{
std::lock_guard<std::mutex> QueueLock(QueueMu);
std::lock_guard<std::mutex> IndexLock(IndexMu);
ShouldStop = true;
}
QueueCV.notify_all();
IndexCV.notify_all();
BackgroundQueue::Task BackgroundIndex::changedFilesTask(
const std::vector<std::string> &ChangedFiles) {
BackgroundQueue::Task T([this, ChangedFiles] {
trace::Span Tracer("BackgroundIndexEnqueue");
// We're doing this asynchronously, because we'll read shards here too.
log("Enqueueing {0} commands for indexing", ChangedFiles.size());
SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size()));

auto NeedsReIndexing = loadShards(std::move(ChangedFiles));
// Run indexing for files that need to be updated.
std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(),
std::mt19937(std::random_device{}()));
std::vector<BackgroundQueue::Task> Tasks;
Tasks.reserve(NeedsReIndexing.size());
for (auto &Elem : NeedsReIndexing)
Tasks.push_back(indexFileTask(std::move(Elem.first), Elem.second));
Queue.append(std::move(Tasks));
});

T.QueuePri = LoadShards;
T.ThreadPri = llvm::ThreadPriority::Default;
return T;
}

void BackgroundIndex::run() {
WithContext Background(BackgroundContext.clone());
while (true) {
llvm::Optional<Task> Task;
llvm::ThreadPriority Priority;
{
std::unique_lock<std::mutex> Lock(QueueMu);
QueueCV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); });
if (ShouldStop) {
Queue.clear();
QueueCV.notify_all();
return;
}
++NumActiveTasks;
std::tie(Task, Priority) = std::move(Queue.front());
Queue.pop_front();
}

if (Priority != llvm::ThreadPriority::Default && !PreventStarvation.load())
llvm::set_thread_priority(Priority);
(*Task)();
if (Priority != llvm::ThreadPriority::Default)
llvm::set_thread_priority(llvm::ThreadPriority::Default);

{
std::unique_lock<std::mutex> Lock(QueueMu);
if (NumActiveTasks == 1 && Queue.empty()) {
// We just finished the last item, the queue is going idle.
Lock.unlock();
Rebuilder.idle();
Lock.lock();
}
assert(NumActiveTasks > 0 && "before decrementing");
--NumActiveTasks;
}
QueueCV.notify_all();
}
}

bool BackgroundIndex::blockUntilIdleForTest(
llvm::Optional<double> TimeoutSeconds) {
std::unique_lock<std::mutex> Lock(QueueMu);
return wait(Lock, QueueCV, timeoutSeconds(TimeoutSeconds),
[&] { return Queue.empty() && NumActiveTasks == 0; });
}

void BackgroundIndex::enqueue(const std::vector<std::string> &ChangedFiles) {
enqueueTask(
[this, ChangedFiles] {
trace::Span Tracer("BackgroundIndexEnqueue");
// We're doing this asynchronously, because we'll read shards here too.
log("Enqueueing {0} commands for indexing", ChangedFiles.size());
SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size()));

auto NeedsReIndexing = loadShards(std::move(ChangedFiles));
// Run indexing for files that need to be updated.
std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(),
std::mt19937(std::random_device{}()));
for (auto &Elem : NeedsReIndexing)
enqueue(std::move(Elem.first), Elem.second);
},
llvm::ThreadPriority::Default);
}

void BackgroundIndex::enqueue(tooling::CompileCommand Cmd,
BackgroundIndexStorage *Storage) {
enqueueTask(Bind(
[this, Storage](tooling::CompileCommand Cmd) {
// We can't use llvm::StringRef here since we are going to
// move from Cmd during the call below.
const std::string FileName = Cmd.Filename;
if (auto Error = index(std::move(Cmd), Storage))
elog("Indexing {0} failed: {1}", FileName,
std::move(Error));
},
std::move(Cmd)),
llvm::ThreadPriority::Background);
}

void BackgroundIndex::enqueueTask(Task T, llvm::ThreadPriority Priority) {
{
std::lock_guard<std::mutex> Lock(QueueMu);
auto I = Queue.end();
// We first store the tasks with Normal priority in the front of the queue.
// Then we store low priority tasks. Normal priority tasks are pretty rare,
// they should not grow beyond single-digit numbers, so it is OK to do
// linear search and insert after that.
if (Priority == llvm::ThreadPriority::Default) {
I = llvm::find_if(
Queue, [](const std::pair<Task, llvm::ThreadPriority> &Elem) {
return Elem.second == llvm::ThreadPriority::Background;
});
}
Queue.insert(I, {std::move(T), Priority});
}
QueueCV.notify_all();
BackgroundQueue::Task
BackgroundIndex::indexFileTask(tooling::CompileCommand Cmd,
BackgroundIndexStorage *Storage) {
BackgroundQueue::Task T([this, Storage, Cmd] {
// We can't use llvm::StringRef here since we are going to
// move from Cmd during the call below.
const std::string FileName = Cmd.Filename;
if (auto Error = index(std::move(Cmd), Storage))
elog("Indexing {0} failed: {1}", FileName, std::move(Error));
});
T.QueuePri = IndexFile;
return T;
}

/// Given index results from a TU, only update symbols coming from files that
Expand Down Expand Up @@ -649,9 +581,5 @@ BackgroundIndex::loadShards(std::vector<std::string> ChangedFiles) {
return NeedsReIndexing;
}

void BackgroundIndex::preventThreadStarvationInTests() {
PreventStarvation.store(true);
}

} // namespace clangd
} // namespace clang
87 changes: 64 additions & 23 deletions clang-tools-extra/clangd/index/Background.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <condition_variable>
#include <deque>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -59,6 +60,47 @@ class BackgroundIndexStorage {
static Factory createDiskBackedStorageFactory();
};

// A priority queue of tasks which can be run on (external) worker threads.
class BackgroundQueue {
public:
/// A work item on the thread pool's queue.
struct Task {
template <typename Func>
explicit Task(Func &&F) : Run(std::forward<Func>(F)){};

std::function<void()> Run;
llvm::ThreadPriority ThreadPri = llvm::ThreadPriority::Background;
// Higher-priority tasks will run first.
unsigned QueuePri = 0;

bool operator<(const Task &O) const { return QueuePri < O.QueuePri; }
};

// Add tasks to the queue.
void push(Task);
void append(std::vector<Task>);

// Process items on the queue until the queue is stopped.
// If the queue becomes empty, OnIdle will be called (on one worker).
void work(std::function<void()> OnIdle = nullptr);

// Stop processing new tasks, allowing all work() calls to return soon.
void stop();

// Disables thread priority lowering to ensure progress on loaded systems.
// Only affects tasks that run after the call.
static void preventThreadStarvationInTests();
LLVM_NODISCARD bool
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds);

private:
std::mutex Mu;
unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks.
std::condition_variable CV;
bool ShouldStop = false;
std::vector<Task> Queue; // max-heap
};

// Builds an in-memory index by by running the static indexer action over
// all commands in a compilation database. Indexing happens in the background.
// FIXME: it should also persist its state on disk for fast start.
Expand All @@ -78,19 +120,22 @@ class BackgroundIndex : public SwapIndex {
// Enqueue translation units for indexing.
// The indexing happens in a background thread, so the symbols will be
// available sometime later.
void enqueue(const std::vector<std::string> &ChangedFiles);
void enqueue(const std::vector<std::string> &ChangedFiles) {
Queue.push(changedFilesTask(ChangedFiles));
}

// Cause background threads to stop after ther current task, any remaining
// tasks will be discarded.
void stop();
void stop() {
Rebuilder.shutdown();
Queue.stop();
}

// Wait until the queue is empty, to allow deterministic testing.
LLVM_NODISCARD bool
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10);

// Disables thread priority lowering in background index to make sure it can
// progress on loaded systems. Only affects tasks that run after the call.
static void preventThreadStarvationInTests();
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10) {
return Queue.blockUntilIdleForTest(TimeoutSeconds);
}

private:
/// Represents the state of a single file when indexing was performed.
Expand All @@ -111,11 +156,8 @@ class BackgroundIndex : public SwapIndex {
const GlobalCompilationDatabase &CDB;
Context BackgroundContext;

// index state
llvm::Error index(tooling::CompileCommand,
BackgroundIndexStorage *IndexStorage);
std::mutex IndexMu;
std::condition_variable IndexCV;

FileSymbols IndexedSymbols;
BackgroundIndexRebuilder Rebuilder;
Expand All @@ -137,19 +179,18 @@ class BackgroundIndex : public SwapIndex {
// Tries to load shards for the ChangedFiles.
std::vector<std::pair<tooling::CompileCommand, BackgroundIndexStorage *>>
loadShards(std::vector<std::string> ChangedFiles);
void enqueue(tooling::CompileCommand Cmd, BackgroundIndexStorage *Storage);

// queue management
using Task = std::function<void()>;
void run(); // Main loop executed by Thread. Runs tasks from Queue.
void enqueueTask(Task T, llvm::ThreadPriority Prioirty);
void enqueueLocked(tooling::CompileCommand Cmd,
BackgroundIndexStorage *IndexStorage);
std::mutex QueueMu;
unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks.
std::condition_variable QueueCV;
bool ShouldStop = false;
std::deque<std::pair<Task, llvm::ThreadPriority>> Queue;

BackgroundQueue::Task
changedFilesTask(const std::vector<std::string> &ChangedFiles);
BackgroundQueue::Task indexFileTask(tooling::CompileCommand Cmd,
BackgroundIndexStorage *Storage);

// from lowest to highest priority
enum QueuePriority {
IndexFile,
LoadShards,
};
BackgroundQueue Queue;
AsyncTaskRunner ThreadPool;
GlobalCompilationDatabase::CommandChanged::Subscription CommandsChanged;
};
Expand Down

0 comments on commit 7e27d86

Please sign in to comment.