Skip to content

Commit

Permalink
[SYCL] Implement queue::ext_oneapi_empty() API to get queue status (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
againull committed Dec 5, 2022
1 parent 2359d94 commit c493295
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ SYCL specification refer to that revision.

== Status

This is a proposed extension specification, intended to gather community
feedback. Interfaces defined in this specification may not be implemented yet
or may be in a preliminary state. The specification itself may also change in
incompatible ways before it is finalized. *Shipping software products should
not rely on APIs defined in this specification.*
This extension is supported by {dpcpp} on all backends except OpenCL.
[NOTE]
====
Currently support for OpenCL backend is limited, API introduced by this extension
can be called only for in-order queues which doesn't have `discard_events` property.
Exception is thrown if new API is called on other type of queue. OpenCL currently
doesn't have an API to get queue status.
====


== Overview
Expand Down
9 changes: 7 additions & 2 deletions sycl/include/sycl/detail/pi.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@
// piDeviceGetInfo.
// 11.17 Added new PI_EXT_ONEAPI_QUEUE_PRIORITY_LOW and
// PI_EXT_ONEAPI_QUEUE_PRIORITY_HIGH queue properties.
// 11.18 Add new parameter name PI_EXT_ONEAPI_QUEUE_INFO_EMPTY to
// _pi_queue_info.

#define _PI_H_VERSION_MAJOR 11
#define _PI_H_VERSION_MINOR 16
#define _PI_H_VERSION_MINOR 18

#define _PI_STRING_HELPER(a) #a
#define _PI_CONCAT(a, b) _PI_STRING_HELPER(a.b)
Expand Down Expand Up @@ -331,7 +333,10 @@ typedef enum {
PI_QUEUE_INFO_DEVICE_DEFAULT = 0x1095,
PI_QUEUE_INFO_PROPERTIES = 0x1093,
PI_QUEUE_INFO_REFERENCE_COUNT = 0x1092,
PI_QUEUE_INFO_SIZE = 0x1094
PI_QUEUE_INFO_SIZE = 0x1094,
// Return 'true' if all commands previously submitted to the queue have
// completed, otherwise return 'false'.
PI_EXT_ONEAPI_QUEUE_INFO_EMPTY = 0x2096
} _pi_queue_info;

typedef enum {
Expand Down
1 change: 1 addition & 0 deletions sycl/include/sycl/feature_test.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ __SYCL_INLINE_VER_NAMESPACE(_V1) {
#define SYCL_EXT_ONEAPI_BACKEND_LEVEL_ZERO 3
#define SYCL_EXT_ONEAPI_USM_DEVICE_READ_ONLY 1
#define SYCL_EXT_ONEAPI_KERNEL_PROPERTIES 1
#define SYCL_EXT_ONEAPI_QUEUE_EMPTY 1
#define SYCL_EXT_ONEAPI_USER_DEFINED_REDUCTIONS 1
#cmakedefine01 SYCL_BUILD_PI_CUDA
#if SYCL_BUILD_PI_CUDA
Expand Down
6 changes: 6 additions & 0 deletions sycl/include/sycl/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,12 @@ class __SYCL_EXPORT queue {
/// \return the backend associated with this queue.
backend get_backend() const noexcept;

/// Allows to check status of the queue (completed vs noncompleted).
///
/// \return returns true if all enqueued commands in the queue have been
/// completed, otherwise returns false.
bool ext_oneapi_empty() const;

private:
pi_native_handle getNative() const;

Expand Down
21 changes: 21 additions & 0 deletions sycl/plugins/cuda/pi_cuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,27 @@ pi_result cuda_piQueueGetInfo(pi_queue command_queue, pi_queue_info param_name,
case PI_QUEUE_INFO_PROPERTIES:
return getInfo(param_value_size, param_value, param_value_size_ret,
command_queue->properties_);
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY: {
try {
bool IsReady = command_queue->all_of([](CUstream s) -> bool {
const CUresult ret = cuStreamQuery(s);
if (ret == CUDA_SUCCESS)
return true;

if (ret == CUDA_ERROR_NOT_READY)
return false;

PI_CHECK_ERROR(ret);
return false;
});
return getInfo(param_value_size, param_value, param_value_size_ret,
IsReady);
} catch (pi_result err) {
return err;
} catch (...) {
return PI_ERROR_OUT_OF_RESOURCES;
}
}
default:
__SYCL_PI_HANDLE_UNKNOWN_PARAM_NAME(param_name);
}
Expand Down
22 changes: 22 additions & 0 deletions sycl/plugins/cuda/pi_cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,28 @@ struct _pi_queue {
return is_last_command && !has_been_synchronized(stream_token);
}

template <typename T> bool all_of(T &&f) {
{
std::lock_guard compute_guard(compute_stream_mutex_);
unsigned int end =
std::min(static_cast<unsigned int>(compute_streams_.size()),
num_compute_streams_);
if (!std::all_of(compute_streams_.begin(), compute_streams_.begin() + end,
f))
return false;
}
{
std::lock_guard transfer_guard(transfer_stream_mutex_);
unsigned int end =
std::min(static_cast<unsigned int>(transfer_streams_.size()),
num_transfer_streams_);
if (!std::all_of(transfer_streams_.begin(),
transfer_streams_.begin() + end, f))
return false;
}
return true;
}

template <typename T> void for_each_stream(T &&f) {
{
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);
Expand Down
15 changes: 15 additions & 0 deletions sycl/plugins/hip/pi_hip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2420,6 +2420,21 @@ pi_result hip_piQueueGetInfo(pi_queue command_queue, pi_queue_info param_name,
case PI_QUEUE_INFO_PROPERTIES:
return getInfo(param_value_size, param_value, param_value_size_ret,
command_queue->properties_);
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY: {
bool IsReady = command_queue->all_of([](hipStream_t s) -> bool {
const hipError_t ret = hipStreamQuery(s);
if (ret == hipSuccess)
return true;

if (ret == hipErrorNotReady)
return false;

PI_CHECK_ERROR(ret);
return false;
});
return getInfo(param_value_size, param_value, param_value_size_ret,
IsReady);
}
default:
__SYCL_PI_HANDLE_UNKNOWN_PARAM_NAME(param_name);
}
Expand Down
22 changes: 22 additions & 0 deletions sycl/plugins/hip/pi_hip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,28 @@ struct _pi_queue {
return is_last_command && !has_been_synchronized(stream_token);
}

template <typename T> bool all_of(T &&f) {
{
std::lock_guard compute_guard(compute_stream_mutex_);
unsigned int end =
std::min(static_cast<unsigned int>(compute_streams_.size()),
num_compute_streams_);
if (!std::all_of(compute_streams_.begin(), compute_streams_.begin() + end,
f))
return false;
}
{
std::lock_guard transfer_guard(transfer_stream_mutex_);
unsigned int end =
std::min(static_cast<unsigned int>(transfer_streams_.size()),
num_transfer_streams_);
if (!std::all_of(transfer_streams_.begin(),
transfer_streams_.begin() + end, f))
return false;
}
return true;
}

template <typename T> void for_each_stream(T &&f) {
{
std::lock_guard<std::mutex> compute_guard(compute_stream_mutex_);
Expand Down
79 changes: 79 additions & 0 deletions sycl/plugins/level_zero/pi_level_zero.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3660,6 +3660,83 @@ pi_result piQueueGetInfo(pi_queue Queue, pi_queue_info ParamName,
case PI_QUEUE_INFO_DEVICE_DEFAULT:
die("PI_QUEUE_INFO_DEVICE_DEFAULT in piQueueGetInfo not implemented\n");
break;
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY: {
// We can exit early if we have in-order queue.
if (Queue->isInOrderQueue()) {
if (!Queue->LastCommandEvent)
return ReturnValue(pi_bool{true});

// We can check status of the event only if it isn't discarded otherwise
// it may be reset (because we are free to reuse such events) and
// zeEventQueryStatus will hang.
// TODO: use more robust way to check that ZeEvent is not owned by
// LastCommandEvent.
if (!Queue->LastCommandEvent->IsDiscarded) {
ze_result_t ZeResult = ZE_CALL_NOCHECK(
zeEventQueryStatus, (Queue->LastCommandEvent->ZeEvent));
if (ZeResult == ZE_RESULT_NOT_READY) {
return ReturnValue(pi_bool{false});
} else if (ZeResult != ZE_RESULT_SUCCESS) {
return mapError(ZeResult);
}
return ReturnValue(pi_bool{true});
}
// For immediate command lists we have to check status of the event
// because immediate command lists are not associated with level zero
// queue. Conservatively return false in this case because last event is
// discarded and we can't check its status.
if (Queue->Device->useImmediateCommandLists())
return ReturnValue(pi_bool{false});
}

// If we have any open command list which is not empty then return false
// because it means that there are commands which are not even submitted for
// execution yet.
using IsCopy = bool;
if (Queue->hasOpenCommandList(IsCopy{true}) ||
Queue->hasOpenCommandList(IsCopy{false}))
return ReturnValue(pi_bool{false});

for (const auto &QueueGroup :
{Queue->ComputeQueueGroup, Queue->CopyQueueGroup}) {
if (Queue->Device->useImmediateCommandLists()) {
// Immediate command lists are not associated with any Level Zero queue,
// that's why we have to check status of events in each immediate
// command list. Start checking from the end and exit early if some
// event is not completed.
for (const auto &ImmCmdList : QueueGroup.ImmCmdLists) {
if (ImmCmdList == Queue->CommandListMap.end())
continue;

auto EventList = ImmCmdList->second.EventList;
for (auto It = EventList.crbegin(); It != EventList.crend(); It++) {
ze_result_t ZeResult =
ZE_CALL_NOCHECK(zeEventQueryStatus, ((*It)->ZeEvent));
if (ZeResult == ZE_RESULT_NOT_READY) {
return ReturnValue(pi_bool{false});
} else if (ZeResult != ZE_RESULT_SUCCESS) {
return mapError(ZeResult);
}
}
}
} else {
for (const auto &ZeQueue : QueueGroup.ZeQueues) {
if (!ZeQueue)
continue;
// Provide 0 as the timeout parameter to immediately get the status of
// the Level Zero queue.
ze_result_t ZeResult = ZE_CALL_NOCHECK(zeCommandQueueSynchronize,
(ZeQueue, /* timeout */ 0));
if (ZeResult == ZE_RESULT_NOT_READY) {
return ReturnValue(pi_bool{false});
} else if (ZeResult != ZE_RESULT_SUCCESS) {
return mapError(ZeResult);
}
}
}
}
return ReturnValue(pi_bool{true});
}
default:
zePrint("Unsupported ParamName in piQueueGetInfo: ParamName=%d(0x%x)\n",
ParamName, ParamName);
Expand Down Expand Up @@ -6727,6 +6804,8 @@ pi_result _pi_queue::synchronize() {
ZE_CALL(zeHostSynchronize, (ZeQueue));
}

LastCommandEvent = nullptr;

// With the entire queue synchronized, the active barriers must be done so we
// can remove them.
for (pi_event &BarrierEvent : ActiveBarriers)
Expand Down
24 changes: 23 additions & 1 deletion sycl/plugins/opencl/pi_opencl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,28 @@ pi_result piQueueCreate(pi_context context, pi_device device,
return cast<pi_result>(ret_err);
}

pi_result piQueueGetInfo(pi_queue queue, pi_queue_info param_name,
size_t param_value_size, void *param_value,
size_t *param_value_size_ret) {
if (queue == nullptr) {
return PI_ERROR_INVALID_QUEUE;
}

switch (param_name) {
case PI_EXT_ONEAPI_QUEUE_INFO_EMPTY:
// OpenCL doesn't provide API to check the status of the queue.
return PI_ERROR_INVALID_VALUE;
default:
cl_int CLErr = clGetCommandQueueInfo(
cast<cl_command_queue>(queue), cast<cl_command_queue_info>(param_name),
param_value_size, param_value, param_value_size_ret);
if (CLErr != CL_SUCCESS) {
return cast<pi_result>(CLErr);
}
}
return PI_SUCCESS;
}

pi_result piextQueueCreateWithNativeHandle(pi_native_handle nativeHandle,
pi_context, pi_device,
bool ownNativeHandle,
Expand Down Expand Up @@ -1549,7 +1571,7 @@ pi_result piPluginInit(pi_plugin *PluginInit) {
_PI_CL(piextContextCreateWithNativeHandle, piextContextCreateWithNativeHandle)
// Queue
_PI_CL(piQueueCreate, piQueueCreate)
_PI_CL(piQueueGetInfo, clGetCommandQueueInfo)
_PI_CL(piQueueGetInfo, piQueueGetInfo)
_PI_CL(piQueueFinish, clFinish)
_PI_CL(piQueueFlush, clFlush)
_PI_CL(piQueueRetain, clRetainCommandQueue)
Expand Down
42 changes: 42 additions & 0 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,48 @@ pi_native_handle queue_impl::getNative() const {
return Handle;
}

bool queue_impl::ext_oneapi_empty() const {
// If we have in-order queue where events are not discarded then just check
// the status of the last event.
if (isInOrder() && !MDiscardEvents) {
std::lock_guard Lock(MLastEventMtx);
return MLastEvent.get_info<info::event::command_execution_status>() ==
info::event_command_status::complete;
}

// Check the status of the backend queue if this is not a host queue.
if (!is_host()) {
pi_bool IsReady = false;
getPlugin().call<PiApiKind::piQueueGetInfo>(
MQueues[0], PI_EXT_ONEAPI_QUEUE_INFO_EMPTY, sizeof(pi_bool), &IsReady,
nullptr);
if (!IsReady)
return false;
}

// We may have events like host tasks which are not submitted to the backend
// queue so we need to get their status separately.
std::lock_guard Lock(MMutex);
for (event Event : MEventsShared)
if (Event.get_info<info::event::command_execution_status>() !=
info::event_command_status::complete)
return false;

for (auto EventImplWeakPtrIt = MEventsWeak.begin();
EventImplWeakPtrIt != MEventsWeak.end(); ++EventImplWeakPtrIt)
if (std::shared_ptr<event_impl> EventImplSharedPtr =
EventImplWeakPtrIt->lock())
if (EventImplSharedPtr->is_host() &&
EventImplSharedPtr
->get_info<info::event::command_execution_status>() !=
info::event_command_status::complete)
return false;

// If we didn't exit early above then it means that all events in the queue
// are completed.
return true;
}

} // namespace detail
} // __SYCL_INLINE_VER_NAMESPACE(_V1)
} // namespace sycl
6 changes: 4 additions & 2 deletions sycl/source/detail/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ class queue_impl {
MStreamsServiceEvents.push_back(Event);
}

bool ext_oneapi_empty() const;

protected:
// template is needed for proper unit testing
template <typename HandlerType = handler>
Expand Down Expand Up @@ -580,7 +582,7 @@ class queue_impl {
void addEvent(const event &Event);

/// Protects all the fields that can be changed by class' methods.
std::mutex MMutex;
mutable std::mutex MMutex;

DeviceImplPtr MDevice;
const ContextImplPtr MContext;
Expand Down Expand Up @@ -611,7 +613,7 @@ class queue_impl {
// This event is employed for enhanced dependency tracking with in-order queue
// Access to the event should be guarded with MLastEventMtx
event MLastEvent;
std::mutex MLastEventMtx;
mutable std::mutex MLastEventMtx;
// Used for in-order queues in pair with MLastEvent
// Host tasks are explicitly synchronized in RT, pi tasks - implicitly by
// backend. Using type to setup explicit sync between host and pi tasks.
Expand Down
2 changes: 2 additions & 0 deletions sycl/source/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ bool queue::is_in_order() const {

backend queue::get_backend() const noexcept { return getImplBackend(impl); }

bool queue::ext_oneapi_empty() const { return impl->ext_oneapi_empty(); }

pi_native_handle queue::getNative() const { return impl->getNative(); }

buffer<detail::AssertHappened, 1> &queue::getAssertHappenedBuffer() {
Expand Down
Loading

0 comments on commit c493295

Please sign in to comment.