Skip to content

Commit

Permalink
[Libomptarget] Begin implementing support for RPC services
Browse files Browse the repository at this point in the history
This patch adds the intial support for running an RPC server in
libomptarget to handle host services. We interface with the library
provided by the `libc` project to stand up a basic server. We introduce
a new type that is controlled by the plugin and has each device
intialize its interface. We then run a basic server to check the RPC
buffer.

This patch does not fully implement the interface. In the future each
plugin will want to define special handlers via the interface to support
things like malloc or H2D copies coming from RPC. We will also want to
allow the plugin to specify t he number of ports. This is currently
capped in the implementation but will be adjusted soon.

Right now running the server is handled by whatever thread ends up doing
the waiting. This is probably not a completely sound solution but I am
not overly familiar with the behaviour of OpenMP tasks and what would be
required here. This works okay with synchrnous regions, and somewhat
fine with `nowait` regions, but I've observed some weird behavior when
one of those regions calls `exit`.

Reviewed By: jdoerfert

Differential Revision: https://reviews.llvm.org/D154312
  • Loading branch information
jhuber6 committed Jul 7, 2023
1 parent fa78983 commit 691dc2d
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 14 deletions.
2 changes: 1 addition & 1 deletion libc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ include(LLVMLibCArchitectures)
if(LIBC_TARGET_ARCHITECTURE_IS_GPU)
set(LIBC_INCLUDE_DIR ${CMAKE_CURRENT_BINARY_DIR}/include)
set(LIBC_INSTALL_INCLUDE_DIR ${CMAKE_INSTALL_INCLUDEDIR}/gpu-none-llvm)
set(LIBC_LIBRARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/lib)
set(LIBC_LIBRARY_DIR ${LLVM_LIBRARY_OUTPUT_INTDIR})
elseif(LLVM_ENABLE_PER_TARGET_RUNTIME_DIR AND LIBC_ENABLE_USE_BY_CLANG)
set(LIBC_INCLUDE_DIR ${LLVM_BINARY_DIR}/include/${LLVM_DEFAULT_TARGET_TRIPLE})
set(LIBC_INSTALL_INCLUDE_DIR ${CMAKE_INSTALL_INCLUDEDIR}/${LLVM_DEFAULT_TARGET_TRIPLE})
Expand Down
39 changes: 33 additions & 6 deletions openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,17 +519,25 @@ struct AMDGPUSignalTy {
}

/// Wait until the signal gets a zero value.
Error wait(const uint64_t ActiveTimeout = 0) const {
if (ActiveTimeout) {
Error wait(const uint64_t ActiveTimeout = 0,
RPCHandleTy *RPCHandle = nullptr) const {
if (ActiveTimeout && !RPCHandle) {
hsa_signal_value_t Got = 1;
Got = hsa_signal_wait_scacquire(Signal, HSA_SIGNAL_CONDITION_EQ, 0,
ActiveTimeout, HSA_WAIT_STATE_ACTIVE);
if (Got == 0)
return Plugin::success();
}

// If there is an RPC device attached to this stream we run it as a server.
uint64_t Timeout = RPCHandle ? 8192 : UINT64_MAX;
auto WaitState = RPCHandle ? HSA_WAIT_STATE_ACTIVE : HSA_WAIT_STATE_BLOCKED;
while (hsa_signal_wait_scacquire(Signal, HSA_SIGNAL_CONDITION_EQ, 0,
UINT64_MAX, HSA_WAIT_STATE_BLOCKED) != 0)
;
Timeout, WaitState) != 0) {
if (RPCHandle)
if (auto Err = RPCHandle->runServer())
return Err;
}
return Plugin::success();
}

