Skip to content

Commit

Permalink
agents: refactor out some otlp methods
Browse files Browse the repository at this point in the history
On preparation of the incoming `GRPCAgent` that's going to be using
them.
  • Loading branch information
santigimeno committed Jul 3, 2024
1 parent 8f25e7e commit 184caa9
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 263 deletions.
94 changes: 13 additions & 81 deletions agents/otlp/src/otlp_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Refs: https://github.com/open-telemetry/opentelemetry-cpp/blob/32cd66b62333e84aa8e92a4447e0aa667b6735e5/examples/otlp/README.md#additional-notes-regarding-abseil-library
#include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h"
#include "otlp_agent.h"
#include "otlp_common.h"
#include "nsolid/nsolid_api.h"
#include "env-inl.h"
#include "debug_utils-inl.h"
Expand All @@ -21,7 +22,6 @@
using ThreadMetricsStor = node::nsolid::ThreadMetrics::MetricsStor;
using nlohmann::json;

namespace nostd = OPENTELEMETRY_NAMESPACE::nostd;
namespace sdk = OPENTELEMETRY_NAMESPACE::sdk;
namespace exporter = OPENTELEMETRY_NAMESPACE::exporter;
namespace ext = OPENTELEMETRY_NAMESPACE::ext;
Expand Down Expand Up @@ -84,8 +84,7 @@ OTLPAgent* OTLPAgent::Inst() {
OTLPAgent::OTLPAgent(): ready_(false),
hooks_init_(false),
trace_flags_(0),
otlp_http_exporter_(nullptr),
resource_(create_resource()),
otlp_exporter_(nullptr),
metrics_interval_(0),
proc_metrics_(),
proc_prev_stor_(),
Expand All @@ -95,8 +94,6 @@ OTLPAgent::OTLPAgent(): ready_(false),
ASSERT_EQ(0, uv_mutex_init(&start_lock_));
ASSERT_EQ(0, exit_lock_.init(true));
is_running_ = true;
scope_ = instrumentationscope::InstrumentationScope::Create(
"nsolid", NODE_VERSION "+ns" NSOLID_VERSION);
}


