Skip to content

Commit

Permalink
[OpenMP] Add non-blocking support for target nowait regions
Browse files Browse the repository at this point in the history
This patch better integrates the target nowait functions with the tasking runtime. It splits the nowait execution into two stages: a dispatch stage, which triggers all the necessary asynchronous device operations and stores a set of post-processing procedures that must be executed after said ops; and a synchronization stage, responsible for synchronizing the previous operations in a non-blocking manner and running the appropriate post-processing functions. Suppose during the synchronization stage the operations are not completed. In that case, the attached hidden helper task is re-enqueued to any hidden helper thread to be later synchronized, allowing other target nowait regions to be concurrently dispatched.

Reviewed By: jdoerfert, tianshilei1992

Differential Revision: https://reviews.llvm.org/D132005
  • Loading branch information
GuilhermeValarini committed Dec 14, 2022
1 parent 2afc90a commit 89c82c8
Show file tree
Hide file tree
Showing 19 changed files with 667 additions and 211 deletions.
6 changes: 6 additions & 0 deletions openmp/libomptarget/include/device.h
Expand Up @@ -438,6 +438,12 @@ struct DeviceTy {
/// OFFLOAD_SUCCESS/OFFLOAD_FAIL when succeeds/fails.
int32_t synchronize(AsyncInfoTy &AsyncInfo);

/// Query for device/queue/event based completion on \p AsyncInfo in a
/// non-blocking manner and return OFFLOAD_SUCCESS/OFFLOAD_FAIL when
/// succeeds/fails. Must be called multiple times until AsyncInfo is
/// completed and AsyncInfo.isDone() returns true.
int32_t queryAsync(AsyncInfoTy &AsyncInfo);

/// Calls the corresponding print in the \p RTLDEVID
/// device RTL to obtain the information of the specific device.
bool printDeviceInfo(int32_t RTLDevID);
Expand Down
71 changes: 70 additions & 1 deletion openmp/libomptarget/include/omptarget.h
Expand Up @@ -15,11 +15,15 @@
#define _OMPTARGET_H_

#include <deque>
#include <functional>
#include <stddef.h>
#include <stdint.h>
#include <type_traits>

#include <SourceInfo.h>

#include "llvm/ADT/SmallVector.h"

#define OFFLOAD_SUCCESS (0)
#define OFFLOAD_FAIL (~0)

Expand Down Expand Up @@ -181,15 +185,29 @@ struct DeviceTy;
/// associated with a libomptarget layer device. RAII semantics to avoid
/// mistakes.
class AsyncInfoTy {
public:
enum class SyncTy { BLOCKING, NON_BLOCKING };

private:
/// Locations we used in (potentially) asynchronous calls which should live
/// as long as this AsyncInfoTy object.
std::deque<void *> BufferLocations;

/// Post-processing operations executed after a successful synchronization.
/// \note the post-processing function should return OFFLOAD_SUCCESS or
/// OFFLOAD_FAIL appropriately.
using PostProcFuncTy = std::function<int()>;
llvm::SmallVector<PostProcFuncTy> PostProcessingFunctions;

__tgt_async_info AsyncInfo;
DeviceTy &Device;

public:
AsyncInfoTy(DeviceTy &Device) : Device(Device) {}
/// Synchronization method to be used.
SyncTy SyncType;

AsyncInfoTy(DeviceTy &Device, SyncTy SyncType = SyncTy::BLOCKING)
: Device(Device), SyncType(SyncType) {}
~AsyncInfoTy() { synchronize(); }

/// Implicit conversion to the __tgt_async_info which is used in the
Expand All @@ -198,12 +216,54 @@ class AsyncInfoTy {

/// Synchronize all pending actions.
///
/// \note synchronization will be performance in a blocking or non-blocking
/// manner, depending on the SyncType.
///
/// \note if the operations are completed, the registered post-processing
/// functions will be executed once and unregistered afterwards.
///
/// \returns OFFLOAD_FAIL or OFFLOAD_SUCCESS appropriately.
int synchronize();

/// Return a void* reference with a lifetime that is at least as long as this
/// AsyncInfoTy object. The location can be used as intermediate buffer.
void *&getVoidPtrLocation();

/// Check if all asynchronous operations are completed.
///
/// \note if the operations are completed, the registered post-processing
/// functions will be executed once and unregistered afterwards.
///
/// \returns true if there is no pending asynchronous operations, false
/// otherwise.
bool isDone();

/// Add a new post-processing function to be executed after synchronization.
///
/// \param[in] Function is a templated function (e.g., function pointers,
/// lambdas, std::function) that can be convertible to a PostProcFuncTy (i.e.,
/// it must have int() as its function signature).
template <typename FuncTy> void addPostProcessingFunction(FuncTy &&Function) {
static_assert(std::is_convertible_v<FuncTy, PostProcFuncTy>,
"Invalid post-processing function type. Please check "
"function signature!");
PostProcessingFunctions.emplace_back(Function);
}

private:
/// Run all the post-processing functions sequentially.
///
/// \note after a successful execution, all previously registered functions
/// are unregistered.
///
/// \returns OFFLOAD_FAIL if any post-processing function failed,
/// OFFLOAD_SUCCESS otherwise.
int32_t runPostProcessing();

/// Check if the internal asynchronous info queue is empty or not.
///
/// \returns true if empty, false otherwise.
bool isQueueEmpty() const;
};

/// This struct is a record of non-contiguous information
Expand Down Expand Up @@ -347,6 +407,15 @@ int __tgt_target_kernel_nowait(ident_t *Loc, int64_t DeviceId, int32_t NumTeams,
void *DepList, int32_t NoAliasDepNum,
void *NoAliasDepList);

// Non-blocking synchronization for target nowait regions. This function
// acquires the asynchronous context from task data of the current task being
// executed and tries to query for the completion of its operations. If the
// operations are still pending, the function returns immediately. If the
// operations are completed, all the post-processing procedures stored in the
// asynchronous context are executed and the context is removed from the task
// data.
void __tgt_target_nowait_query(void **AsyncHandle);

void __tgt_set_info_flag(uint32_t);

int __tgt_print_device_info(int64_t DeviceId);
Expand Down
10 changes: 10 additions & 0 deletions openmp/libomptarget/include/omptargetplugin.h
Expand Up @@ -156,6 +156,16 @@ int32_t __tgt_rtl_run_target_team_region_async(
// error code.
int32_t __tgt_rtl_synchronize(int32_t ID, __tgt_async_info *AsyncInfo);

// Queries for the completion of asynchronous operations. Instead of blocking
// the calling thread as __tgt_rtl_synchronize, the progress of the operations
// stored in AsyncInfo->Queue is queried in a non-blocking manner, partially
// advancing their execution. If all operations are completed, AsyncInfo->Queue
// is set to nullptr. If there are still pending operations, AsyncInfo->Queue is
// kept as a valid queue. In any case of success (i.e., successful query
// with/without completing all operations), return zero. Otherwise, return an
// error code.
int32_t __tgt_rtl_query_async(int32_t ID, __tgt_async_info *AsyncInfo);

// Set plugin's internal information flag externally.
void __tgt_rtl_set_info_flag(uint32_t);

Expand Down
2 changes: 2 additions & 0 deletions openmp/libomptarget/include/rtl.h
Expand Up @@ -62,6 +62,7 @@ struct RTLInfoTy {
__tgt_async_info *);
typedef int64_t(init_requires_ty)(int64_t);
typedef int32_t(synchronize_ty)(int32_t, __tgt_async_info *);
typedef int32_t(query_async_ty)(int32_t, __tgt_async_info *);
typedef int32_t (*register_lib_ty)(__tgt_bin_desc *);
typedef int32_t(supports_empty_images_ty)();
typedef void(print_device_info_ty)(int32_t);
Expand Down Expand Up @@ -112,6 +113,7 @@ struct RTLInfoTy {
run_team_region_async_ty *run_team_region_async = nullptr;
init_requires_ty *init_requires = nullptr;
synchronize_ty *synchronize = nullptr;
query_async_ty *query_async = nullptr;
register_lib_ty register_lib = nullptr;
register_lib_ty unregister_lib = nullptr;
supports_empty_images_ty *supports_empty_images = nullptr;
Expand Down
Expand Up @@ -354,6 +354,13 @@ Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo) {
return synchronizeImpl(*AsyncInfo);
}

Error GenericDeviceTy::queryAsync(__tgt_async_info *AsyncInfo) {
if (!AsyncInfo || !AsyncInfo->Queue)
return Plugin::error("Invalid async info queue");

return queryAsyncImpl(*AsyncInfo);
}

Expected<void *> GenericDeviceTy::dataAlloc(int64_t Size, void *HostPtr,
TargetAllocTy Kind) {
void *Alloc = nullptr;
Expand Down Expand Up @@ -791,6 +798,16 @@ int32_t __tgt_rtl_synchronize(int32_t DeviceId,
return (bool)Err;
}

int32_t __tgt_rtl_query_async(int32_t DeviceId,
__tgt_async_info *AsyncInfoPtr) {
auto Err = Plugin::get().getDevice(DeviceId).queryAsync(AsyncInfoPtr);
if (Err)
REPORT("Failure to query stream %p: %s\n", AsyncInfoPtr->Queue,
toString(std::move(Err)).data());

return (bool)Err;
}

int32_t __tgt_rtl_run_target_region(int32_t DeviceId, void *TgtEntryPtr,
void **TgtArgs, ptrdiff_t *TgtOffsets,
int32_t NumArgs) {
Expand Down
Expand Up @@ -290,6 +290,11 @@ struct GenericDeviceTy : public DeviceAllocatorTy {
Error synchronize(__tgt_async_info *AsyncInfo);
virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo) = 0;

/// Query for the completion of the pending operations on the __tgt_async_info
/// structure in a non-blocking manner.
Error queryAsync(__tgt_async_info *AsyncInfo);
virtual Error queryAsyncImpl(__tgt_async_info &AsyncInfo) = 0;

/// Allocate data on the device or involving the device.
Expected<void *> dataAlloc(int64_t Size, void *HostPtr, TargetAllocTy Kind);

Expand Down
18 changes: 18 additions & 0 deletions openmp/libomptarget/plugins-nextgen/cuda/src/rtl.cpp
Expand Up @@ -486,6 +486,24 @@ struct CUDADeviceTy : public GenericDeviceTy {
return Plugin::check(Res, "Error in cuStreamSynchronize: %s");
}

/// Query for the completion of the pending operations on the async info.
Error queryAsyncImpl(__tgt_async_info &AsyncInfo) override {
CUstream Stream = reinterpret_cast<CUstream>(AsyncInfo.Queue);
CUresult Res = cuStreamQuery(Stream);

// Not ready streams must be considered as successful operations.
if (Res == CUDA_ERROR_NOT_READY)
return Plugin::success();

// Once the stream is synchronized and the operations completed (or an error
// occurs), return it to stream pool and reset AsyncInfo. This is to make
// sure the synchronization only works for its own tasks.
CUDAStreamManager.returnResource(Stream);
AsyncInfo.Queue = nullptr;

return Plugin::check(Res, "Error in cuStreamQuery: %s");
}

/// Submit data to the device (host to device transfer).
Error dataSubmitImpl(void *TgtPtr, const void *HstPtr, int64_t Size,
AsyncInfoWrapperTy &AsyncInfoWrapper) override {
Expand Down
Expand Up @@ -245,6 +245,12 @@ struct GenELF64DeviceTy : public GenericDeviceTy {
return Plugin::success();
}

/// All functions are already synchronous. No need to do anything on this
/// query function.
Error queryAsyncImpl(__tgt_async_info &AsyncInfo) override {
return Plugin::success();
}

/// This plugin does not support interoperability
Error initAsyncInfoImpl(AsyncInfoWrapperTy &AsyncInfoWrapper) override {
return Plugin::error("initAsyncInfoImpl not supported");
Expand Down
1 change: 1 addition & 0 deletions openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.cpp
Expand Up @@ -58,6 +58,7 @@ DLWRAP(cuModuleUnload, 1)
DLWRAP(cuStreamCreate, 2)
DLWRAP(cuStreamDestroy, 1)
DLWRAP(cuStreamSynchronize, 1)
DLWRAP(cuStreamQuery, 1)
DLWRAP(cuCtxSetCurrent, 1)
DLWRAP(cuDevicePrimaryCtxRelease, 1)
DLWRAP(cuDevicePrimaryCtxGetState, 3)
Expand Down
2 changes: 2 additions & 0 deletions openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.h
Expand Up @@ -31,6 +31,7 @@ typedef enum cudaError_enum {
CUDA_ERROR_INVALID_VALUE = 1,
CUDA_ERROR_NO_DEVICE = 100,
CUDA_ERROR_INVALID_HANDLE = 400,
CUDA_ERROR_NOT_READY = 600,
CUDA_ERROR_TOO_MANY_PEERS = 711,
} CUresult;

Expand Down Expand Up @@ -244,6 +245,7 @@ CUresult cuModuleUnload(CUmodule);
CUresult cuStreamCreate(CUstream *, unsigned);
CUresult cuStreamDestroy(CUstream);
CUresult cuStreamSynchronize(CUstream);
CUresult cuStreamQuery(CUstream);
CUresult cuCtxSetCurrent(CUcontext);
CUresult cuDevicePrimaryCtxRelease(CUdevice);
CUresult cuDevicePrimaryCtxGetState(CUdevice, unsigned *, int *);
Expand Down
32 changes: 32 additions & 0 deletions openmp/libomptarget/plugins/cuda/src/rtl.cpp
Expand Up @@ -1267,6 +1267,29 @@ class DeviceRTLTy {
return (Err == CUDA_SUCCESS) ? OFFLOAD_SUCCESS : OFFLOAD_FAIL;
}

int queryAsync(const int DeviceId, __tgt_async_info *AsyncInfo) const {
CUstream Stream = reinterpret_cast<CUstream>(AsyncInfo->Queue);
CUresult Err = cuStreamQuery(Stream);

// Not ready streams must be considered as successful operations.
if (Err == CUDA_ERROR_NOT_READY)
return OFFLOAD_SUCCESS;

// Once the stream is synchronized or an error occurs, return it to the
// stream pool and reset AsyncInfo. This is to make sure the
// synchronization only works for its own tasks.
StreamPool[DeviceId]->release(Stream);
AsyncInfo->Queue = nullptr;

if (Err != CUDA_SUCCESS) {
DP("Error when querying for stream progress. stream = " DPxMOD
", async info ptr = " DPxMOD "\n",
DPxPTR(Stream), DPxPTR(AsyncInfo));
CUDA_ERR_STRING(Err);
}
return (Err == CUDA_SUCCESS) ? OFFLOAD_SUCCESS : OFFLOAD_FAIL;
}

void printDeviceInfo(int32_t DeviceId) {
char TmpChar[1000];
std::string TmpStr;
Expand Down Expand Up @@ -1780,6 +1803,15 @@ int32_t __tgt_rtl_synchronize(int32_t DeviceId,
return DeviceRTL.synchronize(DeviceId, AsyncInfoPtr);
}

int32_t __tgt_rtl_query_async(int32_t DeviceId,
__tgt_async_info *AsyncInfoPtr) {
assert(DeviceRTL.isValidDeviceId(DeviceId) && "device_id is invalid");
assert(AsyncInfoPtr && "async_info_ptr is nullptr");
assert(AsyncInfoPtr->Queue && "async_info_ptr->Queue is nullptr");
// NOTE: We don't need to set context for stream query.
return DeviceRTL.queryAsync(DeviceId, AsyncInfoPtr);
}

void __tgt_rtl_set_info_flag(uint32_t NewInfoLevel) {
std::atomic<uint32_t> &InfoLevel = getInfoLevelInternal();
InfoLevel.store(NewInfoLevel);
Expand Down
7 changes: 7 additions & 0 deletions openmp/libomptarget/src/device.cpp
Expand Up @@ -641,6 +641,13 @@ int32_t DeviceTy::synchronize(AsyncInfoTy &AsyncInfo) {
return OFFLOAD_SUCCESS;
}

int32_t DeviceTy::queryAsync(AsyncInfoTy &AsyncInfo) {
if (RTL->query_async)
return RTL->query_async(RTLDeviceID, AsyncInfo);

return synchronize(AsyncInfo);
}

int32_t DeviceTy::createEvent(void **Event) {
if (RTL->create_event)
return RTL->create_event(RTLDeviceID, Event);
Expand Down
2 changes: 1 addition & 1 deletion openmp/libomptarget/src/exports
Expand Up @@ -26,6 +26,7 @@ VERS1.0 {
__tgt_target_teams_nowait_mapper;
__tgt_target_kernel;
__tgt_target_kernel_nowait;
__tgt_target_nowait_query;
__tgt_mapper_num_components;
__tgt_push_mapper_component;
__kmpc_push_target_tripcount;
Expand Down Expand Up @@ -60,4 +61,3 @@ VERS1.0 {
local:
*;
};

0 comments on commit 89c82c8

Please sign in to comment.