Skip to content

Commit

Permalink
safer profiler thread finalization
Browse files Browse the repository at this point in the history
  • Loading branch information
malytomas committed Jul 20, 2022
1 parent ca483fa commit f161d55
Showing 1 changed file with 152 additions and 132 deletions.
284 changes: 152 additions & 132 deletions sources/libcore/profiling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,31 @@ namespace cage
constexpr String DefaultBrowser = "firefox";
#endif // CAGE_SYSTEM_WINDOWS

constexpr uint64 ThreadNameSpecifier = (uint64)-2;

ConfigBool confEnabled("cage/profiling/enabled", false);
const ConfigBool confAutoStartClient("cage/profiling/autoStartClient", true);
const ConfigString confBrowser("cage/profiling/browser", DefaultBrowser);

uint64 timestamp()
uint64 timestamp() noexcept
{
// ensures that all timestamps are unique
static std::atomic<uint64> atom = 0;
uint64 newv = applicationTime();
uint64 oldv = atom.load();
do
try
{
if (oldv >= newv)
newv = oldv + 1;
} while (!atom.compare_exchange_weak(oldv, newv));
return newv;
// ensures that all timestamps are unique
static std::atomic<uint64> atom = 0;
uint64 newv = applicationTime();
uint64 oldv = atom.load();
do
{
if (oldv >= newv)
newv = oldv + 1;
} while (!atom.compare_exchange_weak(oldv, newv));
return newv;
}
catch (...)
{
return m;
}
}

struct QueueItem
Expand All @@ -66,148 +75,147 @@ namespace cage

struct Dispatcher
{
std::unordered_map<uint64, String> threadNames;
Holder<WebsocketServer> server;
Holder<WebsocketConnection> connection;
Holder<Process> client;
Holder<Thread> thread = newThread(Delegate<void()>().bind<Dispatcher, &Dispatcher::threadEntry>(this), "profiling dispatcher");

void eraseQueue()
struct Runner
{
QueueItem qi;
while (queue().tryPop(qi))
{
if (qi.startTime == m && qi.endTime == m)
threadNames[qi.threadId] = qi.name;
}
}
std::unordered_map<uint64, String> threadNames;
Holder<WebsocketServer> server;
Holder<WebsocketConnection> connection;
Holder<Process> client;

void updateConnected()
{
struct NamesMap
void eraseQueue()
{
std::unordered_map<String, uint32> data;
uint32 next = 0;

uint32 index(const String &name)
QueueItem qi;
while (queue().tryPop(qi))
{
const auto it = data.find(name);
if (it != data.end()) [[likely]]
return it->second;
return data[name] = next++;
if (qi.startTime == ThreadNameSpecifier && qi.endTime == ThreadNameSpecifier)
threadNames[qi.threadId] = qi.name;
}
}

uint32 index(StringLiteral name)
void updateConnected()
{
struct NamesMap
{
return index(String(name));
}
std::unordered_map<String, uint32> data;
uint32 next = 0;

std::string mapping() const
{
std::vector<const String *> v;
v.resize(data.size());
for (const auto &it : data)
v[it.second] = &it.first;

std::string str;
str.reserve(v.size() * 100);
for (const auto &n : v)
str += (Stringizer() + "\"" + *n + "\",\n ").value.c_str();
return str + "\"\"";
}
uint32 index(const String &name)
{
const auto it = data.find(name);
if (it != data.end()) [[likely]]
return it->second;
return data[name] = next++;
}

NamesMap()
uint32 index(StringLiteral name)
{
return index(String(name));
}

std::string mapping() const
{
std::vector<const String *> v;
v.resize(data.size());
for (const auto &it : data)
v[it.second] = &it.first;

std::string str;
str.reserve(v.size() * 100);
for (const auto &n : v)
str += (Stringizer() + "\"" + *n + "\",\n ").value.c_str();
return str + "\"\"";
}

NamesMap()
{
data.reserve(200);
}
} names;

struct ThreadData
{
data.reserve(200);
}
} names;
std::string events;

struct ThreadData
{
std::string events;
ThreadData()
{
events.reserve(50000);
}
};
std::unordered_map<uint64, ThreadData> data;

ThreadData()
QueueItem qi;
while (queue().tryPop(qi))
{
events.reserve(50000);
if (qi.startTime == ThreadNameSpecifier && qi.endTime == ThreadNameSpecifier)
threadNames[qi.threadId] = qi.name;
else
{
const String s = Stringizer() + "[" + names.index(qi.name) + "," + names.index(qi.category) + "," + qi.startTime + "," + (qi.endTime - qi.startTime) + (qi.framing ? ",1" : "") + "], ";
data[qi.threadId].events += s.c_str();
}
}
};
std::unordered_map<uint64, ThreadData> data;

