From 41744071d6f9a95147b59d4e68b7e6131f2aa4ef Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 23 Mar 2022 12:32:42 +0800 Subject: [PATCH] Finish async Export for `OtlpHttpClient`, `OtlpHttpExporter` and `OtlpHttpLogExporter`. Add tests for both sync and async exporting. Signed-off-by: owent --- exporters/otlp/src/otlp_http_exporter.cc | 2 +- exporters/otlp/src/otlp_http_log_exporter.cc | 2 +- .../otlp/test/otlp_http_exporter_test.cc | 363 +++++++++------- .../otlp/test/otlp_http_log_exporter_test.cc | 401 ++++++++++-------- .../sdk/logs/batch_log_processor.h | 37 ++ sdk/src/logs/batch_log_processor.cc | 16 + 6 files changed, 489 insertions(+), 332 deletions(-) diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index b302e6ec34..3f38443adc 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -70,7 +70,7 @@ void OtlpHttpExporter::Export( proto::collector::trace::v1::ExportTraceServiceRequest service_request; OtlpRecordableUtils::PopulateRequest(spans, &service_request); - http_client_->Export(service_request, result_callback); + http_client_->Export(service_request, std::move(result_callback)); } bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index de89b20b3d..51fb7a96a5 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -70,7 +70,7 @@ void OtlpHttpLogExporter::Export( } proto::collector::logs::v1::ExportLogsServiceRequest service_request; OtlpRecordableUtils::PopulateRequest(logs, &service_request); - http_client_->Export(service_request, result_callback); + http_client_->Export(service_request, std::move(result_callback)); } bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index 12c7494b2e..8f38685963 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -3,6 +3,9 @@ #ifndef HAVE_CPP_STDLIB +# include +# include + # include "opentelemetry/exporters/otlp/otlp_http_exporter.h" # include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" @@ -81,170 +84,218 @@ class OtlpHttpExporterTestPeer : public ::testing::Test auto http_client = http_client::HttpClientFactory::CreateNoSend(); return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; } + + void ExportJsonIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}}; + resource_attributes["bool_value"] = true; + resource_attributes["int32_value"] = static_cast(1); + resource_attributes["uint32_value"] = static_cast(2); + resource_attributes["int64_value"] = static_cast(0x1100000000LL); + resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); + resource_attributes["double_value"] = static_cast(3.1); + resource_attributes["vec_bool_value"] = std::vector{true, false, true}; + resource_attributes["vec_int32_value"] = std::vector{1, 2}; + resource_attributes["vec_uint32_value"] = std::vector{3, 4}; + resource_attributes["vec_int64_value"] = std::vector{5, 6}; + resource_attributes["vec_uint64_value"] = std::vector{7, 8}; + resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; + resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; + auto resource = resource::Resource::Create(resource_attributes); + + auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); + processor_opts.max_export_batch_size = 5; + processor_opts.max_queue_size = 5; + processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); + processor_opts.is_export_async = is_async; + auto processor = std::unique_ptr( + new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); + auto provider = nostd::shared_ptr( + new sdk::trace::TracerProvider(std::move(processor), resource)); + + std::string report_trace_id; + + char trace_id_hex[2 * trace_api::TraceId::kSize] = {0}; + auto tracer = provider->GetTracer("test"); + auto parent_span = tracer->StartSpan("Test parent span"); + + trace_api::StartSpanOptions child_span_opts = {}; + child_span_opts.parent = parent_span->GetContext(); + + auto child_span = tracer->StartSpan("Test child span", child_span_opts); + + nostd::get(child_span_opts.parent) + .trace_id() + .ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, is_async]( + std::shared_ptr callback) { + auto check_json = + nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); + auto resource_span = *check_json["resource_spans"].begin(); + auto instrumentation_library_span = + *resource_span["instrumentation_library_spans"].begin(); + auto span = *instrumentation_library_span["spans"].begin(); + auto received_trace_id = span["trace_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + child_span->End(); + parent_span->End(); + + static_cast(provider.get())->ForceFlush(); + } + + void ExportBinaryIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}}; + resource_attributes["bool_value"] = true; + resource_attributes["int32_value"] = static_cast(1); + resource_attributes["uint32_value"] = static_cast(2); + resource_attributes["int64_value"] = static_cast(0x1100000000LL); + resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); + resource_attributes["double_value"] = static_cast(3.1); + resource_attributes["vec_bool_value"] = std::vector{true, false, true}; + resource_attributes["vec_int32_value"] = std::vector{1, 2}; + resource_attributes["vec_uint32_value"] = std::vector{3, 4}; + resource_attributes["vec_int64_value"] = std::vector{5, 6}; + resource_attributes["vec_uint64_value"] = std::vector{7, 8}; + resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; + resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; + auto resource = resource::Resource::Create(resource_attributes); + + auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); + processor_opts.max_export_batch_size = 5; + processor_opts.max_queue_size = 5; + processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); + processor_opts.is_export_async = is_async; + + auto processor = std::unique_ptr( + new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); + auto provider = nostd::shared_ptr( + new sdk::trace::TracerProvider(std::move(processor), resource)); + + std::string report_trace_id; + + uint8_t trace_id_binary[trace_api::TraceId::kSize] = {0}; + auto tracer = provider->GetTracer("test"); + auto parent_span = tracer->StartSpan("Test parent span"); + + trace_api::StartSpanOptions child_span_opts = {}; + child_span_opts.parent = parent_span->GetContext(); + + auto child_span = tracer->StartSpan("Test child span", child_span_opts); + nostd::get(child_span_opts.parent) + .trace_id() + .CopyBytesTo(MakeSpan(trace_id_binary)); + report_trace_id.assign(reinterpret_cast(trace_id_binary), sizeof(trace_id_binary)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, is_async]( + std::shared_ptr callback) { + opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; + request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], + static_cast(mock_session->GetRequest()->body_.size())); + auto received_trace_id = + request_body.resource_spans(0).instrumentation_library_spans(0).spans(0).trace_id(); + EXPECT_EQ(received_trace_id, report_trace_id); + + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + child_span->End(); + parent_span->End(); + + static_cast(provider.get())->ForceFlush(); + } }; // Create spans, let processor call Export() -TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTest) +TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync) { - auto mock_otlp_client = - OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}}; - resource_attributes["bool_value"] = true; - resource_attributes["int32_value"] = static_cast(1); - resource_attributes["uint32_value"] = static_cast(2); - resource_attributes["int64_value"] = static_cast(0x1100000000LL); - resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); - resource_attributes["double_value"] = static_cast(3.1); - resource_attributes["vec_bool_value"] = std::vector{true, false, true}; - resource_attributes["vec_int32_value"] = std::vector{1, 2}; - resource_attributes["vec_uint32_value"] = std::vector{3, 4}; - resource_attributes["vec_int64_value"] = std::vector{5, 6}; - resource_attributes["vec_uint64_value"] = std::vector{7, 8}; - resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; - resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; - auto resource = resource::Resource::Create(resource_attributes); - - auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); - processor_opts.max_export_batch_size = 5; - processor_opts.max_queue_size = 5; - processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); - auto processor = std::unique_ptr( - new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); - auto provider = nostd::shared_ptr( - new sdk::trace::TracerProvider(std::move(processor), resource)); - - std::string report_trace_id; - - char trace_id_hex[2 * trace_api::TraceId::kSize] = {0}; - auto tracer = provider->GetTracer("test"); - auto parent_span = tracer->StartSpan("Test parent span"); - - trace_api::StartSpanOptions child_span_opts = {}; - child_span_opts.parent = parent_span->GetContext(); - - auto child_span = tracer->StartSpan("Test child span", child_span_opts); - - nostd::get(child_span_opts.parent) - .trace_id() - .ToLowerBase16(MakeSpan(trace_id_hex)); - report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id]( - std::shared_ptr callback) { - auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); - auto resource_span = *check_json["resource_spans"].begin(); - auto instrumentation_library_span = *resource_span["instrumentation_library_spans"].begin(); - auto span = *instrumentation_library_span["spans"].begin(); - auto received_trace_id = span["trace_id"].get(); - EXPECT_EQ(received_trace_id, report_trace_id); - - auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); - ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); - if (custom_header != mock_session->GetRequest()->headers_.end()) - { - EXPECT_EQ("Custom-Header-Value", custom_header->second); - } - // let the otlp_http_client to continue - http_client::nosend::Response response; - callback->OnResponse(response); - }); - - child_span->End(); - parent_span->End(); - - static_cast(provider.get())->ForceFlush(); + ExportJsonIntegrationTest(false); +} + +TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync) +{ + ExportJsonIntegrationTest(true); } // Create spans, let processor call Export() -TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTest) +TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync) +{ + ExportBinaryIntegrationTest(false); +} + +TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestAsync) { - auto mock_otlp_client = - OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}}; - resource_attributes["bool_value"] = true; - resource_attributes["int32_value"] = static_cast(1); - resource_attributes["uint32_value"] = static_cast(2); - resource_attributes["int64_value"] = static_cast(0x1100000000LL); - resource_attributes["uint64_value"] = static_cast(0x1200000000ULL); - resource_attributes["double_value"] = static_cast(3.1); - resource_attributes["vec_bool_value"] = std::vector{true, false, true}; - resource_attributes["vec_int32_value"] = std::vector{1, 2}; - resource_attributes["vec_uint32_value"] = std::vector{3, 4}; - resource_attributes["vec_int64_value"] = std::vector{5, 6}; - resource_attributes["vec_uint64_value"] = std::vector{7, 8}; - resource_attributes["vec_double_value"] = std::vector{3.2, 3.3}; - resource_attributes["vec_string_value"] = std::vector{"vector", "string"}; - auto resource = resource::Resource::Create(resource_attributes); - - auto processor_opts = sdk::trace::BatchSpanProcessorOptions(); - processor_opts.max_export_batch_size = 5; - processor_opts.max_queue_size = 5; - processor_opts.schedule_delay_millis = std::chrono::milliseconds(256); - - auto processor = std::unique_ptr( - new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts)); - auto provider = nostd::shared_ptr( - new sdk::trace::TracerProvider(std::move(processor), resource)); - - std::string report_trace_id; - - uint8_t trace_id_binary[trace_api::TraceId::kSize] = {0}; - auto tracer = provider->GetTracer("test"); - auto parent_span = tracer->StartSpan("Test parent span"); - - trace_api::StartSpanOptions child_span_opts = {}; - child_span_opts.parent = parent_span->GetContext(); - - auto child_span = tracer->StartSpan("Test child span", child_span_opts); - nostd::get(child_span_opts.parent) - .trace_id() - .CopyBytesTo(MakeSpan(trace_id_binary)); - report_trace_id.assign(reinterpret_cast(trace_id_binary), sizeof(trace_id_binary)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id]( - std::shared_ptr callback) { - opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body; - request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], - static_cast(mock_session->GetRequest()->body_.size())); - auto received_trace_id = - request_body.resource_spans(0).instrumentation_library_spans(0).spans(0).trace_id(); - EXPECT_EQ(received_trace_id, report_trace_id); - - auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); - ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); - if (custom_header != mock_session->GetRequest()->headers_.end()) - { - EXPECT_EQ("Custom-Header-Value", custom_header->second); - } - - // let the otlp_http_client to continue - http_client::nosend::Response response; - - response.Finish(*callback.get()); - }); - - child_span->End(); - parent_span->End(); - - static_cast(provider.get())->ForceFlush(); + ExportBinaryIntegrationTest(true); } // Test exporter configuration options diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index e49824f0b7..366fea163c 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -82,188 +82,241 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test auto http_client = http_client::HttpClientFactory::CreateNoSend(); return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client}; } + + void ExportJsonIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + bool attribute_storage_bool_value[] = {true, false, true}; + int32_t attribute_storage_int32_value[] = {1, 2}; + uint32_t attribute_storage_uint32_value[] = {3, 4}; + int64_t attribute_storage_int64_value[] = {5, 6}; + uint64_t attribute_storage_uint64_value[] = {7, 8}; + double attribute_storage_double_value[] = {3.2, 3.3}; + std::string attribute_storage_string_value[] = {"vector", "string"}; + + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->AddProcessor( + std::unique_ptr(new sdk::logs::BatchLogProcessor( + std::move(exporter), 5, std::chrono::milliseconds(256), 5, is_async))); + + std::string report_trace_id; + std::string report_span_id; + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + char trace_id_hex[2 * opentelemetry::trace::TraceId::kSize] = {0}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + char span_id_hex[2 * opentelemetry::trace::SpanId::kSize] = {0}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; + auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); + + trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); + report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); + + span_id.ToLowerBase16(MakeSpan(span_id_hex)); + report_span_id.assign(span_id_hex, sizeof(span_id_hex)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, report_span_id, is_async]( + std::shared_ptr callback) { + auto check_json = + nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); + auto resource_logs = *check_json["resource_logs"].begin(); + auto instrumentation_library_span = + *resource_logs["instrumentation_library_logs"].begin(); + auto log = *instrumentation_library_span["logs"].begin(); + auto received_trace_id = log["trace_id"].get(); + auto received_span_id = log["span_id"].get(); + EXPECT_EQ(received_trace_id, report_trace_id); + EXPECT_EQ(received_span_id, report_span_id); + EXPECT_EQ("Log name", log["name"].get()); + EXPECT_EQ("Log message", log["body"]["string_value"].get()); + EXPECT_LE(15, log["attributes"].size()); + auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); + ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); + if (custom_header != mock_session->GetRequest()->headers_.end()) + { + EXPECT_EQ("Custom-Header-Value", custom_header->second); + } + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); + } + + void ExportBinaryIntegrationTest(bool is_async) + { + auto mock_otlp_client = + OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto mock_otlp_http_client = mock_otlp_client.first; + auto client = mock_otlp_client.second; + auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); + + bool attribute_storage_bool_value[] = {true, false, true}; + int32_t attribute_storage_int32_value[] = {1, 2}; + uint32_t attribute_storage_uint32_value[] = {3, 4}; + int64_t attribute_storage_int64_value[] = {5, 6}; + uint64_t attribute_storage_uint64_value[] = {7, 8}; + double attribute_storage_double_value[] = {3.2, 3.3}; + std::string attribute_storage_string_value[] = {"vector", "string"}; + + sdk::logs::BatchLogProcessorOptions processor_options; + processor_options.max_export_batch_size = 5; + processor_options.max_queue_size = 5; + processor_options.schedule_delay_millis = std::chrono::milliseconds(256); + processor_options.is_export_async = is_async; + auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); + provider->AddProcessor(std::unique_ptr( + new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options))); + + std::string report_trace_id; + std::string report_span_id; + uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + opentelemetry::trace::TraceId trace_id{trace_id_bin}; + uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', + '3', '2', '1', '0'}; + opentelemetry::trace::SpanId span_id{span_id_bin}; + + const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; + auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); + + report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); + report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); + + auto no_send_client = std::static_pointer_cast(client); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&mock_session, report_trace_id, report_span_id, is_async]( + std::shared_ptr callback) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; + request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], + static_cast(mock_session->GetRequest()->body_.size())); + auto received_log = request_body.resource_logs(0).instrumentation_library_logs(0).logs(0); + EXPECT_EQ(received_log.trace_id(), report_trace_id); + EXPECT_EQ(received_log.span_id(), report_span_id); + EXPECT_EQ("Log name", received_log.name()); + EXPECT_EQ("Log message", received_log.body().string_value()); + EXPECT_LE(15, received_log.attributes_size()); + bool check_service_name = false; + for (auto &attribute : received_log.attributes()) + { + if ("service.name" == attribute.key()) + { + check_service_name = true; + EXPECT_EQ("unit_test_service", attribute.value().string_value()); + } + } + ASSERT_TRUE(check_service_name); + + // let the otlp_http_client to continue + if (is_async) + { + std::thread async_finish{[callback]() { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + http_client::nosend::Response response; + response.Finish(*callback.get()); + }}; + async_finish.detach(); + } + else + { + http_client::nosend::Response response; + response.Finish(*callback.get()); + } + }); + + logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", + {{"service.name", "unit_test_service"}, + {"tenant.id", "test_user"}, + {"bool_value", true}, + {"int32_value", static_cast(1)}, + {"uint32_value", static_cast(2)}, + {"int64_value", static_cast(0x1100000000LL)}, + {"uint64_value", static_cast(0x1200000000ULL)}, + {"double_value", static_cast(3.1)}, + {"vec_bool_value", attribute_storage_bool_value}, + {"vec_int32_value", attribute_storage_int32_value}, + {"vec_uint32_value", attribute_storage_uint32_value}, + {"vec_int64_value", attribute_storage_int64_value}, + {"vec_uint64_value", attribute_storage_uint64_value}, + {"vec_double_value", attribute_storage_double_value}, + {"vec_string_value", attribute_storage_string_value}}, + trace_id, span_id, + opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, + std::chrono::system_clock::now()); + + provider->ForceFlush(); + } }; // Create log records, let processor call Export() -TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) +TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestSync) +{ + ExportJsonIntegrationTest(false); +} + +TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestAsync) { - auto mock_otlp_client = - OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - bool attribute_storage_bool_value[] = {true, false, true}; - int32_t attribute_storage_int32_value[] = {1, 2}; - uint32_t attribute_storage_uint32_value[] = {3, 4}; - int64_t attribute_storage_int64_value[] = {5, 6}; - uint64_t attribute_storage_uint64_value[] = {7, 8}; - double attribute_storage_double_value[] = {3.2, 3.3}; - std::string attribute_storage_string_value[] = {"vector", "string"}; - - auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); - provider->AddProcessor(std::unique_ptr( - new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5))); - - std::string report_trace_id; - std::string report_span_id; - uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; - char trace_id_hex[2 * opentelemetry::trace::TraceId::kSize] = {0}; - opentelemetry::trace::TraceId trace_id{trace_id_bin}; - uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', - '3', '2', '1', '0'}; - char span_id_hex[2 * opentelemetry::trace::SpanId::kSize] = {0}; - opentelemetry::trace::SpanId span_id{span_id_bin}; - - const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; - auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); - - trace_id.ToLowerBase16(MakeSpan(trace_id_hex)); - report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex)); - - span_id.ToLowerBase16(MakeSpan(span_id_hex)); - report_span_id.assign(span_id_hex, sizeof(span_id_hex)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, report_span_id]( - std::shared_ptr callback) { - auto check_json = nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false); - auto resource_logs = *check_json["resource_logs"].begin(); - auto instrumentation_library_span = *resource_logs["instrumentation_library_logs"].begin(); - auto log = *instrumentation_library_span["logs"].begin(); - auto received_trace_id = log["trace_id"].get(); - auto received_span_id = log["span_id"].get(); - EXPECT_EQ(received_trace_id, report_trace_id); - EXPECT_EQ(received_span_id, report_span_id); - EXPECT_EQ("Log name", log["name"].get()); - EXPECT_EQ("Log message", log["body"]["string_value"].get()); - EXPECT_LE(15, log["attributes"].size()); - auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key"); - ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end()); - if (custom_header != mock_session->GetRequest()->headers_.end()) - { - EXPECT_EQ("Custom-Header-Value", custom_header->second); - } - - // let the otlp_http_client to continue - http_client::nosend::Response response; - - response.Finish(*callback.get()); - }); - - logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", - {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}, - {"bool_value", true}, - {"int32_value", static_cast(1)}, - {"uint32_value", static_cast(2)}, - {"int64_value", static_cast(0x1100000000LL)}, - {"uint64_value", static_cast(0x1200000000ULL)}, - {"double_value", static_cast(3.1)}, - {"vec_bool_value", attribute_storage_bool_value}, - {"vec_int32_value", attribute_storage_int32_value}, - {"vec_uint32_value", attribute_storage_uint32_value}, - {"vec_int64_value", attribute_storage_int64_value}, - {"vec_uint64_value", attribute_storage_uint64_value}, - {"vec_double_value", attribute_storage_double_value}, - {"vec_string_value", attribute_storage_string_value}}, - trace_id, span_id, - opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, - std::chrono::system_clock::now()); - - provider->ForceFlush(); + ExportJsonIntegrationTest(true); } // Create log records, let processor call Export() -TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) +TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestSync) { - auto mock_otlp_client = - OtlpHttpLogExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); - auto mock_otlp_http_client = mock_otlp_client.first; - auto client = mock_otlp_client.second; - auto exporter = GetExporter(std::unique_ptr{mock_otlp_http_client}); - - bool attribute_storage_bool_value[] = {true, false, true}; - int32_t attribute_storage_int32_value[] = {1, 2}; - uint32_t attribute_storage_uint32_value[] = {3, 4}; - int64_t attribute_storage_int64_value[] = {5, 6}; - uint64_t attribute_storage_uint64_value[] = {7, 8}; - double attribute_storage_double_value[] = {3.2, 3.3}; - std::string attribute_storage_string_value[] = {"vector", "string"}; - - auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); - provider->AddProcessor(std::unique_ptr( - new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5))); - - std::string report_trace_id; - std::string report_span_id; - uint8_t trace_id_bin[opentelemetry::trace::TraceId::kSize] = { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; - opentelemetry::trace::TraceId trace_id{trace_id_bin}; - uint8_t span_id_bin[opentelemetry::trace::SpanId::kSize] = {'7', '6', '5', '4', - '3', '2', '1', '0'}; - opentelemetry::trace::SpanId span_id{span_id_bin}; - - const std::string schema_url{"https://opentelemetry.io/schemas/1.2.0"}; - auto logger = provider->GetLogger("test", "", "opentelelemtry_library", "", schema_url); - - report_trace_id.assign(reinterpret_cast(trace_id_bin), sizeof(trace_id_bin)); - report_span_id.assign(reinterpret_cast(span_id_bin), sizeof(span_id_bin)); - - auto no_send_client = std::static_pointer_cast(client); - auto mock_session = - std::static_pointer_cast(no_send_client->session_); - EXPECT_CALL(*mock_session, SendRequest) - .WillOnce([&mock_session, report_trace_id, report_span_id]( - std::shared_ptr callback) { - opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_body; - request_body.ParseFromArray(&mock_session->GetRequest()->body_[0], - static_cast(mock_session->GetRequest()->body_.size())); - auto received_log = request_body.resource_logs(0).instrumentation_library_logs(0).logs(0); - EXPECT_EQ(received_log.trace_id(), report_trace_id); - EXPECT_EQ(received_log.span_id(), report_span_id); - EXPECT_EQ("Log name", received_log.name()); - EXPECT_EQ("Log message", received_log.body().string_value()); - EXPECT_LE(15, received_log.attributes_size()); - bool check_service_name = false; - for (auto &attribute : received_log.attributes()) - { - if ("service.name" == attribute.key()) - { - check_service_name = true; - EXPECT_EQ("unit_test_service", attribute.value().string_value()); - } - } - ASSERT_TRUE(check_service_name); - http_client::nosend::Response response; - callback->OnResponse(response); - }); - - logger->Log(opentelemetry::logs::Severity::kInfo, "Log name", "Log message", - {{"service.name", "unit_test_service"}, - {"tenant.id", "test_user"}, - {"bool_value", true}, - {"int32_value", static_cast(1)}, - {"uint32_value", static_cast(2)}, - {"int64_value", static_cast(0x1100000000LL)}, - {"uint64_value", static_cast(0x1200000000ULL)}, - {"double_value", static_cast(3.1)}, - {"vec_bool_value", attribute_storage_bool_value}, - {"vec_int32_value", attribute_storage_int32_value}, - {"vec_uint32_value", attribute_storage_uint32_value}, - {"vec_int64_value", attribute_storage_int64_value}, - {"vec_uint64_value", attribute_storage_uint64_value}, - {"vec_double_value", attribute_storage_double_value}, - {"vec_string_value", attribute_storage_string_value}}, - trace_id, span_id, - opentelemetry::trace::TraceFlags{opentelemetry::trace::TraceFlags::kIsSampled}, - std::chrono::system_clock::now()); - - provider->ForceFlush(); + ExportBinaryIntegrationTest(false); +} + +TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestAsync) +{ + ExportBinaryIntegrationTest(true); } // Test exporter configuration options diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 1387c9c82b..c6f9406148 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -19,6 +19,33 @@ namespace sdk namespace logs { +/** + * Struct to hold batch SpanProcessor options. + */ +struct BatchLogProcessorOptions +{ + /** + * The maximum buffer/queue size. After the size is reached, spans are + * dropped. + */ + size_t max_queue_size = 2048; + + /* The time interval between two consecutive exports. */ + std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000); + + /** + * The maximum batch size of every export. It must be smaller or + * equal to max_queue_size. + */ + size_t max_export_batch_size = 512; + + /** + * Determines whether the export happens asynchronously. + * Default implementation is synchronous. + */ + bool is_export_async = false; +}; + /** * This is an implementation of the LogProcessor which creates batches of finished logs and passes * the export-friendly log data representations to the configured LogExporter. @@ -44,6 +71,16 @@ class BatchLogProcessor : public LogProcessor const size_t max_export_batch_size = 512, const bool is_export_async = false); + /** + * Creates a batch log processor by configuring the specified exporter and other parameters + * as per the official, language-agnostic opentelemetry specs. + * + * @param exporter - The backend exporter to pass the logs to + * @param options - The batch SpanProcessor options. + */ + explicit BatchLogProcessor(std::unique_ptr &&exporter, + const BatchLogProcessorOptions &options); + /** Makes a new recordable **/ std::unique_ptr MakeRecordable() noexcept override; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index b8db44f861..37c5f5ffe5 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -32,6 +32,22 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, is_async_shutdown_notified_.store(false); } +BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, + const BatchLogProcessorOptions &options) + : exporter_(std::move(exporter)), + max_queue_size_(options.max_queue_size), + scheduled_delay_millis_(options.schedule_delay_millis), + max_export_batch_size_(options.max_export_batch_size), + buffer_(options.max_queue_size), + is_export_async_(options.is_export_async), + worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) +{ + is_shutdown_.store(false); + is_force_flush_.store(false); + is_force_flush_notified_.store(false); + is_async_shutdown_notified_.store(false); +} + std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept { return exporter_->MakeRecordable();