Expand Down Expand Up @@ -895,6 +903,11 @@ struct AMDGPUStreamTy {
/// operation that was already finalized in a previous stream sycnhronize.
uint32_t SyncCycle;

/// A pointer associated with an RPC server running on the given device. If
/// RPC is not being used this will be a null pointer. Otherwise, this
/// indicates that an RPC server is expected to be run on this stream.
RPCHandleTy *RPCHandle;

/// Mutex to protect stream's management.
mutable std::mutex Mutex;

Expand Down Expand Up @@ -1050,6 +1063,9 @@ struct AMDGPUStreamTy {
/// Deinitialize the stream's signals.
Error deinit() { return Plugin::success(); }

/// Attach an RPC handle to this stream.
void setRPCHandle(RPCHandleTy *Handle) { RPCHandle = Handle; }

/// Push a asynchronous kernel to the stream. The kernel arguments must be
/// placed in a special allocation for kernel args and must keep alive until
/// the kernel finalizes. Once the kernel is finished, the stream will release
Expand Down Expand Up @@ -1264,7 +1280,8 @@ struct AMDGPUStreamTy {
return Plugin::success();

// Wait until all previous operations on the stream have completed.
if (auto Err = Slots[last()].Signal->wait(StreamBusyWaitMicroseconds))
if (auto Err =
Slots[last()].Signal->wait(StreamBusyWaitMicroseconds, RPCHandle))
return Err;

// Reset the stream and perform all pending post actions.
Expand Down Expand Up @@ -1786,6 +1803,12 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
/// AMDGPU devices do not have the concept of contexts.
Error setContext() override { return Plugin::success(); }

/// We want to set up the RPC server for host services to the GPU if it is
/// availible.
bool shouldSetupRPCServer() const override {
return libomptargetSupportsRPC();
}

/// Get the stream of the asynchronous info sructure or get a new one.
AMDGPUStreamTy &getStream(AsyncInfoWrapperTy &AsyncInfoWrapper) {
AMDGPUStreamTy *&Stream = AsyncInfoWrapper.getQueueAs<AMDGPUStreamTy *>();
Expand Down Expand Up @@ -2507,7 +2530,7 @@ AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device)
: Agent(Device.getAgent()), Queue(Device.getNextQueue()),
SignalManager(Device.getSignalManager()),
// Initialize the std::deque with some empty positions.
Slots(32), NextSlot(0), SyncCycle(0),
Slots(32), NextSlot(0), SyncCycle(0), RPCHandle(nullptr),
StreamBusyWaitMicroseconds(Device.getStreamBusyWaitMicroseconds()) {}

/// Class implementing the AMDGPU-specific functionalities of the global
Expand Down Expand Up @@ -2837,6 +2860,10 @@ Error AMDGPUKernelTy::launchImpl(GenericDeviceTy &GenericDevice,
AMDGPUDeviceTy &AMDGPUDevice = static_cast<AMDGPUDeviceTy &>(GenericDevice);
AMDGPUStreamTy &Stream = AMDGPUDevice.getStream(AsyncInfoWrapper);

// If this kernel requires an RPC server we attach its pointer to the stream.
if (GenericDevice.getRPCHandle())
Stream.setRPCHandle(GenericDevice.getRPCHandle());

// Push the kernel launch into the stream.
return Stream.pushKernelLaunch(*this, AllArgs, NumThreads, NumBlocks,
GroupSize, ArgsMemoryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# NOTE: Don't try to build `PluginInterface` using `add_llvm_library` because we
# don't want to export `PluginInterface` while `add_llvm_library` requires that.
add_library(PluginInterface OBJECT
PluginInterface.cpp GlobalHandler.cpp JIT.cpp)
PluginInterface.cpp GlobalHandler.cpp JIT.cpp RPC.cpp)

# Only enable JIT for those targets that LLVM can support.
string(TOUPPER "${LLVM_TARGETS_TO_BUILD}" TargetsSupported)
Expand Down Expand Up @@ -62,6 +62,25 @@ target_link_libraries(PluginInterface
MemoryManager
)

# Include the RPC server from the `libc` project if availible.
set(libomptarget_supports_rpc FALSE)
if(TARGET llvmlibc_rpc_server)
target_link_libraries(PluginInterface PRIVATE llvmlibc_rpc_server)
target_compile_definitions(PluginInterface PRIVATE LIBOMPTARGET_RPC_SUPPORT)
set(libomptarget_supports_rpc TRUE)
else()
find_library(llvmlibc_rpc_server NAMES llvmlibc_rpc_server
PATHS ${LIBOMPTARGET_LLVM_LIBRARY_DIR} NO_DEFAULT_PATH)
if(llvmlibc_rpc_server)
message(WARNING ${llvmlibc_rpc_server})
target_link_libraries(PluginInterface PRIVATE llvmlibc_rpc_server)
target_compile_definitions(PluginInterface PRIVATE LIBOMPTARGET_RPC_SUPPORT)
set(libomptarget_supports_rpc TRUE)
endif()
endif()
set(LIBOMPTARGET_GPU_LIBC_SUPPORT ${libomptarget_supports_rpc} CACHE BOOL
"Libomptarget support for the GPU libc")

if ((OMPT_TARGET_DEFAULT) AND (LIBOMPTARGET_OMPT_SUPPORT))
target_link_libraries(PluginInterface PUBLIC OMPT)
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ GenericDeviceTy::GenericDeviceTy(int32_t DeviceId, int32_t NumDevices,
OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 32),
DeviceId(DeviceId), GridValues(OMPGridValues),
PeerAccesses(NumDevices, PeerAccessState::PENDING), PeerAccessesLock(),
PinnedAllocs(*this) {}
PinnedAllocs(*this), RPCHandle(nullptr) {}

Error GenericDeviceTy::init(GenericPluginTy &Plugin) {
if (auto Err = initImpl(Plugin))
Expand Down Expand Up @@ -453,6 +453,10 @@ Error GenericDeviceTy::deinit() {
if (RecordReplay.isRecordingOrReplaying())
RecordReplay.deinit();

if (RPCHandle)
if (auto Err = RPCHandle->deinitDevice())
return std::move(Err);

return deinitImpl();
}

Expand Down Expand Up @@ -493,6 +497,9 @@ GenericDeviceTy::loadBinary(GenericPluginTy &Plugin,
if (auto Err = registerOffloadEntries(*Image))
return std::move(Err);

if (auto Err = setupRPCServer(Plugin, *Image))
return std::move(Err);

// Return the pointer to the table of entries.
return Image->getOffloadEntryTable();
}
Expand Down Expand Up @@ -525,6 +532,33 @@ Error GenericDeviceTy::setupDeviceEnvironment(GenericPluginTy &Plugin,
return Plugin::success();
}

Error GenericDeviceTy::setupRPCServer(GenericPluginTy &Plugin,
DeviceImageTy &Image) {
// The plugin either does not need an RPC server or it is unavailible.
if (!shouldSetupRPCServer())
return Plugin::success();

// Check if this device needs to run an RPC server.
RPCServerTy &Server = Plugin.getRPCServer();
auto UsingOrErr =
Server.isDeviceUsingRPC(*this, Plugin.getGlobalHandler(), Image);
if (!UsingOrErr)
return UsingOrErr.takeError();

if (!UsingOrErr.get())
return Plugin::success();

if (auto Err = Server.initDevice(*this, Plugin.getGlobalHandler(), Image))
return std::move(Err);

auto DeviceOrErr = Server.getDevice(*this);
if (!DeviceOrErr)
return DeviceOrErr.takeError();
RPCHandle = *DeviceOrErr;
DP("Running an RPC server on device %d\n", getDeviceId());
return Plugin::success();
}

Error GenericDeviceTy::registerOffloadEntries(DeviceImageTy &Image) {
const __tgt_offload_entry *Begin = Image.getTgtImage()->EntriesBegin;
const __tgt_offload_entry *End = Image.getTgtImage()->EntriesEnd;
Expand Down Expand Up @@ -1088,6 +1122,9 @@ Error GenericPluginTy::init() {
GlobalHandler = Plugin::createGlobalHandler();
assert(GlobalHandler && "Invalid global handler");

RPCServer = new RPCServerTy(NumDevices);
assert(RPCServer && "Invalid RPC server");

return Plugin::success();
}

Expand All @@ -1105,6 +1142,9 @@ Error GenericPluginTy::deinit() {
assert(!Devices[DeviceId] && "Device was not deinitialized");
}

if (RPCServer)
delete RPCServer;

// Perform last deinitializations on the plugin.
return deinitImpl();
}
Expand Down Expand Up @@ -1139,6 +1179,14 @@ Error GenericPluginTy::deinitDevice(int32_t DeviceId) {
return Plugin::success();
}

const bool llvm::omp::target::plugin::libomptargetSupportsRPC() {
#ifdef LIBOMPTARGET_RPC_SUPPORT
return true;
#else
return false;
#endif
}

/// Exposed library API function, basically wrappers around the GenericDeviceTy
/// functionality with the same name. All non-async functions are redirected
/// to the async versions right away with a NULL AsyncInfoPtr.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "GlobalHandler.h"
#include "JIT.h"
#include "MemoryManager.h"
#include "RPC.h"
#include "Utilities.h"
#include "omptarget.h"

Expand Down Expand Up @@ -600,6 +601,11 @@ struct GenericDeviceTy : public DeviceAllocatorTy {
/// this behavior by overriding the shouldSetupDeviceEnvironment function.
Error setupDeviceEnvironment(GenericPluginTy &Plugin, DeviceImageTy &Image);

// Setup the RPC server for this device if needed. This may not run on some
// plugins like the CPU targets. By default, it will not be executed so it is
// up to the target to override this using the shouldSetupRPCServer function.
Error setupRPCServer(GenericPluginTy &Plugin, DeviceImageTy &Image);

/// Register the offload entries for a specific image on the device.
Error registerOffloadEntries(DeviceImageTy &Image);

Expand Down Expand Up @@ -751,6 +757,9 @@ struct GenericDeviceTy : public DeviceAllocatorTy {
return OMPX_MinThreadsForLowTripCount;
}

/// Get the RPC server running on this device.
RPCHandleTy *getRPCHandle() const { return RPCHandle; }

private:
/// Register offload entry for global variable.
Error registerGlobalOffloadEntry(DeviceImageTy &DeviceImage,
Expand Down Expand Up @@ -780,6 +789,10 @@ struct GenericDeviceTy : public DeviceAllocatorTy {
/// setupDeviceEnvironment() function.
virtual bool shouldSetupDeviceEnvironment() const { return true; }

/// Indicate whether or not the device should setup the RPC server. This is
/// only necessary for unhosted targets like the GPU.
virtual bool shouldSetupRPCServer() const { return false; }

/// Pointer to the memory manager or nullptr if not available.
MemoryManagerTy *MemoryManager;

Expand Down Expand Up @@ -837,6 +850,10 @@ struct GenericDeviceTy : public DeviceAllocatorTy {

/// Map of host pinned allocations used for optimize device transfers.
PinnedAllocationMapTy PinnedAllocs;

/// A pointer to an RPC server instance attached to this device if present.
/// This is used to run the RPC server during task synchronization.
RPCHandleTy *RPCHandle;
};

/// Class implementing common functionalities of offload plugins. Each plugin
Expand Down Expand Up @@ -892,6 +909,12 @@ struct GenericPluginTy {
/// plugin.
JITEngine &getJIT() { return JIT; }

/// Get a reference to the RPC server used to provide host services.
RPCServerTy &getRPCServer() {
assert(RPCServer && "RPC server not initialized");
return *RPCServer;
}

/// Get the OpenMP requires flags set for this plugin.
int64_t getRequiresFlags() const { return RequiresFlags; }

Expand Down Expand Up @@ -946,6 +969,9 @@ struct GenericPluginTy {

/// The JIT engine shared by all devices connected to this plugin.
JITEngine JIT;

/// The interface between the plugin and the GPU for host services.
RPCServerTy *RPCServer;
};

/// Class for simplifying the getter operation of the plugin. Anywhere on the
Expand Down Expand Up @@ -1209,6 +1235,9 @@ template <typename ResourceRef> class GenericDeviceResourceManagerTy {
std::deque<ResourceRef> ResourcePool;
};

/// A static check on whether or not we support RPC in libomptarget.
const bool libomptargetSupportsRPC();

} // namespace plugin
} // namespace target
} // namespace omp
Expand Down

0 comments on commit 691dc2d

Please sign in to comment.