Skip to content

Commit

Permalink
Re-apply "[ORC] Unify task dispatch across ExecutionSession..." with …
Browse files Browse the repository at this point in the history
…more fixes.

This re-applies 6094b3b, which was reverted in e7efd37 (and before that
in 1effa19) due to bot failures.

The test failures were fixed by having SelfExecutorProcessControl use an
InPlaceTaskDispatcher by default, rather than a DynamicThreadPoolTaskDispatcher.
This shouldn't be necessary (and indicates a concurrency issue elsewhere), but
InPlaceTaskDispatcher is a less surprising default, and better matches the
existing behavior (compilation on current thread by default), so the change
seems reasonable. I've filed #89870
to investigate the concurrency issue as a follow-up.

Coding my way home: 6.25133S 127.94177W
  • Loading branch information
lhames committed Apr 24, 2024
1 parent ad4a42b commit 7da6342
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 66 deletions.
14 changes: 1 addition & 13 deletions llvm/include/llvm/ExecutionEngine/Orc/Core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1443,9 +1443,6 @@ class ExecutionSession {
/// Send a result to the remote.
using SendResultFunction = unique_function<void(shared::WrapperFunctionResult)>;

/// For dispatching ORC tasks (typically materialization tasks).
using DispatchTaskFunction = unique_function<void(std::unique_ptr<Task> T)>;

/// An asynchronous wrapper-function callable from the executor via
/// jit-dispatch.
using JITDispatchHandlerFunction = unique_function<void(
Expand Down Expand Up @@ -1568,12 +1565,6 @@ class ExecutionSession {
/// Unhandled errors can be sent here to log them.
void reportError(Error Err) { ReportError(std::move(Err)); }

/// Set the task dispatch function.
ExecutionSession &setDispatchTask(DispatchTaskFunction DispatchTask) {
this->DispatchTask = std::move(DispatchTask);
return *this;
}

/// Search the given JITDylibs to find the flags associated with each of the
/// given symbols.
void lookupFlags(LookupKind K, JITDylibSearchOrder SearchOrder,
Expand Down Expand Up @@ -1648,7 +1639,7 @@ class ExecutionSession {
void dispatchTask(std::unique_ptr<Task> T) {
assert(T && "T must be non-null");
DEBUG_WITH_TYPE("orc", dumpDispatchInfo(*T));
DispatchTask(std::move(T));
EPC->getDispatcher().dispatch(std::move(T));
}

/// Run a wrapper function in the executor.
Expand Down Expand Up @@ -1762,8 +1753,6 @@ class ExecutionSession {
logAllUnhandledErrors(std::move(Err), errs(), "JIT session error: ");
}

static void runOnCurrentThread(std::unique_ptr<Task> T) { T->run(); }

void dispatchOutstandingMUs();

static std::unique_ptr<MaterializationResponsibility>
Expand Down Expand Up @@ -1869,7 +1858,6 @@ class ExecutionSession {
std::unique_ptr<ExecutorProcessControl> EPC;
std::unique_ptr<Platform> P;
ErrorReporter ReportError = logErrorsToStdErr;
DispatchTaskFunction DispatchTask = runOnCurrentThread;

std::vector<ResourceManager *> ResourceManagers;

Expand Down
25 changes: 16 additions & 9 deletions llvm/include/llvm/ExecutionEngine/Orc/LLJIT.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ class LLJIT {

DataLayout DL;
Triple TT;
std::unique_ptr<DefaultThreadPool> CompileThreads;

std::unique_ptr<ObjectLayer> ObjLinkingLayer;
std::unique_ptr<ObjectTransformLayer> ObjTransformLayer;
Expand Down Expand Up @@ -325,6 +324,7 @@ class LLJITBuilderState {
PlatformSetupFunction SetUpPlatform;
NotifyCreatedFunction NotifyCreated;
unsigned NumCompileThreads = 0;
std::optional<bool> SupportConcurrentCompilation;

/// Called prior to JIT class construcion to fix up defaults.
Error prepareForConstruction();
Expand All @@ -333,7 +333,7 @@ class LLJITBuilderState {
template <typename JITType, typename SetterImpl, typename State>
class LLJITBuilderSetters {
public:
/// Set a ExecutorProcessControl for this instance.
/// Set an ExecutorProcessControl for this instance.
/// This should not be called if ExecutionSession has already been set.
SetterImpl &
setExecutorProcessControl(std::unique_ptr<ExecutorProcessControl> EPC) {
Expand Down Expand Up @@ -462,19 +462,26 @@ class LLJITBuilderSetters {
///
/// If this method is not called, behavior will be as if it were called with
/// a zero argument.
///
/// This setting should not be used if a custom ExecutionSession or
/// ExecutorProcessControl object is set: in those cases a custom
/// TaskDispatcher should be used instead.
SetterImpl &setNumCompileThreads(unsigned NumCompileThreads) {
impl().NumCompileThreads = NumCompileThreads;
return impl();
}

/// Set an ExecutorProcessControl object.
/// If set, this forces LLJIT concurrent compilation support to be either on
/// or off. This controls the selection of compile function (concurrent vs
/// single threaded) and whether or not sub-modules are cloned to new
/// contexts for lazy emission.
///
/// If the platform uses ObjectLinkingLayer by default and no
/// ObjectLinkingLayerCreator has been set then the ExecutorProcessControl
/// object will be used to supply the memory manager for the
/// ObjectLinkingLayer.
SetterImpl &setExecutorProcessControl(ExecutorProcessControl &EPC) {
impl().EPC = &EPC;
/// If not explicitly set then concurrency support will be turned on if
/// NumCompileThreads is set to a non-zero value, or if a custom
/// ExecutionSession or ExecutorProcessControl instance is provided.
SetterImpl &setSupportConcurrentCompilation(
std::optional<bool> SupportConcurrentCompilation) {
impl().SupportConcurrentCompilation = SupportConcurrentCompilation;
return impl();
}

Expand Down
8 changes: 8 additions & 0 deletions llvm/include/llvm/ExecutionEngine/Orc/TaskDispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#if LLVM_ENABLE_THREADS
#include <condition_variable>
#include <deque>
#include <mutex>
#include <thread>
#endif
Expand Down Expand Up @@ -114,13 +115,20 @@ class InPlaceTaskDispatcher : public TaskDispatcher {

class DynamicThreadPoolTaskDispatcher : public TaskDispatcher {
public:
DynamicThreadPoolTaskDispatcher(
std::optional<size_t> MaxMaterializationThreads)
: MaxMaterializationThreads(MaxMaterializationThreads) {}
void dispatch(std::unique_ptr<Task> T) override;
void shutdown() override;
private:
std::mutex DispatchMutex;
bool Running = true;
size_t Outstanding = 0;
std::condition_variable OutstandingCV;

std::optional<size_t> MaxMaterializationThreads;
size_t NumMaterializationThreads = 0;
std::deque<std::unique_ptr<Task>> MaterializationTaskQueue;
};

#endif // LLVM_ENABLE_THREADS
Expand Down
7 changes: 1 addition & 6 deletions llvm/lib/ExecutionEngine/Orc/ExecutorProcessControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,8 @@ SelfExecutorProcessControl::Create(
if (!SSP)
SSP = std::make_shared<SymbolStringPool>();

if (!D) {
#if LLVM_ENABLE_THREADS
D = std::make_unique<DynamicThreadPoolTaskDispatcher>();
#else
if (!D)
D = std::make_unique<InPlaceTaskDispatcher>();
#endif
}

auto PageSize = sys::Process::getPageSize();
if (!PageSize)
Expand Down
77 changes: 53 additions & 24 deletions llvm/lib/ExecutionEngine/Orc/LLJIT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,37 @@ Error LLJITBuilderState::prepareForConstruction() {
return JTMBOrErr.takeError();
}

if ((ES || EPC) && NumCompileThreads)
return make_error<StringError>(
"NumCompileThreads cannot be used with a custom ExecutionSession or "
"ExecutorProcessControl",
inconvertibleErrorCode());

#if !LLVM_ENABLE_THREADS
if (NumCompileThreads)
return make_error<StringError>(
"LLJIT num-compile-threads is " + Twine(NumCompileThreads) +
" but LLVM was compiled with LLVM_ENABLE_THREADS=Off",
inconvertibleErrorCode());
#endif // !LLVM_ENABLE_THREADS

bool ConcurrentCompilationSettingDefaulted = !SupportConcurrentCompilation;
if (!SupportConcurrentCompilation) {
#if LLVM_ENABLE_THREADS
SupportConcurrentCompilation = NumCompileThreads || ES || EPC;
#else
SupportConcurrentCompilation = false;
#endif // LLVM_ENABLE_THREADS
} else {
#if !LLVM_ENABLE_THREADS
if (*SupportConcurrentCompilation)
return make_error<StringError>(
"LLJIT concurrent compilation support requested, but LLVM was built "
"with LLVM_ENABLE_THREADS=Off",
inconvertibleErrorCode());
#endif // !LLVM_ENABLE_THREADS
}

LLVM_DEBUG({
dbgs() << " JITTargetMachineBuilder is "
<< JITTargetMachineBuilderPrinter(*JTMB, " ")
Expand All @@ -684,11 +715,13 @@ Error LLJITBuilderState::prepareForConstruction() {
<< (CreateCompileFunction ? "Yes" : "No") << "\n"
<< " Custom platform-setup function: "
<< (SetUpPlatform ? "Yes" : "No") << "\n"
<< " Number of compile threads: " << NumCompileThreads;
if (!NumCompileThreads)
dbgs() << " (code will be compiled on the execution thread)\n";
<< " Support concurrent compilation: "
<< (*SupportConcurrentCompilation ? "Yes" : "No");
if (ConcurrentCompilationSettingDefaulted)
dbgs() << " (defaulted based on ES / EPC)\n";
else
dbgs() << "\n";
dbgs() << " Number of compile threads: " << NumCompileThreads << "\n";
});

// Create DL if not specified.
Expand All @@ -705,7 +738,19 @@ Error LLJITBuilderState::prepareForConstruction() {
dbgs() << "ExecutorProcessControl not specified, "
"Creating SelfExecutorProcessControl instance\n";
});
if (auto EPCOrErr = SelfExecutorProcessControl::Create())

std::unique_ptr<TaskDispatcher> D = nullptr;
#if LLVM_ENABLE_THREADS
if (*SupportConcurrentCompilation) {
std::optional<size_t> NumThreads = std ::nullopt;
if (NumCompileThreads)
NumThreads = NumCompileThreads;
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(NumThreads);
} else
D = std::make_unique<InPlaceTaskDispatcher>();
#endif // LLVM_ENABLE_THREADS
if (auto EPCOrErr =
SelfExecutorProcessControl::Create(nullptr, std::move(D), nullptr))
EPC = std::move(*EPCOrErr);
else
return EPCOrErr.takeError();
Expand Down Expand Up @@ -790,8 +835,6 @@ Error LLJITBuilderState::prepareForConstruction() {
}

LLJIT::~LLJIT() {
if (CompileThreads)
CompileThreads->wait();
if (auto Err = ES->endSession())
ES->reportError(std::move(Err));
}
Expand Down Expand Up @@ -916,9 +959,8 @@ LLJIT::createCompileFunction(LLJITBuilderState &S,
if (S.CreateCompileFunction)
return S.CreateCompileFunction(std::move(JTMB));

// Otherwise default to creating a SimpleCompiler, or ConcurrentIRCompiler,
// depending on the number of threads requested.
if (S.NumCompileThreads > 0)
// If using a custom EPC then use a ConcurrentIRCompiler by default.
if (*S.SupportConcurrentCompilation)
return std::make_unique<ConcurrentIRCompiler>(std::move(JTMB));

auto TM = JTMB.createTargetMachine();
Expand Down Expand Up @@ -970,21 +1012,8 @@ LLJIT::LLJIT(LLJITBuilderState &S, Error &Err)
std::make_unique<IRTransformLayer>(*ES, *TransformLayer);
}

if (S.NumCompileThreads > 0) {
if (*S.SupportConcurrentCompilation)
InitHelperTransformLayer->setCloneToNewContextOnEmit(true);
CompileThreads = std::make_unique<DefaultThreadPool>(
hardware_concurrency(S.NumCompileThreads));
ES->setDispatchTask([this](std::unique_ptr<Task> T) {
// FIXME: We should be able to use move-capture here, but ThreadPool's
// AsyncTaskTys are std::functions rather than unique_functions
// (because MSVC's std::packaged_tasks don't support move-only types).
// Fix this when all the above gets sorted out.
CompileThreads->async([UnownedT = T.release()]() mutable {
std::unique_ptr<Task> T(UnownedT);
T->run();
});
});
}

if (S.SetupProcessSymbolsJITDylib) {
if (auto ProcSymsJD = S.SetupProcessSymbolsJITDylib(*this)) {
Expand Down Expand Up @@ -1240,7 +1269,7 @@ LLLazyJIT::LLLazyJIT(LLLazyJITBuilderState &S, Error &Err) : LLJIT(S, Err) {
CODLayer = std::make_unique<CompileOnDemandLayer>(
*ES, *InitHelperTransformLayer, *LCTMgr, std::move(ISMBuilder));

if (S.NumCompileThreads > 0)
if (*S.SupportConcurrentCompilation)
CODLayer->setCloneToNewContextOnEmit(true);
}

Expand Down
47 changes: 42 additions & 5 deletions llvm/lib/ExecutionEngine/Orc/TaskDispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//===----------------------------------------------------------------------===//

#include "llvm/ExecutionEngine/Orc/TaskDispatch.h"
#include "llvm/ExecutionEngine/Orc/Core.h"

namespace llvm {
namespace orc {
Expand All @@ -24,16 +25,52 @@ void InPlaceTaskDispatcher::shutdown() {}

#if LLVM_ENABLE_THREADS
void DynamicThreadPoolTaskDispatcher::dispatch(std::unique_ptr<Task> T) {
bool IsMaterializationTask = isa<MaterializationTask>(*T);

{
std::lock_guard<std::mutex> Lock(DispatchMutex);

if (IsMaterializationTask) {

// If this is a materialization task and there are too many running
// already then queue this one up and return early.
if (MaxMaterializationThreads &&
NumMaterializationThreads == *MaxMaterializationThreads) {
MaterializationTaskQueue.push_back(std::move(T));
return;
}

// Otherwise record that we have a materialization task running.
++NumMaterializationThreads;
}

++Outstanding;
}

std::thread([this, T = std::move(T)]() mutable {
T->run();
std::lock_guard<std::mutex> Lock(DispatchMutex);
--Outstanding;
OutstandingCV.notify_all();
std::thread([this, T = std::move(T), IsMaterializationTask]() mutable {
while (true) {

// Run the task.
T->run();

std::lock_guard<std::mutex> Lock(DispatchMutex);
if (!MaterializationTaskQueue.empty()) {
// If there are any materialization tasks running then steal that work.
T = std::move(MaterializationTaskQueue.front());
MaterializationTaskQueue.pop_front();
if (!IsMaterializationTask) {
++NumMaterializationThreads;
IsMaterializationTask = true;
}
} else {
// Otherwise decrement work counters.
if (IsMaterializationTask)
--NumMaterializationThreads;
--Outstanding;
OutstandingCV.notify_all();
return;
}
}
}).detach();
}

Expand Down
6 changes: 3 additions & 3 deletions llvm/tools/llvm-jitlink/llvm-jitlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,8 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> launchExecutor() {
S.CreateMemoryManager = createSharedMemoryManager;

return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
std::make_unique<DynamicThreadPoolTaskDispatcher>(), std::move(S),
FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
std::move(S), FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
#endif
}

Expand Down Expand Up @@ -897,7 +897,7 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> connectToExecutor() {
S.CreateMemoryManager = createSharedMemoryManager;

return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
std::make_unique<DynamicThreadPoolTaskDispatcher>(),
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
std::move(S), *SockFD, *SockFD);
#endif
}
Expand Down
8 changes: 4 additions & 4 deletions llvm/unittests/ExecutionEngine/Orc/CoreAPIsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,11 @@ TEST_F(CoreAPIsStandardTest, RedefineBoundWeakSymbol) {

TEST_F(CoreAPIsStandardTest, DefineMaterializingSymbol) {
bool ExpectNoMoreMaterialization = false;
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
DispatchOverride = [&](std::unique_ptr<Task> T) {
if (ExpectNoMoreMaterialization && isa<MaterializationTask>(*T))
ADD_FAILURE() << "Unexpected materialization";
T->run();
});
};

auto MU = std::make_unique<SimpleMaterializationUnit>(
SymbolFlagsMap({{Foo, FooSym.getFlags()}}),
Expand Down Expand Up @@ -1403,7 +1403,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {

std::mutex WorkThreadsMutex;
std::vector<std::thread> WorkThreads;
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
DispatchOverride = [&](std::unique_ptr<Task> T) {
std::promise<void> WaitP;
std::lock_guard<std::mutex> Lock(WorkThreadsMutex);
WorkThreads.push_back(
Expand All @@ -1412,7 +1412,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {
T->run();
}));
WaitP.set_value();
});
};

cantFail(JD.define(absoluteSymbols({{Foo, FooSym}})));

Expand Down
Loading

0 comments on commit 7da6342

Please sign in to comment.