Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None:

class BrowserUseSessionInstrumentorInitializer(InstrumentorInitializer):
def init_instrumentor(
self, client, async_client, *args, **kwargs
self, async_client, *args, **kwargs
) -> BaseInstrumentor | None:
if not is_package_installed("browser-use"):
return None
Expand Down Expand Up @@ -344,17 +344,17 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None:

class PatchrightInstrumentorInitializer(InstrumentorInitializer):
def init_instrumentor(
self, client, async_client, *args, **kwargs
self, async_client, *args, **kwargs
) -> BaseInstrumentor | None:
if not is_package_installed("patchright"):
return None

from lmnr.sdk.browser.patchright_otel import PatchrightInstrumentor

if client is None and async_client is None:
if async_client is None:
return None

return PatchrightInstrumentor(client, async_client)
return PatchrightInstrumentor(async_client)


class PineconeInstrumentorInitializer(InstrumentorInitializer):
Expand All @@ -371,17 +371,17 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None:

class PlaywrightInstrumentorInitializer(InstrumentorInitializer):
def init_instrumentor(
self, client, async_client, *args, **kwargs
self, async_client, *args, **kwargs
) -> BaseInstrumentor | None:
if not is_package_installed("playwright"):
return None

from lmnr.sdk.browser.playwright_otel import PlaywrightInstrumentor

if client is None and async_client is None:
if async_client is None:
return None

return PlaywrightInstrumentor(client, async_client)
return PlaywrightInstrumentor(async_client)


