Skip to content

Commit

Permalink
Roctracer: Implement reliable data flushing on stop (#824)
Browse files Browse the repository at this point in the history
Summary:
When stopping and flushing with libroctracer there is no way to guarantee all completion notifications have been received and logged.   Torch.Profiler calls deviceSynchronize() on stopping the tracer but that ensures nothing.  It does add some delay, reducing the race.

I augmented the tracer stop procedure to enqueue an extra async op and delay until the notification for it is received.

This will fix many "flakey" unit tests that look for kernel executions that occur immediately before the end of a recording.

Pull Request resolved: #824

Reviewed By: aaronenyeshi, xuzhao9

Differential Revision: D50841060

Pulled By: briancoutinho

fbshipit-source-id: af736989517a1f958bc657d09efbf3c6d4c897ef
  • Loading branch information
mwootton authored and facebook-github-bot committed Nov 3, 2023
1 parent a30ca3f commit 46ede25
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
1 change: 0 additions & 1 deletion libkineto/src/RoctracerActivityApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ void RoctracerActivityApi::clearActivities() {
kernelNames_.clear();
}


void RoctracerActivityApi::enableActivities(
const std::set<ActivityType>& selected_activities) {
#ifdef HAS_ROCTRACER
Expand Down
58 changes: 57 additions & 1 deletion libkineto/src/RoctracerLogger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <cstring>
#include <chrono>
#include <time.h>
#include <mutex>
#include <unistd.h>

#include "ThreadUtil.h"

Expand All @@ -24,6 +26,21 @@ using namespace std::chrono;

constexpr size_t kBufSize(2 * 1024 * 1024);

class Flush
{
public:
std::atomic<bool> doFlush_ {false};
std::mutex mutex_;
std::atomic<uint64_t> maxCorrelationId_;
uint64_t correlationId_ {0};
void reportCorrelation(const uint64_t &cid) {
uint64_t prev = maxCorrelationId_;
while (prev < cid && !maxCorrelationId_.compare_exchange_weak(prev, cid))
{}
}
};
static Flush s_flush;

RoctracerLogger& RoctracerLogger::singleton() {
static RoctracerLogger instance;
return instance;
Expand Down Expand Up @@ -91,6 +108,7 @@ void RoctracerLogger::api_callback(uint32_t domain, uint32_t cid, const void* ca
case HIP_API_ID_hipExtLaunchKernel:
case HIP_API_ID_hipLaunchCooperativeKernel: // Should work here
{
s_flush.reportCorrelation(data->correlation_id);
auto &args = data->args.hipLaunchKernel;
dis->kernelRows_.emplace_back(data->correlation_id,
domain,
Expand All @@ -116,6 +134,7 @@ void RoctracerLogger::api_callback(uint32_t domain, uint32_t cid, const void* ca
case HIP_API_ID_hipModuleLaunchKernel:
case HIP_API_ID_hipExtModuleLaunchKernel:
{
s_flush.reportCorrelation(data->correlation_id);
auto &args = data->args.hipModuleLaunchKernel;
dis->kernelRows_.emplace_back(data->correlation_id,
domain,
Expand Down Expand Up @@ -208,6 +227,7 @@ void RoctracerLogger::api_callback(uint32_t domain, uint32_t cid, const void* ca
case HIP_API_ID_hipMemcpyAsync:
case HIP_API_ID_hipMemcpyWithStream:
{
s_flush.reportCorrelation(data->correlation_id);
auto &args = data->args.hipMemcpyAsync;
dis->copyRows_.emplace_back(data->correlation_id,
domain,
Expand Down Expand Up @@ -252,6 +272,22 @@ void RoctracerLogger::activity_callback(const char* begin, const char* end, void
auto &gpuTraceBuffers = singleton().gpuTraceBuffers_;
memcpy(buffer, begin, size);
gpuTraceBuffers->emplace_back(buffer, size);

// If we are stopping the tracer, implement reliable flushing
if (s_flush.doFlush_) {
std::unique_lock<std::mutex> lock(s_flush.mutex_);
// scan the records looking for the final correlation id
const roctracer_record_t* record = (const roctracer_record_t*)(begin);
const roctracer_record_t* end_record = (const roctracer_record_t*)(end);

while (record < end_record) {
if (record->correlation_id == s_flush.correlationId_) {
s_flush.correlationId_ = 0; // Clear id to signal we found it
break;
}
roctracer_next_record(record, &record);
}
}
}

void RoctracerLogger::startLogging() {
Expand Down Expand Up @@ -312,12 +348,32 @@ void RoctracerLogger::startLogging() {
}

externalCorrelationEnabled_ = true;
logging_ = true;
roctracer_start();
}

void RoctracerLogger::stopLogging() {
if (logging_ == false)
return;

// If we are stopping the tracer, implement reliable flushing
std::unique_lock<std::mutex> lock(s_flush.mutex_);

s_flush.doFlush_ = true;
s_flush.correlationId_ = s_flush.maxCorrelationId_; // load ending id from the running max

// Poll on the worker finding the record and clearing s_flush.correlationId_
while (s_flush.correlationId_ != 0) {
lock.unlock();
roctracer_flush_activity_expl(hccPool_);
usleep(1000);
lock.lock();
}

s_flush.doFlush_ = false;

roctracer_stop();
roctracer_flush_activity_expl(hccPool_);
logging_ = false;
}

void RoctracerLogger::endTracing() {
Expand Down
1 change: 1 addition & 0 deletions libkineto/src/RoctracerLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class RoctracerLogger {

std::unique_ptr<std::list<RoctracerActivityBuffer>> gpuTraceBuffers_;
bool externalCorrelationEnabled_{true};
bool logging_{false};

friend class onnxruntime::profiling::RocmProfiler;
friend class libkineto::RoctracerActivityApi;
Expand Down

0 comments on commit 46ede25

Please sign in to comment.