From 7c90cf4052e86e1a77e4a645eb5a902b49e781bf Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Wed, 3 Nov 2021 13:21:15 -0700 Subject: [PATCH] Add logging signal to main (#2251) * Add initial overall structure and classes for logs sdk (#1894) * Add global LogEmitterProvider and convenience function get_log_emitter (#1901) * Add OTLPHandler for standard library logging module (#1903) * Add LogProcessors implementation (#1916) * Fix typos in test_handler.py (#1953) * Add support for OTLP Log exporter (#1943) * Add support for user defined attributes in OTLPHandler (#1952) * use timeout in force_flush (#2118) * use timeout in force_flush * fix lint * Update opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py Co-authored-by: Srikanth Chekuri * fix lint Co-authored-by: Srikanth Chekuri * add a ConsoleExporter for logging (#2099) Co-authored-by: Srikanth Chekuri * Update SDK docs and Add example with OTEL collector logging (debug) exporter (#2050) * Fix exception in severity number transformation (#2208) * Fix exception with warning message transformation * Fix lint * Fix lint * fstring * Demonstrate how to set the Resource for LogEmitterProvider (#2209) * Demonstrate how to set the Resource for LogEmitterProvider Added a Resource to the logs example to make it more complete. Previously it was using the built-in Resource. Now it adds the service.name and service.instance.id attributes. The resulting emitted log records look like this: ``` Resource labels: -> telemetry.sdk.language: STRING(python) -> telemetry.sdk.name: STRING(opentelemetry) -> telemetry.sdk.version: STRING(1.5.0) -> service.name: STRING(shoppingcart) -> service.instance.id: STRING(instance-12) InstrumentationLibraryLogs #0 InstrumentationLibrary __main__ 0.1 LogRecord #0 Timestamp: 2021-10-14 18:33:43.425820928 +0000 UTC Severity: ERROR ShortName: Body: Hyderabad, we have a major problem. Trace ID: ce1577e4a703f42d569e72593ad71888 Span ID: f8908ac4258ceff6 Flags: 1 ``` * Fix linting * Use batch processor in example (#2225) * move logs to _logs (#2240) * move logs to _logs * fix lint * move log_exporter to _log_exporter as it's still experimental (#2252) Co-authored-by: Srikanth Chekuri Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Co-authored-by: Leighton Chen Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Co-authored-by: Owais Lone --- CHANGELOG.md | 8 + docs/examples/logs/README.rst | 75 +++ docs/examples/logs/example.py | 62 +++ docs/examples/logs/otel-collector-config.yaml | 10 + docs/sdk/logs.export.rst | 7 + docs/sdk/logs.rst | 22 + docs/sdk/logs.severity.rst | 7 + docs/sdk/sdk.rst | 1 + .../otlp/proto/grpc/_log_exporter/__init__.py | 186 +++++++ .../tests/logs/__init__.py | 0 .../tests/logs/test_otlp_logs_exporter.py | 474 +++++++++++++++++ opentelemetry-sdk/setup.cfg | 2 + .../src/opentelemetry/sdk/_logs/__init__.py | 500 ++++++++++++++++++ .../sdk/_logs/export/__init__.py | 301 +++++++++++ .../_logs/export/in_memory_log_exporter.py | 51 ++ .../src/opentelemetry/sdk/_logs/severity.py | 115 ++++ .../sdk/environment_variables.py | 9 + opentelemetry-sdk/tests/logs/__init__.py | 13 + opentelemetry-sdk/tests/logs/test_export.py | 322 +++++++++++ .../tests/logs/test_global_provider.py | 75 +++ opentelemetry-sdk/tests/logs/test_handler.py | 96 ++++ .../tests/logs/test_multi_log_prcessor.py | 194 +++++++ 22 files changed, 2530 insertions(+) create mode 100644 docs/examples/logs/README.rst create mode 100644 docs/examples/logs/example.py create mode 100644 docs/examples/logs/otel-collector-config.yaml create mode 100644 docs/sdk/logs.export.rst create mode 100644 docs/sdk/logs.rst create mode 100644 docs/sdk/logs.severity.rst create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py create mode 100644 opentelemetry-sdk/tests/logs/__init__.py create mode 100644 opentelemetry-sdk/tests/logs/test_export.py create mode 100644 opentelemetry-sdk/tests/logs/test_global_provider.py create mode 100644 opentelemetry-sdk/tests/logs/test_handler.py create mode 100644 opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ad92cf02b5..5f4c305ae7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -115,6 +115,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added dropped count to otlp, jaeger and zipkin exporters. ([#1893](https://github.com/open-telemetry/opentelemetry-python/pull/1893)) +### Added +- Give OTLPHandler the ability to process attributes + ([#1952](https://github.com/open-telemetry/opentelemetry-python/pull/1952)) +- Add global LogEmitterProvider and convenience function get_log_emitter + ([#1901](https://github.com/open-telemetry/opentelemetry-python/pull/1901)) +- Add OTLPHandler for standard library logging module + ([#1903](https://github.com/open-telemetry/opentelemetry-python/pull/1903)) + ### Changed - Updated `opentelemetry-opencensus-exporter` to use `service_name` of spans instead of resource ([#1897](https://github.com/open-telemetry/opentelemetry-python/pull/1897)) diff --git a/docs/examples/logs/README.rst b/docs/examples/logs/README.rst new file mode 100644 index 0000000000..3c19c2eafe --- /dev/null +++ b/docs/examples/logs/README.rst @@ -0,0 +1,75 @@ +OpenTelemetry Logs SDK +====================== + +Start the Collector locally to see data being exported. Write the following file: + +.. code-block:: yaml + + # otel-collector-config.yaml + receivers: + otlp: + protocols: + grpc: + + exporters: + logging: + + processors: + batch: + +Then start the Docker container: + +.. code-block:: sh + + docker run \ + -p 4317:4317 \ + -v $(pwd)/otel-collector-config.yaml:/etc/otel/config.yaml \ + otel/opentelemetry-collector-contrib:latest + +.. code-block:: sh + + $ python example.py + +The resulting logs will appear in the output from the collector and look similar to this: + +.. code-block:: sh + + ResourceLog #0 + Resource labels: + -> telemetry.sdk.language: STRING(python) + -> telemetry.sdk.name: STRING(opentelemetry) + -> telemetry.sdk.version: STRING(1.5.0.dev0) + -> service.name: STRING(unknown_service) + InstrumentationLibraryLogs #0 + InstrumentationLibrary __main__ 0.1 + LogRecord #0 + Timestamp: 2021-08-18 08:26:53.837349888 +0000 UTC + Severity: ERROR + ShortName: + Body: Exception while exporting logs. + ResourceLog #1 + Resource labels: + -> telemetry.sdk.language: STRING(python) + -> telemetry.sdk.name: STRING(opentelemetry) + -> telemetry.sdk.version: STRING(1.5.0.dev0) + -> service.name: STRING(unknown_service) + InstrumentationLibraryLogs #0 + InstrumentationLibrary __main__ 0.1 + LogRecord #0 + Timestamp: 2021-08-18 08:26:53.842546944 +0000 UTC + Severity: ERROR + ShortName: + Body: The five boxing wizards jump quickly. + ResourceLog #2 + Resource labels: + -> telemetry.sdk.language: STRING(python) + -> telemetry.sdk.name: STRING(opentelemetry) + -> telemetry.sdk.version: STRING(1.5.0.dev0) + -> service.name: STRING(unknown_service) + InstrumentationLibraryLogs #0 + InstrumentationLibrary __main__ 0.1 + LogRecord #0 + Timestamp: 2021-08-18 08:26:53.843979008 +0000 UTC + Severity: ERROR + ShortName: + Body: Hyderabad, we have a major problem. \ No newline at end of file diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py new file mode 100644 index 0000000000..b34d9a88cc --- /dev/null +++ b/docs/examples/logs/example.py @@ -0,0 +1,62 @@ +import logging + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.sdk._logs import ( + LogEmitterProvider, + OTLPHandler, + set_log_emitter_provider, +) +from opentelemetry.sdk._logs.export import BatchLogProcessor +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ConsoleSpanExporter, +) + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(ConsoleSpanExporter()) +) + +log_emitter_provider = LogEmitterProvider( + resource=Resource.create( + { + "service.name": "shoppingcart", + "service.instance.id": "instance-12", + } + ), +) +set_log_emitter_provider(log_emitter_provider) + +exporter = OTLPLogExporter(insecure=True) +log_emitter_provider.add_log_processor(BatchLogProcessor(exporter)) +log_emitter = log_emitter_provider.get_log_emitter(__name__, "0.1") +handler = OTLPHandler(level=logging.NOTSET, log_emitter=log_emitter) + +# Attach OTLP handler to root logger +logging.getLogger("root").addHandler(handler) + +# Log directly +logging.info("Jackdaws love my big sphinx of quartz.") + +# Create different namespaced loggers +logger1 = logging.getLogger("myapp.area1") +logger2 = logging.getLogger("myapp.area2") + +logger1.debug("Quick zephyrs blow, vexing daft Jim.") +logger1.info("How quickly daft jumping zebras vex.") +logger2.warning("Jail zesty vixen who grabbed pay from quack.") +logger2.error("The five boxing wizards jump quickly.") + + +# Trace context correlation +tracer = trace.get_tracer(__name__) +with tracer.start_as_current_span("foo"): + # Do something + logger2.error("Hyderabad, we have a major problem.") + +log_emitter_provider.shutdown() diff --git a/docs/examples/logs/otel-collector-config.yaml b/docs/examples/logs/otel-collector-config.yaml new file mode 100644 index 0000000000..f29ce6476c --- /dev/null +++ b/docs/examples/logs/otel-collector-config.yaml @@ -0,0 +1,10 @@ +receivers: + otlp: + protocols: + grpc: + +exporters: + logging: + +processors: + batch: diff --git a/docs/sdk/logs.export.rst b/docs/sdk/logs.export.rst new file mode 100644 index 0000000000..19a4023742 --- /dev/null +++ b/docs/sdk/logs.export.rst @@ -0,0 +1,7 @@ +opentelemetry.sdk._logs.export +============================== + +.. automodule:: opentelemetry.sdk._logs.export + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/sdk/logs.rst b/docs/sdk/logs.rst new file mode 100644 index 0000000000..6d9f3c2548 --- /dev/null +++ b/docs/sdk/logs.rst @@ -0,0 +1,22 @@ +opentelemetry.sdk._logs package +=============================== + +.. warning:: + OpenTelemetry Python logs are in an experimental state. The APIs within + :mod:`opentelemetry.sdk._logs` are subject to change in minor/patch releases and make no + backward compatability guarantees at this time. + + Once logs become stable, this package will be be renamed to ``opentelemetry.sdk.logs``. + +Submodules +---------- + +.. toctree:: + + logs.export + logs.severity + +.. automodule:: opentelemetry.sdk._logs + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/sdk/logs.severity.rst b/docs/sdk/logs.severity.rst new file mode 100644 index 0000000000..1197e8b44e --- /dev/null +++ b/docs/sdk/logs.severity.rst @@ -0,0 +1,7 @@ +opentelemetry.sdk._logs.severity +================================ + +.. automodule:: opentelemetry.sdk._logs.severity + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/sdk/sdk.rst b/docs/sdk/sdk.rst index 333da1820b..619f3bd8cc 100644 --- a/docs/sdk/sdk.rst +++ b/docs/sdk/sdk.rst @@ -8,5 +8,6 @@ OpenTelemetry Python SDK resources trace + logs error_handler environment_variables diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py new file mode 100644 index 0000000000..211655d93a --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -0,0 +1,186 @@ +# Copyright The OpenTelemetry Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Sequence +from grpc import ChannelCredentials, Compression +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( + OTLPExporterMixin, + _translate_key_values, + get_resource_data, + _translate_value, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( + LogsServiceStub, +) +from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary +from opentelemetry.proto.logs.v1.logs_pb2 import ( + InstrumentationLibraryLogs, + ResourceLogs, +) +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.sdk._logs import LogRecord as SDKLogRecord +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult + + +class OTLPLogExporter( + LogExporter, + OTLPExporterMixin[SDKLogRecord, ExportLogsServiceRequest, LogExportResult], +): + + _result = LogExportResult + _stub = LogsServiceStub + + def __init__( + self, + endpoint: Optional[str] = None, + insecure: Optional[bool] = None, + credentials: Optional[ChannelCredentials] = None, + headers: Optional[Sequence] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, + ): + super().__init__( + **{ + "endpoint": endpoint, + "insecure": insecure, + "credentials": credentials, + "headers": headers, + "timeout": timeout, + "compression": compression, + } + ) + + def _translate_name(self, log_data: LogData) -> None: + self._collector_log_kwargs["name"] = log_data.log_record.name + + def _translate_time(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "time_unix_nano" + ] = log_data.log_record.timestamp + + def _translate_span_id(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "span_id" + ] = log_data.log_record.span_id.to_bytes(8, "big") + + def _translate_trace_id(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "trace_id" + ] = log_data.log_record.trace_id.to_bytes(16, "big") + + def _translate_trace_flags(self, log_data: LogData) -> None: + self._collector_log_kwargs["flags"] = int( + log_data.log_record.trace_flags + ) + + def _translate_body(self, log_data: LogData): + self._collector_log_kwargs["body"] = _translate_value( + log_data.log_record.body + ) + + def _translate_severity_text(self, log_data: LogData): + self._collector_log_kwargs[ + "severity_text" + ] = log_data.log_record.severity_text + + def _translate_attributes(self, log_data: LogData) -> None: + if log_data.log_record.attributes: + self._collector_log_kwargs["attributes"] = [] + for key, value in log_data.log_record.attributes.items(): + try: + self._collector_log_kwargs["attributes"].append( + _translate_key_values(key, value) + ) + except Exception: # pylint: disable=broad-except + pass + + def _translate_data( + self, data: Sequence[LogData] + ) -> ExportLogsServiceRequest: + # pylint: disable=attribute-defined-outside-init + + sdk_resource_instrumentation_library_logs = {} + + for log_data in data: + resource = log_data.log_record.resource + + instrumentation_library_logs_map = ( + sdk_resource_instrumentation_library_logs.get(resource, {}) + ) + if not instrumentation_library_logs_map: + sdk_resource_instrumentation_library_logs[ + resource + ] = instrumentation_library_logs_map + + instrumentation_library_logs = ( + instrumentation_library_logs_map.get( + log_data.instrumentation_info + ) + ) + if not instrumentation_library_logs: + if log_data.instrumentation_info is not None: + instrumentation_library_logs_map[ + log_data.instrumentation_info + ] = InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name=log_data.instrumentation_info.name, + version=log_data.instrumentation_info.version, + ) + ) + else: + instrumentation_library_logs_map[ + log_data.instrumentation_info + ] = InstrumentationLibraryLogs() + + instrumentation_library_logs = ( + instrumentation_library_logs_map.get( + log_data.instrumentation_info + ) + ) + + self._collector_log_kwargs = {} + + self._translate_name(log_data) + self._translate_time(log_data) + self._translate_span_id(log_data) + self._translate_trace_id(log_data) + self._translate_trace_flags(log_data) + self._translate_body(log_data) + self._translate_severity_text(log_data) + self._translate_attributes(log_data) + + self._collector_log_kwargs[ + "severity_number" + ] = log_data.log_record.severity_number.value + + instrumentation_library_logs.logs.append( + PB2LogRecord(**self._collector_log_kwargs) + ) + + return ExportLogsServiceRequest( + resource_logs=get_resource_data( + sdk_resource_instrumentation_library_logs, + ResourceLogs, + "logs", + ) + ) + + def export(self, batch: Sequence[LogData]) -> LogExportResult: + return self._export(batch) + + def shutdown(self) -> None: + pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py new file mode 100644 index 0000000000..b9c33786e3 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -0,0 +1,474 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from concurrent.futures import ThreadPoolExecutor +from unittest import TestCase +from unittest.mock import patch + +from google.protobuf.duration_pb2 import Duration +from google.rpc.error_details_pb2 import RetryInfo +from grpc import StatusCode, server + +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, + ExportLogsServiceResponse, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( + LogsServiceServicer, + add_LogsServiceServicer_to_server, +) +from opentelemetry.proto.common.v1.common_pb2 import ( + AnyValue, + InstrumentationLibrary, + KeyValue, +) +from opentelemetry.proto.logs.v1.logs_pb2 import InstrumentationLibraryLogs +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.proto.logs.v1.logs_pb2 import ResourceLogs +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as OTLPResource, +) +from opentelemetry.sdk._logs import LogData, LogRecord +from opentelemetry.sdk._logs.export import LogExportResult +from opentelemetry.sdk._logs.severity import ( + SeverityNumber as SDKSeverityNumber, +) +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import TraceFlags + + +class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.UNAVAILABLE) + + context.send_initial_metadata( + (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) + ) + context.set_trailing_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo( + retry_delay=Duration(seconds=4) + ).SerializeToString(), + ), + ) + ) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerUNAVAILABLE(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.UNAVAILABLE) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerSUCCESS(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.OK) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.ALREADY_EXISTS) + + return ExportLogsServiceResponse() + + +class TestOTLPLogExporter(TestCase): + def setUp(self): + + self.exporter = OTLPLogExporter() + + self.server = server(ThreadPoolExecutor(max_workers=10)) + + self.server.add_insecure_port("[::]:4317") + + self.server.start() + + self.log_data_1 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986797, + span_id=5213367945872657620, + trace_flags=TraceFlags(0x01), + severity_text="WARNING", + severity_number=SDKSeverityNumber.WARN, + name="name", + body="Zhengzhou, We have a heaviest rains in 1000 years", + resource=SDKResource({"key": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + self.log_data_2 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986799, + span_id=5213367945872657623, + trace_flags=TraceFlags(0x01), + severity_text="INFO", + severity_number=SDKSeverityNumber.INFO2, + name="info name", + body="Sydney, Opera House is closed", + resource=SDKResource({"key": "value"}), + attributes={"custom_attr": [1, 2, 3]}, + ), + instrumentation_info=InstrumentationInfo( + "second_name", "second_version" + ), + ) + self.log_data_3 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986800, + span_id=5213367945872657628, + trace_flags=TraceFlags(0x01), + severity_text="ERROR", + severity_number=SDKSeverityNumber.WARN, + name="error name", + body="Mumbai, Boil water before drinking", + resource=SDKResource({"service": "myapp"}), + ), + instrumentation_info=InstrumentationInfo( + "third_name", "third_version" + ), + ) + + def tearDown(self): + self.server.stop(None) + + @patch( + "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" + ) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + @patch( + "opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._stub" + ) + # pylint: disable=unused-argument + def test_no_credentials_error( + self, mock_ssl_channel, mock_secure, mock_stub + ): + OTLPLogExporter(insecure=False) + self.assertTrue(mock_ssl_channel.called) + + # pylint: disable=no-self-use + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): + expected_endpoint = "localhost:4317" + endpoints = [ + ( + "http://localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + False, + mock_secure, + ), + ( + "https://localhost:4317", + None, + mock_secure, + ), + ( + "https://localhost:4317", + True, + mock_insecure, + ), + ] + # pylint: disable=C0209 + for endpoint, insecure, mock_method in endpoints: + OTLPLogExporter(endpoint=endpoint, insecure=insecure) + self.assertEqual( + 1, + mock_method.call_count, + "expected {} to be called for {} {}".format( + mock_method, endpoint, insecure + ), + ) + self.assertEqual( + expected_endpoint, + mock_method.call_args[0][0], + "expected {} got {} {}".format( + expected_endpoint, mock_method.call_args[0][0], endpoint + ), + ) + mock_method.reset_mock() + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable(self, mock_sleep, mock_expo): + + mock_expo.configure_mock(**{"return_value": [1]}) + + add_LogsServiceServicer_to_server( + LogsServiceServicerUNAVAILABLE(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + mock_sleep.assert_called_with(1) + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable_delay(self, mock_sleep, mock_expo): + + mock_expo.configure_mock(**{"return_value": [1]}) + + add_LogsServiceServicer_to_server( + LogsServiceServicerUNAVAILABLEDelay(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + mock_sleep.assert_called_with(4) + + def test_success(self): + add_LogsServiceServicer_to_server( + LogsServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS + ) + + def test_failure(self): + add_LogsServiceServicer_to_server( + LogsServiceServicerALREADY_EXISTS(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + + def test_translate_log_data(self): + + expected = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="key", value=AnyValue(string_value="value") + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="first_name", version="first_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="name", + time_unix_nano=self.log_data_1.log_record.timestamp, + severity_number=self.log_data_1.log_record.severity_number.value, + severity_text="WARNING", + span_id=int.to_bytes( + 5213367945872657620, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986797, + 16, + "big", + ), + body=_translate_value( + "Zhengzhou, We have a heaviest rains in 1000 years" + ), + attributes=[ + KeyValue( + key="a", + value=AnyValue(int_value=1), + ), + KeyValue( + key="b", + value=AnyValue(string_value="c"), + ), + ], + flags=int( + self.log_data_1.log_record.trace_flags + ), + ) + ], + ) + ], + ), + ] + ) + + # pylint: disable=protected-access + self.assertEqual( + expected, self.exporter._translate_data([self.log_data_1]) + ) + + def test_translate_multiple_logs(self): + expected = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="key", value=AnyValue(string_value="value") + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="first_name", version="first_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="name", + time_unix_nano=self.log_data_1.log_record.timestamp, + severity_number=self.log_data_1.log_record.severity_number.value, + severity_text="WARNING", + span_id=int.to_bytes( + 5213367945872657620, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986797, + 16, + "big", + ), + body=_translate_value( + "Zhengzhou, We have a heaviest rains in 1000 years" + ), + attributes=[ + KeyValue( + key="a", + value=AnyValue(int_value=1), + ), + KeyValue( + key="b", + value=AnyValue(string_value="c"), + ), + ], + flags=int( + self.log_data_1.log_record.trace_flags + ), + ) + ], + ), + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="second_name", version="second_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="info name", + time_unix_nano=self.log_data_2.log_record.timestamp, + severity_number=self.log_data_2.log_record.severity_number.value, + severity_text="INFO", + span_id=int.to_bytes( + 5213367945872657623, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986799, + 16, + "big", + ), + body=_translate_value( + "Sydney, Opera House is closed" + ), + attributes=[ + KeyValue( + key="custom_attr", + value=_translate_value([1, 2, 3]), + ), + ], + flags=int( + self.log_data_2.log_record.trace_flags + ), + ) + ], + ), + ], + ), + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="service", + value=AnyValue(string_value="myapp"), + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="third_name", version="third_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="error name", + time_unix_nano=self.log_data_3.log_record.timestamp, + severity_number=self.log_data_3.log_record.severity_number.value, + severity_text="ERROR", + span_id=int.to_bytes( + 5213367945872657628, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986800, + 16, + "big", + ), + body=_translate_value( + "Mumbai, Boil water before drinking" + ), + attributes=[], + flags=int( + self.log_data_3.log_record.trace_flags + ), + ) + ], + ) + ], + ), + ] + ) + + # pylint: disable=protected-access + self.assertEqual( + expected, + self.exporter._translate_data( + [self.log_data_1, self.log_data_2, self.log_data_3] + ), + ) diff --git a/opentelemetry-sdk/setup.cfg b/opentelemetry-sdk/setup.cfg index 1996249573..158c8a198c 100644 --- a/opentelemetry-sdk/setup.cfg +++ b/opentelemetry-sdk/setup.cfg @@ -54,6 +54,8 @@ opentelemetry_tracer_provider = sdk_tracer_provider = opentelemetry.sdk.trace:TracerProvider opentelemetry_traces_exporter = console = opentelemetry.sdk.trace.export:ConsoleSpanExporter +opentelemetry_log_emitter_provider = + sdk_log_emitter_provider = opentelemetry.sdk._logs:LogEmitterProvider opentelemetry_id_generator = random = opentelemetry.sdk.trace.id_generator:RandomIdGenerator opentelemetry_environment_variables = diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py new file mode 100644 index 0000000000..6da162ed0f --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py @@ -0,0 +1,500 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import atexit +import concurrent.futures +import json +import logging +import os +import threading +from typing import Any, Callable, Optional, Tuple, Union, cast + +from opentelemetry.sdk._logs.severity import SeverityNumber, std_to_otlp +from opentelemetry.sdk.environment_variables import ( + _OTEL_PYTHON_LOG_EMITTER_PROVIDER, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util import ns_to_iso_str +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import ( + format_span_id, + format_trace_id, + get_current_span, +) +from opentelemetry.trace.span import TraceFlags +from opentelemetry.util._providers import _load_provider +from opentelemetry.util._time import _time_ns +from opentelemetry.util.types import Attributes + +_logger = logging.getLogger(__name__) + + +class LogRecord: + """A LogRecord instance represents an event being logged. + + LogRecord instances are created and emitted via `LogEmitter` + every time something is logged. They contain all the information + pertinent to the event being logged. + """ + + def __init__( + self, + timestamp: Optional[int] = None, + trace_id: Optional[int] = None, + span_id: Optional[int] = None, + trace_flags: Optional[TraceFlags] = None, + severity_text: Optional[str] = None, + severity_number: Optional[SeverityNumber] = None, + name: Optional[str] = None, + body: Optional[Any] = None, + resource: Optional[Resource] = None, + attributes: Optional[Attributes] = None, + ): + self.timestamp = timestamp + self.trace_id = trace_id + self.span_id = span_id + self.trace_flags = trace_flags + self.severity_text = severity_text + self.severity_number = severity_number + self.name = name + self.body = body + self.resource = resource + self.attributes = attributes + + def __eq__(self, other: object) -> bool: + if not isinstance(other, LogRecord): + return NotImplemented + return self.__dict__ == other.__dict__ + + def to_json(self) -> str: + return json.dumps( + { + "body": self.body, + "name": self.name, + "severity_number": repr(self.severity_number), + "severity_text": self.severity_text, + "attributes": self.attributes, + "timestamp": ns_to_iso_str(self.timestamp), + "trace_id": f"0x{format_trace_id(self.trace_id)}", + "span_id": f"0x{format_span_id(self.span_id)}", + "trace_flags": self.trace_flags, + "resource": repr(self.resource.attributes) + if self.resource + else "", + }, + indent=4, + ) + + +class LogData: + """Readable LogRecord data plus associated InstrumentationLibrary.""" + + def __init__( + self, + log_record: LogRecord, + instrumentation_info: InstrumentationInfo, + ): + self.log_record = log_record + self.instrumentation_info = instrumentation_info + + +class LogProcessor(abc.ABC): + """Interface to hook the log record emitting action. + + Log processors can be registered directly using + :func:`LogEmitterProvider.add_log_processor` and they are invoked + in the same order as they were registered. + """ + + @abc.abstractmethod + def emit(self, log_data: LogData): + """Emits the `LogData`""" + + @abc.abstractmethod + def shutdown(self): + """Called when a :class:`opentelemetry.sdk._logs.LogEmitter` is shutdown""" + + @abc.abstractmethod + def force_flush(self, timeout_millis: int = 30000): + """Export all the received logs to the configured Exporter that have not yet + been exported. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + False if the timeout is exceeded, True otherwise. + """ + + +# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved +# pylint:disable=no-member +class SynchronousMultiLogProcessor(LogProcessor): + """Implementation of class:`LogProcessor` that forwards all received + events to a list of log processors sequentially. + + The underlying log processors are called in sequential order as they were + added. + """ + + def __init__(self): + # use a tuple to avoid race conditions when adding a new log and + # iterating through it on "emit". + self._log_processors = () # type: Tuple[LogProcessor, ...] + self._lock = threading.Lock() + + def add_log_processor(self, log_processor: LogProcessor) -> None: + """Adds a Logprocessor to the list of log processors handled by this instance""" + with self._lock: + self._log_processors = self._log_processors + (log_processor,) + + def emit(self, log_data: LogData) -> None: + for lp in self._log_processors: + lp.emit(log_data) + + def shutdown(self) -> None: + """Shutdown the log processors one by one""" + for lp in self._log_processors: + lp.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors one by one + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. If the first n log processors exceeded the timeout + then remaining log processors will not be flushed. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + deadline_ns = _time_ns() + timeout_millis * 1000000 + for lp in self._log_processors: + current_ts = _time_ns() + if current_ts >= deadline_ns: + return False + + if not lp.force_flush((deadline_ns - current_ts) // 1000000): + return False + + return True + + +class ConcurrentMultiLogProcessor(LogProcessor): + """Implementation of :class:`LogProcessor` that forwards all received + events to a list of log processors in parallel. + + Calls to the underlying log processors are forwarded in parallel by + submitting them to a thread pool executor and waiting until each log + processor finished its work. + + Args: + max_workers: The number of threads managed by the thread pool executor + and thus defining how many log processors can work in parallel. + """ + + def __init__(self, max_workers: int = 2): + # use a tuple to avoid race conditions when adding a new log and + # iterating through it on "emit". + self._log_processors = () # type: Tuple[LogProcessor, ...] + self._lock = threading.Lock() + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) + + def add_log_processor(self, log_processor: LogProcessor): + with self._lock: + self._log_processors = self._log_processors + (log_processor,) + + def _submit_and_wait( + self, + func: Callable[[LogProcessor], Callable[..., None]], + *args: Any, + **kwargs: Any, + ): + futures = [] + for lp in self._log_processors: + future = self._executor.submit(func(lp), *args, **kwargs) + futures.append(future) + for future in futures: + future.result() + + def emit(self, log_data: LogData): + self._submit_and_wait(lambda lp: lp.emit, log_data) + + def shutdown(self): + self._submit_and_wait(lambda lp: lp.shutdown) + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors in parallel. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + futures = [] + for lp in self._log_processors: + future = self._executor.submit(lp.force_flush, timeout_millis) + futures.append(future) + + done_futures, not_done_futures = concurrent.futures.wait( + futures, timeout_millis / 1e3 + ) + + if not_done_futures: + return False + + for future in done_futures: + if not future.result(): + return False + + return True + + +# skip natural LogRecord attributes +# http://docs.python.org/library/logging.html#logrecord-attributes +_RESERVED_ATTRS = frozenset( + ( + "asctime", + "args", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "getMessage", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + ) +) + + +class OTLPHandler(logging.Handler): + """A handler class which writes logging records, in OTLP format, to + a network destination or file. + """ + + def __init__( + self, + level=logging.NOTSET, + log_emitter=None, + ) -> None: + super().__init__(level=level) + self._log_emitter = log_emitter or get_log_emitter(__name__) + + @staticmethod + def _get_attributes(record: logging.LogRecord) -> Attributes: + return { + k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS + } + + def _translate(self, record: logging.LogRecord) -> LogRecord: + timestamp = int(record.created * 1e9) + span_context = get_current_span().get_span_context() + attributes = self._get_attributes(record) + severity_number = std_to_otlp(record.levelno) + return LogRecord( + timestamp=timestamp, + trace_id=span_context.trace_id, + span_id=span_context.span_id, + trace_flags=span_context.trace_flags, + severity_text=record.levelname, + severity_number=severity_number, + body=record.getMessage(), + resource=self._log_emitter.resource, + attributes=attributes, + ) + + def emit(self, record: logging.LogRecord) -> None: + """ + Emit a record. + + The record is translated to OTLP format, and then sent across the pipeline. + """ + self._log_emitter.emit(self._translate(record)) + + def flush(self) -> None: + """ + Flushes the logging output. + """ + self._log_emitter.flush() + + +class LogEmitter: + def __init__( + self, + resource: Resource, + multi_log_processor: Union[ + SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor + ], + instrumentation_info: InstrumentationInfo, + ): + self._resource = resource + self._multi_log_processor = multi_log_processor + self._instrumentation_info = instrumentation_info + + @property + def resource(self): + return self._resource + + def emit(self, record: LogRecord): + """Emits the :class:`LogData` by associating :class:`LogRecord` + and instrumentation info. + """ + log_data = LogData(record, self._instrumentation_info) + self._multi_log_processor.emit(log_data) + + # TODO: Should this flush everything in pipeline? + # Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290 + def flush(self): + """Ensure all logging output has been flushed.""" + self._multi_log_processor.force_flush() + + +class LogEmitterProvider: + def __init__( + self, + resource: Resource = Resource.create(), + shutdown_on_exit: bool = True, + multi_log_processor: Union[ + SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor + ] = None, + ): + self._resource = resource + self._multi_log_processor = ( + multi_log_processor or SynchronousMultiLogProcessor() + ) + self._at_exit_handler = None + if shutdown_on_exit: + self._at_exit_handler = atexit.register(self.shutdown) + + @property + def resource(self): + return self._resource + + def get_log_emitter( + self, + instrumenting_module_name: str, + instrumenting_module_verison: str = "", + ) -> LogEmitter: + return LogEmitter( + self._resource, + self._multi_log_processor, + InstrumentationInfo( + instrumenting_module_name, instrumenting_module_verison + ), + ) + + def add_log_processor(self, log_processor: LogProcessor): + """Registers a new :class:`LogProcessor` for this `LogEmitterProvider` instance. + + The log processors are invoked in the same order they are registered. + """ + self._multi_log_processor.add_log_processor(log_processor) + + def shutdown(self): + """Shuts down the log processors.""" + self._multi_log_processor.shutdown() + if self._at_exit_handler is not None: + atexit.unregister(self._at_exit_handler) + self._at_exit_handler = None + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + return self._multi_log_processor.force_flush(timeout_millis) + + +_LOG_EMITTER_PROVIDER = None + + +def get_log_emitter_provider() -> LogEmitterProvider: + """Gets the current global :class:`~.LogEmitterProvider` object.""" + global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement + if _LOG_EMITTER_PROVIDER is None: + if _OTEL_PYTHON_LOG_EMITTER_PROVIDER not in os.environ: + _LOG_EMITTER_PROVIDER = LogEmitterProvider() + return _LOG_EMITTER_PROVIDER + + _LOG_EMITTER_PROVIDER = cast( + "LogEmitterProvider", + _load_provider( + _OTEL_PYTHON_LOG_EMITTER_PROVIDER, "log_emitter_provider" + ), + ) + + return _LOG_EMITTER_PROVIDER + + +def set_log_emitter_provider(log_emitter_provider: LogEmitterProvider) -> None: + """Sets the current global :class:`~.LogEmitterProvider` object. + + This can only be done once, a warning will be logged if any furter attempt + is made. + """ + global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement + + if _LOG_EMITTER_PROVIDER is not None: + _logger.warning( + "Overriding of current LogEmitterProvider is not allowed" + ) + return + + _LOG_EMITTER_PROVIDER = log_emitter_provider + + +def get_log_emitter( + instrumenting_module_name: str, + instrumenting_library_version: str = "", + log_emitter_provider: Optional[LogEmitterProvider] = None, +) -> LogEmitter: + """Returns a `LogEmitter` for use within a python process. + + This function is a convenience wrapper for + opentelemetry.sdk._logs.LogEmitterProvider.get_log_emitter. + + If log_emitter_provider param is omitted the current configured one is used. + """ + if log_emitter_provider is None: + log_emitter_provider = get_log_emitter_provider() + return log_emitter_provider.get_log_emitter( + instrumenting_module_name, instrumenting_library_version + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py new file mode 100644 index 0000000000..f65c967534 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py @@ -0,0 +1,301 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import collections +import enum +import logging +import sys +import threading +from os import linesep +from typing import IO, Callable, Deque, List, Optional, Sequence + +from opentelemetry.context import attach, detach, set_value +from opentelemetry.sdk._logs import LogData, LogProcessor, LogRecord +from opentelemetry.util._time import _time_ns + +_logger = logging.getLogger(__name__) + + +class LogExportResult(enum.Enum): + SUCCESS = 0 + FAILURE = 1 + + +class LogExporter(abc.ABC): + """Interface for exporting logs. + + Interface to be implemented by services that want to export logs received + in their own format. + + To export data this MUST be registered to the :class`opentelemetry.sdk._logs.LogEmitter` using a + log processor. + """ + + @abc.abstractmethod + def export(self, batch: Sequence[LogData]): + """Exports a batch of logs. + + Args: + batch: The list of `LogData` objects to be exported + + Returns: + The result of the export + """ + + @abc.abstractmethod + def shutdown(self): + """Shuts down the exporter. + + Called when the SDK is shut down. + """ + + +class ConsoleExporter(LogExporter): + """Implementation of :class:`LogExporter` that prints log records to the + console. + + This class can be used for diagnostic purposes. It prints the exported + log records to the console STDOUT. + """ + + def __init__( + self, + out: IO = sys.stdout, + formatter: Callable[[LogRecord], str] = lambda record: record.to_json() + + linesep, + ): + self.out = out + self.formatter = formatter + + def export(self, batch: Sequence[LogData]): + for data in batch: + self.out.write(self.formatter(data.log_record)) + self.out.flush() + return LogExportResult.SUCCESS + + def shutdown(self): + pass + + +class SimpleLogProcessor(LogProcessor): + """This is an implementation of LogProcessor which passes + received logs in the export-friendly LogData representation to the + configured LogExporter, as soon as they are emitted. + """ + + def __init__(self, exporter: LogExporter): + self._exporter = exporter + self._shutdown = False + + def emit(self, log_data: LogData): + if self._shutdown: + _logger.warning("Processor is already shutdown, ignoring call") + return + token = attach(set_value("suppress_instrumentation", True)) + try: + self._exporter.export((log_data,)) + except Exception: # pylint: disable=broad-except + _logger.exception("Exception while exporting logs.") + detach(token) + + def shutdown(self): + self._shutdown = True + self._exporter.shutdown() + + def force_flush( + self, timeout_millis: int = 30000 + ) -> bool: # pylint: disable=no-self-use + return True + + +class _FlushRequest: + __slots__ = ["event", "num_log_records"] + + def __init__(self): + self.event = threading.Event() + self.num_log_records = 0 + + +class BatchLogProcessor(LogProcessor): + """This is an implementation of LogProcessor which creates batches of + received logs in the export-friendly LogData representation and + send to the configured LogExporter, as soon as they are emitted. + """ + + def __init__( + self, + exporter: LogExporter, + schedule_delay_millis: int = 5000, + max_export_batch_size: int = 512, + export_timeout_millis: int = 30000, + ): + self._exporter = exporter + self._schedule_delay_millis = schedule_delay_millis + self._max_export_batch_size = max_export_batch_size + self._export_timeout_millis = export_timeout_millis + self._queue = collections.deque() # type: Deque[LogData] + self._worker_thread = threading.Thread(target=self.worker, daemon=True) + self._condition = threading.Condition(threading.Lock()) + self._shutdown = False + self._flush_request = None # type: Optional[_FlushRequest] + self._log_records = [ + None + ] * self._max_export_batch_size # type: List[Optional[LogData]] + self._worker_thread.start() + + def worker(self): + timeout = self._schedule_delay_millis / 1e3 + flush_request = None # type: Optional[_FlushRequest] + while not self._shutdown: + with self._condition: + if self._shutdown: + # shutdown may have been called, avoid further processing + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self._queue) < self._max_export_batch_size + and self._flush_request is None + ): + self._condition.wait(timeout) + + flush_request = self._get_and_unset_flush_request() + if not self._queue: + timeout = self._schedule_delay_millis / 1e3 + self._notify_flush_request_finished(flush_request) + flush_request = None + continue + if self._shutdown: + break + + start_ns = _time_ns() + self._export(flush_request) + end_ns = _time_ns() + # subtract the duration of this export call to the next timeout + timeout = self._schedule_delay_millis / 1e3 - ( + (end_ns - start_ns) / 1e9 + ) + + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self._condition: + shutdown_flush_request = self._get_and_unset_flush_request() + + # flush the remaining logs + self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _export(self, flush_request: Optional[_FlushRequest] = None): + """Exports logs considering the given flush_request. + + If flush_request is not None then logs are exported in batches + until the number of exported logs reached or exceeded the num of logs in + flush_request, otherwise exports at max max_export_batch_size logs. + """ + if flush_request is None: + self._export_batch() + return + + num_log_records = flush_request.num_log_records + while self._queue: + exported = self._export_batch() + num_log_records -= exported + + if num_log_records <= 0: + break + + def _export_batch(self) -> int: + """Exports at most max_export_batch_size logs and returns the number of + exported logs. + """ + idx = 0 + while idx < self._max_export_batch_size and self._queue: + record = self._queue.pop() + self._log_records[idx] = record + idx += 1 + token = attach(set_value("suppress_instrumentation", True)) + try: + self._exporter.export(self._log_records[:idx]) # type: ignore + except Exception: # pylint: disable=broad-except + _logger.exception("Exception while exporting logs.") + detach(token) + + for index in range(idx): + self._log_records[index] = None + return idx + + def _drain_queue(self): + """Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self._queue: + self._export_batch() + + def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_log_records = len(self._queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: Optional[_FlushRequest] = None, + ): + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def emit(self, log_data: LogData) -> None: + """Adds the `LogData` to queue and notifies the waiting threads + when size of queue reaches max_export_batch_size. + """ + if self._shutdown: + return + self._queue.appendleft(log_data) + if len(self._queue) >= self._max_export_batch_size: + with self._condition: + self._condition.notify() + + def shutdown(self): + self._shutdown = True + with self._condition: + self._condition.notify_all() + self._worker_thread.join() + self._exporter.shutdown() + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + if timeout_millis is None: + timeout_millis = self._export_timeout_millis + if self._shutdown: + return True + + with self._condition: + flush_request = self._get_or_create_flush_request() + self._condition.notify_all() + + ret = flush_request.event.wait(timeout_millis / 1e3) + if not ret: + _logger.warning("Timeout was exceeded in force_flush().") + return ret diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py new file mode 100644 index 0000000000..68cb6b7389 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py @@ -0,0 +1,51 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult + + +class InMemoryLogExporter(LogExporter): + """Implementation of :class:`.LogExporter` that stores logs in memory. + + This class can be used for testing purposes. It stores the exported logs + in a list in memory that can be retrieved using the + :func:`.get_finished_logs` method. + """ + + def __init__(self): + self._logs = [] + self._lock = threading.Lock() + self._stopped = False + + def clear(self) -> None: + with self._lock: + self._logs.clear() + + def get_finished_logs(self) -> typing.Tuple[LogData, ...]: + with self._lock: + return tuple(self._logs) + + def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: + if self._stopped: + return LogExportResult.FAILURE + with self._lock: + self._logs.extend(batch) + return LogExportResult.SUCCESS + + def shutdown(self) -> None: + self._stopped = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py new file mode 100644 index 0000000000..2570375990 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py @@ -0,0 +1,115 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum + + +class SeverityNumber(enum.Enum): + """Numerical value of severity. + + Smaller numerical values correspond to less severe events + (such as debug events), larger numerical values correspond + to more severe events (such as errors and critical events). + + See the `Log Data Model`_ spec for more info and how to map the + severity from source format to OTLP Model. + + .. _Log Data Model: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber + """ + + UNSPECIFIED = 0 + TRACE = 1 + TRACE2 = 2 + TRACE3 = 3 + TRACE4 = 4 + DEBUG = 5 + DEBUG2 = 6 + DEBUG3 = 7 + DEBUG4 = 8 + INFO = 9 + INFO2 = 10 + INFO3 = 11 + INFO4 = 12 + WARN = 13 + WARN2 = 14 + WARN3 = 15 + WARN4 = 16 + ERROR = 17 + ERROR2 = 18 + ERROR3 = 19 + ERROR4 = 20 + FATAL = 21 + FATAL2 = 22 + FATAL3 = 23 + FATAL4 = 24 + + +_STD_TO_OTLP = { + 10: SeverityNumber.DEBUG, + 11: SeverityNumber.DEBUG2, + 12: SeverityNumber.DEBUG3, + 13: SeverityNumber.DEBUG4, + 14: SeverityNumber.DEBUG4, + 15: SeverityNumber.DEBUG4, + 16: SeverityNumber.DEBUG4, + 17: SeverityNumber.DEBUG4, + 18: SeverityNumber.DEBUG4, + 19: SeverityNumber.DEBUG4, + 20: SeverityNumber.INFO, + 21: SeverityNumber.INFO2, + 22: SeverityNumber.INFO3, + 23: SeverityNumber.INFO4, + 24: SeverityNumber.INFO4, + 25: SeverityNumber.INFO4, + 26: SeverityNumber.INFO4, + 27: SeverityNumber.INFO4, + 28: SeverityNumber.INFO4, + 29: SeverityNumber.INFO4, + 30: SeverityNumber.WARN, + 31: SeverityNumber.WARN2, + 32: SeverityNumber.WARN3, + 33: SeverityNumber.WARN4, + 34: SeverityNumber.WARN4, + 35: SeverityNumber.WARN4, + 36: SeverityNumber.WARN4, + 37: SeverityNumber.WARN4, + 38: SeverityNumber.WARN4, + 39: SeverityNumber.WARN4, + 40: SeverityNumber.ERROR, + 41: SeverityNumber.ERROR2, + 42: SeverityNumber.ERROR3, + 43: SeverityNumber.ERROR4, + 44: SeverityNumber.ERROR4, + 45: SeverityNumber.ERROR4, + 46: SeverityNumber.ERROR4, + 47: SeverityNumber.ERROR4, + 48: SeverityNumber.ERROR4, + 49: SeverityNumber.ERROR4, + 50: SeverityNumber.FATAL, + 51: SeverityNumber.FATAL2, + 52: SeverityNumber.FATAL3, + 53: SeverityNumber.FATAL4, +} + + +def std_to_otlp(levelno: int) -> SeverityNumber: + """ + Map python log levelno as defined in https://docs.python.org/3/library/logging.html#logging-levels + to OTLP log severity number. + """ + if levelno < 10: + return SeverityNumber.UNSPECIFIED + if levelno > 53: + return SeverityNumber.FATAL4 + return _STD_TO_OTLP[levelno] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index 8b3d4abbf8..e9d35092a6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -369,3 +369,12 @@ If both are set, :envvar:`OTEL_SERVICE_NAME` takes precedence. """ + +_OTEL_PYTHON_LOG_EMITTER_PROVIDER = "OTEL_PYTHON_LOG_EMITTER_PROVIDER" +""" +.. envvar:: OTEL_PYTHON_LOG_EMITTER_PROVIDER + +The :envvar:`OTEL_PYTHON_LOG_EMITTER_PROVIDER` environment variable allows users to +provide the entry point for loading the log emitter provider. If not specified, SDK +LogEmitterProvider is used. +""" diff --git a/opentelemetry-sdk/tests/logs/__init__.py b/opentelemetry-sdk/tests/logs/__init__.py new file mode 100644 index 0000000000..b0a6f42841 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py new file mode 100644 index 0000000000..964b44f769 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -0,0 +1,322 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access +import logging +import os +import time +import unittest +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import Mock, patch + +from opentelemetry.sdk import trace +from opentelemetry.sdk._logs import ( + LogData, + LogEmitterProvider, + LogRecord, + OTLPHandler, +) +from opentelemetry.sdk._logs.export import ( + BatchLogProcessor, + ConsoleExporter, + SimpleLogProcessor, +) +from opentelemetry.sdk._logs.export.in_memory_log_exporter import ( + InMemoryLogExporter, +) +from opentelemetry.sdk._logs.severity import SeverityNumber +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import TraceFlags +from opentelemetry.trace.span import INVALID_SPAN_CONTEXT + + +class TestSimpleLogProcessor(unittest.TestCase): + def test_simple_log_processor_default_level(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter(__name__) + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("default_level") + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Something is wrong") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + warning_log_record = finished_logs[0].log_record + self.assertEqual(warning_log_record.body, "Something is wrong") + self.assertEqual(warning_log_record.severity_text, "WARNING") + self.assertEqual( + warning_log_record.severity_number, SeverityNumber.WARN + ) + + def test_simple_log_processor_custom_level(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter(__name__) + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("custom_level") + logger.setLevel(logging.ERROR) + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Warning message") + logger.debug("Debug message") + logger.error("Error message") + logger.critical("Critical message") + finished_logs = exporter.get_finished_logs() + # Make sure only level >= logging.CRITICAL logs are recorded + self.assertEqual(len(finished_logs), 2) + critical_log_record = finished_logs[0].log_record + fatal_log_record = finished_logs[1].log_record + self.assertEqual(critical_log_record.body, "Error message") + self.assertEqual(critical_log_record.severity_text, "ERROR") + self.assertEqual( + critical_log_record.severity_number, SeverityNumber.ERROR + ) + self.assertEqual(fatal_log_record.body, "Critical message") + self.assertEqual(fatal_log_record.severity_text, "CRITICAL") + self.assertEqual( + fatal_log_record.severity_number, SeverityNumber.FATAL + ) + + def test_simple_log_processor_trace_correlation(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter("name", "version") + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("trace_correlation") + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Warning message") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + log_record = finished_logs[0].log_record + self.assertEqual(log_record.body, "Warning message") + self.assertEqual(log_record.severity_text, "WARNING") + self.assertEqual(log_record.severity_number, SeverityNumber.WARN) + self.assertEqual(log_record.trace_id, INVALID_SPAN_CONTEXT.trace_id) + self.assertEqual(log_record.span_id, INVALID_SPAN_CONTEXT.span_id) + self.assertEqual( + log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags + ) + exporter.clear() + + tracer = trace.TracerProvider().get_tracer(__name__) + with tracer.start_as_current_span("test") as span: + logger.critical("Critical message within span") + + finished_logs = exporter.get_finished_logs() + log_record = finished_logs[0].log_record + self.assertEqual(log_record.body, "Critical message within span") + self.assertEqual(log_record.severity_text, "CRITICAL") + self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + span_context = span.get_span_context() + self.assertEqual(log_record.trace_id, span_context.trace_id) + self.assertEqual(log_record.span_id, span_context.span_id) + self.assertEqual(log_record.trace_flags, span_context.trace_flags) + + def test_simple_log_processor_shutdown(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter(__name__) + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("shutdown") + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Something is wrong") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + warning_log_record = finished_logs[0].log_record + self.assertEqual(warning_log_record.body, "Something is wrong") + self.assertEqual(warning_log_record.severity_text, "WARNING") + self.assertEqual( + warning_log_record.severity_number, SeverityNumber.WARN + ) + exporter.clear() + log_emitter_provider.shutdown() + logger.warning("Log after shutdown") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 0) + + +class TestBatchLogProcessor(unittest.TestCase): + def test_emit_call_log_record(self): + exporter = InMemoryLogExporter() + log_processor = Mock(wraps=BatchLogProcessor(exporter)) + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("emit_call") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.error("error") + self.assertEqual(log_processor.emit.call_count, 1) + + def test_shutdown(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("shutdown") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.warning("warning message: %s", "possible upcoming heatwave") + logger.error("Very high rise in temperatures across the globe") + logger.critical("Temparature hits high 420 C in Hyderabad") + + log_processor.shutdown() + self.assertTrue(exporter._stopped) + + finished_logs = exporter.get_finished_logs() + expected = [ + ("warning message: possible upcoming heatwave", "WARNING"), + ("Very high rise in temperatures across the globe", "ERROR"), + ( + "Temparature hits high 420 C in Hyderabad", + "CRITICAL", + ), + ] + emitted = [ + (item.log_record.body, item.log_record.severity_text) + for item in finished_logs + ] + self.assertEqual(expected, emitted) + + def test_force_flush(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("force_flush") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.critical("Earth is burning") + log_processor.force_flush() + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + log_record = finished_logs[0].log_record + self.assertEqual(log_record.body, "Earth is burning") + self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + + def test_log_processor_too_many_logs(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("many_logs") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + for log_no in range(1000): + logger.critical("Log no: %s", log_no) + + self.assertTrue(log_processor.force_flush()) + finised_logs = exporter.get_finished_logs() + self.assertEqual(len(finised_logs), 1000) + + def test_with_multiple_threads(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("threads") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + def bulk_log_and_flush(num_logs): + for _ in range(num_logs): + logger.critical("Critical message") + self.assertTrue(log_processor.force_flush()) + + with ThreadPoolExecutor(max_workers=69) as executor: + futures = [] + for idx in range(69): + future = executor.submit(bulk_log_and_flush, idx + 1) + futures.append(future) + + executor.shutdown() + + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 2415) + + +class TestConsoleExporter(unittest.TestCase): + def test_export(self): # pylint: disable=no-self-use + """Check that the console exporter prints log records.""" + log_data = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986797, + span_id=5213367945872657620, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SeverityNumber.WARN, + name="name", + body="Zhengzhou, We have a heaviest rains in 1000 years", + resource=SDKResource({"key": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + exporter = ConsoleExporter() + # Mocking stdout interferes with debugging and test reporting, mock on + # the exporter instance instead. + + with patch.object(exporter, "out") as mock_stdout: + exporter.export([log_data]) + mock_stdout.write.assert_called_once_with( + log_data.log_record.to_json() + os.linesep + ) + + self.assertEqual(mock_stdout.write.call_count, 1) + self.assertEqual(mock_stdout.flush.call_count, 1) + + def test_export_custom(self): # pylint: disable=no-self-use + """Check that console exporter uses custom io, formatter.""" + mock_record_str = Mock(str) + + def formatter(record): # pylint: disable=unused-argument + return mock_record_str + + mock_stdout = Mock() + exporter = ConsoleExporter(out=mock_stdout, formatter=formatter) + log_data = LogData( + log_record=LogRecord(), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + exporter.export([log_data]) + mock_stdout.write.assert_called_once_with(mock_record_str) diff --git a/opentelemetry-sdk/tests/logs/test_global_provider.py b/opentelemetry-sdk/tests/logs/test_global_provider.py new file mode 100644 index 0000000000..7a249defcf --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_global_provider.py @@ -0,0 +1,75 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# type:ignore +import unittest +from importlib import reload +from logging import WARNING +from unittest.mock import patch + +from opentelemetry.sdk import _logs +from opentelemetry.sdk._logs import ( + LogEmitterProvider, + get_log_emitter_provider, + set_log_emitter_provider, +) +from opentelemetry.sdk.environment_variables import ( + _OTEL_PYTHON_LOG_EMITTER_PROVIDER, +) + + +class TestGlobals(unittest.TestCase): + def tearDown(self): + reload(_logs) + + def check_override_not_allowed(self): + """set_log_emitter_provider should throw a warning when overridden""" + provider = get_log_emitter_provider() + with self.assertLogs(level=WARNING) as test: + set_log_emitter_provider(LogEmitterProvider()) + self.assertEqual( + test.output, + [ + ( + "WARNING:opentelemetry.sdk._logs:Overriding of current " + "LogEmitterProvider is not allowed" + ) + ], + ) + self.assertIs(provider, get_log_emitter_provider()) + + def test_set_tracer_provider(self): + reload(_logs) + provider = LogEmitterProvider() + set_log_emitter_provider(provider) + retrieved_provider = get_log_emitter_provider() + self.assertEqual(provider, retrieved_provider) + + def test_tracer_provider_override_warning(self): + reload(_logs) + self.check_override_not_allowed() + + @patch.dict( + "os.environ", + {_OTEL_PYTHON_LOG_EMITTER_PROVIDER: "sdk_log_emitter_provider"}, + ) + def test_sdk_log_emitter_provider(self): + reload(_logs) + self.check_override_not_allowed() + + @patch.dict("os.environ", {_OTEL_PYTHON_LOG_EMITTER_PROVIDER: "unknown"}) + def test_unknown_log_emitter_provider(self): + reload(_logs) + with self.assertRaises(Exception): + get_log_emitter_provider() diff --git a/opentelemetry-sdk/tests/logs/test_handler.py b/opentelemetry-sdk/tests/logs/test_handler.py new file mode 100644 index 0000000000..d7942f912b --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_handler.py @@ -0,0 +1,96 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest +from unittest.mock import Mock + +from opentelemetry.sdk import trace +from opentelemetry.sdk._logs import LogEmitter, OTLPHandler +from opentelemetry.sdk._logs.severity import SeverityNumber +from opentelemetry.trace import INVALID_SPAN_CONTEXT + + +def get_logger(level=logging.NOTSET, log_emitter=None): + logger = logging.getLogger(__name__) + handler = OTLPHandler(level=level, log_emitter=log_emitter) + logger.addHandler(handler) + return logger + + +class TestOTLPHandler(unittest.TestCase): + def test_handler_default_log_level(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + # Make sure debug messages are ignored by default + logger.debug("Debug message") + self.assertEqual(emitter_mock.emit.call_count, 0) + # Assert emit gets called for warning message + logger.warning("Warning message") + self.assertEqual(emitter_mock.emit.call_count, 1) + + def test_handler_custom_log_level(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(level=logging.ERROR, log_emitter=emitter_mock) + logger.warning("Warning message test custom log level") + # Make sure any log with level < ERROR is ignored + self.assertEqual(emitter_mock.emit.call_count, 0) + logger.error("Mumbai, we have a major problem") + logger.critical("No Time For Caution") + self.assertEqual(emitter_mock.emit.call_count, 2) + + def test_log_record_no_span_context(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + # Assert emit gets called for warning message + logger.warning("Warning message") + args, _ = emitter_mock.emit.call_args_list[0] + log_record = args[0] + + self.assertIsNotNone(log_record) + self.assertEqual(log_record.trace_id, INVALID_SPAN_CONTEXT.trace_id) + self.assertEqual(log_record.span_id, INVALID_SPAN_CONTEXT.span_id) + self.assertEqual( + log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags + ) + + def test_log_record_user_attributes(self): + """Attributes can be injected into logs by adding them to the LogRecord""" + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + # Assert emit gets called for warning message + logger.warning("Warning message", extra={"http.status_code": 200}) + args, _ = emitter_mock.emit.call_args_list[0] + log_record = args[0] + + self.assertIsNotNone(log_record) + self.assertEqual(log_record.attributes, {"http.status_code": 200}) + + def test_log_record_trace_correlation(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + + tracer = trace.TracerProvider().get_tracer(__name__) + with tracer.start_as_current_span("test") as span: + logger.critical("Critical message within span") + + args, _ = emitter_mock.emit.call_args_list[0] + log_record = args[0] + self.assertEqual(log_record.body, "Critical message within span") + self.assertEqual(log_record.severity_text, "CRITICAL") + self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + span_context = span.get_span_context() + self.assertEqual(log_record.trace_id, span_context.trace_id) + self.assertEqual(log_record.span_id, span_context.span_id) + self.assertEqual(log_record.trace_flags, span_context.trace_flags) diff --git a/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py new file mode 100644 index 0000000000..e55124edcc --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py @@ -0,0 +1,194 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=protected-access,no-self-use,no-member + +import logging +import threading +import time +import unittest +from abc import ABC, abstractmethod +from unittest.mock import Mock + +from opentelemetry.sdk._logs import ( + ConcurrentMultiLogProcessor, + LogEmitterProvider, + LogProcessor, + LogRecord, + OTLPHandler, + SynchronousMultiLogProcessor, +) +from opentelemetry.sdk._logs.severity import SeverityNumber + + +class AnotherLogProcessor(LogProcessor): + def __init__(self, exporter, logs_list): + self._exporter = exporter + self._log_list = logs_list + self._closed = False + + def emit(self, log_data): + if self._closed: + return + self._log_list.append( + (log_data.log_record.body, log_data.log_record.severity_text) + ) + + def shutdown(self): + self._closed = True + self._exporter.shutdown() + + def force_flush(self, timeout_millis=30000): + self._log_list.clear() + return True + + +class TestLogProcessor(unittest.TestCase): + def test_log_processor(self): + provider = LogEmitterProvider() + log_emitter = provider.get_log_emitter(__name__) + handler = OTLPHandler(log_emitter=log_emitter) + + logs_list_1 = [] + processor1 = AnotherLogProcessor(Mock(), logs_list_1) + logs_list_2 = [] + processor2 = AnotherLogProcessor(Mock(), logs_list_2) + + logger = logging.getLogger("test.span.processor") + logger.addHandler(handler) + + # Test no proessor added + logger.critical("Odisha, we have another major cyclone") + + self.assertEqual(len(logs_list_1), 0) + self.assertEqual(len(logs_list_2), 0) + + # Add one processor + provider.add_log_processor(processor1) + logger.warning("Brace yourself") + logger.error("Some error message") + + expected_list_1 = [ + ("Brace yourself", "WARNING"), + ("Some error message", "ERROR"), + ] + self.assertEqual(logs_list_1, expected_list_1) + + # Add another processor + provider.add_log_processor(processor2) + logger.critical("Something disastrous") + expected_list_1.append(("Something disastrous", "CRITICAL")) + + expected_list_2 = [("Something disastrous", "CRITICAL")] + + self.assertEqual(logs_list_1, expected_list_1) + self.assertEqual(logs_list_2, expected_list_2) + + +class MultiLogProcessorTestBase(ABC): + @abstractmethod + def _get_multi_log_processor(self): + pass + + def make_record(self): + return LogRecord( + timestamp=1622300111608942000, + severity_text="WARNING", + severity_number=SeverityNumber.WARN, + body="Warning message", + ) + + def test_on_emit(self): + multi_log_processor = self._get_multi_log_processor() + mocks = [Mock(spec=LogProcessor) for _ in range(5)] + for mock in mocks: + multi_log_processor.add_log_processor(mock) + record = self.make_record() + multi_log_processor.emit(record) + for mock in mocks: + mock.emit.assert_called_with(record) + multi_log_processor.shutdown() + + def test_on_shutdown(self): + multi_log_processor = self._get_multi_log_processor() + mocks = [Mock(spec=LogProcessor) for _ in range(5)] + for mock in mocks: + multi_log_processor.add_log_processor(mock) + multi_log_processor.shutdown() + for mock in mocks: + mock.shutdown.assert_called_once_with() + + def test_on_force_flush(self): + multi_log_processor = self._get_multi_log_processor() + mocks = [Mock(spec=LogProcessor) for _ in range(5)] + for mock in mocks: + multi_log_processor.add_log_processor(mock) + ret_value = multi_log_processor.force_flush(100) + + self.assertTrue(ret_value) + for mock_processor in mocks: + self.assertEqual(1, mock_processor.force_flush.call_count) + + +class TestSynchronousMultiLogProcessor( + MultiLogProcessorTestBase, unittest.TestCase +): + def _get_multi_log_processor(self): + return SynchronousMultiLogProcessor() + + def test_force_flush_delayed(self): + multi_log_processor = SynchronousMultiLogProcessor() + + def delay(_): + time.sleep(0.09) + + mock_processor1 = Mock(spec=LogProcessor) + mock_processor1.force_flush = Mock(side_effect=delay) + multi_log_processor.add_log_processor(mock_processor1) + mock_processor2 = Mock(spec=LogProcessor) + multi_log_processor.add_log_processor(mock_processor2) + + ret_value = multi_log_processor.force_flush(50) + self.assertFalse(ret_value) + self.assertEqual(mock_processor1.force_flush.call_count, 1) + self.assertEqual(mock_processor2.force_flush.call_count, 0) + + +class TestConcurrentMultiLogProcessor( + MultiLogProcessorTestBase, unittest.TestCase +): + def _get_multi_log_processor(self): + return ConcurrentMultiLogProcessor() + + def test_force_flush_delayed(self): + multi_log_processor = ConcurrentMultiLogProcessor() + wait_event = threading.Event() + + def delay(_): + wait_event.wait() + + mock1 = Mock(spec=LogProcessor) + mock1.force_flush = Mock(side_effect=delay) + mocks = [Mock(LogProcessor) for _ in range(5)] + mocks = [mock1] + mocks + for mock_processor in mocks: + multi_log_processor.add_log_processor(mock_processor) + + ret_value = multi_log_processor.force_flush(50) + wait_event.set() + + self.assertFalse(ret_value) + for mock in mocks: + self.assertEqual(1, mock.force_flush.call_count) + multi_log_processor.shutdown()