QueueItem qi;
while (queue().tryPop(qi))
{
if (qi.startTime == m && qi.endTime == m)
threadNames[qi.threadId] = qi.name;
else
std::string str = "{\"names\":[";
str += names.mapping();
str += "],\n\"threads\":[\n";
for (const auto &thr : data)
{
const String s = Stringizer() + "[" + names.index(qi.name) + "," + names.index(qi.category) + "," + qi.startTime + "," + (qi.endTime - qi.startTime) + (qi.framing ? ",1" : "") + "], ";
data[qi.threadId].events += s.c_str();
str += "{\"name\":\"";
str += threadNames[thr.first].c_str();
str += "\",\n\"events\":[";
str += thr.second.events;
str += "[]\n]},\n";
}
}
str += "{}\n]}";

std::string str = "{\"names\":[";
str += names.mapping();
str += "],\n\"threads\":[\n";
for (const auto &thr : data)
{
str += "{\"name\":\"";
str += threadNames[thr.first].c_str();
str += "\",\n\"events\":[";
str += thr.second.events;
str += "[]\n]},\n";
connection->write(str);
}
str += "{}\n]}";

connection->write(str);
}

void updateConnecting()
{
if (server)
{
connection = server->accept();
if (connection)
{
CAGE_LOG(SeverityEnum::Info, "profiling", Stringizer() + "profiling client connected: " + connection->address() + ":" + connection->port());
server.clear();
}
}
else
void updateConnecting()
{
server = newWebsocketServer(randomRange(10000u, 65000u));
CAGE_LOG(SeverityEnum::Info, "profiling", Stringizer() + "profiling server listens on port " + server->port());

if (confAutoStartClient)
if (server)
{
try
connection = server->accept();
if (connection)
{
writeFile("profiling.htm")->write(profiling_htm().cast<const char>());
const String baseUrl = pathWorkingDir() + "/profiling.htm";
const String url = Stringizer() + "file://" + baseUrl + "?port=" + server->port();
ProcessCreateConfig cfg(Stringizer() + (String)confBrowser + " " + url);
cfg.discardStdErr = cfg.discardStdIn = cfg.discardStdOut = true;
client = newProcess(cfg);
CAGE_LOG(SeverityEnum::Info, "profiling", Stringizer() + "profiling client connected: " + connection->address() + ":" + connection->port());
server.clear();
}
catch (const cage::Exception &)
}
else
{
server = newWebsocketServer(randomRange(10000u, 65000u));
CAGE_LOG(SeverityEnum::Info, "profiling", Stringizer() + "profiling server listens on port " + server->port());

if (confAutoStartClient)
{
CAGE_LOG(SeverityEnum::Warning, "profiling", "failed to automatically launch profiling client");
try
{
writeFile("profiling.htm")->write(profiling_htm().cast<const char>());
const String baseUrl = pathWorkingDir() + "/profiling.htm";
const String url = Stringizer() + "file://" + baseUrl + "?port=" + server->port();
ProcessCreateConfig cfg(Stringizer() + (String)confBrowser + " " + url);
cfg.discardStdErr = cfg.discardStdIn = cfg.discardStdOut = true;
client = newProcess(cfg);
}
catch (const cage::Exception &)
{
CAGE_LOG(SeverityEnum::Warning, "profiling", "failed to automatically launch profiling client");
}
}
}
eraseQueue();
}
eraseQueue();
}

void updateDisabled()
{
server.clear();
connection.clear();
client.clear();
eraseQueue();
}
void updateDisabled()
{
server.clear();
connection.clear();
client.clear();
eraseQueue();
}

void threadEntry()
{
try
void run()
{
while (!queue().stopped())
{
Expand Down Expand Up @@ -238,6 +246,15 @@ namespace cage
}
}
}
};

void threadEntry()
{
try
{
Runner runner;
runner.run();
}
catch (...)
{
// nothing
Expand All @@ -247,7 +264,10 @@ namespace cage
~Dispatcher()
{
queue().terminate();
thread.clear();
}

Holder<Thread> thread = newThread(Delegate<void()>().bind<Dispatcher, &Dispatcher::threadEntry>(this), "profiling dispatcher");
} dispatcher;
}

Expand All @@ -258,7 +278,7 @@ namespace cage
QueueItem qi;
qi.name = currentThreadName();
qi.threadId = currentThreadId();
qi.startTime = qi.endTime = m;
qi.startTime = qi.endTime = ThreadNameSpecifier;
queue().push(qi);
}
catch (...)
Expand All @@ -281,17 +301,17 @@ namespace cage
{
if (ev.startTime == m)
return;
if (!confEnabled)
return;
QueueItem qi;
qi.name = ev.name;
qi.category = ev.category;
qi.startTime = ev.startTime;
qi.endTime = timestamp();
qi.threadId = currentThreadId();
qi.framing = ev.framing;
try
{
if (!confEnabled)
return;
QueueItem qi;
qi.name = ev.name;
qi.category = ev.category;
qi.startTime = ev.startTime;
qi.endTime = timestamp();
qi.threadId = currentThreadId();
qi.framing = ev.framing;
queue().push(qi);
}
catch (...)
Expand Down

0 comments on commit f161d55

Please sign in to comment.