From c36d47e17288223397c6835cf06bf5487b6a21b0 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Thu, 9 Oct 2025 17:35:57 +0530 Subject: [PATCH] feat(metrics): Add fork-safety to SynchronousMeasurementConsumer Implement post-fork reinitialization of threading locks in the metrics measurement consumer to prevent deadlocks and data duplication in forked child processes. --- .../metrics/_internal/measurement_consumer.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051a..049968b6a8e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -14,6 +14,7 @@ # pylint: disable=unused-import +import os from abc import ABC, abstractmethod from threading import Lock from time import time_ns @@ -76,8 +77,25 @@ def __init__( self._async_instruments: List[ "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] + if hasattr(os, "register_at_fork"): + os.register_at_fork( + after_in_child=self._at_fork_reinit + ) # pylint: disable=protected-access + + def _at_fork_reinit(self): + """Reinitialize lock in child process after fork""" + self._lock._at_fork_reinit() + # Lazy reinitialization of storages on first use post fork. This is + # done to avoid the overhead of reinitializing the storages on + # every fork. + self._needs_storage_reinit = True + self._async_instruments.clear() def consume_measurement(self, measurement: Measurement) -> None: + if getattr(self, '_needs_storage_reinit', False): + self._reinit_storages() + self._needs_storage_reinit = False + should_sample_exemplar = ( self._sdk_config.exemplar_filter.should_sample( measurement.value, @@ -105,6 +123,11 @@ def collect( metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, ) -> Optional[Iterable[Metric]]: + + if getattr(self, '_needs_storage_reinit', False): + self._reinit_storages() + self._needs_storage_reinit = False + with self._lock: metric_reader_storage = self._reader_storages[metric_reader] # for now, just use the defaults @@ -143,3 +166,11 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def _reinit_storages(self): + # Reinitialize the storages. Use to reinitialize the storages after a + # fork to avoid duplicate data points. + with self._lock: + for storage in self._reader_storages.values(): + storage._lock._at_fork_reinit() + storage._instrument_view_instrument_matches.clear()