Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[project]
name = "lmnr"
version = "0.7.18"
version = "0.7.19"
description = "Python SDK for Laminar"
authors = [
{ name = "lmnr.ai", email = "founders@lmnr.ai" }
Expand Down
31 changes: 26 additions & 5 deletions src/lmnr/opentelemetry_lib/tracing/exporter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import grpc
import re
import threading
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
Expand All @@ -22,6 +23,7 @@ class LaminarSpanExporter(SpanExporter):
headers: dict[str, str]
timeout: float
force_http: bool
_instance_lock: threading.RLock

def __init__(
self,
Expand All @@ -31,6 +33,7 @@ def __init__(
timeout_seconds: int = 30,
force_http: bool = False,
):
self._instance_lock = threading.RLock()
url = base_url or from_env("LMNR_BASE_URL") or "https://api.lmnr.ai"
url = url.rstrip("/")
if match := re.search(r":(\d{1,5})$", url):
Expand Down Expand Up @@ -74,27 +77,45 @@ def __init__(
"- set the OTEL_ENDPOINT environment variable\n"
"- pass the base_url parameter to Laminar.initialize"
)
self._init_instance()

def _init_instance(self):
# Create new instance first (outside critical section for performance)
if self.force_http:
self.instance = HTTPOTLPSpanExporter(
new_instance = HTTPOTLPSpanExporter(
endpoint=self.endpoint,
headers=self.headers,
compression=HTTPCompression.Gzip,
timeout=self.timeout,
)
else:
self.instance = OTLPSpanExporter(
new_instance = OTLPSpanExporter(
endpoint=self.endpoint,
headers=self.headers,
timeout=self.timeout,
compression=grpc.Compression.Gzip,
)

# Atomic swap with proper cleanup
with self._instance_lock:
old_instance: OTLPSpanExporter | HTTPOTLPSpanExporter | None = getattr(
self, "instance", None
)
if old_instance is not None:
try:
old_instance.shutdown()
except Exception as e:
logger.warning(f"Error shutting down old exporter instance: {e}")
self.instance = new_instance

def export(self, spans: list[ReadableSpan]) -> SpanExportResult:
return self.instance.export(spans)
with self._instance_lock:
return self.instance.export(spans)

def shutdown(self) -> None:
return self.instance.shutdown()
with self._instance_lock:
return self.instance.shutdown()

def force_flush(self, timeout_millis: int = 30000) -> bool:
return self.instance.force_flush(timeout_millis)
with self._instance_lock:
return self.instance.force_flush(timeout_millis)
82 changes: 52 additions & 30 deletions src/lmnr/opentelemetry_lib/tracing/processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
import uuid

from opentelemetry.sdk.trace.export import (
Expand Down Expand Up @@ -30,6 +31,8 @@ class LaminarSpanProcessor(SpanProcessor):
__span_id_to_path: dict[int, list[str]] = {}
__span_id_lists: dict[int, list[str]] = {}
max_export_batch_size: int
_instance_lock: threading.RLock
_paths_lock: threading.RLock

def __init__(
self,
Expand All @@ -42,6 +45,8 @@ def __init__(
disable_batch: bool = False,
exporter: SpanExporter | None = None,
):
self._instance_lock = threading.RLock()
self._paths_lock = threading.RLock()
self.logger = get_default_logger(__name__)
self.max_export_batch_size = max_export_batch_size
self.exporter = exporter or LaminarSpanExporter(
Expand All @@ -60,20 +65,25 @@ def __init__(
)

def on_start(self, span: Span, parent_context: Context | None = None):
parent_span_path = list(span.attributes.get(PARENT_SPAN_PATH, tuple())) or (
self.__span_id_to_path.get(span.parent.span_id) if span.parent else None
)
parent_span_ids_path = list(
span.attributes.get(PARENT_SPAN_IDS_PATH, tuple())
) or (self.__span_id_lists.get(span.parent.span_id, []) if span.parent else [])
span_path = parent_span_path + [span.name] if parent_span_path else [span.name]
span_ids_path = parent_span_ids_path + [
str(uuid.UUID(int=span.get_span_context().span_id))
]
span.set_attribute(SPAN_PATH, span_path)
span.set_attribute(SPAN_IDS_PATH, span_ids_path)
self.__span_id_to_path[span.get_span_context().span_id] = span_path
self.__span_id_lists[span.get_span_context().span_id] = span_ids_path
with self._paths_lock:
parent_span_path = list(span.attributes.get(PARENT_SPAN_PATH, tuple())) or (
self.__span_id_to_path.get(span.parent.span_id) if span.parent else None
)
parent_span_ids_path = list(
span.attributes.get(PARENT_SPAN_IDS_PATH, tuple())
) or (
self.__span_id_lists.get(span.parent.span_id, []) if span.parent else []
)
span_path = (
parent_span_path + [span.name] if parent_span_path else [span.name]
)
span_ids_path = parent_span_ids_path + [
str(uuid.UUID(int=span.get_span_context().span_id))
]
span.set_attribute(SPAN_PATH, span_path)
span.set_attribute(SPAN_IDS_PATH, span_ids_path)
self.__span_id_to_path[span.get_span_context().span_id] = span_path
self.__span_id_lists[span.get_span_context().span_id] = span_ids_path

span.set_attribute(SPAN_INSTRUMENTATION_SOURCE, "python")
span.set_attribute(SPAN_SDK_VERSION, __version__)
Expand All @@ -84,37 +94,49 @@ def on_start(self, span: Span, parent_context: Context | None = None):
for key, value in graph_context.items():
span.set_attribute(f"lmnr.association.properties.{key}", value)

self.instance.on_start(span, parent_context)
with self._instance_lock:
self.instance.on_start(span, parent_context)

def on_end(self, span: Span):
self.instance.on_end(span)
with self._instance_lock:
self.instance.on_end(span)

def force_flush(self, timeout_millis: int = 30000) -> bool:
return self.instance.force_flush(timeout_millis)
with self._instance_lock:
return self.instance.force_flush(timeout_millis)

def force_reinit(self):
if not isinstance(self.exporter, LaminarSpanExporter):
self.logger.warning(
"LaminarSpanProcessor is not using LaminarSpanExporter, cannot force reinit"
)
return
self.instance.shutdown()
disable_batch = isinstance(self.instance, SimpleSpanProcessor)
del self.exporter.instance
del self.instance

# Reinitialize exporter (thread-safe, handles its own locking)
self.exporter._init_instance()
self.instance = (
SimpleSpanProcessor(self.exporter)
if disable_batch
else BatchSpanProcessor(
self.exporter, max_export_batch_size=self.max_export_batch_size

with self._instance_lock:
old_instance = self.instance
disable_batch = isinstance(old_instance, SimpleSpanProcessor)

try:
old_instance.shutdown()
except Exception as e:
self.logger.debug(f"Error shutting down old processor instance: {e}")

self.instance = (
SimpleSpanProcessor(self.exporter)
if disable_batch
else BatchSpanProcessor(
self.exporter, max_export_batch_size=self.max_export_batch_size
)
)
)

def shutdown(self):
self.instance.shutdown()
with self._instance_lock:
self.instance.shutdown()

def clear(self):
self.__span_id_to_path = {}
self.__span_id_lists = {}
with self._paths_lock:
self.__span_id_to_path = {}
self.__span_id_lists = {}
2 changes: 1 addition & 1 deletion src/lmnr/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from packaging import version


__version__ = "0.7.18"
__version__ = "0.7.19"
PYTHON_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}"


Expand Down