From ec584bd462aa477aefdd12729063cf5d235d4a43 Mon Sep 17 00:00:00 2001 From: Nicolas Miller Date: Fri, 21 Mar 2025 17:29:29 +0000 Subject: [PATCH 1/6] [UR][CUDA][HIP] Unifiy queue handling between adapters The CUDA and HIP adapters are both using a nearly identical complicated queue that handles creating an out-of-order UR queue from in-order CUDA/HIP streams. This patch extracts all of the queue logic into a separate templated class that can be used by both adapters. Beyond removing a lot of duplicated code, it also makes it a lot easier to maintain. There was a few functional differences between the queues in both adapters, but mostly due to fixes done in the CUDA adapter that were not ported to the HIP adapter. There might be more but I found at least one race condition (https://github.com/intel/llvm/pull/15100) and one performance issue (https://github.com/intel/llvm/pull/6333) that weren't fixed in the HIP adapter. This patch uses the CUDA version of the queue as a base for the generic queue, and will thus fix for HIP the race condition and performance issue mentioned above. This code is quite complex, so this patch also aimed to minimize any other changes beyond the structural changes needed to share the code. However it did do the following changes in the two adapters: CUDA: * Rename `ur_stream_guard_` to `ur_stream_guard` * Rename `getNextEventID` to `getNextEventId` * Remove duplicate `get_device` getter, use `getDevice` instead HIP: * Fix queue finish so it doesn't fail when no streams need to be synchronized --- .../source/adapters/cuda/command_buffer.cpp | 2 +- .../source/adapters/cuda/enqueue.cpp | 8 +- .../source/adapters/cuda/event.cpp | 8 +- .../source/adapters/cuda/queue.cpp | 90 +---- .../source/adapters/cuda/queue.hpp | 251 +------------ .../source/adapters/cuda/stream_queue.hpp | 346 ++++++++++++++++++ unified-runtime/source/adapters/hip/queue.cpp | 97 +---- unified-runtime/source/adapters/hip/queue.hpp | 245 +------------ 8 files changed, 400 insertions(+), 647 deletions(-) create mode 100644 unified-runtime/source/adapters/cuda/stream_queue.hpp diff --git a/unified-runtime/source/adapters/cuda/command_buffer.cpp b/unified-runtime/source/adapters/cuda/command_buffer.cpp index f494c7b55753..01dc632fa6fd 100644 --- a/unified-runtime/source/adapters/cuda/command_buffer.cpp +++ b/unified-runtime/source/adapters/cuda/command_buffer.cpp @@ -1133,7 +1133,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueCommandBufferExp( std::unique_ptr RetImplEvent{nullptr}; ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); diff --git a/unified-runtime/source/adapters/cuda/enqueue.cpp b/unified-runtime/source/adapters/cuda/enqueue.cpp index fb3e1952ed1c..e48a1e5ea15f 100644 --- a/unified-runtime/source/adapters/cuda/enqueue.cpp +++ b/unified-runtime/source/adapters/cuda/enqueue.cpp @@ -315,7 +315,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueEventsWaitWithBarrier( try { ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); { @@ -440,7 +440,7 @@ enqueueKernelLaunch(ur_queue_handle_t hQueue, ur_kernel_handle_t hKernel, ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); @@ -628,7 +628,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunchCustomExp( ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); @@ -1517,7 +1517,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMFill( try { ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); UR_CHECK_ERROR(enqueueEventsWait(hQueue, CuStream, numEventsInWaitList, diff --git a/unified-runtime/source/adapters/cuda/event.cpp b/unified-runtime/source/adapters/cuda/event.cpp index d440567ffaac..b8ec9917d396 100644 --- a/unified-runtime/source/adapters/cuda/event.cpp +++ b/unified-runtime/source/adapters/cuda/event.cpp @@ -87,17 +87,17 @@ bool ur_event_handle_t_::isCompleted() const noexcept try { uint64_t ur_event_handle_t_::getQueuedTime() const { assert(isStarted()); - return Queue->get_device()->getElapsedTime(EvQueued); + return Queue->getDevice()->getElapsedTime(EvQueued); } uint64_t ur_event_handle_t_::getStartTime() const { assert(isStarted()); - return Queue->get_device()->getElapsedTime(EvStart); + return Queue->getDevice()->getElapsedTime(EvStart); } uint64_t ur_event_handle_t_::getEndTime() const { assert(isStarted() && isRecorded()); - return Queue->get_device()->getElapsedTime(EvEnd); + return Queue->getDevice()->getElapsedTime(EvEnd); } ur_result_t ur_event_handle_t_::record() { @@ -111,7 +111,7 @@ ur_result_t ur_event_handle_t_::record() { UR_ASSERT(Queue, UR_RESULT_ERROR_INVALID_QUEUE); try { - EventID = Queue->getNextEventID(); + EventID = Queue->getNextEventId(); if (EventID == 0) { die("Unrecoverable program state reached in event identifier overflow"); } diff --git a/unified-runtime/source/adapters/cuda/queue.cpp b/unified-runtime/source/adapters/cuda/queue.cpp index 115766c7d1c2..47e5d81f1203 100644 --- a/unified-runtime/source/adapters/cuda/queue.cpp +++ b/unified-runtime/source/adapters/cuda/queue.cpp @@ -32,93 +32,17 @@ void ur_queue_handle_t_::transferStreamWaitForBarrierIfNeeded( } } -CUstream ur_queue_handle_t_::getNextComputeStream(uint32_t *StreamToken) { - if (getThreadLocalStream() != CUstream{0}) - return getThreadLocalStream(); - uint32_t StreamI; - uint32_t Token; - while (true) { - if (NumComputeStreams < ComputeStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard guard(ComputeStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumComputeStreams after that - if (NumComputeStreams < ComputeStreams.size()) { - UR_CHECK_ERROR(cuStreamCreateWithPriority( - &ComputeStreams[NumComputeStreams], Flags, Priority)); - ++NumComputeStreams; - } - } - Token = ComputeStreamIndex++; - StreamI = Token % ComputeStreams.size(); - // if a stream has been reused before it was next selected round-robin - // fashion, we want to delay its next use and instead select another one - // that is more likely to have completed all the enqueued work. - if (DelayCompute[StreamI]) { - DelayCompute[StreamI] = false; - } else { - break; - } - } - if (StreamToken) { - *StreamToken = Token; - } - CUstream res = ComputeStreams[StreamI]; - computeStreamWaitForBarrierIfNeeded(res, StreamI); - return res; +ur_queue_handle_t ur_queue_handle_t_::getEventQueue(const ur_event_handle_t e) { + return e->getQueue(); } -CUstream ur_queue_handle_t_::getNextComputeStream( - uint32_t NumEventsInWaitList, const ur_event_handle_t *EventWaitList, - ur_stream_guard_ &Guard, uint32_t *StreamToken) { - if (getThreadLocalStream() != CUstream{0}) - return getThreadLocalStream(); - for (uint32_t i = 0; i < NumEventsInWaitList; i++) { - uint32_t Token = EventWaitList[i]->getComputeStreamToken(); - if (reinterpret_cast(EventWaitList[i]->getQueue()) == - this && - canReuseStream(Token)) { - std::unique_lock ComputeSyncGuard(ComputeStreamSyncMutex); - // redo the check after lock to avoid data races on - // LastSyncComputeStreams - if (canReuseStream(Token)) { - uint32_t StreamI = Token % DelayCompute.size(); - DelayCompute[StreamI] = true; - if (StreamToken) { - *StreamToken = Token; - } - Guard = ur_stream_guard_{std::move(ComputeSyncGuard)}; - CUstream Result = EventWaitList[i]->getStream(); - computeStreamWaitForBarrierIfNeeded(Result, StreamI); - return Result; - } - } - } - Guard = {}; - return getNextComputeStream(StreamToken); +uint32_t +ur_queue_handle_t_::getEventComputeStreamToken(const ur_event_handle_t e) { + return e->getComputeStreamToken(); } -CUstream ur_queue_handle_t_::getNextTransferStream() { - if (getThreadLocalStream() != CUstream{0}) - return getThreadLocalStream(); - if (TransferStreams.empty()) { // for example in in-order queue - return getNextComputeStream(); - } - if (NumTransferStreams < TransferStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard Guuard(TransferStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumTransferStreams after that - if (NumTransferStreams < TransferStreams.size()) { - UR_CHECK_ERROR(cuStreamCreateWithPriority( - &TransferStreams[NumTransferStreams], Flags, Priority)); - ++NumTransferStreams; - } - } - uint32_t StreamI = TransferStreamIndex++ % TransferStreams.size(); - CUstream Result = TransferStreams[StreamI]; - transferStreamWaitForBarrierIfNeeded(Result, StreamI); - return Result; +CUstream ur_queue_handle_t_::getEventStream(const ur_event_handle_t e) { + return e->getStream(); } /// Creates a `ur_queue_handle_t` object on the CUDA backend. diff --git a/unified-runtime/source/adapters/cuda/queue.hpp b/unified-runtime/source/adapters/cuda/queue.hpp index 89132c99ca63..a0d130fa3ac8 100644 --- a/unified-runtime/source/adapters/cuda/queue.hpp +++ b/unified-runtime/source/adapters/cuda/queue.hpp @@ -17,100 +17,24 @@ #include #include -using ur_stream_guard_ = std::unique_lock; +#include "stream_queue.hpp" /// UR queue mapping on to CUstream objects. /// -struct ur_queue_handle_t_ { +struct ur_queue_handle_t_ : stream_queue_t { + using stream_queue_t::stream_queue_t; - using native_type = CUstream; - static constexpr int DefaultNumComputeStreams = 128; - static constexpr int DefaultNumTransferStreams = 64; - - std::vector ComputeStreams; - std::vector TransferStreams; - // Stream used for recording EvQueue, which holds information about when the - // command in question is enqueued on host, as opposed to started. It is - // created only if profiling is enabled - either for queue or per event. - native_type HostSubmitTimeStream{0}; - // delay_compute_ keeps track of which streams have been recently reused and - // their next use should be delayed. If a stream has been recently reused it - // will be skipped the next time it would be selected round-robin style. When - // skipped, its delay flag is cleared. - std::vector DelayCompute; - // keep track of which streams have applied barrier - std::vector ComputeAppliedBarrier; - std::vector TransferAppliedBarrier; - ur_context_handle_t_ *Context; - ur_device_handle_t_ *Device; CUevent BarrierEvent = nullptr; CUevent BarrierTmpEvent = nullptr; - std::atomic_uint32_t RefCount; - std::atomic_uint32_t EventCount; - std::atomic_uint32_t ComputeStreamIndex; - std::atomic_uint32_t TransferStreamIndex; - unsigned int NumComputeStreams; - unsigned int NumTransferStreams; - unsigned int LastSyncComputeStreams; - unsigned int LastSyncTransferStreams; - unsigned int Flags; - ur_queue_flags_t URFlags; - int Priority; - // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be - // locked at the same time, ComputeStreamSyncMutex should be locked first - // to avoid deadlocks - std::mutex ComputeStreamSyncMutex; - std::mutex ComputeStreamMutex; - std::mutex TransferStreamMutex; - std::mutex BarrierMutex; - bool HasOwnership; - - ur_queue_handle_t_(std::vector &&ComputeStreams, - std::vector &&TransferStreams, - ur_context_handle_t_ *Context, ur_device_handle_t_ *Device, - unsigned int Flags, ur_queue_flags_t URFlags, int Priority, - bool BackendOwns = true) - : ComputeStreams{std::move(ComputeStreams)}, - TransferStreams{std::move(TransferStreams)}, - DelayCompute(this->ComputeStreams.size(), false), - ComputeAppliedBarrier(this->ComputeStreams.size()), - TransferAppliedBarrier(this->TransferStreams.size()), Context{Context}, - Device{Device}, RefCount{1}, EventCount{0}, ComputeStreamIndex{0}, - TransferStreamIndex{0}, NumComputeStreams{0}, NumTransferStreams{0}, - LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), - URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { - urContextRetain(Context); - urDeviceRetain(Device); - } - ~ur_queue_handle_t_() { - urContextRelease(Context); - urDeviceRelease(Device); - } - - void computeStreamWaitForBarrierIfNeeded(CUstream Strean, uint32_t StreamI); - void transferStreamWaitForBarrierIfNeeded(CUstream Stream, uint32_t StreamI); - - // get_next_compute/transfer_stream() functions return streams from - // appropriate pools in round-robin fashion - native_type getNextComputeStream(uint32_t *StreamToken = nullptr); - // this overload tries select a stream that was used by one of dependencies. - // If that is not possible returns a new stream. If a stream is reused it - // returns a lock that needs to remain locked as long as the stream is in use - native_type getNextComputeStream(uint32_t NumEventsInWaitList, - const ur_event_handle_t *EventWaitList, - ur_stream_guard_ &Guard, - uint32_t *StreamToken = nullptr); - - // Thread local stream will be used if ScopedStream is active - static CUstream &getThreadLocalStream() { - static thread_local CUstream stream{0}; - return stream; - } - - native_type getNextTransferStream(); - native_type get() { return getNextComputeStream(); }; - ur_device_handle_t getDevice() const noexcept { return Device; }; + void computeStreamWaitForBarrierIfNeeded(CUstream Strean, + uint32_t StreamI) override; + void transferStreamWaitForBarrierIfNeeded(CUstream Stream, + uint32_t StreamI) override; + ur_queue_handle_t getEventQueue(const ur_event_handle_t) override; + uint32_t getEventComputeStreamToken(const ur_event_handle_t) override; + CUstream getEventStream(const ur_event_handle_t) override; // Function which creates the profiling stream. Called only from makeNative // event when profiling is required. @@ -122,155 +46,10 @@ struct ur_queue_handle_t_ { }); } - native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; } - - bool hasBeenSynchronized(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - return LastSyncComputeStreams > StreamToken; - } - - bool canReuseStream(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - // If the command represented by the stream token was not the last command - // enqueued to the stream we can not reuse the stream - we need to allow for - // commands enqueued after it and the one we are about to enqueue to run - // concurrently - bool IsLastCommand = - (ComputeStreamIndex - StreamToken) <= ComputeStreams.size(); - // If there was a barrier enqueued to the queue after the command - // represented by the stream token we should not reuse the stream, as we can - // not take that stream into account for the bookkeeping for the next - // barrier - such a stream would not be synchronized with. Performance-wise - // it does not matter that we do not reuse the stream, as the work - // represented by the stream token is guaranteed to be complete by the - // barrier before any work we are about to enqueue to the stream will start, - // so the event does not need to be synchronized with. - return IsLastCommand && !hasBeenSynchronized(StreamToken); + void createStreamWithPriority(CUstream *Stream, unsigned int Flags, + int Priority) override { + UR_CHECK_ERROR(cuStreamCreateWithPriority(Stream, Flags, Priority)); } - - template bool allOf(T &&F) { - { - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - if (!std::all_of(ComputeStreams.begin(), ComputeStreams.begin() + End, F)) - return false; - } - { - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - if (!std::all_of(TransferStreams.begin(), TransferStreams.begin() + End, - F)) - return false; - } - return true; - } - - template void forEachStream(T &&F) { - { - std::lock_guard compute_guard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - for (unsigned int i = 0; i < End; i++) { - F(ComputeStreams[i]); - } - } - { - std::lock_guard transfer_guard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - for (unsigned int i = 0; i < End; i++) { - F(TransferStreams[i]); - } - } - } - - template void syncStreams(T &&F) { - auto SyncCompute = [&F, &Streams = ComputeStreams, &Delay = DelayCompute]( - unsigned int Start, unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(Streams[i]); - Delay[i] = false; - } - }; - auto SyncTransfer = [&F, &streams = TransferStreams](unsigned int Start, - unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(streams[i]); - } - }; - { - unsigned int Size = static_cast(ComputeStreams.size()); - std::lock_guard ComputeSyncGuard(ComputeStreamSyncMutex); - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int Start = LastSyncComputeStreams; - unsigned int End = NumComputeStreams < Size ? NumComputeStreams - : ComputeStreamIndex.load(); - if (ResetUsed) { - LastSyncComputeStreams = End; - } - if (End - Start >= Size) { - SyncCompute(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start <= End) { - SyncCompute(Start, End); - } else { - SyncCompute(Start, Size); - SyncCompute(0, End); - } - } - } - { - unsigned int Size = static_cast(TransferStreams.size()); - if (!Size) { - return; - } - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int Start = LastSyncTransferStreams; - unsigned int End = NumTransferStreams < Size ? NumTransferStreams - : TransferStreamIndex.load(); - if (ResetUsed) { - LastSyncTransferStreams = End; - } - if (End - Start >= Size) { - SyncTransfer(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start <= End) { - SyncTransfer(Start, End); - } else { - SyncTransfer(Start, Size); - SyncTransfer(0, End); - } - } - } - } - - ur_context_handle_t_ *getContext() const { return Context; }; - - ur_device_handle_t_ *get_device() const { return Device; }; - - uint32_t incrementReferenceCount() noexcept { return ++RefCount; } - - uint32_t decrementReferenceCount() noexcept { return --RefCount; } - - uint32_t getReferenceCount() const noexcept { return RefCount; } - - uint32_t getNextEventID() noexcept { return ++EventCount; } - - bool backendHasOwnership() const noexcept { return HasOwnership; } }; // RAII object to make hQueue stream getter methods all return the same stream @@ -286,7 +65,7 @@ class ScopedStream { ScopedStream(ur_queue_handle_t hQueue, uint32_t NumEventsInWaitList, const ur_event_handle_t *EventWaitList) : hQueue{hQueue} { - ur_stream_guard_ Guard; + ur_stream_guard Guard; hQueue->getThreadLocalStream() = hQueue->getNextComputeStream(NumEventsInWaitList, EventWaitList, Guard); } diff --git a/unified-runtime/source/adapters/cuda/stream_queue.hpp b/unified-runtime/source/adapters/cuda/stream_queue.hpp new file mode 100644 index 000000000000..de0d5b25023c --- /dev/null +++ b/unified-runtime/source/adapters/cuda/stream_queue.hpp @@ -0,0 +1,346 @@ +//===--------- stream_queue.hpp - CUDA Adapter ----------------------------===// +// +// Copyright (C) 2025 Intel Corporation +// +// Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM +// Exceptions. See LICENSE.TXT +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +#pragma once + +#include + +#include +#include +#include + +using ur_stream_guard = std::unique_lock; + +/// Generic implementation of out-of-order queue using in-order streams. +/// +template struct stream_queue_t { + using native_type = ST; + static constexpr int DefaultNumComputeStreams = CS; + static constexpr int DefaultNumTransferStreams = TS; + + std::vector ComputeStreams; + std::vector TransferStreams; + // Stream used for recording EvQueue, which holds information about when the + // command in question is enqueued on host, as opposed to started. It is + // created only if profiling is enabled - either for queue or per event. + native_type HostSubmitTimeStream{0}; + // delay_compute_ keeps track of which streams have been recently reused and + // their next use should be delayed. If a stream has been recently reused it + // will be skipped the next time it would be selected round-robin style. When + // skipped, its delay flag is cleared. + std::vector DelayCompute; + // keep track of which streams have applied barrier + std::vector ComputeAppliedBarrier; + std::vector TransferAppliedBarrier; + ur_context_handle_t_ *Context; + ur_device_handle_t_ *Device; + std::atomic_uint32_t RefCount; + std::atomic_uint32_t EventCount; + std::atomic_uint32_t ComputeStreamIndex; + std::atomic_uint32_t TransferStreamIndex; + unsigned int NumComputeStreams; + unsigned int NumTransferStreams; + unsigned int LastSyncComputeStreams; + unsigned int LastSyncTransferStreams; + unsigned int Flags; + ur_queue_flags_t URFlags; + int Priority; + // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be + // locked at the same time, ComputeStreamSyncMutex should be locked first + // to avoid deadlocks + std::mutex ComputeStreamSyncMutex; + std::mutex ComputeStreamMutex; + std::mutex TransferStreamMutex; + std::mutex BarrierMutex; + bool HasOwnership; + + stream_queue_t(std::vector &&ComputeStreams, + std::vector &&TransferStreams, + ur_context_handle_t_ *Context, ur_device_handle_t_ *Device, + unsigned int Flags, ur_queue_flags_t URFlags, int Priority, + bool BackendOwns = true) + : ComputeStreams{std::move(ComputeStreams)}, + TransferStreams{std::move(TransferStreams)}, + DelayCompute(this->ComputeStreams.size(), false), + ComputeAppliedBarrier(this->ComputeStreams.size()), + TransferAppliedBarrier(this->TransferStreams.size()), Context{Context}, + Device{Device}, RefCount{1}, EventCount{0}, ComputeStreamIndex{0}, + TransferStreamIndex{0}, NumComputeStreams{0}, NumTransferStreams{0}, + LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), + URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { + urContextRetain(Context); + urDeviceRetain(Device); + } + + ~stream_queue_t() { + urContextRelease(Context); + urDeviceRelease(Device); + } + + virtual void computeStreamWaitForBarrierIfNeeded(native_type Strean, + uint32_t StreamI) = 0; + virtual void transferStreamWaitForBarrierIfNeeded(native_type Stream, + uint32_t StreamI) = 0; + virtual void createStreamWithPriority(native_type *Stream, unsigned int Flags, + int Priority) = 0; + virtual ur_queue_handle_t getEventQueue(const ur_event_handle_t) = 0; + virtual uint32_t getEventComputeStreamToken(const ur_event_handle_t) = 0; + virtual native_type getEventStream(const ur_event_handle_t) = 0; + + // get_next_compute/transfer_stream() functions return streams from + // appropriate pools in round-robin fashion + native_type getNextComputeStream(uint32_t *StreamToken = nullptr) { + if (getThreadLocalStream() != native_type{0}) + return getThreadLocalStream(); + uint32_t StreamI; + uint32_t Token; + while (true) { + if (NumComputeStreams < ComputeStreams.size()) { + // the check above is for performance - so as not to lock mutex every + // time + std::lock_guard guard(ComputeStreamMutex); + // The second check is done after mutex is locked so other threads can + // not change NumComputeStreams after that + if (NumComputeStreams < ComputeStreams.size()) { + createStreamWithPriority(&ComputeStreams[NumComputeStreams], Flags, + Priority); + ++NumComputeStreams; + } + } + Token = ComputeStreamIndex++; + StreamI = Token % ComputeStreams.size(); + // if a stream has been reused before it was next selected round-robin + // fashion, we want to delay its next use and instead select another one + // that is more likely to have completed all the enqueued work. + if (DelayCompute[StreamI]) { + DelayCompute[StreamI] = false; + } else { + break; + } + } + if (StreamToken) { + *StreamToken = Token; + } + native_type res = ComputeStreams[StreamI]; + computeStreamWaitForBarrierIfNeeded(res, StreamI); + return res; + } + + // this overload tries select a stream that was used by one of dependencies. + // If that is not possible returns a new stream. If a stream is reused it + // returns a lock that needs to remain locked as long as the stream is in use + native_type getNextComputeStream(uint32_t NumEventsInWaitList, + const ur_event_handle_t *EventWaitList, + ur_stream_guard &Guard, + uint32_t *StreamToken = nullptr) { + if (getThreadLocalStream() != native_type{0}) + return getThreadLocalStream(); + for (uint32_t i = 0; i < NumEventsInWaitList; i++) { + uint32_t Token = getEventComputeStreamToken(EventWaitList[i]); + if (getEventQueue(EventWaitList[i]) == this && canReuseStream(Token)) { + std::unique_lock ComputeSyncGuard(ComputeStreamSyncMutex); + // redo the check after lock to avoid data races on + // LastSyncComputeStreams + if (canReuseStream(Token)) { + uint32_t StreamI = Token % DelayCompute.size(); + DelayCompute[StreamI] = true; + if (StreamToken) { + *StreamToken = Token; + } + Guard = ur_stream_guard{std::move(ComputeSyncGuard)}; + native_type Result = getEventStream(EventWaitList[i]); + computeStreamWaitForBarrierIfNeeded(Result, StreamI); + return Result; + } + } + } + Guard = {}; + return getNextComputeStream(StreamToken); + } + + // Thread local stream will be used if ScopedStream is active + static native_type &getThreadLocalStream() { + static thread_local native_type stream{0}; + return stream; + } + + native_type getNextTransferStream() { + if (getThreadLocalStream() != native_type{0}) + return getThreadLocalStream(); + if (TransferStreams.empty()) { // for example in in-order queue + return getNextComputeStream(); + } + if (NumTransferStreams < TransferStreams.size()) { + // the check above is for performance - so as not to lock mutex every time + std::lock_guard Guard(TransferStreamMutex); + // The second check is done after mutex is locked so other threads can not + // change NumTransferStreams after that + if (NumTransferStreams < TransferStreams.size()) { + createStreamWithPriority(&TransferStreams[NumTransferStreams], Flags, + Priority); + ++NumTransferStreams; + } + } + uint32_t StreamI = TransferStreamIndex++ % TransferStreams.size(); + native_type Result = TransferStreams[StreamI]; + transferStreamWaitForBarrierIfNeeded(Result, StreamI); + return Result; + } + + native_type get() { return getNextComputeStream(); }; + ur_device_handle_t getDevice() const noexcept { return Device; }; + + native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; } + + bool hasBeenSynchronized(uint32_t StreamToken) { + // stream token not associated with one of the compute streams + if (StreamToken == std::numeric_limits::max()) { + return false; + } + return LastSyncComputeStreams > StreamToken; + } + + bool canReuseStream(uint32_t StreamToken) { + // stream token not associated with one of the compute streams + if (StreamToken == std::numeric_limits::max()) { + return false; + } + // If the command represented by the stream token was not the last command + // enqueued to the stream we can not reuse the stream - we need to allow for + // commands enqueued after it and the one we are about to enqueue to run + // concurrently + bool IsLastCommand = + (ComputeStreamIndex - StreamToken) <= ComputeStreams.size(); + // If there was a barrier enqueued to the queue after the command + // represented by the stream token we should not reuse the stream, as we can + // not take that stream into account for the bookkeeping for the next + // barrier - such a stream would not be synchronized with. Performance-wise + // it does not matter that we do not reuse the stream, as the work + // represented by the stream token is guaranteed to be complete by the + // barrier before any work we are about to enqueue to the stream will start, + // so the event does not need to be synchronized with. + return IsLastCommand && !hasBeenSynchronized(StreamToken); + } + + template bool allOf(T &&F) { + { + std::lock_guard ComputeGuard(ComputeStreamMutex); + unsigned int End = std::min( + static_cast(ComputeStreams.size()), NumComputeStreams); + if (!std::all_of(ComputeStreams.begin(), ComputeStreams.begin() + End, F)) + return false; + } + { + std::lock_guard TransferGuard(TransferStreamMutex); + unsigned int End = + std::min(static_cast(TransferStreams.size()), + NumTransferStreams); + if (!std::all_of(TransferStreams.begin(), TransferStreams.begin() + End, + F)) + return false; + } + return true; + } + + template void forEachStream(T &&F) { + { + std::lock_guard compute_guard(ComputeStreamMutex); + unsigned int End = std::min( + static_cast(ComputeStreams.size()), NumComputeStreams); + for (unsigned int i = 0; i < End; i++) { + F(ComputeStreams[i]); + } + } + { + std::lock_guard transfer_guard(TransferStreamMutex); + unsigned int End = + std::min(static_cast(TransferStreams.size()), + NumTransferStreams); + for (unsigned int i = 0; i < End; i++) { + F(TransferStreams[i]); + } + } + } + + template void syncStreams(T &&F) { + auto SyncCompute = [&F, &Streams = ComputeStreams, &Delay = DelayCompute]( + unsigned int Start, unsigned int Stop) { + for (unsigned int i = Start; i < Stop; i++) { + F(Streams[i]); + Delay[i] = false; + } + }; + auto SyncTransfer = [&F, &streams = TransferStreams](unsigned int Start, + unsigned int Stop) { + for (unsigned int i = Start; i < Stop; i++) { + F(streams[i]); + } + }; + { + unsigned int Size = static_cast(ComputeStreams.size()); + std::lock_guard ComputeSyncGuard(ComputeStreamSyncMutex); + std::lock_guard ComputeGuard(ComputeStreamMutex); + unsigned int Start = LastSyncComputeStreams; + unsigned int End = NumComputeStreams < Size ? NumComputeStreams + : ComputeStreamIndex.load(); + if (ResetUsed) { + LastSyncComputeStreams = End; + } + if (End - Start >= Size) { + SyncCompute(0, Size); + } else { + Start %= Size; + End %= Size; + if (Start <= End) { + SyncCompute(Start, End); + } else { + SyncCompute(Start, Size); + SyncCompute(0, End); + } + } + } + { + unsigned int Size = static_cast(TransferStreams.size()); + if (!Size) { + return; + } + std::lock_guard TransferGuard(TransferStreamMutex); + unsigned int Start = LastSyncTransferStreams; + unsigned int End = NumTransferStreams < Size ? NumTransferStreams + : TransferStreamIndex.load(); + if (ResetUsed) { + LastSyncTransferStreams = End; + } + if (End - Start >= Size) { + SyncTransfer(0, Size); + } else { + Start %= Size; + End %= Size; + if (Start <= End) { + SyncTransfer(Start, End); + } else { + SyncTransfer(Start, Size); + SyncTransfer(0, End); + } + } + } + } + + ur_context_handle_t_ *getContext() const { return Context; }; + + uint32_t incrementReferenceCount() noexcept { return ++RefCount; } + + uint32_t decrementReferenceCount() noexcept { return --RefCount; } + + uint32_t getReferenceCount() const noexcept { return RefCount; } + + uint32_t getNextEventId() noexcept { return ++EventCount; } + + bool backendHasOwnership() const noexcept { return HasOwnership; } +}; diff --git a/unified-runtime/source/adapters/hip/queue.cpp b/unified-runtime/source/adapters/hip/queue.cpp index 12c96baa0b9e..da41d9ab2696 100644 --- a/unified-runtime/source/adapters/hip/queue.cpp +++ b/unified-runtime/source/adapters/hip/queue.cpp @@ -28,89 +28,17 @@ void ur_queue_handle_t_::transferStreamWaitForBarrierIfNeeded( } } -hipStream_t ur_queue_handle_t_::getNextComputeStream(uint32_t *StreamToken) { - if (getThreadLocalStream() != hipStream_t{0}) - return getThreadLocalStream(); - uint32_t Stream_i; - uint32_t Token; - while (true) { - if (NumComputeStreams < ComputeStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard guard(ComputeStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumComputeStreams after that - if (NumComputeStreams < ComputeStreams.size()) { - UR_CHECK_ERROR(hipStreamCreateWithPriority( - &ComputeStreams[NumComputeStreams++], Flags, Priority)); - } - } - Token = ComputeStreamIdx++; - Stream_i = Token % ComputeStreams.size(); - // if a stream has been reused before it was next selected round-robin - // fashion, we want to delay its next use and instead select another one - // that is more likely to have completed all the enqueued work. - if (DelayCompute[Stream_i]) { - DelayCompute[Stream_i] = false; - } else { - break; - } - } - if (StreamToken) { - *StreamToken = Token; - } - hipStream_t Res = ComputeStreams[Stream_i]; - computeStreamWaitForBarrierIfNeeded(Res, Stream_i); - return Res; +ur_queue_handle_t ur_queue_handle_t_::getEventQueue(const ur_event_handle_t e) { + return e->getQueue(); } -hipStream_t ur_queue_handle_t_::getNextComputeStream( - uint32_t NumEventsInWaitList, const ur_event_handle_t *EventWaitList, - ur_stream_guard &Guard, uint32_t *StreamToken) { - if (getThreadLocalStream() != hipStream_t{0}) - return getThreadLocalStream(); - for (uint32_t i = 0; i < NumEventsInWaitList; i++) { - uint32_t Token = EventWaitList[i]->getComputeStreamToken(); - if (EventWaitList[i]->getQueue() == this && canReuseStream(Token)) { - std::unique_lock ComputeSyncGuard(ComputeStreamSyncMutex); - // redo the check after lock to avoid data races on - // LastSyncComputeStreams - if (canReuseStream(Token)) { - uint32_t Stream_i = Token % DelayCompute.size(); - DelayCompute[Stream_i] = true; - if (StreamToken) { - *StreamToken = Token; - } - Guard = ur_stream_guard{std::move(ComputeSyncGuard)}; - hipStream_t Res = EventWaitList[i]->getStream(); - computeStreamWaitForBarrierIfNeeded(Res, Stream_i); - return Res; - } - } - } - Guard = {}; - return getNextComputeStream(StreamToken); +uint32_t +ur_queue_handle_t_::getEventComputeStreamToken(const ur_event_handle_t e) { + return e->getComputeStreamToken(); } -hipStream_t ur_queue_handle_t_::getNextTransferStream() { - if (getThreadLocalStream() != hipStream_t{0}) - return getThreadLocalStream(); - if (TransferStreams.empty()) { // for example in in-order queue - return getNextComputeStream(); - } - if (NumTransferStreams < TransferStreams.size()) { - // the check above is for performance - so as not to lock mutex every time - std::lock_guard Guard(TransferStreamMutex); - // The second check is done after mutex is locked so other threads can not - // change NumTransferStreams after that - if (NumTransferStreams < TransferStreams.size()) { - UR_CHECK_ERROR(hipStreamCreateWithPriority( - &TransferStreams[NumTransferStreams++], Flags, Priority)); - } - } - uint32_t Stream_i = TransferStreamIdx++ % TransferStreams.size(); - hipStream_t Res = TransferStreams[Stream_i]; - transferStreamWaitForBarrierIfNeeded(Res, Stream_i); - return Res; +hipStream_t ur_queue_handle_t_::getEventStream(const ur_event_handle_t e) { + return e->getStream(); } UR_APIEXPORT ur_result_t UR_APICALL @@ -246,18 +174,13 @@ UR_APIEXPORT ur_result_t UR_APICALL urQueueRelease(ur_queue_handle_t hQueue) { } UR_APIEXPORT ur_result_t UR_APICALL urQueueFinish(ur_queue_handle_t hQueue) { - // set default result to a negative result (avoid false-positve tests) - ur_result_t Result = UR_RESULT_ERROR_OUT_OF_RESOURCES; + ur_result_t Result = UR_RESULT_SUCCESS; try { - ScopedDevice Active(hQueue->getDevice()); - hQueue->syncStreams([&Result](hipStream_t S) { - UR_CHECK_ERROR(hipStreamSynchronize(S)); - Result = UR_RESULT_SUCCESS; - }); - + hQueue->syncStreams( + [&Result](hipStream_t S) { UR_CHECK_ERROR(hipStreamSynchronize(S)); }); } catch (ur_result_t Err) { Result = Err; } catch (...) { diff --git a/unified-runtime/source/adapters/hip/queue.hpp b/unified-runtime/source/adapters/hip/queue.hpp index ccd94c141992..0f68b4f8789d 100644 --- a/unified-runtime/source/adapters/hip/queue.hpp +++ b/unified-runtime/source/adapters/hip/queue.hpp @@ -14,93 +14,24 @@ #include #include -using ur_stream_guard = std::unique_lock; +#include "../cuda/stream_queue.hpp" /// UR queue mapping on to hipStream_t objects. /// -struct ur_queue_handle_t_ { - using native_type = hipStream_t; - static constexpr int DefaultNumComputeStreams = 64; - static constexpr int DefaultNumTransferStreams = 16; +struct ur_queue_handle_t_ : stream_queue_t { + using stream_queue_t::stream_queue_t; - std::vector ComputeStreams; - std::vector TransferStreams; - // Stream used for recording EvQueue, which holds information about when the - // command in question is enqueued on host, as opposed to started. It is - // created only if profiling is enabled - either for queue or per event. - native_type HostSubmitTimeStream{0}; - // DelayCompute keeps track of which streams have been recently reused and - // their next use should be delayed. If a stream has been recently reused it - // will be skipped the next time it would be selected round-robin style. When - // skipped, its delay flag is cleared. - std::vector DelayCompute; - // keep track of which streams have applied barrier - std::vector ComputeAppliedBarrier; - std::vector TransferAppliedBarrier; - ur_context_handle_t Context; - ur_device_handle_t Device; hipEvent_t BarrierEvent = nullptr; hipEvent_t BarrierTmpEvent = nullptr; - std::atomic_uint32_t RefCount; - std::atomic_uint32_t EventCount; - std::atomic_uint32_t ComputeStreamIdx; - std::atomic_uint32_t TransferStreamIdx; - unsigned int NumComputeStreams; - unsigned int NumTransferStreams; - unsigned int LastSyncComputeStreams; - unsigned int LastSyncTransferStreams; - unsigned int Flags; - ur_queue_flags_t URFlags; - int Priority; - // When ComputeStreamSyncMutex and ComputeStreamMutex both need to be - // locked at the same time, ComputeStreamSyncMutex should be locked first - // to avoid deadlocks - std::mutex ComputeStreamSyncMutex; - std::mutex ComputeStreamMutex; - std::mutex TransferStreamMutex; - std::mutex BarrierMutex; - bool HasOwnership; - ur_queue_handle_t_(std::vector &&ComputeStreams, - std::vector &&TransferStreams, - ur_context_handle_t Context, ur_device_handle_t Device, - unsigned int Flags, ur_queue_flags_t URFlags, int Priority, - bool BackendOwns = true) - : ComputeStreams{std::move(ComputeStreams)}, - TransferStreams{std::move(TransferStreams)}, - DelayCompute(this->ComputeStreams.size(), false), - ComputeAppliedBarrier(this->ComputeStreams.size()), - TransferAppliedBarrier(this->TransferStreams.size()), Context{Context}, - Device{Device}, RefCount{1}, EventCount{0}, ComputeStreamIdx{0}, - TransferStreamIdx{0}, NumComputeStreams{0}, NumTransferStreams{0}, - LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), - URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { - urContextRetain(Context); - urDeviceRetain(Device); - } - - ~ur_queue_handle_t_() { - urContextRelease(Context); - urDeviceRelease(Device); - } - - void computeStreamWaitForBarrierIfNeeded(hipStream_t Stream, - uint32_t Stream_i); + void computeStreamWaitForBarrierIfNeeded(hipStream_t Strean, + uint32_t StreamI) override; void transferStreamWaitForBarrierIfNeeded(hipStream_t Stream, - uint32_t Stream_i); - - // getNextCompute/TransferStream() functions return streams from - // appropriate pools in round-robin fashion - native_type getNextComputeStream(uint32_t *StreamToken = nullptr); - // this overload tries select a stream that was used by one of dependencies. - // If that is not possible returns a new stream. If a stream is reused it - // returns a lock that needs to remain locked as long as the stream is in use - native_type getNextComputeStream(uint32_t NumEventsInWaitList, - const ur_event_handle_t *EventWaitList, - ur_stream_guard &Guard, - uint32_t *StreamToken = nullptr); - native_type getNextTransferStream(); - native_type get() { return getNextComputeStream(); }; + uint32_t StreamI) override; + ur_queue_handle_t getEventQueue(const ur_event_handle_t) override; + uint32_t getEventComputeStreamToken(const ur_event_handle_t) override; + hipStream_t getEventStream(const ur_event_handle_t) override; // Function which creates the profiling stream. Called only from makeNative // event when profiling is required. @@ -111,161 +42,11 @@ struct ur_queue_handle_t_ { hipStreamNonBlocking)); }); } - native_type getHostSubmitTimeStream() { return HostSubmitTimeStream; } - - bool hasBeenSynchronized(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - return LastSyncComputeStreams > StreamToken; - } - - bool canReuseStream(uint32_t StreamToken) { - // stream token not associated with one of the compute streams - if (StreamToken == std::numeric_limits::max()) { - return false; - } - // If the command represented by the stream token was not the last command - // enqueued to the stream we can not reuse the stream - we need to allow for - // commands enqueued after it and the one we are about to enqueue to run - // concurrently - bool IsLastCommand = - (ComputeStreamIdx - StreamToken) <= ComputeStreams.size(); - // If there was a barrier enqueued to the queue after the command - // represented by the stream token we should not reuse the stream, as we can - // not take that stream into account for the bookkeeping for the next - // barrier - such a stream would not be synchronized with. Performance-wise - // it does not matter that we do not reuse the stream, as the work - // represented by the stream token is guaranteed to be complete by the - // barrier before any work we are about to enqueue to the stream will start, - // so the event does not need to be synchronized with. - return IsLastCommand && !hasBeenSynchronized(StreamToken); - } - - template bool allOf(T &&F) { - { - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - if (!std::all_of(ComputeStreams.begin(), ComputeStreams.begin() + End, F)) - return false; - } - { - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - if (!std::all_of(TransferStreams.begin(), TransferStreams.begin() + End, - F)) - return false; - } - return true; - } - template void forEachStream(T &&F) { - { - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int End = std::min( - static_cast(ComputeStreams.size()), NumComputeStreams); - for (unsigned int i = 0; i < End; i++) { - F(ComputeStreams[i]); - } - } - { - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int End = - std::min(static_cast(TransferStreams.size()), - NumTransferStreams); - for (unsigned int i = 0; i < End; i++) { - F(TransferStreams[i]); - } - } + void createStreamWithPriority(hipStream_t *Stream, unsigned int Flags, + int Priority) override { + UR_CHECK_ERROR(hipStreamCreateWithPriority(Stream, Flags, Priority)); } - - template void syncStreams(T &&F) { - auto SyncCompute = [&F, &Streams = ComputeStreams, &Delay = DelayCompute]( - unsigned int Start, unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(Streams[i]); - Delay[i] = false; - } - }; - auto SyncTransfer = [&F, &Streams = TransferStreams](unsigned int Start, - unsigned int Stop) { - for (unsigned int i = Start; i < Stop; i++) { - F(Streams[i]); - } - }; - { - unsigned int Size = static_cast(ComputeStreams.size()); - std::lock_guard ComputeSyncGuard(ComputeStreamSyncMutex); - std::lock_guard ComputeGuard(ComputeStreamMutex); - unsigned int Start = LastSyncComputeStreams; - unsigned int End = NumComputeStreams < Size ? NumComputeStreams - : ComputeStreamIdx.load(); - if (End - Start >= Size) { - SyncCompute(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start < End) { - SyncCompute(Start, End); - } else { - SyncCompute(Start, Size); - SyncCompute(0, End); - } - } - if (ResetUsed) { - LastSyncComputeStreams = End; - } - } - { - unsigned int Size = static_cast(TransferStreams.size()); - if (!Size) { - return; - } - std::lock_guard TransferGuard(TransferStreamMutex); - unsigned int Start = LastSyncTransferStreams; - unsigned int End = NumTransferStreams < Size ? NumTransferStreams - : TransferStreamIdx.load(); - if (End - Start >= Size) { - SyncTransfer(0, Size); - } else { - Start %= Size; - End %= Size; - if (Start < End) { - SyncTransfer(Start, End); - } else { - SyncTransfer(Start, Size); - SyncTransfer(0, End); - } - } - if (ResetUsed) { - LastSyncTransferStreams = End; - } - } - } - - // Thread local stream will be used if ScopedStream is active - static hipStream_t &getThreadLocalStream() { - static thread_local hipStream_t stream{0}; - return stream; - } - - ur_context_handle_t getContext() const { return Context; }; - - ur_device_handle_t getDevice() const { return Device; }; - - uint32_t incrementReferenceCount() noexcept { return ++RefCount; } - - uint32_t decrementReferenceCount() noexcept { return --RefCount; } - - uint32_t getReferenceCount() const noexcept { return RefCount; } - - uint32_t getNextEventId() noexcept { return ++EventCount; } - - bool backendHasOwnership() const noexcept { return HasOwnership; } }; // RAII object to make hQueue stream getter methods all return the same stream From 1c59e6eadf83daf49c669082618fddd831a03713 Mon Sep 17 00:00:00 2001 From: Nicolas Miller Date: Wed, 26 Mar 2025 09:38:52 +0000 Subject: [PATCH 2/6] [UR][HIP] Cleanup urQueueFinish Capturing the result is no longer needed --- unified-runtime/source/adapters/hip/queue.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/unified-runtime/source/adapters/hip/queue.cpp b/unified-runtime/source/adapters/hip/queue.cpp index da41d9ab2696..d8bf30e9396e 100644 --- a/unified-runtime/source/adapters/hip/queue.cpp +++ b/unified-runtime/source/adapters/hip/queue.cpp @@ -179,8 +179,9 @@ UR_APIEXPORT ur_result_t UR_APICALL urQueueFinish(ur_queue_handle_t hQueue) { try { ScopedDevice Active(hQueue->getDevice()); - hQueue->syncStreams( - [&Result](hipStream_t S) { UR_CHECK_ERROR(hipStreamSynchronize(S)); }); + hQueue->syncStreams( + [](hipStream_t S) { UR_CHECK_ERROR(hipStreamSynchronize(S)); }); + } catch (ur_result_t Err) { Result = Err; } catch (...) { From 8b178a0e217825a125ce52c07bd17f087d229b0e Mon Sep 17 00:00:00 2001 From: Nicolas Miller Date: Wed, 26 Mar 2025 10:55:23 +0000 Subject: [PATCH 3/6] [UR] Move stream_queue.hpp to common UR directory --- .github/CODEOWNERS | 1 + .../source/adapters/cuda/queue.hpp | 2 +- unified-runtime/source/adapters/hip/queue.hpp | 2 +- .../cuda => common/cuda-hip}/stream_queue.hpp | 26 ++++++++++--------- 4 files changed, 17 insertions(+), 14 deletions(-) rename unified-runtime/source/{adapters/cuda => common/cuda-hip}/stream_queue.hpp (96%) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 6079780044c6..ffe1773edcec 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -75,6 +75,7 @@ llvm/include/llvm/SYCLLowerIR/SYCLCreateNVVMAnnotations.h @intel/llvm-reviewers- llvm/lib/SYCLLowerIR/SYCLCreateNVVMAnnotations.cpp @intel/llvm-reviewers-cuda llvm/lib/Target/NVPTX @intel/llvm-reviewers-cuda llvm/lib/Target/AMDGPU @intel/llvm-reviewers-cuda +unified-runtime/source/common/cuda-hip @intel/llvm-reviewers-cuda # XPTI instrumentation utilities xpti/ @intel/llvm-reviewers-runtime diff --git a/unified-runtime/source/adapters/cuda/queue.hpp b/unified-runtime/source/adapters/cuda/queue.hpp index a0d130fa3ac8..f6369996b86a 100644 --- a/unified-runtime/source/adapters/cuda/queue.hpp +++ b/unified-runtime/source/adapters/cuda/queue.hpp @@ -17,7 +17,7 @@ #include #include -#include "stream_queue.hpp" +#include /// UR queue mapping on to CUstream objects. /// diff --git a/unified-runtime/source/adapters/hip/queue.hpp b/unified-runtime/source/adapters/hip/queue.hpp index 0f68b4f8789d..b06f68195396 100644 --- a/unified-runtime/source/adapters/hip/queue.hpp +++ b/unified-runtime/source/adapters/hip/queue.hpp @@ -14,7 +14,7 @@ #include #include -#include "../cuda/stream_queue.hpp" +#include /// UR queue mapping on to hipStream_t objects. /// diff --git a/unified-runtime/source/adapters/cuda/stream_queue.hpp b/unified-runtime/source/common/cuda-hip/stream_queue.hpp similarity index 96% rename from unified-runtime/source/adapters/cuda/stream_queue.hpp rename to unified-runtime/source/common/cuda-hip/stream_queue.hpp index de0d5b25023c..9aa7c5d8b135 100644 --- a/unified-runtime/source/adapters/cuda/stream_queue.hpp +++ b/unified-runtime/source/common/cuda-hip/stream_queue.hpp @@ -1,15 +1,15 @@ -//===--------- stream_queue.hpp - CUDA Adapter ----------------------------===// -// -// Copyright (C) 2025 Intel Corporation -// -// Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM -// Exceptions. See LICENSE.TXT -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -#pragma once +/* + * + * Copyright (C) 2025 Intel Corporation + * + * Part of the Unified-Runtime Project, under the Apache License v2.0 with LLVM + * Exceptions. See LICENSE.TXT + * + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + */ -#include +#pragma once #include #include @@ -17,8 +17,10 @@ using ur_stream_guard = std::unique_lock; -/// Generic implementation of out-of-order queue using in-order streams. +/// Generic implementation of an out-of-order UR queue based on in-order +/// backend 'stream' objects. /// +/// This class is specifically designed for the CUDA and HIP adapters. template struct stream_queue_t { using native_type = ST; static constexpr int DefaultNumComputeStreams = CS; From 924ac2f61a628559310d49f4cc7ec130ff06a025 Mon Sep 17 00:00:00 2001 From: Nicolas Miller Date: Wed, 26 Mar 2025 12:12:59 +0000 Subject: [PATCH 4/6] [UR][CUDA-HIP] Remove unnecessary device retain/release Device retain release is a no-op --- unified-runtime/source/common/cuda-hip/stream_queue.hpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/unified-runtime/source/common/cuda-hip/stream_queue.hpp b/unified-runtime/source/common/cuda-hip/stream_queue.hpp index 9aa7c5d8b135..c7e23374c08d 100644 --- a/unified-runtime/source/common/cuda-hip/stream_queue.hpp +++ b/unified-runtime/source/common/cuda-hip/stream_queue.hpp @@ -77,13 +77,9 @@ template struct stream_queue_t { LastSyncComputeStreams{0}, LastSyncTransferStreams{0}, Flags(Flags), URFlags(URFlags), Priority(Priority), HasOwnership{BackendOwns} { urContextRetain(Context); - urDeviceRetain(Device); } - ~stream_queue_t() { - urContextRelease(Context); - urDeviceRelease(Device); - } + ~stream_queue_t() { urContextRelease(Context); } virtual void computeStreamWaitForBarrierIfNeeded(native_type Strean, uint32_t StreamI) = 0; From 43f83a4f557d246fbcac58d46e266f77030c34c1 Mon Sep 17 00:00:00 2001 From: Nicolas Miller Date: Wed, 2 Apr 2025 10:23:19 +0100 Subject: [PATCH 5/6] [UR][CUDA] Fix stream guard in async alloc --- unified-runtime/source/adapters/cuda/async_alloc.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unified-runtime/source/adapters/cuda/async_alloc.cpp b/unified-runtime/source/adapters/cuda/async_alloc.cpp index 1826ebe531c5..8f545b5f2d98 100644 --- a/unified-runtime/source/adapters/cuda/async_alloc.cpp +++ b/unified-runtime/source/adapters/cuda/async_alloc.cpp @@ -25,7 +25,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMDeviceAllocExp( ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); @@ -83,7 +83,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMFreeExp( ScopedContext Active(hQueue->getDevice()); uint32_t StreamToken; - ur_stream_guard_ Guard; + ur_stream_guard Guard; CUstream CuStream = hQueue->getNextComputeStream( numEventsInWaitList, phEventWaitList, Guard, &StreamToken); From 52de284bd94d167ecf076b586814b7e253a5a785 Mon Sep 17 00:00:00 2001 From: Nicolas Miller Date: Wed, 2 Apr 2025 10:43:29 +0100 Subject: [PATCH 6/6] [UR][CUDA-HIP] Mark stream queue destructor as virtual --- unified-runtime/source/common/cuda-hip/stream_queue.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unified-runtime/source/common/cuda-hip/stream_queue.hpp b/unified-runtime/source/common/cuda-hip/stream_queue.hpp index c7e23374c08d..8952fd04948f 100644 --- a/unified-runtime/source/common/cuda-hip/stream_queue.hpp +++ b/unified-runtime/source/common/cuda-hip/stream_queue.hpp @@ -79,7 +79,7 @@ template struct stream_queue_t { urContextRetain(Context); } - ~stream_queue_t() { urContextRelease(Context); } + virtual ~stream_queue_t() { urContextRelease(Context); } virtual void computeStreamWaitForBarrierIfNeeded(native_type Strean, uint32_t StreamI) = 0;