From 8f0efadb668298d42b128b7e4e430fcccb3ee193 Mon Sep 17 00:00:00 2001 From: Joe McGinley <116890464+jomcgi@users.noreply.github.com> Date: Wed, 26 Feb 2025 23:13:35 +0000 Subject: [PATCH] fix: log and trace processor memory leak Update register_at_fork calls in processors to use weak references Add tests for all processors that us register_at_fork Strong references in register_at_fork persist after the processor objects are deleted. This prevents garbage collection as the reference count for the processor object never drops to 0. --- CHANGELOG.md | 2 ++ .../sdk/_logs/_internal/export/__init__.py | 4 +++- .../sdk/metrics/_internal/export/__init__.py | 2 +- .../sdk/trace/export/__init__.py | 4 +++- opentelemetry-sdk/tests/logs/test_export.py | 19 +++++++++++++++ .../test_periodic_exporting_metric_reader.py | 23 +++++++++++++++++++ .../tests/trace/export/test_export.py | 19 +++++++++++++++ 7 files changed, 70 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f97d53c20da..ededc7734b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4444](https://github.com/open-telemetry/opentelemetry-python/pull/4444)) - Updated `tracecontext-integration-test` gitref to `d782773b2cf2fa4afd6a80a93b289d8a74ca894d` ([#4448](https://github.com/open-telemetry/opentelemetry-python/pull/4448)) +- Fix memory leak in Log & Trace exporter + ([#4449](https://github.com/open-telemetry/opentelemetry-python/pull/4449)) ## Version 1.30.0/0.51b0 (2025-02-03) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index e5669580c4b..ac2da66e60f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -19,6 +19,7 @@ import os import sys import threading +import weakref from os import environ, linesep from time import time_ns from typing import IO, Callable, Deque, List, Optional, Sequence @@ -215,7 +216,8 @@ def __init__( self._log_records = [None] * self._max_export_batch_size self._worker_thread.start() if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda self._pid = os.getpid() def _at_fork_reinit(self): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 89bb8f3eb73..3099031b364 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -492,7 +492,7 @@ def __init__( weak_at_fork = weakref.WeakMethod(self._at_fork_reinit) os.register_at_fork( - after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access + after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda ) elif self._export_interval_millis <= 0: raise ValueError( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index bdfc348611e..03f64ddc679 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -18,6 +18,7 @@ import sys import threading import typing +import weakref from enum import Enum from os import environ, linesep from time import time_ns @@ -200,7 +201,8 @@ def __init__( self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start() if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda self._pid = os.getpid() def on_start( diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 2e00bad6538..b9ec0ac2e7f 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -13,11 +13,13 @@ # limitations under the License. # pylint: disable=protected-access +import gc import logging import multiprocessing import os import time import unittest +import weakref from concurrent.futures import ThreadPoolExecutor from unittest.mock import Mock, patch @@ -619,6 +621,23 @@ def _target(): log_record_processor.shutdown() + def test_batch_log_record_processor_gc(self): + # Given a BatchLogRecordProcessor + exporter = InMemoryLogExporter() + processor = BatchLogRecordProcessor(exporter) + weak_ref = weakref.ref(processor) + processor.shutdown() + + # When the processor is garbage collected + del processor + gc.collect() + + # Then the reference to the processor should no longer exist + self.assertIsNone( + weak_ref(), + "The BatchLogRecordProcessor object created by this test wasn't garbage collected", + ) + class TestConsoleLogExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 3cbc3c9fe60..8aa89e72910 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -14,7 +14,9 @@ # pylint: disable=protected-access,invalid-name,no-self-use +import gc import math +import weakref from logging import WARNING from time import sleep, time_ns from typing import Optional, Sequence @@ -257,3 +259,24 @@ def test_metric_timeout_does_not_kill_worker_thread(self): sleep(0.1) self.assertTrue(pmr._daemon_thread.is_alive()) pmr.shutdown() + + def test_metric_exporer_gc(self): + # Given a PeriodicExportingMetricReader + exporter = FakeMetricsExporter( + preferred_aggregation={ + Counter: LastValueAggregation(), + }, + ) + processor = PeriodicExportingMetricReader(exporter) + weak_ref = weakref.ref(processor) + processor.shutdown() + + # When we garbage collect the reader + del processor + gc.collect() + + # Then the reference to the reader should no longer exist + self.assertIsNone( + weak_ref(), + "The PeriodicExportingMetricReader object created by this test wasn't garbage collected", + ) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index aa94a514cad..a6d9c36875b 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gc import multiprocessing import os import threading import time import unittest +import weakref from concurrent.futures import ThreadPoolExecutor from logging import WARNING from platform import python_implementation, system @@ -585,6 +587,23 @@ def test_batch_span_processor_parameters(self): max_export_batch_size=512, ) + def test_batch_span_processor_gc(self): + # Given a BatchSpanProcessor + exporter = MySpanExporter(destination=[]) + processor = export.BatchSpanProcessor(exporter) + weak_ref = weakref.ref(processor) + processor.shutdown() + + # When the processor is garbage collected + del processor + gc.collect() + + # Then the reference to the processor should no longer exist + self.assertIsNone( + weak_ref(), + "The BatchSpanProcessor object created by this test wasn't garbage collected", + ) + class TestConsoleSpanExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use