Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sycl/source/detail/queue_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ pi_native_handle queue_impl::getNative(int32_t &NativeHandleDesc) const {
return Handle;
}

void queue_impl::cleanup_fusion_cmd() {
detail::Scheduler::getInstance().cleanUpCmdFusion(this);
}

bool queue_impl::ext_oneapi_empty() const {
// If we have in-order queue where events are not discarded then just check
// the status of the last event.
Expand Down
4 changes: 4 additions & 0 deletions sycl/source/detail/queue_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ class queue_impl {
#endif
throw_asynchronous();
if (!MHostQueue) {
cleanup_fusion_cmd();
getPlugin()->call<PiApiKind::piQueueRelease>(MQueues[0]);
}
}
Expand Down Expand Up @@ -693,6 +694,9 @@ class queue_impl {
}

protected:
// Hook to the scheduler to clean up any fusion command held on destruction.
void cleanup_fusion_cmd();

// template is needed for proper unit testing
template <typename HandlerType = handler>
void finalizeHandler(HandlerType &Handler, const CG::CGTYPE &Type,
Expand Down
11 changes: 11 additions & 0 deletions sycl/source/detail/scheduler/commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3156,13 +3156,24 @@ pi_int32 KernelFusionCommand::enqueueImp() {
waitForPreparedHostEvents();
waitForEvents(MQueue, MPreparedDepsEvents, MEvent->getHandleRef());

// We need to release the queue here because KernelFusionCommands are
// held back by the scheduler thus prevent the deallocation of the queue.
resetQueue();
return PI_SUCCESS;
}

void KernelFusionCommand::setFusionStatus(FusionStatus Status) {
MStatus = Status;
}

void KernelFusionCommand::resetQueue() {
assert(MStatus != FusionStatus::ACTIVE &&
"Cannot release the queue attached to the KernelFusionCommand if it "
"is active.");
MQueue.reset();
MWorkerQueue.reset();
}

void KernelFusionCommand::emitInstrumentationData() {
#ifdef XPTI_ENABLE_INSTRUMENTATION
constexpr uint16_t NotificationTraceType = xpti::trace_node_create;
Expand Down
5 changes: 5 additions & 0 deletions sycl/source/detail/scheduler/commands.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,11 @@ class KernelFusionCommand : public Command {
/// only be called under the protection of the scheduler write-lock.
void setFusionStatus(FusionStatus Status);

/// Reset the queue. This can be required as the command is held in order
/// to maintain events alive, however this prevent the normal destruction of
/// the queue.
void resetQueue();

bool isActive() const { return MStatus == FusionStatus::ACTIVE; }

bool readyForDeletion() const { return MStatus == FusionStatus::DELETED; }
Expand Down
16 changes: 11 additions & 5 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ Scheduler::GraphBuildResult Scheduler::GraphBuilder::addCG(
// they create any requirement or event dependency on any of the kernels in
// the fusion list, this will lead to cancellation of the fusion in the
// GraphProcessor.
auto QUniqueID = std::hash<QueueImplPtr>()(Queue);
auto QUniqueID = std::hash<sycl::detail::queue_impl *>()(Queue.get());
if (isInFusionMode(QUniqueID) && !NewCmd->isHostTask()) {
auto *FusionCmd = findFusionList(QUniqueID)->second.get();

Expand Down Expand Up @@ -1349,7 +1349,14 @@ Command *Scheduler::GraphBuilder::connectDepEvent(
}

void Scheduler::GraphBuilder::startFusion(QueueImplPtr Queue) {
auto QUniqueID = std::hash<QueueImplPtr>()(Queue);
cleanUpCmdFusion(Queue.get());
auto QUniqueID = std::hash<sycl::detail::queue_impl *>()(Queue.get());
MFusionMap.emplace(QUniqueID, std::make_unique<KernelFusionCommand>(Queue));
}

void Scheduler::GraphBuilder::cleanUpCmdFusion(
sycl::detail::queue_impl *Queue) {
auto QUniqueID = std::hash<sycl::detail::queue_impl *>()(Queue);
if (isInFusionMode(QUniqueID)) {
throw sycl::exception{sycl::make_error_code(sycl::errc::invalid),
"Queue already in fusion mode"};
Expand All @@ -1365,7 +1372,6 @@ void Scheduler::GraphBuilder::startFusion(QueueImplPtr Queue) {
cleanupCommand(OldFusionCmd->second.release());
MFusionMap.erase(OldFusionCmd);
}
MFusionMap.emplace(QUniqueID, std::make_unique<KernelFusionCommand>(Queue));
}

void Scheduler::GraphBuilder::removeNodeFromGraph(
Expand Down Expand Up @@ -1404,7 +1410,7 @@ void Scheduler::GraphBuilder::removeNodeFromGraph(

void Scheduler::GraphBuilder::cancelFusion(QueueImplPtr Queue,
std::vector<Command *> &ToEnqueue) {
auto QUniqueID = std::hash<QueueImplPtr>()(Queue);
auto QUniqueID = std::hash<sycl::detail::queue_impl *>()(Queue.get());
if (!isInFusionMode(QUniqueID)) {
return;
}
Expand Down Expand Up @@ -1492,7 +1498,7 @@ EventImplPtr
Scheduler::GraphBuilder::completeFusion(QueueImplPtr Queue,
std::vector<Command *> &ToEnqueue,
const property_list &PropList) {
auto QUniqueID = std::hash<QueueImplPtr>()(Queue);
auto QUniqueID = std::hash<sycl::detail::queue_impl *>()(Queue.get());
#if SYCL_EXT_CODEPLAY_KERNEL_FUSION
if (!isInFusionMode(QUniqueID)) {
auto InactiveFusionList = findFusionList(QUniqueID);
Expand Down
12 changes: 11 additions & 1 deletion sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,13 +569,22 @@ void Scheduler::cleanupAuxiliaryResources(BlockingT Blocking) {

void Scheduler::startFusion(QueueImplPtr Queue) {
WriteLockT Lock = acquireWriteLock();
WriteLockT FusionMapLock = acquireFusionWriteLock();
MGraphBuilder.startFusion(Queue);
}

void Scheduler::cleanUpCmdFusion(sycl::detail::queue_impl *Queue) {
// No graph lock, we might be called because the graph builder is releasing
// resources.
WriteLockT FusionMapLock = acquireFusionWriteLock();
MGraphBuilder.cleanUpCmdFusion(Queue);
}

void Scheduler::cancelFusion(QueueImplPtr Queue) {
std::vector<Command *> ToEnqueue;
{
WriteLockT Lock = acquireWriteLock();
WriteLockT FusionMapLock = acquireFusionWriteLock();
MGraphBuilder.cancelFusion(Queue, ToEnqueue);
}
enqueueCommandForCG(nullptr, ToEnqueue);
Expand All @@ -587,6 +596,7 @@ EventImplPtr Scheduler::completeFusion(QueueImplPtr Queue,
EventImplPtr FusedEvent;
{
WriteLockT Lock = acquireWriteLock();
WriteLockT FusionMapLock = acquireFusionWriteLock();
FusedEvent = MGraphBuilder.completeFusion(Queue, ToEnqueue, PropList);
}
enqueueCommandForCG(nullptr, ToEnqueue);
Expand All @@ -595,7 +605,7 @@ EventImplPtr Scheduler::completeFusion(QueueImplPtr Queue,
}

bool Scheduler::isInFusionMode(QueueIdT queue) {
ReadLockT Lock = acquireReadLock();
ReadLockT Lock = acquireFusionReadLock();
return MGraphBuilder.isInFusionMode(queue);
}

Expand Down
30 changes: 30 additions & 0 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ class Scheduler {

void startFusion(QueueImplPtr Queue);

void cleanUpCmdFusion(sycl::detail::queue_impl *Queue);

void cancelFusion(QueueImplPtr Queue);

EventImplPtr completeFusion(QueueImplPtr Queue, const property_list &);
Expand Down Expand Up @@ -488,10 +490,33 @@ class Scheduler {
return Lock;
}

/// Provides exclusive access to std::shared_timed_mutex object with deadlock
/// avoidance to the Fusion map
WriteLockT acquireFusionWriteLock() {
#ifdef _WIN32
WriteLockT Lock(MFusionMapLock, std::defer_lock);
while (!Lock.try_lock_for(std::chrono::milliseconds(10))) {
// Without yield while loop acts like endless while loop and occupies the
// whole CPU when multiple command groups are created in multiple host
// threads
std::this_thread::yield();
}
#else
WriteLockT Lock(MFusionMapLock);
// It is a deadlock on UNIX in implementation of lock and lock_shared, if
// try_lock in the loop above will be executed, so using a single lock here
#endif // _WIN32
return Lock;
}

/// Provides shared access to std::shared_timed_mutex object with deadlock
/// avoidance
ReadLockT acquireReadLock() { return ReadLockT{MGraphLock}; }

/// Provides shared access to std::shared_timed_mutex object with deadlock
/// avoidance to the Fusion map
ReadLockT acquireFusionReadLock() { return ReadLockT{MFusionMapLock}; }

void cleanupCommands(const std::vector<Command *> &Cmds);

void NotifyHostTaskCompletion(Command *Cmd);
Expand Down Expand Up @@ -627,6 +652,10 @@ class Scheduler {

void startFusion(QueueImplPtr Queue);

/// Clean up the internal fusion commands held for the given queue.
/// @param Queue the queue for which to remove the fusion commands.
void cleanUpCmdFusion(sycl::detail::queue_impl *Queue);

void cancelFusion(QueueImplPtr Queue, std::vector<Command *> &ToEnqueue);

EventImplPtr completeFusion(QueueImplPtr Queue,
Expand Down Expand Up @@ -870,6 +899,7 @@ class Scheduler {

GraphBuilder MGraphBuilder;
RWLockT MGraphLock;
RWLockT MFusionMapLock;

std::vector<Command *> MDeferredCleanupCommands;
std::mutex MDeferredCleanupMutex;
Expand Down