diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ccccc79370..56db5628ba6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4458](https://github.com/open-telemetry/opentelemetry-python/pull/4458)) - pylint-ci updated python version to 3.13 ([#4450](https://github.com/open-telemetry/opentelemetry-python/pull/4450)) +- Fix memory leak in Log & Trace exporter + ([#4449](https://github.com/open-telemetry/opentelemetry-python/pull/4449)) ## Version 1.30.0/0.51b0 (2025-02-03) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 434dc745ccf..a4eb113c89b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -20,6 +20,7 @@ import os import sys import threading +import weakref from os import environ, linesep from time import time_ns from typing import IO, Callable, Deque, List, Optional, Sequence @@ -216,7 +217,8 @@ def __init__( self._log_records = [None] * self._max_export_batch_size self._worker_thread.start() if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda self._pid = os.getpid() def _at_fork_reinit(self): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 52c683343a5..2cb587f2f65 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -501,7 +501,7 @@ def __init__( weak_at_fork = weakref.WeakMethod(self._at_fork_reinit) os.register_at_fork( - after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access + after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda ) elif self._export_interval_millis <= 0: raise ValueError( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 47d1769a418..9e60d6cff9b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -19,6 +19,7 @@ import sys import threading import typing +import weakref from enum import Enum from os import environ, linesep from time import time_ns @@ -200,7 +201,8 @@ def __init__( self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start() if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda self._pid = os.getpid() def on_start( diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 2e00bad6538..b9ec0ac2e7f 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -13,11 +13,13 @@ # limitations under the License. # pylint: disable=protected-access +import gc import logging import multiprocessing import os import time import unittest +import weakref from concurrent.futures import ThreadPoolExecutor from unittest.mock import Mock, patch @@ -619,6 +621,23 @@ def _target(): log_record_processor.shutdown() + def test_batch_log_record_processor_gc(self): + # Given a BatchLogRecordProcessor + exporter = InMemoryLogExporter() + processor = BatchLogRecordProcessor(exporter) + weak_ref = weakref.ref(processor) + processor.shutdown() + + # When the processor is garbage collected + del processor + gc.collect() + + # Then the reference to the processor should no longer exist + self.assertIsNone( + weak_ref(), + "The BatchLogRecordProcessor object created by this test wasn't garbage collected", + ) + class TestConsoleLogExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 3cbc3c9fe60..8aa89e72910 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -14,7 +14,9 @@ # pylint: disable=protected-access,invalid-name,no-self-use +import gc import math +import weakref from logging import WARNING from time import sleep, time_ns from typing import Optional, Sequence @@ -257,3 +259,24 @@ def test_metric_timeout_does_not_kill_worker_thread(self): sleep(0.1) self.assertTrue(pmr._daemon_thread.is_alive()) pmr.shutdown() + + def test_metric_exporer_gc(self): + # Given a PeriodicExportingMetricReader + exporter = FakeMetricsExporter( + preferred_aggregation={ + Counter: LastValueAggregation(), + }, + ) + processor = PeriodicExportingMetricReader(exporter) + weak_ref = weakref.ref(processor) + processor.shutdown() + + # When we garbage collect the reader + del processor + gc.collect() + + # Then the reference to the reader should no longer exist + self.assertIsNone( + weak_ref(), + "The PeriodicExportingMetricReader object created by this test wasn't garbage collected", + ) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index aa94a514cad..a6d9c36875b 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gc import multiprocessing import os import threading import time import unittest +import weakref from concurrent.futures import ThreadPoolExecutor from logging import WARNING from platform import python_implementation, system @@ -585,6 +587,23 @@ def test_batch_span_processor_parameters(self): max_export_batch_size=512, ) + def test_batch_span_processor_gc(self): + # Given a BatchSpanProcessor + exporter = MySpanExporter(destination=[]) + processor = export.BatchSpanProcessor(exporter) + weak_ref = weakref.ref(processor) + processor.shutdown() + + # When the processor is garbage collected + del processor + gc.collect() + + # Then the reference to the processor should no longer exist + self.assertIsNone( + weak_ref(), + "The BatchSpanProcessor object created by this test wasn't garbage collected", + ) + class TestConsoleSpanExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use