class QdrantInstrumentorInitializer(InstrumentorInitializer):
Expand Down
2 changes: 1 addition & 1 deletion src/lmnr/opentelemetry_lib/tracing/instruments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def init_instrumentations(
continue

try:
instrumentor = initializer.init_instrumentor(client, async_client)
instrumentor = initializer.init_instrumentor(async_client)
if instrumentor is None:
continue
if not instrumentor.is_instrumented_by_opentelemetry:
Expand Down
158 changes: 158 additions & 0 deletions src/lmnr/sdk/browser/background_send_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""
Background sending for browser events.

This module provides background execution for HTTP requests that send browser events,
ensuring sends never block the main execution flow while guaranteeing completion at
program exit.

## Background Event Loop Architecture
Uses a dedicated event loop running in a separate thread to handle async HTTP requests.
This architecture provides:

1. **Non-blocking execution**: Sends happen in the background, never blocking the main
thread or Playwright's event loop, allowing browser automation to continue smoothly.

2. **Guaranteed completion**: When the program exits, all pending async sends are
awaited and complete successfully, even if they're slow. No events are dropped.

3. **Lifecycle independence**: The background loop runs independently of Playwright's
event loop, so it survives when Playwright shuts down its internal loop before
program exit.

The pattern uses `asyncio.run_coroutine_threadsafe()` to submit async coroutines
from any thread (sync or async) to our background loop, maintaining pure async code
while achieving cross-thread execution.
"""

import asyncio
import atexit
import threading
from typing import Any

from lmnr.sdk.log import get_default_logger

logger = get_default_logger(__name__)

# Timeout for waiting for each async send operation at exit
ASYNC_SEND_TIMEOUT_SECONDS = 30

# Timeout for background loop creation
LOOP_CREATION_TIMEOUT_SECONDS = 5

# Timeout for thread join during cleanup
THREAD_JOIN_TIMEOUT_SECONDS = 5

# ==============================================================================
# Background event loop for async sends
# ==============================================================================

# Background event loop state
_background_loop = None
_background_loop_thread = None
_background_loop_lock = threading.Lock()
_background_loop_ready = threading.Event()
_pending_async_futures: set[asyncio.Future[Any]] = set()


def get_background_loop() -> asyncio.AbstractEventLoop:
"""
Get or create the background event loop for async sends.

Creates a dedicated event loop running in a daemon thread on first call.
Subsequent calls return the same loop. Thread-safe.

Returns:
The background event loop running in a separate thread.
"""
global _background_loop, _background_loop_thread

with _background_loop_lock:
if _background_loop is None:
# Create a new event loop in a background thread
def run_loop():
global _background_loop
_background_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_background_loop)
_background_loop_ready.set()
_background_loop.run_forever()

_background_loop_thread = threading.Thread(
target=run_loop, daemon=True, name="lmnr-async-sends"
)
_background_loop_thread.start()

# Register cleanup handler
atexit.register(_cleanup_background_loop)

# Wait for loop to be created (outside the lock to avoid blocking other threads)
if not _background_loop_ready.wait(timeout=LOOP_CREATION_TIMEOUT_SECONDS):
raise RuntimeError("Background loop creation timed out")

return _background_loop


def track_async_send(future: asyncio.Future) -> None:
"""
Track an async send future for cleanup at exit.

The future is automatically removed from tracking when it completes,
preventing memory leaks.

Args:
future: The future returned by asyncio.run_coroutine_threadsafe()
"""
with _background_loop_lock:
_pending_async_futures.add(future)

def remove_on_done(f):
"""Remove the future from tracking when it completes."""
with _background_loop_lock:
_pending_async_futures.discard(f)

future.add_done_callback(remove_on_done)


def _cleanup_background_loop():
"""
Shutdown the background event loop and wait for all pending sends to complete.

Called automatically at program exit via atexit. Waits for each pending send
to complete with a timeout, then stops the background loop gracefully.
"""
global _background_loop

# Create a snapshot of pending futures to avoid holding the lock during waits
with _background_loop_lock:
futures_to_wait = list(_pending_async_futures)

pending_count = len(futures_to_wait)

if pending_count > 0:
logger.info(
f"Finishing sending {pending_count} browser events... "
"Ctrl+C to cancel (may result in incomplete session recording)."
)

# Wait for all pending futures to complete
for future in futures_to_wait:
try:
future.result(timeout=ASYNC_SEND_TIMEOUT_SECONDS)
except TimeoutError:
logger.debug("Timeout waiting for async send to complete")
except KeyboardInterrupt:
logger.debug("Interrupted, cancelling pending async sends")
for f in futures_to_wait:
f.cancel()
raise
except Exception as e:
logger.debug(f"Error in async send: {e}")

# Stop the background loop
if _background_loop is not None and not _background_loop.is_closed():
try:
_background_loop.call_soon_threadsafe(_background_loop.stop)
# Wait for thread to finish
if _background_loop_thread is not None:
_background_loop_thread.join(timeout=THREAD_JOIN_TIMEOUT_SECONDS)
except Exception as e:
logger.debug(f"Error stopping background loop: {e}")
33 changes: 23 additions & 10 deletions src/lmnr/sdk/browser/cdp_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import logging
import orjson
import os
import time
Expand All @@ -8,12 +7,17 @@

from lmnr.sdk.decorators import observe
from lmnr.sdk.browser.utils import retry_async
from lmnr.sdk.browser.background_send_events import (
get_background_loop,
track_async_send,
)
from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient
from lmnr.opentelemetry_lib.tracing.context import get_current_context
from lmnr.opentelemetry_lib.tracing import TracerWrapper
from lmnr.sdk.log import get_default_logger
from lmnr.sdk.types import MaskInputOptions

logger = logging.getLogger(__name__)
logger = get_default_logger(__name__)

OLD_BUFFER_TIMEOUT = 60
CDP_OPERATION_TIMEOUT_SECONDS = 10
Expand Down Expand Up @@ -499,12 +503,14 @@

function heartbeat() {
// Add heartbeat events
setInterval(() => {
window.lmnrRrweb.record.addCustomEvent('heartbeat', {
title: document.title,
setInterval(
() => {
window.lmnrRrweb.record.addCustomEvent('heartbeat', {
title: document.title,
url: document.URL,
})
}, HEARTBEAT_INTERVAL
},
HEARTBEAT_INTERVAL,
);
}

Expand Down Expand Up @@ -785,10 +791,13 @@ async def start_recording_events(
logger.debug("Failed to inject session recorder, not registering bindings")
return

# Get the background loop for async sends (independent of CDP's loop)
background_loop = get_background_loop()

# Buffer for reassembling chunks
chunk_buffers = {}

async def send_events_from_browser(chunk):
async def send_events_from_browser(chunk: dict):
try:
# Handle chunked data
batch_id = chunk["batchId"]
Expand Down Expand Up @@ -817,9 +826,13 @@ async def send_events_from_browser(chunk):
# Parse the JSON
events = orjson.loads(full_data)

# Send to server
# Send to server in background loop (independent of CDP's loop)
if events and len(events) > 0:
await client._browser_events.send(lmnr_session_id, trace_id, events)
future = asyncio.run_coroutine_threadsafe(
client._browser_events.send(lmnr_session_id, trace_id, events),
background_loop,
)
track_async_send(future)

# Clean up buffer
del chunk_buffers[batch_id]
Expand All @@ -843,7 +856,7 @@ async def send_events_callback(event, cdp_session_id: str | None = None):
return
if event["executionContextId"] != isolated_context_id:
return
await send_events_from_browser(orjson.loads(event["payload"]))
asyncio.create_task(send_events_from_browser(orjson.loads(event["payload"])))

await cdp_client.send.Runtime.addBinding(
{
Expand Down
8 changes: 4 additions & 4 deletions src/lmnr/sdk/browser/patchright_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
_wrap_new_context_sync,
_wrap_new_context_async,
)
from lmnr.sdk.client.synchronous.sync_client import LaminarClient
from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
Expand Down Expand Up @@ -97,9 +96,8 @@


class PatchrightInstrumentor(BaseInstrumentor):
def __init__(self, client: LaminarClient, async_client: AsyncLaminarClient):
def __init__(self, async_client: AsyncLaminarClient):
super().__init__()
self.client = client
self.async_client = async_client

def instrumentation_dependencies(self) -> Collection[str]:
Expand All @@ -109,6 +107,8 @@ def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

# Both sync and async methods use async_client
# because we are using a background asyncio loop for async sends
for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
Expand All @@ -119,7 +119,7 @@ def _instrument(self, **kwargs):
f"{wrap_object}.{wrap_method}",
wrapped_method.get("wrapper")(
tracer,
self.client,
self.async_client,
wrapped_method,
),
)
Expand Down
Loading