Skip to content

Commit

Permalink
[OpenMP] Optimized stream selection by scheduling data mapping for th…
Browse files Browse the repository at this point in the history
…e same target region into a same stream

Summary:
This patch introduces two things for offloading:
1. Asynchronous data transferring: those functions are suffix with `_async`. They have one more argument compared with their synchronous counterparts: `__tgt_async_info*`, which is a new struct that only has one field, `void *Identifier`. This struct is for information exchange between different asynchronous operations. It can be used for stream selection, like in this case, or operation synchronization, which is also used. We may expect more usages in the future.
2. Optimization of stream selection for data mapping. Previous implementation was using asynchronous device memory transfer but synchronizing after each memory transfer. Actually, if we say kernel A needs four memory copy to device and two memory copy back to host, then we can schedule these seven operations (four H2D, two D2H, and one kernel launch) into a same stream and just need synchronization after memory copy from device to host. In this way, we can save a huge overhead compared with synchronization after each operation.

Reviewers: jdoerfert, ye-luo

Reviewed By: jdoerfert

Subscribers: yaxunl, lildmh, guansong, openmp-commits

Tags: #openmp

Differential Revision: https://reviews.llvm.org/D77005
  • Loading branch information
shiltian committed Apr 7, 2020
1 parent fcf7cc2 commit 32ed292
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 136 deletions.
9 changes: 9 additions & 0 deletions openmp/libomptarget/include/omptarget.h
Expand Up @@ -111,6 +111,15 @@ struct __tgt_target_table {
*EntriesEnd; // End of the table with all the entries (non inclusive)
};

/// This struct contains information exchanged between different asynchronous
/// operations for device-dependent optimization and potential synchronization
struct __tgt_async_info {
// A pointer to a queue-like structure where offloading operations are issued.
// We assume to use this structure to do synchronization. In CUDA backend, it
// is CUstream.
void *Queue = nullptr;
};

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
39 changes: 28 additions & 11 deletions openmp/libomptarget/include/omptargetplugin.h
Expand Up @@ -58,15 +58,21 @@ __tgt_target_table *__tgt_rtl_load_binary(int32_t ID,
// case an error occurred on the target device.
void *__tgt_rtl_data_alloc(int32_t ID, int64_t Size, void *HostPtr);

// Pass the data content to the target device using the target address.
// In case of success, return zero. Otherwise, return an error code.
// Pass the data content to the target device using the target address. If
// AsyncInfoPtr is nullptr, it is synchronous; otherwise it is asynchronous.
// However, AsyncInfoPtr may be ignored on some platforms, like x86_64. In that
// case, it is synchronous. In case of success, return zero. Otherwise, return
// an error code.
int32_t __tgt_rtl_data_submit(int32_t ID, void *TargetPtr, void *HostPtr,
int64_t Size);
int64_t Size, __tgt_async_info *AsyncInfoPtr);

// Retrieve the data content from the target device using its address.
// In case of success, return zero. Otherwise, return an error code.
// Retrieve the data content from the target device using its address. If
// AsyncInfoPtr is nullptr, it is synchronous; otherwise it is asynchronous.
// However, AsyncInfoPtr may be ignored on some platforms, like x86_64. In that
// case, it is synchronous. In case of success, return zero. Otherwise, return
// an error code.
int32_t __tgt_rtl_data_retrieve(int32_t ID, void *HostPtr, void *TargetPtr,
int64_t Size);
int64_t Size, __tgt_async_info *AsyncInfoPtr);

// De-allocate the data referenced by target ptr on the device. In case of
// success, return zero. Otherwise, return an error code.
Expand All @@ -75,17 +81,28 @@ int32_t __tgt_rtl_data_delete(int32_t ID, void *TargetPtr);
// Transfer control to the offloaded entry Entry on the target device.
// Args and Offsets are arrays of NumArgs size of target addresses and
// offsets. An offset should be added to the target address before passing it
// to the outlined function on device side. In case of success, return zero.
// Otherwise, return an error code.
// to the outlined function on device side. If AsyncInfoPtr is nullptr, it is
// synchronous; otherwise it is asynchronous. However, AsyncInfoPtr may be
// ignored on some platforms, like x86_64. In that case, it is synchronous. In
// case of success, return zero. Otherwise, return an error code.
int32_t __tgt_rtl_run_target_region(int32_t ID, void *Entry, void **Args,
ptrdiff_t *Offsets, int32_t NumArgs);
ptrdiff_t *Offsets, int32_t NumArgs,
__tgt_async_info *AsyncInfoPtr);

