Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multiple processors #692

Merged
merged 24 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ add_subdirectory(simple)
add_subdirectory(batch)
add_subdirectory(metrics_simple)
add_subdirectory(multithreaded)
add_subdirectory(multi_processor)
add_subdirectory(http)
6 changes: 4 additions & 2 deletions examples/http/tracer_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/provider.h"

#include <iostream>
#include<vector>
lalitb marked this conversation as resolved.
Show resolved Hide resolved

namespace {

Expand All @@ -13,8 +13,10 @@ void initTracer() {
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
std::vector<std::unique_ptr<sdktrace::SpanProcessor>> processors;
processors.push_back(std::move(processor));
// Default is an always-on sampler.
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processor));
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processors));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a convenience constructor that just takes on processor?

Alternatively, a static helper method to make TracerProvider from a processor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider taking a look at the other language clients that reached 1.0 stable and see what's the best practice.
One more example from Python https://opentelemetry-python.readthedocs.io/en/latest/getting-started.html.

Copy link
Member Author

@lalitb lalitb Apr 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific example is using TracerContext. This is how it looks like if we directly use TracerProvider to add processors:
( which would be what application developer be doing most of the times ):

  auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(std:move(processor1), resource, sampler, id_generator);
  provider.AddProcessor(std::move(processor2));
  provider.AddProcessor(std::move(processor3));

  opentelemetry::trace::Provider::SetTracerProvider(provider);

Do we want this for TracerContext too ?

Copy link
Member Author

@lalitb lalitb Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the example NOT to use TracerContext, instead directly use TracerProvider to pass processor(s) as an argument to constructor. TracerProvider constructor takes either of single processor, or vector of processors as an argument as shown in example in above comment ( and in PR description ).

auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(context));
// Set the global trace provider
Expand Down
26 changes: 26 additions & 0 deletions examples/multi_processor/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
cc_library(
name = "foo_multi_library",
srcs = [
"foo_library/foo_library.cc",
],
hdrs = [
"foo_library/foo_library.h",
],
deps = [
"//api",
],
)

cc_binary(
name = "example_multi_processor",
srcs = [
"main.cc",
],
deps = [
":foo_multi_library",
"//api",
"//exporters/memory:in_memory_span_exporter",
"//exporters/ostream:ostream_span_exporter",
"//sdk/src/trace",
],
)
10 changes: 10 additions & 0 deletions examples/multi_processor/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
include_directories(${CMAKE_SOURCE_DIR}/exporters/ostream/include
${CMAKE_SOURCE_DIR}/exporters/memory/include)

add_library(foo_multi_library foo_library/foo_library.cc)
target_link_libraries(foo_multi_library opentelemetry_exporter_ostream_span
${CMAKE_THREAD_LIBS_INIT} opentelemetry_api)

add_executable(example_multi_processor main.cc)
target_link_libraries(example_multi_processor ${CMAKE_THREAD_LIBS_INIT}
foo_multi_library opentelemetry_trace)
12 changes: 12 additions & 0 deletions examples/multi_processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

# Simple Trace Example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty first line and change the title to indicate this is for multiple processors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, have changed the title, and the description too.


In this example, the application in `main.cc` initializes and registers a tracer
provider from the [OpenTelemetry
SDK](https://github.com/open-telemetry/opentelemetry-cpp). The application then
calls a `foo_library` which has been instrumented using the [OpenTelemetry
API](https://github.com/open-telemetry/opentelemetry-cpp/tree/main/api).
Resulting telemetry is directed to stdout through the StdoutSpanExporter.

See [CONTRIBUTING.md](../../CONTRIBUTING.md) for instructions on building and
running the example.
42 changes: 42 additions & 0 deletions examples/multi_processor/foo_library/foo_library.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include "opentelemetry/trace/provider.h"

namespace trace = opentelemetry::trace;
namespace nostd = opentelemetry::nostd;

namespace
{
nostd::shared_ptr<trace::Tracer> get_tracer()
{
auto provider = trace::Provider::GetTracerProvider();
return provider->GetTracer("foo_library");
}

void f1()
{
auto span = get_tracer()->StartSpan("f1");
auto scope = get_tracer()->WithActiveSpan(span);

span->End();
}

void f2()
{
auto span = get_tracer()->StartSpan("f2");
auto scope = get_tracer()->WithActiveSpan(span);

f1();
f1();

span->End();
}
} // namespace

void foo_library()
{
auto span = get_tracer()->StartSpan("library");
auto scope = get_tracer()->WithActiveSpan(span);

f2();

span->End();
}
3 changes: 3 additions & 0 deletions examples/multi_processor/foo_library/foo_library.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#pragma once

void foo_library();
85 changes: 85 additions & 0 deletions examples/multi_processor/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include "opentelemetry/sdk/trace/simple_processor.h"
#include "opentelemetry/sdk/trace/tracer_context.h"
#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/provider.h"

// Using an exporter that simply dumps span data to stdout.
#include "foo_library/foo_library.h"
#include "opentelemetry/exporters/memory/in_memory_span_exporter.h"
#include "opentelemetry/exporters/ostream/span_exporter.h"

using opentelemetry::exporter::memory::InMemorySpanExporter;

InMemorySpanExporter *memory_span_exporter;

namespace
{
void initTracer()
{
auto exporter1 = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor1 = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter1)));

auto exporter2 = std::unique_ptr<sdktrace::SpanExporter>(new InMemorySpanExporter());

// fetch the exporter for dumping data later
memory_span_exporter = dynamic_cast<InMemorySpanExporter *>(exporter2.get());
lalitb marked this conversation as resolved.
Show resolved Hide resolved

auto processor2 = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter2)));

