diff --git a/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py b/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py index 93b93de7..e5dfffcc 100644 --- a/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py +++ b/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py @@ -80,7 +80,7 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None: class BrowserUseSessionInstrumentorInitializer(InstrumentorInitializer): def init_instrumentor( - self, client, async_client, *args, **kwargs + self, async_client, *args, **kwargs ) -> BaseInstrumentor | None: if not is_package_installed("browser-use"): return None @@ -344,17 +344,17 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None: class PatchrightInstrumentorInitializer(InstrumentorInitializer): def init_instrumentor( - self, client, async_client, *args, **kwargs + self, async_client, *args, **kwargs ) -> BaseInstrumentor | None: if not is_package_installed("patchright"): return None from lmnr.sdk.browser.patchright_otel import PatchrightInstrumentor - if client is None and async_client is None: + if async_client is None: return None - return PatchrightInstrumentor(client, async_client) + return PatchrightInstrumentor(async_client) class PineconeInstrumentorInitializer(InstrumentorInitializer): @@ -371,17 +371,17 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None: class PlaywrightInstrumentorInitializer(InstrumentorInitializer): def init_instrumentor( - self, client, async_client, *args, **kwargs + self, async_client, *args, **kwargs ) -> BaseInstrumentor | None: if not is_package_installed("playwright"): return None from lmnr.sdk.browser.playwright_otel import PlaywrightInstrumentor - if client is None and async_client is None: + if async_client is None: return None - return PlaywrightInstrumentor(client, async_client) + return PlaywrightInstrumentor(async_client) class QdrantInstrumentorInitializer(InstrumentorInitializer): diff --git a/src/lmnr/opentelemetry_lib/tracing/instruments.py b/src/lmnr/opentelemetry_lib/tracing/instruments.py index e3042ff5..ea64fa2e 100644 --- a/src/lmnr/opentelemetry_lib/tracing/instruments.py +++ b/src/lmnr/opentelemetry_lib/tracing/instruments.py @@ -124,7 +124,7 @@ def init_instrumentations( continue try: - instrumentor = initializer.init_instrumentor(client, async_client) + instrumentor = initializer.init_instrumentor(async_client) if instrumentor is None: continue if not instrumentor.is_instrumented_by_opentelemetry: diff --git a/src/lmnr/sdk/browser/background_send_events.py b/src/lmnr/sdk/browser/background_send_events.py new file mode 100644 index 00000000..91ad9c73 --- /dev/null +++ b/src/lmnr/sdk/browser/background_send_events.py @@ -0,0 +1,158 @@ +""" +Background sending for browser events. + +This module provides background execution for HTTP requests that send browser events, +ensuring sends never block the main execution flow while guaranteeing completion at +program exit. + +## Background Event Loop Architecture +Uses a dedicated event loop running in a separate thread to handle async HTTP requests. +This architecture provides: + +1. **Non-blocking execution**: Sends happen in the background, never blocking the main + thread or Playwright's event loop, allowing browser automation to continue smoothly. + +2. **Guaranteed completion**: When the program exits, all pending async sends are + awaited and complete successfully, even if they're slow. No events are dropped. + +3. **Lifecycle independence**: The background loop runs independently of Playwright's + event loop, so it survives when Playwright shuts down its internal loop before + program exit. + +The pattern uses `asyncio.run_coroutine_threadsafe()` to submit async coroutines +from any thread (sync or async) to our background loop, maintaining pure async code +while achieving cross-thread execution. +""" + +import asyncio +import atexit +import threading +from typing import Any + +from lmnr.sdk.log import get_default_logger + +logger = get_default_logger(__name__) + +# Timeout for waiting for each async send operation at exit +ASYNC_SEND_TIMEOUT_SECONDS = 30 + +# Timeout for background loop creation +LOOP_CREATION_TIMEOUT_SECONDS = 5 + +# Timeout for thread join during cleanup +THREAD_JOIN_TIMEOUT_SECONDS = 5 + +# ============================================================================== +# Background event loop for async sends +# ============================================================================== + +# Background event loop state +_background_loop = None +_background_loop_thread = None +_background_loop_lock = threading.Lock() +_background_loop_ready = threading.Event() +_pending_async_futures: set[asyncio.Future[Any]] = set() + + +def get_background_loop() -> asyncio.AbstractEventLoop: + """ + Get or create the background event loop for async sends. + + Creates a dedicated event loop running in a daemon thread on first call. + Subsequent calls return the same loop. Thread-safe. + + Returns: + The background event loop running in a separate thread. + """ + global _background_loop, _background_loop_thread + + with _background_loop_lock: + if _background_loop is None: + # Create a new event loop in a background thread + def run_loop(): + global _background_loop + _background_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_background_loop) + _background_loop_ready.set() + _background_loop.run_forever() + + _background_loop_thread = threading.Thread( + target=run_loop, daemon=True, name="lmnr-async-sends" + ) + _background_loop_thread.start() + + # Register cleanup handler + atexit.register(_cleanup_background_loop) + + # Wait for loop to be created (outside the lock to avoid blocking other threads) + if not _background_loop_ready.wait(timeout=LOOP_CREATION_TIMEOUT_SECONDS): + raise RuntimeError("Background loop creation timed out") + + return _background_loop + + +def track_async_send(future: asyncio.Future) -> None: + """ + Track an async send future for cleanup at exit. + + The future is automatically removed from tracking when it completes, + preventing memory leaks. + + Args: + future: The future returned by asyncio.run_coroutine_threadsafe() + """ + with _background_loop_lock: + _pending_async_futures.add(future) + + def remove_on_done(f): + """Remove the future from tracking when it completes.""" + with _background_loop_lock: + _pending_async_futures.discard(f) + + future.add_done_callback(remove_on_done) + + +def _cleanup_background_loop(): + """ + Shutdown the background event loop and wait for all pending sends to complete. + + Called automatically at program exit via atexit. Waits for each pending send + to complete with a timeout, then stops the background loop gracefully. + """ + global _background_loop + + # Create a snapshot of pending futures to avoid holding the lock during waits + with _background_loop_lock: + futures_to_wait = list(_pending_async_futures) + + pending_count = len(futures_to_wait) + + if pending_count > 0: + logger.info( + f"Finishing sending {pending_count} browser events... " + "Ctrl+C to cancel (may result in incomplete session recording)." + ) + + # Wait for all pending futures to complete + for future in futures_to_wait: + try: + future.result(timeout=ASYNC_SEND_TIMEOUT_SECONDS) + except TimeoutError: + logger.debug("Timeout waiting for async send to complete") + except KeyboardInterrupt: + logger.debug("Interrupted, cancelling pending async sends") + for f in futures_to_wait: + f.cancel() + raise + except Exception as e: + logger.debug(f"Error in async send: {e}") + + # Stop the background loop + if _background_loop is not None and not _background_loop.is_closed(): + try: + _background_loop.call_soon_threadsafe(_background_loop.stop) + # Wait for thread to finish + if _background_loop_thread is not None: + _background_loop_thread.join(timeout=THREAD_JOIN_TIMEOUT_SECONDS) + except Exception as e: + logger.debug(f"Error stopping background loop: {e}") diff --git a/src/lmnr/sdk/browser/cdp_utils.py b/src/lmnr/sdk/browser/cdp_utils.py index eae4e274..9240d27b 100644 --- a/src/lmnr/sdk/browser/cdp_utils.py +++ b/src/lmnr/sdk/browser/cdp_utils.py @@ -1,5 +1,4 @@ import asyncio -import logging import orjson import os import time @@ -8,12 +7,17 @@ from lmnr.sdk.decorators import observe from lmnr.sdk.browser.utils import retry_async +from lmnr.sdk.browser.background_send_events import ( + get_background_loop, + track_async_send, +) from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient from lmnr.opentelemetry_lib.tracing.context import get_current_context from lmnr.opentelemetry_lib.tracing import TracerWrapper +from lmnr.sdk.log import get_default_logger from lmnr.sdk.types import MaskInputOptions -logger = logging.getLogger(__name__) +logger = get_default_logger(__name__) OLD_BUFFER_TIMEOUT = 60 CDP_OPERATION_TIMEOUT_SECONDS = 10 @@ -499,12 +503,14 @@ function heartbeat() { // Add heartbeat events - setInterval(() => { - window.lmnrRrweb.record.addCustomEvent('heartbeat', { - title: document.title, + setInterval( + () => { + window.lmnrRrweb.record.addCustomEvent('heartbeat', { + title: document.title, url: document.URL, }) - }, HEARTBEAT_INTERVAL + }, + HEARTBEAT_INTERVAL, ); } @@ -785,10 +791,13 @@ async def start_recording_events( logger.debug("Failed to inject session recorder, not registering bindings") return + # Get the background loop for async sends (independent of CDP's loop) + background_loop = get_background_loop() + # Buffer for reassembling chunks chunk_buffers = {} - async def send_events_from_browser(chunk): + async def send_events_from_browser(chunk: dict): try: # Handle chunked data batch_id = chunk["batchId"] @@ -817,9 +826,13 @@ async def send_events_from_browser(chunk): # Parse the JSON events = orjson.loads(full_data) - # Send to server + # Send to server in background loop (independent of CDP's loop) if events and len(events) > 0: - await client._browser_events.send(lmnr_session_id, trace_id, events) + future = asyncio.run_coroutine_threadsafe( + client._browser_events.send(lmnr_session_id, trace_id, events), + background_loop, + ) + track_async_send(future) # Clean up buffer del chunk_buffers[batch_id] @@ -843,7 +856,7 @@ async def send_events_callback(event, cdp_session_id: str | None = None): return if event["executionContextId"] != isolated_context_id: return - await send_events_from_browser(orjson.loads(event["payload"])) + asyncio.create_task(send_events_from_browser(orjson.loads(event["payload"]))) await cdp_client.send.Runtime.addBinding( { diff --git a/src/lmnr/sdk/browser/patchright_otel.py b/src/lmnr/sdk/browser/patchright_otel.py index ef01feb4..4dca3015 100644 --- a/src/lmnr/sdk/browser/patchright_otel.py +++ b/src/lmnr/sdk/browser/patchright_otel.py @@ -6,7 +6,6 @@ _wrap_new_context_sync, _wrap_new_context_async, ) -from lmnr.sdk.client.synchronous.sync_client import LaminarClient from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap @@ -97,9 +96,8 @@ class PatchrightInstrumentor(BaseInstrumentor): - def __init__(self, client: LaminarClient, async_client: AsyncLaminarClient): + def __init__(self, async_client: AsyncLaminarClient): super().__init__() - self.client = client self.async_client = async_client def instrumentation_dependencies(self) -> Collection[str]: @@ -109,6 +107,8 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + # Both sync and async methods use async_client + # because we are using a background asyncio loop for async sends for wrapped_method in WRAPPED_METHODS: wrap_package = wrapped_method.get("package") wrap_object = wrapped_method.get("object") @@ -119,7 +119,7 @@ def _instrument(self, **kwargs): f"{wrap_object}.{wrap_method}", wrapped_method.get("wrapper")( tracer, - self.client, + self.async_client, wrapped_method, ), ) diff --git a/src/lmnr/sdk/browser/playwright_otel.py b/src/lmnr/sdk/browser/playwright_otel.py index f59eefb4..d89a410d 100644 --- a/src/lmnr/sdk/browser/playwright_otel.py +++ b/src/lmnr/sdk/browser/playwright_otel.py @@ -10,7 +10,6 @@ ) from lmnr.sdk.browser.utils import with_tracer_and_client_wrapper from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient -from lmnr.sdk.client.synchronous.sync_client import LaminarClient from lmnr.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -55,7 +54,7 @@ @with_tracer_and_client_wrapper def _wrap_new_browser_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): browser: SyncBrowser = wrapped(*args, **kwargs) session_id = str(uuid.uuid4().hex) @@ -98,7 +97,7 @@ async def page_handler(page): @with_tracer_and_client_wrapper def _wrap_new_context_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): context: SyncBrowserContext = wrapped(*args, **kwargs) session_id = str(uuid.uuid4().hex) @@ -140,7 +139,7 @@ async def page_handler(page): @with_tracer_and_client_wrapper def _wrap_bring_to_front_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): wrapped(*args, **kwargs) take_full_snapshot(instance) @@ -156,7 +155,7 @@ async def _wrap_bring_to_front_async( @with_tracer_and_client_wrapper def _wrap_browser_new_page_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): page = wrapped(*args, **kwargs) session_id = str(uuid.uuid4().hex) @@ -266,9 +265,8 @@ async def _wrap_browser_new_page_async( class PlaywrightInstrumentor(BaseInstrumentor): - def __init__(self, client: LaminarClient, async_client: AsyncLaminarClient): + def __init__(self, async_client: AsyncLaminarClient): super().__init__() - self.client = client self.async_client = async_client def instrumentation_dependencies(self) -> Collection[str]: @@ -278,6 +276,8 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + # Both sync and async methods use async_client because we are using + # a background asyncio loop for async sends for wrapped_method in WRAPPED_METHODS: wrap_package = wrapped_method.get("package") wrap_object = wrapped_method.get("object") @@ -288,7 +288,7 @@ def _instrument(self, **kwargs): f"{wrap_object}.{wrap_method}", wrapped_method.get("wrapper")( tracer, - self.client, + self.async_client, wrapped_method, ), ) diff --git a/src/lmnr/sdk/browser/pw_utils.py b/src/lmnr/sdk/browser/pw_utils.py index d5b3fe23..2560fbbc 100644 --- a/src/lmnr/sdk/browser/pw_utils.py +++ b/src/lmnr/sdk/browser/pw_utils.py @@ -1,17 +1,22 @@ -import orjson -import logging +import asyncio import os import time +import orjson + from opentelemetry import trace +from lmnr.opentelemetry_lib.tracing.context import get_current_context +from lmnr.opentelemetry_lib.tracing import TracerWrapper from lmnr.opentelemetry_lib.utils.package_check import is_package_installed from lmnr.sdk.decorators import observe from lmnr.sdk.browser.utils import retry_sync, retry_async -from lmnr.sdk.client.synchronous.sync_client import LaminarClient +from lmnr.sdk.browser.background_send_events import ( + get_background_loop, + track_async_send, +) from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient -from lmnr.opentelemetry_lib.tracing.context import get_current_context -from lmnr.opentelemetry_lib.tracing import TracerWrapper +from lmnr.sdk.log import get_default_logger from lmnr.sdk.types import MaskInputOptions try: @@ -34,16 +39,98 @@ "or `pip install patchright` to install one of the supported browsers." ) from e -logger = logging.getLogger(__name__) +logger = get_default_logger(__name__) OLD_BUFFER_TIMEOUT = 60 + +def create_send_events_handler( + chunk_buffers: dict, + session_id: str, + trace_id: str, + client: AsyncLaminarClient, + background_loop: asyncio.AbstractEventLoop, +): + """ + Create an async event handler for sending browser events. + + This handler reassembles chunked event data and submits it to the background + loop for async HTTP sending. The handler itself processes chunks synchronously + but delegates the actual HTTP send to the background loop. + + Args: + chunk_buffers: Dictionary to store incomplete chunk batches + session_id: Browser session ID + trace_id: OpenTelemetry trace ID + client: Async Laminar client for HTTP requests + background_loop: Background event loop for async sends + + Returns: + An async function that handles incoming event chunks from the browser + """ + + async def send_events_from_browser(chunk): + try: + # Handle chunked data + batch_id = chunk["batchId"] + chunk_index = chunk["chunkIndex"] + total_chunks = chunk["totalChunks"] + data = chunk["data"] + + # Initialize buffer for this batch if needed + if batch_id not in chunk_buffers: + chunk_buffers[batch_id] = { + "chunks": {}, + "total": total_chunks, + "timestamp": time.time(), + } + + # Store chunk + chunk_buffers[batch_id]["chunks"][chunk_index] = data + + # Check if we have all chunks + if len(chunk_buffers[batch_id]["chunks"]) == total_chunks: + # Reassemble the full message + full_data = "" + for i in range(total_chunks): + full_data += chunk_buffers[batch_id]["chunks"][i] + + # Parse the JSON + events = orjson.loads(full_data) + + # Send to server in background loop (independent of Playwright's loop) + if events and len(events) > 0: + future = asyncio.run_coroutine_threadsafe( + client._browser_events.send(session_id, trace_id, events), + background_loop, + ) + track_async_send(future) + + # Clean up buffer + del chunk_buffers[batch_id] + + # Clean up old incomplete buffers + current_time = time.time() + to_delete = [] + for bid, buffer in chunk_buffers.items(): + if current_time - buffer["timestamp"] > OLD_BUFFER_TIMEOUT: + to_delete.append(bid) + for bid in to_delete: + logger.debug(f"Cleaning up incomplete chunk buffer: {bid}") + del chunk_buffers[bid] + + except Exception as e: + logger.debug(f"Could not send events: {e}") + + return send_events_from_browser + + current_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(current_dir, "recorder", "record.umd.min.cjs"), "r") as f: RRWEB_CONTENT = f"() => {{ {f.read()} }}" INJECT_PLACEHOLDER = """ -(mask_input_options) => { +(maskInputOptions) => { const BATCH_TIMEOUT = 2000; // Send events after 2 seconds const MAX_WORKER_PROMISES = 50; // Max concurrent worker promises const HEARTBEAT_INTERVAL = 2000; @@ -316,7 +403,7 @@ } // Worker-based compression for large objects - async function compressLargeObject(data, isLarge = true) { + async function compressLargeObject(data) { // Check if workers are supported first - if not, use main thread compression if (!testWorkerSupport()) { return await compressSmallObject(data); @@ -380,7 +467,6 @@ function isLargeEvent(type) { const LARGE_EVENT_TYPES = [ 2, // FullSnapshot - 3, // IncrementalSnapshot ]; if (LARGE_EVENT_TYPES.includes(type)) { @@ -470,8 +556,6 @@ } } - setInterval(sendBatchIfReady, BATCH_TIMEOUT); - async function bufferToBase64(buffer) { const base64url = await new Promise(r => { const reader = new FileReader() @@ -481,51 +565,57 @@ return base64url.slice(base64url.indexOf(',') + 1); } - window.lmnrRrweb.record({ - async emit(event) { - try { - const isLarge = isLargeEvent(event.type); - const compressedResult = isLarge ? - await compressLargeObject(event.data, true) : - await compressSmallObject(event.data); - - const base64Data = await bufferToBase64(compressedResult); - const eventToSend = { - ...event, - data: base64Data, - }; - window.lmnrRrwebEventsBatch.push(eventToSend); - } catch (error) { - console.warn('Failed to push event to batch', error); + if (!window.lmnrStartedRecordingEvents) { + setInterval(sendBatchIfReady, BATCH_TIMEOUT); + + window.lmnrRrweb.record({ + async emit(event) { + try { + const isLarge = isLargeEvent(event.type); + const compressedResult = isLarge ? + await compressLargeObject(event.data) : + await compressSmallObject(event.data); + + const base64Data = await bufferToBase64(compressedResult); + const eventToSend = { + ...event, + data: base64Data, + }; + window.lmnrRrwebEventsBatch.push(eventToSend); + } catch (error) { + console.warn('Failed to push event to batch', error); + } + }, + recordCanvas: true, + collectFonts: true, + recordCrossOriginIframes: true, + maskInputOptions: { + password: true, + textarea: maskInputOptions.textarea || false, + text: maskInputOptions.text || false, + number: maskInputOptions.number || false, + select: maskInputOptions.select || false, + email: maskInputOptions.email || false, + tel: maskInputOptions.tel || false, } - }, - recordCanvas: true, - collectFonts: true, - recordCrossOriginIframes: true, - maskInputOptions: { - password: true, - textarea: mask_input_options.textarea || false, - text: mask_input_options.text || false, - number: mask_input_options.number || false, - select: mask_input_options.select || false, - email: mask_input_options.email || false, - tel: mask_input_options.tel || false, - } - }); - - function heartbeat() { - // Add heartbeat events - setInterval(() => { - window.lmnrRrweb.record.addCustomEvent('heartbeat', { - title: document.title, - url: document.URL, - }) - }, HEARTBEAT_INTERVAL - ); - } + }); - heartbeat(); + function heartbeat() { + // Add heartbeat events + setInterval( + () => { + window.lmnrRrweb.record.addCustomEvent('heartbeat', { + title: document.title, + url: document.URL, + }) + }, + HEARTBEAT_INTERVAL, + ); + } + heartbeat(); + window.lmnrStartedRecordingEvents = true; + } } """ @@ -571,6 +661,8 @@ def inject_session_recorder_sync(page: SyncPage): def load_session_recorder(): try: + if page.is_closed(): + return False page.evaluate(RRWEB_CONTENT) return True except Exception as e: @@ -585,7 +677,8 @@ def load_session_recorder(): return try: - page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) + if not page.is_closed(): + page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) except Exception as e: logger.debug(f"Failed to inject session recorder: {e}") @@ -607,6 +700,8 @@ async def inject_session_recorder_async(page: Page): async def load_session_recorder(): try: + if page.is_closed(): + return False await page.evaluate(RRWEB_CONTENT) return True except Exception as e: @@ -621,7 +716,8 @@ async def load_session_recorder(): return try: - await page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) + if not page.is_closed(): + await page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) except Exception as e: logger.debug(f"Failed to inject session recorder placeholder: {e}") @@ -630,67 +726,39 @@ async def load_session_recorder(): @observe(name="playwright.page", ignore_input=True, ignore_output=True) -def start_recording_events_sync(page: SyncPage, session_id: str, client: LaminarClient): +def start_recording_events_sync( + page: SyncPage, session_id: str, client: AsyncLaminarClient +): ctx = get_current_context() span = trace.get_current_span(ctx) trace_id = format(span.get_span_context().trace_id, "032x") span.set_attribute("lmnr.internal.has_browser_session", True) + # Get the background loop for async sends + background_loop = get_background_loop() + # Buffer for reassembling chunks chunk_buffers = {} - def send_events_from_browser(chunk): - try: - # Handle chunked data - batch_id = chunk["batchId"] - chunk_index = chunk["chunkIndex"] - total_chunks = chunk["totalChunks"] - data = chunk["data"] - - # Initialize buffer for this batch if needed - if batch_id not in chunk_buffers: - chunk_buffers[batch_id] = { - "chunks": {}, - "total": total_chunks, - "timestamp": time.time(), - } - - # Store chunk - chunk_buffers[batch_id]["chunks"][chunk_index] = data - - # Check if we have all chunks - if len(chunk_buffers[batch_id]["chunks"]) == total_chunks: - # Reassemble the full message - full_data = "".join( - chunk_buffers[batch_id]["chunks"][i] for i in range(total_chunks) - ) - - # Parse the JSON - events = orjson.loads(full_data) - - # Send to server - if events and len(events) > 0: - client._browser_events.send(session_id, trace_id, events) - - # Clean up buffer - del chunk_buffers[batch_id] - - # Clean up old incomplete buffers - current_time = time.time() - to_delete = [] - for bid, buffer in chunk_buffers.items(): - if current_time - buffer["timestamp"] > OLD_BUFFER_TIMEOUT: - to_delete.append(bid) - for bid in to_delete: - logger.debug(f"Cleaning up incomplete chunk buffer: {bid}") - del chunk_buffers[bid] + # Create the async event handler (shared implementation) + send_events_from_browser = create_send_events_handler( + chunk_buffers, session_id, trace_id, client, background_loop + ) + def submit_event(chunk): + """Sync wrapper that submits async handler to background loop.""" + try: + # Submit async handler to background loop + asyncio.run_coroutine_threadsafe( + send_events_from_browser(chunk), + background_loop, + ) except Exception as e: - logger.debug(f"Could not send events: {e}") + logger.debug(f"Error submitting event: {e}") try: - page.expose_function("lmnrSendEvents", send_events_from_browser) + page.expose_function("lmnrSendEvents", submit_event) except Exception as e: logger.debug(f"Could not expose function: {e}") @@ -698,9 +766,10 @@ def send_events_from_browser(chunk): def on_load(p): try: - inject_session_recorder_sync(p) + if not p.is_closed(): + inject_session_recorder_sync(p) except Exception as e: - logger.error(f"Error in on_load handler: {e}") + logger.debug(f"Error in on_load handler: {e}") page.on("domcontentloaded", on_load) @@ -714,57 +783,16 @@ async def start_recording_events_async( trace_id = format(span.get_span_context().trace_id, "032x") span.set_attribute("lmnr.internal.has_browser_session", True) + # Get the background loop for async sends (independent of Playwright's loop) + background_loop = get_background_loop() + # Buffer for reassembling chunks chunk_buffers = {} - async def send_events_from_browser(chunk): - try: - # Handle chunked data - batch_id = chunk["batchId"] - chunk_index = chunk["chunkIndex"] - total_chunks = chunk["totalChunks"] - data = chunk["data"] - - # Initialize buffer for this batch if needed - if batch_id not in chunk_buffers: - chunk_buffers[batch_id] = { - "chunks": {}, - "total": total_chunks, - "timestamp": time.time(), - } - - # Store chunk - chunk_buffers[batch_id]["chunks"][chunk_index] = data - - # Check if we have all chunks - if len(chunk_buffers[batch_id]["chunks"]) == total_chunks: - # Reassemble the full message - full_data = "" - for i in range(total_chunks): - full_data += chunk_buffers[batch_id]["chunks"][i] - - # Parse the JSON - events = orjson.loads(full_data) - - # Send to server - if events and len(events) > 0: - await client._browser_events.send(session_id, trace_id, events) - - # Clean up buffer - del chunk_buffers[batch_id] - - # Clean up old incomplete buffers - current_time = time.time() - to_delete = [] - for bid, buffer in chunk_buffers.items(): - if current_time - buffer["timestamp"] > OLD_BUFFER_TIMEOUT: - to_delete.append(bid) - for bid in to_delete: - logger.debug(f"Cleaning up incomplete chunk buffer: {bid}") - del chunk_buffers[bid] - - except Exception as e: - logger.debug(f"Could not send events: {e}") + # Create the async event handler (shared implementation) + send_events_from_browser = create_send_events_handler( + chunk_buffers, session_id, trace_id, client, background_loop + ) try: await page.expose_function("lmnrSendEvents", send_events_from_browser) @@ -775,9 +803,11 @@ async def send_events_from_browser(chunk): async def on_load(p): try: - await inject_session_recorder_async(p) + # Check if page is closed before attempting to inject + if not p.is_closed(): + await inject_session_recorder_async(p) except Exception as e: - logger.error(f"Error in on_load handler: {e}") + logger.debug(f"Error in on_load handler: {e}") page.on("domcontentloaded", on_load) diff --git a/src/lmnr/sdk/browser/utils.py b/src/lmnr/sdk/browser/utils.py index ce22b2dd..d646c46c 100644 --- a/src/lmnr/sdk/browser/utils.py +++ b/src/lmnr/sdk/browser/utils.py @@ -42,7 +42,7 @@ def retry_sync(func, retries=5, delay=0.5, error_message="Operation failed"): if result: # If function returns truthy value, consider it successful return result if attempt == retries - 1: # Last attempt - logger.error(f"{error_message} after all retries") + logger.debug(f"{error_message} after all retries") return None except Exception as e: if attempt == retries - 1: # Last attempt