Skip to content

Commit

Permalink
Revert "[ORC] Unify task dispatch across ExecutionSession and Executo…
Browse files Browse the repository at this point in the history
…rProcessControl."

This reverts commit 6094b3b.

Multiple bots are broken.
  • Loading branch information
joker-eph committed Apr 22, 2024
1 parent 2e2ac6f commit a28557a
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 157 deletions.
14 changes: 13 additions & 1 deletion llvm/include/llvm/ExecutionEngine/Orc/Core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,9 @@ class ExecutionSession {
/// Send a result to the remote.
using SendResultFunction = unique_function<void(shared::WrapperFunctionResult)>;

/// For dispatching ORC tasks (typically materialization tasks).
using DispatchTaskFunction = unique_function<void(std::unique_ptr<Task> T)>;

/// An asynchronous wrapper-function callable from the executor via
/// jit-dispatch.
using JITDispatchHandlerFunction = unique_function<void(
Expand Down Expand Up @@ -1565,6 +1568,12 @@ class ExecutionSession {
/// Unhandled errors can be sent here to log them.
void reportError(Error Err) { ReportError(std::move(Err)); }

/// Set the task dispatch function.
ExecutionSession &setDispatchTask(DispatchTaskFunction DispatchTask) {
this->DispatchTask = std::move(DispatchTask);
return *this;
}

/// Search the given JITDylibs to find the flags associated with each of the
/// given symbols.
void lookupFlags(LookupKind K, JITDylibSearchOrder SearchOrder,
Expand Down Expand Up @@ -1639,7 +1648,7 @@ class ExecutionSession {
void dispatchTask(std::unique_ptr<Task> T) {
assert(T && "T must be non-null");
DEBUG_WITH_TYPE("orc", dumpDispatchInfo(*T));
EPC->getDispatcher().dispatch(std::move(T));
DispatchTask(std::move(T));
}

/// Run a wrapper function in the executor.
Expand Down Expand Up @@ -1753,6 +1762,8 @@ class ExecutionSession {
logAllUnhandledErrors(std::move(Err), errs(), "JIT session error: ");
}

static void runOnCurrentThread(std::unique_ptr<Task> T) { T->run(); }

void dispatchOutstandingMUs();

static std::unique_ptr<MaterializationResponsibility>
Expand Down Expand Up @@ -1858,6 +1869,7 @@ class ExecutionSession {
std::unique_ptr<ExecutorProcessControl> EPC;
std::unique_ptr<Platform> P;
ErrorReporter ReportError = logErrorsToStdErr;
DispatchTaskFunction DispatchTask = runOnCurrentThread;

std::vector<ResourceManager *> ResourceManagers;

Expand Down
25 changes: 9 additions & 16 deletions llvm/include/llvm/ExecutionEngine/Orc/LLJIT.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class LLJIT {

DataLayout DL;
Triple TT;
std::unique_ptr<DefaultThreadPool> CompileThreads;

std::unique_ptr<ObjectLayer> ObjLinkingLayer;
std::unique_ptr<ObjectTransformLayer> ObjTransformLayer;
Expand Down Expand Up @@ -324,7 +325,6 @@ class LLJITBuilderState {
PlatformSetupFunction SetUpPlatform;
NotifyCreatedFunction NotifyCreated;
unsigned NumCompileThreads = 0;
std::optional<bool> SupportConcurrentCompilation;

/// Called prior to JIT class construcion to fix up defaults.
Error prepareForConstruction();
Expand All @@ -333,7 +333,7 @@ class LLJITBuilderState {
template <typename JITType, typename SetterImpl, typename State>
class LLJITBuilderSetters {
public:
/// Set an ExecutorProcessControl for this instance.
/// Set a ExecutorProcessControl for this instance.
/// This should not be called if ExecutionSession has already been set.
SetterImpl &
setExecutorProcessControl(std::unique_ptr<ExecutorProcessControl> EPC) {
Expand Down Expand Up @@ -462,26 +462,19 @@ class LLJITBuilderSetters {
///
/// If this method is not called, behavior will be as if it were called with
/// a zero argument.
///
/// This setting should not be used if a custom ExecutionSession or
/// ExecutorProcessControl object is set: in those cases a custom
/// TaskDispatcher should be used instead.
SetterImpl &setNumCompileThreads(unsigned NumCompileThreads) {
impl().NumCompileThreads = NumCompileThreads;
return impl();
}

/// If set, this forces LLJIT concurrent compilation support to be either on
/// or off. This controls the selection of compile function (concurrent vs
/// single threaded) and whether or not sub-modules are cloned to new
/// contexts for lazy emission.
/// Set an ExecutorProcessControl object.
///
/// If not explicitly set then concurrency support will be turned on if
/// NumCompileThreads is set to a non-zero value, or if a custom
/// ExecutionSession or ExecutorProcessControl instance is provided.
SetterImpl &setSupportConcurrentCompilation(
std::optional<bool> SupportConcurrentCompilation) {
impl().SupportConcurrentCompilation = SupportConcurrentCompilation;
/// If the platform uses ObjectLinkingLayer by default and no
/// ObjectLinkingLayerCreator has been set then the ExecutorProcessControl
/// object will be used to supply the memory manager for the
/// ObjectLinkingLayer.
SetterImpl &setExecutorProcessControl(ExecutorProcessControl &EPC) {
impl().EPC = &EPC;
return impl();
}

Expand Down
7 changes: 0 additions & 7 deletions llvm/include/llvm/ExecutionEngine/Orc/TaskDispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,13 @@ class InPlaceTaskDispatcher : public TaskDispatcher {

class DynamicThreadPoolTaskDispatcher : public TaskDispatcher {
public:
DynamicThreadPoolTaskDispatcher(
std::optional<size_t> MaxMaterializationThreads)
: MaxMaterializationThreads(MaxMaterializationThreads) {}
void dispatch(std::unique_ptr<Task> T) override;
void shutdown() override;
private:
std::mutex DispatchMutex;
bool Running = true;
size_t Outstanding = 0;
std::condition_variable OutstandingCV;

std::optional<size_t> MaxMaterializationThreads;
size_t NumMaterializationThreads = 0;
std::deque<std::unique_ptr<Task>> MaterializationTaskQueue;
};

#endif // LLVM_ENABLE_THREADS
Expand Down
2 changes: 1 addition & 1 deletion llvm/lib/ExecutionEngine/Orc/ExecutorProcessControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ SelfExecutorProcessControl::Create(

if (!D) {
#if LLVM_ENABLE_THREADS
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt);
D = std::make_unique<DynamicThreadPoolTaskDispatcher>();
#else
D = std::make_unique<InPlaceTaskDispatcher>();
#endif
Expand Down
77 changes: 24 additions & 53 deletions llvm/lib/ExecutionEngine/Orc/LLJIT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,37 +667,6 @@ Error LLJITBuilderState::prepareForConstruction() {
return JTMBOrErr.takeError();
}

if ((ES || EPC) && NumCompileThreads)
return make_error<StringError>(
"NumCompileThreads cannot be used with a custom ExecutionSession or "
"ExecutorProcessControl",
inconvertibleErrorCode());

#if !LLVM_ENABLE_THREADS
if (NumCompileThreads)
return make_error<StringError>(
"LLJIT num-compile-threads is " + Twine(NumCompileThreads) +
" but LLVM was compiled with LLVM_ENABLE_THREADS=Off",
inconvertibleErrorCode());
#endif // !LLVM_ENABLE_THREADS

bool ConcurrentCompilationSettingDefaulted = !SupportConcurrentCompilation;
if (!SupportConcurrentCompilation) {
#if LLVM_ENABLE_THREADS
SupportConcurrentCompilation = NumCompileThreads || ES || EPC;
#else
SupportConcurrentCompilation = false;
#endif // LLVM_ENABLE_THREADS
} else {
#if !LLVM_ENABLE_THREADS
if (*SupportConcurrentCompilation)
return make_error<StringError>(
"LLJIT concurrent compilation support requested, but LLVM was built "
"with LLVM_ENABLE_THREADS=Off",
inconvertibleErrorCode());
#endif // !LLVM_ENABLE_THREADS
}

LLVM_DEBUG({
dbgs() << " JITTargetMachineBuilder is "
<< JITTargetMachineBuilderPrinter(*JTMB, " ")
Expand All @@ -715,13 +684,11 @@ Error LLJITBuilderState::prepareForConstruction() {
<< (CreateCompileFunction ? "Yes" : "No") << "\n"
<< " Custom platform-setup function: "
<< (SetUpPlatform ? "Yes" : "No") << "\n"
<< " Support concurrent compilation: "
<< (*SupportConcurrentCompilation ? "Yes" : "No");
if (ConcurrentCompilationSettingDefaulted)
dbgs() << " (defaulted based on ES / EPC)\n";
<< " Number of compile threads: " << NumCompileThreads;
if (!NumCompileThreads)
dbgs() << " (code will be compiled on the execution thread)\n";
else
dbgs() << "\n";
dbgs() << " Number of compile threads: " << NumCompileThreads << "\n";
});

// Create DL if not specified.
Expand All @@ -738,19 +705,7 @@ Error LLJITBuilderState::prepareForConstruction() {
dbgs() << "ExecutorProcessControl not specified, "
"Creating SelfExecutorProcessControl instance\n";
});

std::unique_ptr<TaskDispatcher> D = nullptr;
#if LLVM_ENABLE_THREADS
if (*SupportConcurrentCompilation) {
std::optional<size_t> NumThreads = std ::nullopt;
if (NumCompileThreads)
NumThreads = NumCompileThreads;
D = std::make_unique<DynamicThreadPoolTaskDispatcher>(NumThreads);
} else
D = std::make_unique<InPlaceTaskDispatcher>();
#endif // LLVM_ENABLE_THREADS
if (auto EPCOrErr =
SelfExecutorProcessControl::Create(nullptr, std::move(D), nullptr))
if (auto EPCOrErr = SelfExecutorProcessControl::Create())
EPC = std::move(*EPCOrErr);
else
return EPCOrErr.takeError();
Expand Down Expand Up @@ -835,6 +790,8 @@ Error LLJITBuilderState::prepareForConstruction() {
}

LLJIT::~LLJIT() {
if (CompileThreads)
CompileThreads->wait();
if (auto Err = ES->endSession())
ES->reportError(std::move(Err));
}
Expand Down Expand Up @@ -959,8 +916,9 @@ LLJIT::createCompileFunction(LLJITBuilderState &S,
if (S.CreateCompileFunction)
return S.CreateCompileFunction(std::move(JTMB));

// If using a custom EPC then use a ConcurrentIRCompiler by default.
if (*S.SupportConcurrentCompilation)
// Otherwise default to creating a SimpleCompiler, or ConcurrentIRCompiler,
// depending on the number of threads requested.
if (S.NumCompileThreads > 0)
return std::make_unique<ConcurrentIRCompiler>(std::move(JTMB));

auto TM = JTMB.createTargetMachine();
Expand Down Expand Up @@ -1012,8 +970,21 @@ LLJIT::LLJIT(LLJITBuilderState &S, Error &Err)
std::make_unique<IRTransformLayer>(*ES, *TransformLayer);
}

if (*S.SupportConcurrentCompilation)
if (S.NumCompileThreads > 0) {
InitHelperTransformLayer->setCloneToNewContextOnEmit(true);
CompileThreads = std::make_unique<DefaultThreadPool>(
hardware_concurrency(S.NumCompileThreads));
ES->setDispatchTask([this](std::unique_ptr<Task> T) {
// FIXME: We should be able to use move-capture here, but ThreadPool's
// AsyncTaskTys are std::functions rather than unique_functions
// (because MSVC's std::packaged_tasks don't support move-only types).
// Fix this when all the above gets sorted out.
CompileThreads->async([UnownedT = T.release()]() mutable {
std::unique_ptr<Task> T(UnownedT);
T->run();
});
});
}

if (S.SetupProcessSymbolsJITDylib) {
if (auto ProcSymsJD = S.SetupProcessSymbolsJITDylib(*this)) {
Expand Down Expand Up @@ -1269,7 +1240,7 @@ LLLazyJIT::LLLazyJIT(LLLazyJITBuilderState &S, Error &Err) : LLJIT(S, Err) {
CODLayer = std::make_unique<CompileOnDemandLayer>(
*ES, *InitHelperTransformLayer, *LCTMgr, std::move(ISMBuilder));

if (*S.SupportConcurrentCompilation)
if (S.NumCompileThreads > 0)
CODLayer->setCloneToNewContextOnEmit(true);
}

Expand Down
47 changes: 5 additions & 42 deletions llvm/lib/ExecutionEngine/Orc/TaskDispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//===----------------------------------------------------------------------===//

#include "llvm/ExecutionEngine/Orc/TaskDispatch.h"
#include "llvm/ExecutionEngine/Orc/Core.h"

namespace llvm {
namespace orc {
Expand All @@ -25,52 +24,16 @@ void InPlaceTaskDispatcher::shutdown() {}

#if LLVM_ENABLE_THREADS
void DynamicThreadPoolTaskDispatcher::dispatch(std::unique_ptr<Task> T) {
bool IsMaterializationTask = isa<MaterializationTask>(*T);

{
std::lock_guard<std::mutex> Lock(DispatchMutex);

if (IsMaterializationTask) {

// If this is a materialization task and there are too many running
// already then queue this one up and return early.
if (MaxMaterializationThreads &&
NumMaterializationThreads == *MaxMaterializationThreads) {
MaterializationTaskQueue.push_back(std::move(T));
return;
}

// Otherwise record that we have a materialization task running.
++NumMaterializationThreads;
}

++Outstanding;
}

std::thread([this, T = std::move(T), IsMaterializationTask]() mutable {
while (true) {

// Run the task.
T->run();

std::lock_guard<std::mutex> Lock(DispatchMutex);
if (!MaterializationTaskQueue.empty()) {
// If there are any materialization tasks running then steal that work.
T = std::move(MaterializationTaskQueue.front());
MaterializationTaskQueue.pop_front();
if (!IsMaterializationTask) {
++NumMaterializationThreads;
IsMaterializationTask = true;
}
} else {
// Otherwise decrement work counters.
if (IsMaterializationTask)
--NumMaterializationThreads;
--Outstanding;
OutstandingCV.notify_all();
return;
}
}
std::thread([this, T = std::move(T)]() mutable {
T->run();
std::lock_guard<std::mutex> Lock(DispatchMutex);
--Outstanding;
OutstandingCV.notify_all();
}).detach();
}

Expand Down
6 changes: 3 additions & 3 deletions llvm/tools/llvm-jitlink/llvm-jitlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,8 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> launchExecutor() {
S.CreateMemoryManager = createSharedMemoryManager;

return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
std::move(S), FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
std::make_unique<DynamicThreadPoolTaskDispatcher>(), std::move(S),
FromExecutor[ReadEnd], ToExecutor[WriteEnd]);
#endif
}

Expand Down Expand Up @@ -897,7 +897,7 @@ static Expected<std::unique_ptr<ExecutorProcessControl>> connectToExecutor() {
S.CreateMemoryManager = createSharedMemoryManager;

return SimpleRemoteEPC::Create<FDSimpleRemoteEPCTransport>(
std::make_unique<DynamicThreadPoolTaskDispatcher>(std::nullopt),
std::make_unique<DynamicThreadPoolTaskDispatcher>(),
std::move(S), *SockFD, *SockFD);
#endif
}
Expand Down
8 changes: 4 additions & 4 deletions llvm/unittests/ExecutionEngine/Orc/CoreAPIsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,11 @@ TEST_F(CoreAPIsStandardTest, RedefineBoundWeakSymbol) {

TEST_F(CoreAPIsStandardTest, DefineMaterializingSymbol) {
bool ExpectNoMoreMaterialization = false;
DispatchOverride = [&](std::unique_ptr<Task> T) {
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
if (ExpectNoMoreMaterialization && isa<MaterializationTask>(*T))
ADD_FAILURE() << "Unexpected materialization";
T->run();
};
});

auto MU = std::make_unique<SimpleMaterializationUnit>(
SymbolFlagsMap({{Foo, FooSym.getFlags()}}),
Expand Down Expand Up @@ -1403,7 +1403,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {

std::mutex WorkThreadsMutex;
std::vector<std::thread> WorkThreads;
DispatchOverride = [&](std::unique_ptr<Task> T) {
ES.setDispatchTask([&](std::unique_ptr<Task> T) {
std::promise<void> WaitP;
std::lock_guard<std::mutex> Lock(WorkThreadsMutex);
WorkThreads.push_back(
Expand All @@ -1412,7 +1412,7 @@ TEST_F(CoreAPIsStandardTest, TestLookupWithThreadedMaterialization) {
T->run();
}));
WaitP.set_value();
};
});

cantFail(JD.define(absoluteSymbols({{Foo, FooSym}})));

Expand Down
Loading

0 comments on commit a28557a

Please sign in to comment.