std::vector<std::unique_ptr<sdktrace::SpanProcessor>> processors;
processors.push_back(std::move(processor1));
processors.push_back(std::move(processor2));
// Default is an always-on sampler.
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processors));

auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(context));

// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}

void dumpSpans(std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanData>> &spans)
{
char span_buf[opentelemetry::trace::SpanId::kSize * 2];
char trace_buf[opentelemetry::trace::TraceId::kSize * 2];
char parent_span_buf[opentelemetry::trace::SpanId::kSize * 2];
std::cout << "\nSpans from memory :" << std::endl;

for (auto &span : spans)
{
std::cout << "\n\tSpan: " << std::endl;
std::cout << "\t\tName: " << span->GetName() << std::endl;
span->GetSpanId().ToLowerBase16(span_buf);
span->GetTraceId().ToLowerBase16(trace_buf);
span->GetParentSpanId().ToLowerBase16(parent_span_buf);
std::cout << "\t\tTraceId: " << std::string(trace_buf, sizeof(trace_buf)) << std::endl;
std::cout << "\t\tSpanId: " << std::string(span_buf, sizeof(span_buf)) << std::endl;
std::cout << "\t\tParentSpanId: " << std::string(parent_span_buf, sizeof(parent_span_buf))
<< std::endl;

std::cout << "\t\tDescription: " << span->GetDescription() << std::endl;
std::cout << "\t\tSpan kind:"
<< static_cast<typename std::underlying_type<opentelemetry::trace::SpanKind>::type>(
span->GetSpanKind())
<< std::endl;
std::cout << "\t\tSpan Status: "
<< static_cast<typename std::underlying_type<opentelemetry::trace::StatusCode>::type>(
span->GetStatus())
<< std::endl;
}
}
} // namespace

int main()
{
// Removing this line will leave the default noop TracerProvider in place.
initTracer();

foo_library();
auto memory_spans = memory_span_exporter->GetData()->GetSpans();
dumpSpans(memory_spans);
}
4 changes: 3 additions & 1 deletion ext/test/w3c_tracecontext_test/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ void initTracer()
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processor));
std::vector<std::unique_ptr<sdktrace::SpanProcessor>> processors;
processors.push_back(std::move(processor));
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processors));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(context));
// Set the global trace provider
Expand Down
7 changes: 5 additions & 2 deletions ext/test/zpages/tracez_data_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ class TracezDataAggregatorTest : public ::testing::Test
{
std::shared_ptr<TracezSharedData> shared_data(new TracezSharedData());
auto resource = opentelemetry::sdk::resource::Resource::Create({});
auto context = std::make_shared<TracerContext>(
std::unique_ptr<SpanProcessor>(new TracezSpanProcessor(shared_data)), resource);
std::unique_ptr<SpanProcessor> processor(new TracezSpanProcessor(shared_data));
std::vector<std::unique_ptr<SpanProcessor>> processors;
processors.push_back(std::move(processor));

auto context = std::make_shared<TracerContext>(std::move(processors), resource);
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context));
tracez_data_aggregator = std::unique_ptr<TracezDataAggregator>(
new TracezDataAggregator(shared_data, milliseconds(10)));
Expand Down
11 changes: 7 additions & 4 deletions ext/test/zpages/tracez_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,16 @@ class TracezProcessor : public ::testing::Test
protected:
void SetUp() override
{
shared_data = std::shared_ptr<TracezSharedData>(new TracezSharedData());
processor = std::shared_ptr<TracezSpanProcessor>(new TracezSpanProcessor(shared_data));
shared_data = std::shared_ptr<TracezSharedData>(new TracezSharedData());
processor = std::shared_ptr<TracezSpanProcessor>(new TracezSpanProcessor(shared_data));
std::unique_ptr<SpanProcessor> processor2(new TracezSpanProcessor(shared_data));
std::vector<std::unique_ptr<SpanProcessor>> processors;
processors.push_back(std::move(processor2));
auto resource = opentelemetry::sdk::resource::Resource::Create({});

// Note: we make a *different* processor for the tracercontext. THis is because
// all the tests use shared data, and we want to make sure this works correctly.
auto context = std::make_shared<TracerContext>(
std::unique_ptr<SpanProcessor>(new TracezSpanProcessor(shared_data)), resource);
auto context = std::make_shared<TracerContext>(std::move(processors), resource);

tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context));
auto spans = shared_data->GetSpanSnapshot();
Expand Down
141 changes: 141 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/multi_recordable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#pragma once