Expand Down Expand Up @@ -176,7 +173,7 @@ int OTLPAgent::config(const nlohmann::json& config) {
if (trace_flags_ == 0 ||
utils::find_any_fields_in_diff(diff, tracing_fields)) {
trace_flags_ = 0;
if (otlp_http_exporter_ != nullptr) {
if (otlp_exporter_ != nullptr) {
auto it = config_.find("tracingEnabled");
if (it != config_.end()) {
bool tracing_enabled = *it;
Expand Down Expand Up @@ -375,60 +372,12 @@ void OTLPAgent::span_msg_cb_(nsuv::ns_async*, OTLPAgent* agent) {
std::vector<std::unique_ptr<sdk::trace::Recordable>> recordables;
Tracer::SpanStor s;
while (agent->span_msg_q_.dequeue(s)) {
auto recordable = agent->otlp_http_exporter_->MakeRecordable();
recordable->SetName(s.name);
time_point start{
duration_cast<time_point::duration>(
milliseconds(static_cast<uint64_t>(s.start)))};
recordable->SetStartTime(start);
recordable->SetDuration(
nanoseconds(static_cast<uint64_t>((s.end - s.start) * 1e6)));

uint8_t span_buf[kSpanIdSize / 2];
detail::HexToBinary(s.span_id, span_buf, sizeof(span_buf));

uint8_t parent_buf[kSpanIdSize / 2];
detail::HexToBinary(s.parent_id, parent_buf, sizeof(parent_buf));

uint8_t trace_buf[kTraceIdSize / 2];
detail::HexToBinary(s.trace_id, trace_buf, sizeof(trace_buf));

trace::SpanContext ctx(trace::TraceId(trace_buf),
trace::SpanId(span_buf),
trace::TraceFlags(0),
false);

trace::SpanId parent_id(parent_buf);

recordable->SetIdentity(ctx, parent_id);

recordable->SetSpanKind(static_cast<trace::SpanKind>(s.kind));

json attrs = json::parse(s.attrs);
ASSERT(!attrs.is_discarded());
for (const auto& attr : attrs.items()) {
const json val = attr.value();
if (val.is_boolean())
recordable->SetAttribute(attr.key(), attr.value().get<bool>());
else if (val.is_number_integer())
recordable->SetAttribute(attr.key(), attr.value().get<int64_t>());
else if (val.is_number_unsigned())
recordable->SetAttribute(attr.key(), attr.value().get<uint64_t>());
else if (val.is_number_float())
recordable->SetAttribute(attr.key(), attr.value().get<double>());
else if (val.is_string())
recordable->SetAttribute(attr.key(), attr.value().get<std::string>());
}

recordable->SetAttribute("thread.id", s.thread_id);

recordable->SetResource(agent->resource_);

auto recordable = agent->otlp_exporter_->MakeRecordable();
fill_recordable(recordable.get(), s);
recordables.push_back(std::move(recordable));
}

nostd::span<std::unique_ptr<sdk::trace::Recordable>> batch(recordables);
auto result = agent->otlp_http_exporter_->Export(batch);
auto result = agent->otlp_exporter_->Export(recordables);
Debug("# Spans Exported: %ld. Result: %d\n",
recordables.size(),
static_cast<int>(result));
Expand Down Expand Up @@ -581,7 +530,7 @@ void OTLPAgent::config_otlp_agent(const json& config) {
auto it = config.find("otlp");
if (it != config.end()) {
// Reset the otlp and metrics exporters and reconfigure endpoints.
otlp_http_exporter_.reset(nullptr);
otlp_exporter_.reset(nullptr);
metrics_exporter_.reset(nullptr);
std::string type = *it;
it = config.find("otlpConfig");
Expand All @@ -605,7 +554,8 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
setup_trace_otlp_exporter(opts);
}

metrics_exporter_.reset(new OTLPMetrics(&loop_, *this));
metrics_exporter_.reset(
new OTLPMetrics(&loop_, *GetResource(), GetScope()));
return;
}

Expand All @@ -628,26 +578,8 @@ void OTLPAgent::config_otlp_endpoint(const json& config) {
setup_trace_grpc_otlp_exporter(opts);
}

metrics_exporter_.reset(new OTLPMetrics(&loop_, url, "", is_http, *this));
}

resource::Resource OTLPAgent::create_resource() const {
json config = json::parse(nsolid::GetConfig(), nullptr, false);
// assert because the runtime should never send me an invalid JSON config
ASSERT(!config.is_discarded());
auto it = config.find("app");
ASSERT(it != config.end());
resource::ResourceAttributes attrs({
{kServiceName, it->get<std::string>()},
{kServiceInstanceId, nsolid::GetAgentId()}
});

it = config.find("appVersion");
if (it != config.end()) {
attrs.SetAttribute(kServiceVersion, it->get<std::string>());
}

return resource::Resource::Create(attrs);
metrics_exporter_.reset(
new OTLPMetrics(&loop_, url, "", is_http, *GetResource(), GetScope()));
}


Expand All @@ -666,13 +598,13 @@ void OTLPAgent::setup_trace_otlp_exporter(
exporter::otlp::OtlpHttpExporterOptions& opts) {
opts.content_type = exporter::otlp::HttpRequestContentType::kBinary;
opts.console_debug = true;
otlp_http_exporter_.reset(new exporter::otlp::OtlpHttpExporter(opts));
otlp_exporter_.reset(new exporter::otlp::OtlpHttpExporter(opts));
}


void OTLPAgent::setup_trace_grpc_otlp_exporter(
exporter::otlp::OtlpGrpcExporterOptions& opts) {
otlp_http_exporter_.reset(new exporter::otlp::OtlpGrpcExporter(opts));
otlp_exporter_.reset(new exporter::otlp::OtlpGrpcExporter(opts));
}


Expand Down
13 changes: 2 additions & 11 deletions agents/otlp/src/otlp_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include "asserts-cpp/asserts.h"
#include "metrics_exporter.h"
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/version.h"


// Class pre-declaration
Expand All @@ -20,9 +20,6 @@ struct OtlpGrpcExporterOptions;
}
}
namespace sdk {
namespace instrumentationscope {
class InstrumentationScope;
}
namespace trace {
class SpanExporter;
}
Expand Down Expand Up @@ -100,8 +97,6 @@ class OTLPAgent {

void config_otlp_endpoint(const nlohmann::json& config);

OPENTELEMETRY_NAMESPACE::sdk::resource::Resource create_resource() const;

void got_proc_metrics();

void setup_trace_otlp_exporter( // NOLINTNEXTLINE(runtime/references)
Expand Down Expand Up @@ -135,11 +130,7 @@ class OTLPAgent {
uint32_t trace_flags_;

std::unique_ptr<OPENTELEMETRY_NAMESPACE::sdk::trace::SpanExporter>
otlp_http_exporter_;
OPENTELEMETRY_NAMESPACE::sdk::resource::Resource resource_;
std::unique_ptr
<OPENTELEMETRY_NAMESPACE::sdk::instrumentationscope::InstrumentationScope>
scope_;
otlp_exporter_;

// For the Metrics API
uint64_t metrics_interval_;
Expand Down
Loading

0 comments on commit 184caa9

Please sign in to comment.