diff --git a/pyproject.toml b/pyproject.toml index 6f130481..1f7d517b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ [project] name = "lmnr" -version = "0.7.18" +version = "0.7.19" description = "Python SDK for Laminar" authors = [ { name = "lmnr.ai", email = "founders@lmnr.ai" } diff --git a/src/lmnr/opentelemetry_lib/tracing/exporter.py b/src/lmnr/opentelemetry_lib/tracing/exporter.py index 555fdaa3..4966cfe1 100644 --- a/src/lmnr/opentelemetry_lib/tracing/exporter.py +++ b/src/lmnr/opentelemetry_lib/tracing/exporter.py @@ -1,5 +1,6 @@ import grpc import re +import threading from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( @@ -22,6 +23,7 @@ class LaminarSpanExporter(SpanExporter): headers: dict[str, str] timeout: float force_http: bool + _instance_lock: threading.RLock def __init__( self, @@ -31,6 +33,7 @@ def __init__( timeout_seconds: int = 30, force_http: bool = False, ): + self._instance_lock = threading.RLock() url = base_url or from_env("LMNR_BASE_URL") or "https://api.lmnr.ai" url = url.rstrip("/") if match := re.search(r":(\d{1,5})$", url): @@ -74,27 +77,45 @@ def __init__( "- set the OTEL_ENDPOINT environment variable\n" "- pass the base_url parameter to Laminar.initialize" ) + self._init_instance() + def _init_instance(self): + # Create new instance first (outside critical section for performance) if self.force_http: - self.instance = HTTPOTLPSpanExporter( + new_instance = HTTPOTLPSpanExporter( endpoint=self.endpoint, headers=self.headers, compression=HTTPCompression.Gzip, timeout=self.timeout, ) else: - self.instance = OTLPSpanExporter( + new_instance = OTLPSpanExporter( endpoint=self.endpoint, headers=self.headers, timeout=self.timeout, compression=grpc.Compression.Gzip, ) + # Atomic swap with proper cleanup + with self._instance_lock: + old_instance: OTLPSpanExporter | HTTPOTLPSpanExporter | None = getattr( + self, "instance", None + ) + if old_instance is not None: + try: + old_instance.shutdown() + except Exception as e: + logger.warning(f"Error shutting down old exporter instance: {e}") + self.instance = new_instance + def export(self, spans: list[ReadableSpan]) -> SpanExportResult: - return self.instance.export(spans) + with self._instance_lock: + return self.instance.export(spans) def shutdown(self) -> None: - return self.instance.shutdown() + with self._instance_lock: + return self.instance.shutdown() def force_flush(self, timeout_millis: int = 30000) -> bool: - return self.instance.force_flush(timeout_millis) + with self._instance_lock: + return self.instance.force_flush(timeout_millis) diff --git a/src/lmnr/opentelemetry_lib/tracing/processor.py b/src/lmnr/opentelemetry_lib/tracing/processor.py index 58df725e..862790a9 100644 --- a/src/lmnr/opentelemetry_lib/tracing/processor.py +++ b/src/lmnr/opentelemetry_lib/tracing/processor.py @@ -1,4 +1,5 @@ import logging +import threading import uuid from opentelemetry.sdk.trace.export import ( @@ -30,6 +31,8 @@ class LaminarSpanProcessor(SpanProcessor): __span_id_to_path: dict[int, list[str]] = {} __span_id_lists: dict[int, list[str]] = {} max_export_batch_size: int + _instance_lock: threading.RLock + _paths_lock: threading.RLock def __init__( self, @@ -42,6 +45,8 @@ def __init__( disable_batch: bool = False, exporter: SpanExporter | None = None, ): + self._instance_lock = threading.RLock() + self._paths_lock = threading.RLock() self.logger = get_default_logger(__name__) self.max_export_batch_size = max_export_batch_size self.exporter = exporter or LaminarSpanExporter( @@ -60,20 +65,25 @@ def __init__( ) def on_start(self, span: Span, parent_context: Context | None = None): - parent_span_path = list(span.attributes.get(PARENT_SPAN_PATH, tuple())) or ( - self.__span_id_to_path.get(span.parent.span_id) if span.parent else None - ) - parent_span_ids_path = list( - span.attributes.get(PARENT_SPAN_IDS_PATH, tuple()) - ) or (self.__span_id_lists.get(span.parent.span_id, []) if span.parent else []) - span_path = parent_span_path + [span.name] if parent_span_path else [span.name] - span_ids_path = parent_span_ids_path + [ - str(uuid.UUID(int=span.get_span_context().span_id)) - ] - span.set_attribute(SPAN_PATH, span_path) - span.set_attribute(SPAN_IDS_PATH, span_ids_path) - self.__span_id_to_path[span.get_span_context().span_id] = span_path - self.__span_id_lists[span.get_span_context().span_id] = span_ids_path + with self._paths_lock: + parent_span_path = list(span.attributes.get(PARENT_SPAN_PATH, tuple())) or ( + self.__span_id_to_path.get(span.parent.span_id) if span.parent else None + ) + parent_span_ids_path = list( + span.attributes.get(PARENT_SPAN_IDS_PATH, tuple()) + ) or ( + self.__span_id_lists.get(span.parent.span_id, []) if span.parent else [] + ) + span_path = ( + parent_span_path + [span.name] if parent_span_path else [span.name] + ) + span_ids_path = parent_span_ids_path + [ + str(uuid.UUID(int=span.get_span_context().span_id)) + ] + span.set_attribute(SPAN_PATH, span_path) + span.set_attribute(SPAN_IDS_PATH, span_ids_path) + self.__span_id_to_path[span.get_span_context().span_id] = span_path + self.__span_id_lists[span.get_span_context().span_id] = span_ids_path span.set_attribute(SPAN_INSTRUMENTATION_SOURCE, "python") span.set_attribute(SPAN_SDK_VERSION, __version__) @@ -84,13 +94,16 @@ def on_start(self, span: Span, parent_context: Context | None = None): for key, value in graph_context.items(): span.set_attribute(f"lmnr.association.properties.{key}", value) - self.instance.on_start(span, parent_context) + with self._instance_lock: + self.instance.on_start(span, parent_context) def on_end(self, span: Span): - self.instance.on_end(span) + with self._instance_lock: + self.instance.on_end(span) def force_flush(self, timeout_millis: int = 30000) -> bool: - return self.instance.force_flush(timeout_millis) + with self._instance_lock: + return self.instance.force_flush(timeout_millis) def force_reinit(self): if not isinstance(self.exporter, LaminarSpanExporter): @@ -98,23 +111,32 @@ def force_reinit(self): "LaminarSpanProcessor is not using LaminarSpanExporter, cannot force reinit" ) return - self.instance.shutdown() - disable_batch = isinstance(self.instance, SimpleSpanProcessor) - del self.exporter.instance - del self.instance + # Reinitialize exporter (thread-safe, handles its own locking) self.exporter._init_instance() - self.instance = ( - SimpleSpanProcessor(self.exporter) - if disable_batch - else BatchSpanProcessor( - self.exporter, max_export_batch_size=self.max_export_batch_size + + with self._instance_lock: + old_instance = self.instance + disable_batch = isinstance(old_instance, SimpleSpanProcessor) + + try: + old_instance.shutdown() + except Exception as e: + self.logger.debug(f"Error shutting down old processor instance: {e}") + + self.instance = ( + SimpleSpanProcessor(self.exporter) + if disable_batch + else BatchSpanProcessor( + self.exporter, max_export_batch_size=self.max_export_batch_size + ) ) - ) def shutdown(self): - self.instance.shutdown() + with self._instance_lock: + self.instance.shutdown() def clear(self): - self.__span_id_to_path = {} - self.__span_id_lists = {} + with self._paths_lock: + self.__span_id_to_path = {} + self.__span_id_lists = {} diff --git a/src/lmnr/version.py b/src/lmnr/version.py index 71936f76..077afb75 100644 --- a/src/lmnr/version.py +++ b/src/lmnr/version.py @@ -3,7 +3,7 @@ from packaging import version -__version__ = "0.7.18" +__version__ = "0.7.19" PYTHON_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}"