#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"
#include "opentelemetry/version.h"

#include <map>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace trace
{
namespace
{
std::size_t MakeKey(const SpanProcessor &processor)
{
return reinterpret_cast<std::size_t>(&processor);
}

} // namespace

class MultiRecordable : public Recordable
{
public:
void AddRecordable(const SpanProcessor &processor,
std::unique_ptr<Recordable> recordable) noexcept
{
recordables_[MakeKey(processor)] = std::move(recordable);
}

const std::unique_ptr<Recordable> &GetRecordable(const SpanProcessor &processor) const noexcept
{
// TODO - return nullptr ref on failed lookup?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove the TODO

static std::unique_ptr<Recordable> empty(nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this empty could be moved to the line right above return empty? We don't need initialize this value if recordable is found.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could make the empty variable as class static as it is also useful in ReleaseRecordable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this empty could be moved to the line right above return empty? We don't need initialize this value if recordable is found.

done

Or we could make the empty variable as class static as it is also useful in ReleaseRecordable?

That won't help as ReleaseRecordable needs to return unique_ptr, not it's a reference so class static can't be used. But control would never reach both places as we don't support removing processors once configured.

auto i = recordables_.find(MakeKey(processor));
if (i != recordables_.end())
{
return i->second;
}
return empty;
}

std::unique_ptr<Recordable> ReleaseRecordable(const SpanProcessor &processor) noexcept
{
auto i = recordables_.find(MakeKey(processor));
if (i != recordables_.end())
{
std::unique_ptr<Recordable> result(i->second.release());
recordables_.erase(MakeKey(processor));
return result;
}
return std::unique_ptr<Recordable>(nullptr);
}

void SetIdentity(const opentelemetry::trace::SpanContext &span_context,
opentelemetry::trace::SpanId parent_span_id) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetIdentity(span_context, parent_span_id);
}
}

virtual void SetAttribute(nostd::string_view key,
const opentelemetry::common::AttributeValue &value) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetAttribute(key, value);
}
}

virtual void AddEvent(nostd::string_view name,
opentelemetry::common::SystemTimestamp timestamp,
const opentelemetry::common::KeyValueIterable &attributes) noexcept override
{

for (auto &recordable : recordables_)
{
recordable.second->AddEvent(name, timestamp, attributes);
}
}

virtual void AddLink(const opentelemetry::trace::SpanContext &span_context,
const opentelemetry::common::KeyValueIterable &attributes) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->AddLink(span_context, attributes);
}
}

virtual void SetStatus(opentelemetry::trace::StatusCode code,
nostd::string_view description) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetStatus(code, description);
}
}

virtual void SetName(nostd::string_view name) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetName(name);
}
}

virtual void SetSpanKind(opentelemetry::trace::SpanKind span_kind) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetSpanKind(span_kind);
}
}

virtual void SetStartTime(opentelemetry::common::SystemTimestamp start_time) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetStartTime(start_time);
}
}

virtual void SetDuration(std::chrono::nanoseconds duration) noexcept override
{
for (auto &recordable : recordables_)
{
recordable.second->SetDuration(duration);
}
}

private:
std::map<std::size_t, std::unique_ptr<Recordable>> recordables_;
};
} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Loading