// Similar to __tgt_rtl_run_target_region, but additionally specify the
// number of teams to be created and a number of threads in each team.
// number of teams to be created and a number of threads in each team. If
// AsyncInfoPtr is nullptr, it is synchronous; otherwise it is asynchronous.
// However, AsyncInfoPtr may be ignored on some platforms, like x86_64. In that
// case, it is synchronous.
int32_t __tgt_rtl_run_target_team_region(int32_t ID, void *Entry, void **Args,
ptrdiff_t *Offsets, int32_t NumArgs,
int32_t NumTeams, int32_t ThreadLimit,
uint64_t loop_tripcount);
uint64_t loop_tripcount,
__tgt_async_info *AsyncInfoPtr);

// Device synchronization. In case of success, return zero. Otherwise, return an
// error code.
int32_t __tgt_rtl_synchronize(int32_t ID, __tgt_async_info *AsyncInfoPtr);

#ifdef __cplusplus
}
Expand Down
186 changes: 115 additions & 71 deletions openmp/libomptarget/plugins/cuda/src/rtl.cpp
Expand Up @@ -309,6 +309,68 @@ class RTLDeviceInfoTy {

static RTLDeviceInfoTy DeviceInfo;

namespace {
CUstream selectStream(int32_t Id, __tgt_async_info *AsyncInfo) {
if (!AsyncInfo)
return DeviceInfo.getNextStream(Id);

if (!AsyncInfo->Queue)
AsyncInfo->Queue = DeviceInfo.getNextStream(Id);

return reinterpret_cast<CUstream>(AsyncInfo->Queue);
}

int32_t dataRetrieve(int32_t DeviceId, void *HstPtr, void *TgtPtr, int64_t Size,
__tgt_async_info *AsyncInfoPtr) {
assert(AsyncInfoPtr && "AsyncInfoPtr is nullptr");
// Set the context we are using.
CUresult err = cuCtxSetCurrent(DeviceInfo.Contexts[DeviceId]);
if (err != CUDA_SUCCESS) {
DP("Error when setting CUDA context\n");
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

CUstream Stream = selectStream(DeviceId, AsyncInfoPtr);

err = cuMemcpyDtoHAsync(HstPtr, (CUdeviceptr)TgtPtr, Size, Stream);
if (err != CUDA_SUCCESS) {
DP("Error when copying data from device to host. Pointers: host = " DPxMOD
", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(HstPtr), DPxPTR(TgtPtr), Size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

return OFFLOAD_SUCCESS;
}

int32_t dataSubmit(int32_t DeviceId, void *TgtPtr, void *HstPtr, int64_t Size,
__tgt_async_info *AsyncInfoPtr) {
assert(AsyncInfoPtr && "AsyncInfoPtr is nullptr");
// Set the context we are using.
CUresult err = cuCtxSetCurrent(DeviceInfo.Contexts[DeviceId]);
if (err != CUDA_SUCCESS) {
DP("Error when setting CUDA context\n");
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

CUstream Stream = selectStream(DeviceId, AsyncInfoPtr);

err = cuMemcpyHtoDAsync((CUdeviceptr)TgtPtr, HstPtr, Size, Stream);
if (err != CUDA_SUCCESS) {
DP("Error when copying data from host to device. Pointers: host = " DPxMOD
", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(HstPtr), DPxPTR(TgtPtr), Size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

return OFFLOAD_SUCCESS;
}
} // namespace

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -663,69 +725,38 @@ void *__tgt_rtl_data_alloc(int32_t device_id, int64_t size, void *hst_ptr) {
}

int32_t __tgt_rtl_data_submit(int32_t device_id, void *tgt_ptr, void *hst_ptr,
int64_t size) {
// Set the context we are using.
CUresult err = cuCtxSetCurrent(DeviceInfo.Contexts[device_id]);
if (err != CUDA_SUCCESS) {
DP("Error when setting CUDA context\n");
CUDA_ERR_STRING(err);
int64_t size, __tgt_async_info *async_info_ptr) {
// The function dataSubmit is always asynchronous. Considering some data
// transfer must be synchronous, we assume if async_info_ptr is nullptr, the
// transfer will be synchronous by creating a temporary async info and then
// synchronizing after call dataSubmit; otherwise, it is asynchronous.
if (async_info_ptr)
return dataSubmit(device_id, tgt_ptr, hst_ptr, size, async_info_ptr);

__tgt_async_info async_info;
int32_t rc = dataSubmit(device_id, tgt_ptr, hst_ptr, size, &async_info);
if (rc != OFFLOAD_SUCCESS)
return OFFLOAD_FAIL;
}

CUstream &Stream = DeviceInfo.getNextStream(device_id);

err = cuMemcpyHtoDAsync((CUdeviceptr)tgt_ptr, hst_ptr, size, Stream);
if (err != CUDA_SUCCESS) {
DP("Error when copying data from host to device. Pointers: host = " DPxMOD
", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

err = cuStreamSynchronize(Stream);
if (err != CUDA_SUCCESS) {
DP("Error when synchronizing async data transfer from host to device. "
"Pointers: host = " DPxMOD ", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

return OFFLOAD_SUCCESS;
return __tgt_rtl_synchronize(device_id, &async_info);
}

int32_t __tgt_rtl_data_retrieve(int32_t device_id, void *hst_ptr, void *tgt_ptr,
int64_t size) {
// Set the context we are using.
CUresult err = cuCtxSetCurrent(DeviceInfo.Contexts[device_id]);
if (err != CUDA_SUCCESS) {
DP("Error when setting CUDA context\n");
CUDA_ERR_STRING(err);
int64_t size,
__tgt_async_info *async_info_ptr) {
// The function dataRetrieve is always asynchronous. Considering some data
// transfer must be synchronous, we assume if async_info_ptr is nullptr, the
// transfer will be synchronous by creating a temporary async info and then
// synchronizing after call dataRetrieve; otherwise, it is asynchronous.
if (async_info_ptr)
return dataRetrieve(device_id, hst_ptr, tgt_ptr, size, async_info_ptr);

__tgt_async_info async_info;
int32_t rc = dataRetrieve(device_id, hst_ptr, tgt_ptr, size, &async_info);
if (rc != OFFLOAD_SUCCESS)
return OFFLOAD_FAIL;
}

CUstream &Stream = DeviceInfo.getNextStream(device_id);

err = cuMemcpyDtoHAsync(hst_ptr, (CUdeviceptr)tgt_ptr, size, Stream);
if (err != CUDA_SUCCESS) {
DP("Error when copying data from device to host. Pointers: host = " DPxMOD
", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

err = cuStreamSynchronize(Stream);
if (err != CUDA_SUCCESS) {
DP("Error when synchronizing async data transfer from device to host. "
"Pointers: host = " DPxMOD ", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}

return OFFLOAD_SUCCESS;
return __tgt_rtl_synchronize(device_id, &async_info);
}

int32_t __tgt_rtl_data_delete(int32_t device_id, void *tgt_ptr) {
Expand All @@ -747,8 +778,12 @@ int32_t __tgt_rtl_data_delete(int32_t device_id, void *tgt_ptr) {
}

int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
void **tgt_args, ptrdiff_t *tgt_offsets, int32_t arg_num, int32_t team_num,
int32_t thread_limit, uint64_t loop_tripcount) {
void **tgt_args,
ptrdiff_t *tgt_offsets,
int32_t arg_num, int32_t team_num,
int32_t thread_limit,
uint64_t loop_tripcount,
__tgt_async_info *async_info) {
// Set the context we are using.
CUresult err = cuCtxSetCurrent(DeviceInfo.Contexts[device_id]);
if (err != CUDA_SUCCESS) {
Expand Down Expand Up @@ -844,8 +879,7 @@ int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
DP("Launch kernel with %d blocks and %d threads\n", cudaBlocksPerGrid,
cudaThreadsPerBlock);

CUstream &Stream = DeviceInfo.getNextStream(device_id);

CUstream Stream = selectStream(device_id, async_info);
err = cuLaunchKernel(KernelInfo->Func, cudaBlocksPerGrid, 1, 1,
cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/,
Stream, &args[0], 0);
Expand All @@ -858,25 +892,35 @@ int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
DP("Launch of entry point at " DPxMOD " successful!\n",
DPxPTR(tgt_entry_ptr));

CUresult sync_err = cuStreamSynchronize(Stream);
if (sync_err != CUDA_SUCCESS) {
DP("Kernel execution error at " DPxMOD "!\n", DPxPTR(tgt_entry_ptr));
CUDA_ERR_STRING(sync_err);
return OFFLOAD_FAIL;
} else {
DP("Kernel execution at " DPxMOD " successful!\n", DPxPTR(tgt_entry_ptr));
}

return OFFLOAD_SUCCESS;
}

int32_t __tgt_rtl_run_target_region(int32_t device_id, void *tgt_entry_ptr,
void **tgt_args, ptrdiff_t *tgt_offsets, int32_t arg_num) {
void **tgt_args, ptrdiff_t *tgt_offsets,
int32_t arg_num,
__tgt_async_info *async_info) {
// use one team and the default number of threads.
const int32_t team_num = 1;
const int32_t thread_limit = 0;
return __tgt_rtl_run_target_team_region(device_id, tgt_entry_ptr, tgt_args,
tgt_offsets, arg_num, team_num, thread_limit, 0);
tgt_offsets, arg_num, team_num,
thread_limit, 0, async_info);
}

int32_t __tgt_rtl_synchronize(int32_t device_id, __tgt_async_info *async_info) {
assert(async_info && "async_info is nullptr");
assert(async_info->Queue && "async_info->Queue is nullptr");

CUstream Stream = reinterpret_cast<CUstream>(async_info->Queue);
CUresult Err = cuStreamSynchronize(Stream);
if (Err != CUDA_SUCCESS) {
DP("Error when synchronizing stream. stream = " DPxMOD
", async info ptr = " DPxMOD "\n",
DPxPTR(Stream), DPxPTR(async_info));
CUDA_ERR_STRING(Err);
return OFFLOAD_FAIL;
}
return OFFLOAD_SUCCESS;
}

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions openmp/libomptarget/plugins/exports
Expand Up @@ -11,6 +11,7 @@ VERS1.0 {
__tgt_rtl_data_delete;
__tgt_rtl_run_target_team_region;
__tgt_rtl_run_target_region;
__tgt_rtl_synchronize;
local:
*;
};
24 changes: 17 additions & 7 deletions openmp/libomptarget/plugins/generic-elf-64bit/src/rtl.cpp
Expand Up @@ -277,13 +277,13 @@ void *__tgt_rtl_data_alloc(int32_t device_id, int64_t size, void *hst_ptr) {
}

int32_t __tgt_rtl_data_submit(int32_t device_id, void *tgt_ptr, void *hst_ptr,
int64_t size) {
int64_t size, __tgt_async_info *) {
memcpy(tgt_ptr, hst_ptr, size);
return OFFLOAD_SUCCESS;
}

int32_t __tgt_rtl_data_retrieve(int32_t device_id, void *hst_ptr, void *tgt_ptr,
int64_t size) {
int64_t size, __tgt_async_info *) {
memcpy(hst_ptr, tgt_ptr, size);
return OFFLOAD_SUCCESS;
}
Expand All @@ -293,9 +293,11 @@ int32_t __tgt_rtl_data_delete(int32_t device_id, void *tgt_ptr) {
return OFFLOAD_SUCCESS;
}

int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
void **tgt_args, ptrdiff_t *tgt_offsets, int32_t arg_num, int32_t team_num,
int32_t thread_limit, uint64_t loop_tripcount /*not used*/) {
int32_t __tgt_rtl_run_target_team_region(
int32_t device_id, void *tgt_entry_ptr, void **tgt_args,
ptrdiff_t *tgt_offsets, int32_t arg_num, int32_t team_num,
int32_t thread_limit, uint64_t loop_tripcount /*not used*/,
__tgt_async_info *async_info /*not used*/) {
// ignore team num and thread limit.

// Use libffi to launch execution.
Expand Down Expand Up @@ -328,10 +330,18 @@ int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
}

int32_t __tgt_rtl_run_target_region(int32_t device_id, void *tgt_entry_ptr,
void **tgt_args, ptrdiff_t *tgt_offsets, int32_t arg_num) {
void **tgt_args, ptrdiff_t *tgt_offsets,
int32_t arg_num,
__tgt_async_info *async_info_ptr) {
// use one team and one thread.
return __tgt_rtl_run_target_team_region(device_id, tgt_entry_ptr, tgt_args,
tgt_offsets, arg_num, 1, 1, 0);
tgt_offsets, arg_num, 1, 1, 0,
async_info_ptr);
}

int32_t __tgt_rtl_synchronize(int32_t device_id,
__tgt_async_info *async_info_ptr) {
return OFFLOAD_SUCCESS;
}

#ifdef __cplusplus
Expand Down
8 changes: 4 additions & 4 deletions openmp/libomptarget/src/api.cpp
Expand Up @@ -161,19 +161,19 @@ EXTERN int omp_target_memcpy(void *dst, void *src, size_t length,
} else if (src_device == omp_get_initial_device()) {
DP("copy from host to device\n");
DeviceTy& DstDev = Devices[dst_device];
rc = DstDev.data_submit(dstAddr, srcAddr, length);
rc = DstDev.data_submit(dstAddr, srcAddr, length, nullptr);
} else if (dst_device == omp_get_initial_device()) {
DP("copy from device to host\n");
DeviceTy& SrcDev = Devices[src_device];
rc = SrcDev.data_retrieve(dstAddr, srcAddr, length);
rc = SrcDev.data_retrieve(dstAddr, srcAddr, length, nullptr);
} else {
DP("copy from device to device\n");
void *buffer = malloc(length);
DeviceTy& SrcDev = Devices[src_device];
DeviceTy& DstDev = Devices[dst_device];
rc = SrcDev.data_retrieve(buffer, srcAddr, length);
rc = SrcDev.data_retrieve(buffer, srcAddr, length, nullptr);
if (rc == OFFLOAD_SUCCESS)
rc = DstDev.data_submit(dstAddr, buffer, length);
rc = DstDev.data_submit(dstAddr, buffer, length, nullptr);
free(buffer);
}

Expand Down

0 comments on commit 32ed292

Please sign in to comment.