Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,9 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {

agent->got_proc_metrics();
for (auto& item : agent->env_metrics_map_) {
int r = item.second.metrics_.Update(thr_metrics_cb_, agent);
// The UV_ESRCH and UV_EBADF errors can happen during the env shutdown
// process. Leaving this assertion for the moment in case another error is
// returned at some point.
// TODO(santi): Remove the assertion
ASSERT(r == 0 || r == UV_ESRCH || r == UV_EBADF);
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
item.second.metrics_.Update(thr_metrics_cb_, agent);
}
}

Expand Down
11 changes: 3 additions & 8 deletions agents/statsd/src/statsd_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,14 +731,9 @@ void StatsDAgent::metrics_timer_cb_(nsuv::ns_timer*, StatsDAgent* agent) {
it != agent->env_metrics_map_.end();
++it) {
ThreadMetrics& e_metrics = std::get<1>(*it);
// Retrieve metrics from the Metrics API.
int r = e_metrics.Update(env_metrics_cb_, agent);
// The UV_ESRCH and UV_EBADF errors can happen during the env shutdown
// process.
// Leaving this assertion for the moment in case another error is returned
// at some point.
// TODO(santi): Remove the assertion.
ASSERT(r == 0 || r == UV_ESRCH || r == UV_EBADF);
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
e_metrics.Update(env_metrics_cb_, agent);
}

// Get and send proc metrics
Expand Down
9 changes: 2 additions & 7 deletions agents/zmq/src/zmq_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1638,14 +1638,9 @@ void ZmqAgent::metrics_timer_cb(nsuv::ns_timer*, ZmqAgent* agent) {
EnvMetricsStor& stor = std::get<1>(*it);
// Reset fetching flag.
stor.fetching = false;
// Retrieve metrics from the Metrics API.
// Retrieve metrics from the Metrics API. Ignore any return error since
// there's nothing to be done.
int r = stor.t_metrics.Update(env_metrics_cb, agent);
// The UV_ESRCH and UV_EBADF errors can happen during the env shutdown
// process.
// Leaving this assertion for the moment in case another error is returned
// at some point.
// TODO(santi): Remove the assertion.
ASSERT(r == 0 || r == UV_ESRCH || r == UV_EBADF);
if (r == 0)
stor.fetching = true;
}
Expand Down
5 changes: 5 additions & 0 deletions src/nsolid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ int ThreadMetrics::Update(v8::Isolate* isolate) {
if (envinst == nullptr || envinst->thread_id() != thread_id_) {
return UV_ESRCH;
}
// An async update request is currently in process. Let that complete before
// running Update() again.
if (update_running_) {
return UV_EBUSY;
}

uv_mutex_lock(&stor_lock_);
envinst->GetThreadMetrics(&stor_);
Expand Down
22 changes: 17 additions & 5 deletions src/nsolid.h
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ class NODE_EXTERN ThreadMetrics {
void* user_data_ = nullptr;
thread_metrics_proxy_sig proxy_;

std::atomic<bool> update_running_ = {false};
uv_mutex_t stor_lock_;
MetricsStor stor_;
};
Expand Down Expand Up @@ -976,22 +977,32 @@ class NODE_EXTERN Snapshot {
/** @cond DONT_DOCUMENT */
template <typename Cb, typename... Data>
int ThreadMetrics::Update(Cb&& cb, Data&&... data) {
bool expected = false;
// NOLINTNEXTLINE(build/namespaces)
using namespace std::placeholders;
using UserData = decltype(std::bind(
std::forward<Cb>(cb), _1, std::forward<Data>(data)...));

update_running_.compare_exchange_strong(expected, true);
if (expected) {
return UV_EBUSY;
}

// _1 - ThreadMetrics*
std::unique_ptr<UserData> user_data = std::make_unique<UserData>(std::bind(
std::forward<Cb>(cb), _1, std::forward<Data>(data)...));
UserData* user_data = new UserData(
std::bind(std::forward<Cb>(cb), _1, std::forward<Data>(data)...));

user_data_ = static_cast<void*>(user_data.get());
user_data_ = user_data;
proxy_ = thread_metrics_proxy_<UserData>;
stor_.thread_id = thread_id_;

int er = get_thread_metrics_();
if (!er)
user_data.release();
if (er) {
user_data_ = nullptr;
proxy_ = nullptr;
delete user_data;
update_running_ = false;
}
return er;
}

Expand All @@ -1001,6 +1012,7 @@ void ThreadMetrics::thread_metrics_proxy_(ThreadMetrics* tm) {
G* g = static_cast<G*>(tm->user_data_);
tm->user_data_ = nullptr;
tm->proxy_ = nullptr;
tm->update_running_ = false;
(*g)(tm);
delete g;
}
Expand Down