From 26ea9baa694fe0866d33f14661197ed7b7aafe5b Mon Sep 17 00:00:00 2001 From: Din Date: Thu, 23 Oct 2025 20:52:03 +0100 Subject: [PATCH 1/5] wip: fix browser-use-cdp --- src/lmnr/sdk/browser/cdp_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lmnr/sdk/browser/cdp_utils.py b/src/lmnr/sdk/browser/cdp_utils.py index eae4e274..6b8b2107 100644 --- a/src/lmnr/sdk/browser/cdp_utils.py +++ b/src/lmnr/sdk/browser/cdp_utils.py @@ -788,7 +788,7 @@ async def start_recording_events( # Buffer for reassembling chunks chunk_buffers = {} - async def send_events_from_browser(chunk): + async def send_events_from_browser(chunk: dict): try: # Handle chunked data batch_id = chunk["batchId"] @@ -843,7 +843,7 @@ async def send_events_callback(event, cdp_session_id: str | None = None): return if event["executionContextId"] != isolated_context_id: return - await send_events_from_browser(orjson.loads(event["payload"])) + asyncio.create_task(send_events_from_browser(orjson.loads(event["payload"]))) await cdp_client.send.Runtime.addBinding( { From 2f9237feab550a7902a1bf7be678a604ba3cf8c3 Mon Sep 17 00:00:00 2001 From: Din Date: Fri, 24 Oct 2025 18:56:47 +0100 Subject: [PATCH 2/5] synchronice pw and cdp, some fixes to background handling in pw --- src/lmnr/sdk/browser/cdp_utils.py | 10 +- src/lmnr/sdk/browser/pw_background_sending.py | 183 ++++++++++++++++++ src/lmnr/sdk/browser/pw_utils.py | 154 +++++++++------ src/lmnr/sdk/browser/utils.py | 2 +- 4 files changed, 283 insertions(+), 66 deletions(-) create mode 100644 src/lmnr/sdk/browser/pw_background_sending.py diff --git a/src/lmnr/sdk/browser/cdp_utils.py b/src/lmnr/sdk/browser/cdp_utils.py index 6b8b2107..47698539 100644 --- a/src/lmnr/sdk/browser/cdp_utils.py +++ b/src/lmnr/sdk/browser/cdp_utils.py @@ -499,12 +499,14 @@ function heartbeat() { // Add heartbeat events - setInterval(() => { - window.lmnrRrweb.record.addCustomEvent('heartbeat', { - title: document.title, + setInterval( + () => { + window.lmnrRrweb.record.addCustomEvent('heartbeat', { + title: document.title, url: document.URL, }) - }, HEARTBEAT_INTERVAL + }, + HEARTBEAT_INTERVAL, ); } diff --git a/src/lmnr/sdk/browser/pw_background_sending.py b/src/lmnr/sdk/browser/pw_background_sending.py new file mode 100644 index 00000000..947e860b --- /dev/null +++ b/src/lmnr/sdk/browser/pw_background_sending.py @@ -0,0 +1,183 @@ +""" +Background sending for browser events (sync and async). + +This module provides background execution for HTTP requests that send browser events, +ensuring sends never block the main execution flow while guaranteeing completion at +program exit. + +## Sync Version (ThreadPoolExecutor) +For synchronous Playwright code, uses a thread pool to handle sends in background +threads. The executor automatically waits for completion on shutdown. + +## Async Version (Background Event Loop) +For async Playwright code, uses a dedicated event loop running in a separate thread +to handle async HTTP requests. This architecture provides: + +1. **Non-blocking execution**: Sends happen in the background, never blocking the main + thread or Playwright's event loop, allowing browser automation to continue smoothly. + +2. **Guaranteed completion**: When the program exits, all pending async sends are + awaited and complete successfully, even if they're slow. No events are dropped. + +3. **Lifecycle independence**: The background loop runs independently of Playwright's + event loop, so it survives when Playwright shuts down its internal loop before + program exit. + +The pattern uses `asyncio.run_coroutine_threadsafe()` to submit async coroutines +from Playwright's loop to our background loop, maintaining pure async code while +achieving cross-loop execution. +""" + +import asyncio +import atexit +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable + +from lmnr.sdk.log import get_default_logger + +logger = get_default_logger(__name__) + +# Timeout for waiting for each async send operation at exit +ASYNC_SEND_TIMEOUT_SECONDS = 30 + +# Timeout for background loop creation +LOOP_CREATION_TIMEOUT_SECONDS = 5 + +# Timeout for thread join during cleanup +THREAD_JOIN_TIMEOUT_SECONDS = 5 + +# ============================================================================== +# Sync version: ThreadPoolExecutor for background sends +# ============================================================================== + +_sync_executor = ThreadPoolExecutor(thread_name_prefix="lmnr-sync-submit") + + +def submit_sync_task(func: Callable, *args) -> None: + """ + Submit a synchronous task to run in the background thread pool. + + The task will execute in a background thread without blocking the main thread. + On program exit, the executor automatically waits for all pending tasks. + + Args: + func: The function to execute + *args: Arguments to pass to the function + """ + _sync_executor.submit(func, *args) + + +# ============================================================================== +# Async version: Background event loop for async sends +# ============================================================================== + +# Background event loop state +_background_loop = None +_background_loop_thread = None +_background_loop_lock = threading.Lock() +_background_loop_ready = threading.Event() +_pending_async_futures: set[asyncio.Future[Any]] = set() + + +def get_background_loop() -> asyncio.AbstractEventLoop: + """ + Get or create the background event loop for async sends. + + Creates a dedicated event loop running in a daemon thread on first call. + Subsequent calls return the same loop. Thread-safe. + + Returns: + The background event loop running in a separate thread. + """ + global _background_loop, _background_loop_thread + + with _background_loop_lock: + if _background_loop is None: + # Create a new event loop in a background thread + def run_loop(): + global _background_loop + _background_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_background_loop) + _background_loop_ready.set() + _background_loop.run_forever() + + _background_loop_thread = threading.Thread( + target=run_loop, daemon=True, name="lmnr-async-sends" + ) + _background_loop_thread.start() + + # Register cleanup handler + atexit.register(_cleanup_background_loop) + + # Wait for loop to be created (outside the lock to avoid blocking other threads) + if not _background_loop_ready.wait(timeout=LOOP_CREATION_TIMEOUT_SECONDS): + raise RuntimeError("Background loop creation timed out") + + return _background_loop + + +def track_async_send(future: asyncio.Future) -> None: + """ + Track an async send future for cleanup at exit. + + The future is automatically removed from tracking when it completes, + preventing memory leaks. + + Args: + future: The future returned by asyncio.run_coroutine_threadsafe() + """ + with _background_loop_lock: + _pending_async_futures.add(future) + + def remove_on_done(f): + """Remove the future from tracking when it completes.""" + with _background_loop_lock: + _pending_async_futures.discard(f) + + future.add_done_callback(remove_on_done) + + +def _cleanup_background_loop(): + """ + Shutdown the background event loop and wait for all pending sends to complete. + + Called automatically at program exit via atexit. Waits for each pending send + to complete with a timeout, then stops the background loop gracefully. + """ + global _background_loop + + # Create a snapshot of pending futures to avoid holding the lock during waits + with _background_loop_lock: + futures_to_wait = list(_pending_async_futures) + + pending_count = len(futures_to_wait) + + if pending_count > 0: + logger.debug( + f"Waiting for {pending_count} async browser event requests to complete..." + ) + + # Wait for all pending futures to complete + for future in futures_to_wait: + try: + future.result(timeout=ASYNC_SEND_TIMEOUT_SECONDS) + except TimeoutError: + logger.debug("Timeout waiting for async send to complete") + except KeyboardInterrupt: + logger.debug("Interrupted, cancelling pending async sends") + for f in futures_to_wait: + f.cancel() + raise + except Exception as e: + logger.debug(f"Error in async send: {e}") + + # Stop the background loop + if _background_loop is not None and not _background_loop.is_closed(): + try: + _background_loop.call_soon_threadsafe(_background_loop.stop) + # Wait for thread to finish + if _background_loop_thread is not None: + _background_loop_thread.join(timeout=THREAD_JOIN_TIMEOUT_SECONDS) + except Exception as e: + logger.debug(f"Error stopping background loop: {e}") diff --git a/src/lmnr/sdk/browser/pw_utils.py b/src/lmnr/sdk/browser/pw_utils.py index d5b3fe23..9bd39f94 100644 --- a/src/lmnr/sdk/browser/pw_utils.py +++ b/src/lmnr/sdk/browser/pw_utils.py @@ -1,17 +1,24 @@ -import orjson -import logging +import asyncio import os import time +import orjson + from opentelemetry import trace +from lmnr.opentelemetry_lib.tracing.context import get_current_context +from lmnr.opentelemetry_lib.tracing import TracerWrapper from lmnr.opentelemetry_lib.utils.package_check import is_package_installed from lmnr.sdk.decorators import observe from lmnr.sdk.browser.utils import retry_sync, retry_async +from lmnr.sdk.browser.pw_background_sending import ( + get_background_loop, + submit_sync_task, + track_async_send, +) from lmnr.sdk.client.synchronous.sync_client import LaminarClient from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient -from lmnr.opentelemetry_lib.tracing.context import get_current_context -from lmnr.opentelemetry_lib.tracing import TracerWrapper +from lmnr.sdk.log import get_default_logger from lmnr.sdk.types import MaskInputOptions try: @@ -34,7 +41,7 @@ "or `pip install patchright` to install one of the supported browsers." ) from e -logger = logging.getLogger(__name__) +logger = get_default_logger(__name__) OLD_BUFFER_TIMEOUT = 60 @@ -43,7 +50,7 @@ RRWEB_CONTENT = f"() => {{ {f.read()} }}" INJECT_PLACEHOLDER = """ -(mask_input_options) => { +(maskInputOptions) => { const BATCH_TIMEOUT = 2000; // Send events after 2 seconds const MAX_WORKER_PROMISES = 50; // Max concurrent worker promises const HEARTBEAT_INTERVAL = 2000; @@ -316,7 +323,7 @@ } // Worker-based compression for large objects - async function compressLargeObject(data, isLarge = true) { + async function compressLargeObject(data) { // Check if workers are supported first - if not, use main thread compression if (!testWorkerSupport()) { return await compressSmallObject(data); @@ -380,7 +387,6 @@ function isLargeEvent(type) { const LARGE_EVENT_TYPES = [ 2, // FullSnapshot - 3, // IncrementalSnapshot ]; if (LARGE_EVENT_TYPES.includes(type)) { @@ -470,8 +476,6 @@ } } - setInterval(sendBatchIfReady, BATCH_TIMEOUT); - async function bufferToBase64(buffer) { const base64url = await new Promise(r => { const reader = new FileReader() @@ -481,51 +485,57 @@ return base64url.slice(base64url.indexOf(',') + 1); } - window.lmnrRrweb.record({ - async emit(event) { - try { - const isLarge = isLargeEvent(event.type); - const compressedResult = isLarge ? - await compressLargeObject(event.data, true) : - await compressSmallObject(event.data); - - const base64Data = await bufferToBase64(compressedResult); - const eventToSend = { - ...event, - data: base64Data, - }; - window.lmnrRrwebEventsBatch.push(eventToSend); - } catch (error) { - console.warn('Failed to push event to batch', error); + if (!window.lmnrStartedRecordingEvents) { + setInterval(sendBatchIfReady, BATCH_TIMEOUT); + + window.lmnrRrweb.record({ + async emit(event) { + try { + const isLarge = isLargeEvent(event.type); + const compressedResult = isLarge ? + await compressLargeObject(event.data) : + await compressSmallObject(event.data); + + const base64Data = await bufferToBase64(compressedResult); + const eventToSend = { + ...event, + data: base64Data, + }; + window.lmnrRrwebEventsBatch.push(eventToSend); + } catch (error) { + console.warn('Failed to push event to batch', error); + } + }, + recordCanvas: true, + collectFonts: true, + recordCrossOriginIframes: true, + maskInputOptions: { + password: true, + textarea: maskInputOptions.textarea || false, + text: maskInputOptions.text || false, + number: maskInputOptions.number || false, + select: maskInputOptions.select || false, + email: maskInputOptions.email || false, + tel: maskInputOptions.tel || false, } - }, - recordCanvas: true, - collectFonts: true, - recordCrossOriginIframes: true, - maskInputOptions: { - password: true, - textarea: mask_input_options.textarea || false, - text: mask_input_options.text || false, - number: mask_input_options.number || false, - select: mask_input_options.select || false, - email: mask_input_options.email || false, - tel: mask_input_options.tel || false, - } - }); - - function heartbeat() { - // Add heartbeat events - setInterval(() => { - window.lmnrRrweb.record.addCustomEvent('heartbeat', { - title: document.title, - url: document.URL, - }) - }, HEARTBEAT_INTERVAL - ); - } + }); - heartbeat(); + function heartbeat() { + // Add heartbeat events + setInterval( + () => { + window.lmnrRrweb.record.addCustomEvent('heartbeat', { + title: document.title, + url: document.URL, + }) + }, + HEARTBEAT_INTERVAL, + ); + } + heartbeat(); + window.lmnrStartedRecordingEvents = true; + } } """ @@ -571,6 +581,8 @@ def inject_session_recorder_sync(page: SyncPage): def load_session_recorder(): try: + if page.is_closed(): + return False page.evaluate(RRWEB_CONTENT) return True except Exception as e: @@ -585,7 +597,8 @@ def load_session_recorder(): return try: - page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) + if not page.is_closed(): + page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) except Exception as e: logger.debug(f"Failed to inject session recorder: {e}") @@ -607,6 +620,8 @@ async def inject_session_recorder_async(page: Page): async def load_session_recorder(): try: + if page.is_closed(): + return False await page.evaluate(RRWEB_CONTENT) return True except Exception as e: @@ -621,7 +636,8 @@ async def load_session_recorder(): return try: - await page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) + if not page.is_closed(): + await page.evaluate(INJECT_PLACEHOLDER, get_mask_input_setting()) except Exception as e: logger.debug(f"Failed to inject session recorder placeholder: {e}") @@ -689,8 +705,14 @@ def send_events_from_browser(chunk): except Exception as e: logger.debug(f"Could not send events: {e}") + def submit_event(chunk): + try: + submit_sync_task(send_events_from_browser, chunk) + except Exception as e: + logger.debug(f"Error submitting event: {e}") + try: - page.expose_function("lmnrSendEvents", send_events_from_browser) + page.expose_function("lmnrSendEvents", submit_event) except Exception as e: logger.debug(f"Could not expose function: {e}") @@ -698,9 +720,10 @@ def send_events_from_browser(chunk): def on_load(p): try: - inject_session_recorder_sync(p) + if not p.is_closed(): + inject_session_recorder_sync(p) except Exception as e: - logger.error(f"Error in on_load handler: {e}") + logger.debug(f"Error in on_load handler: {e}") page.on("domcontentloaded", on_load) @@ -714,6 +737,9 @@ async def start_recording_events_async( trace_id = format(span.get_span_context().trace_id, "032x") span.set_attribute("lmnr.internal.has_browser_session", True) + # Get the background loop for async sends (independent of Playwright's loop) + background_loop = get_background_loop() + # Buffer for reassembling chunks chunk_buffers = {} @@ -746,9 +772,13 @@ async def send_events_from_browser(chunk): # Parse the JSON events = orjson.loads(full_data) - # Send to server + # Send to server in background loop (independent of Playwright's loop) if events and len(events) > 0: - await client._browser_events.send(session_id, trace_id, events) + future = asyncio.run_coroutine_threadsafe( + client._browser_events.send(session_id, trace_id, events), + background_loop, + ) + track_async_send(future) # Clean up buffer del chunk_buffers[batch_id] @@ -775,9 +805,11 @@ async def send_events_from_browser(chunk): async def on_load(p): try: - await inject_session_recorder_async(p) + # Check if page is closed before attempting to inject + if not p.is_closed(): + await inject_session_recorder_async(p) except Exception as e: - logger.error(f"Error in on_load handler: {e}") + logger.debug(f"Error in on_load handler: {e}") page.on("domcontentloaded", on_load) diff --git a/src/lmnr/sdk/browser/utils.py b/src/lmnr/sdk/browser/utils.py index ce22b2dd..d646c46c 100644 --- a/src/lmnr/sdk/browser/utils.py +++ b/src/lmnr/sdk/browser/utils.py @@ -42,7 +42,7 @@ def retry_sync(func, retries=5, delay=0.5, error_message="Operation failed"): if result: # If function returns truthy value, consider it successful return result if attempt == retries - 1: # Last attempt - logger.error(f"{error_message} after all retries") + logger.debug(f"{error_message} after all retries") return None except Exception as e: if attempt == retries - 1: # Last attempt From 09b79e0dc8bdbcdc9e577542f684d9f0c8728304 Mon Sep 17 00:00:00 2001 From: Din Date: Sun, 26 Oct 2025 10:19:01 +0000 Subject: [PATCH 3/5] use new constructs in cdp use as well --- ...d_sending.py => background_send_events.py} | 4 +--- src/lmnr/sdk/browser/cdp_utils.py | 19 +++++++++++++++---- src/lmnr/sdk/browser/pw_utils.py | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) rename src/lmnr/sdk/browser/{pw_background_sending.py => background_send_events.py} (98%) diff --git a/src/lmnr/sdk/browser/pw_background_sending.py b/src/lmnr/sdk/browser/background_send_events.py similarity index 98% rename from src/lmnr/sdk/browser/pw_background_sending.py rename to src/lmnr/sdk/browser/background_send_events.py index 947e860b..b0bf305a 100644 --- a/src/lmnr/sdk/browser/pw_background_sending.py +++ b/src/lmnr/sdk/browser/background_send_events.py @@ -154,9 +154,7 @@ def _cleanup_background_loop(): pending_count = len(futures_to_wait) if pending_count > 0: - logger.debug( - f"Waiting for {pending_count} async browser event requests to complete..." - ) + logger.info(f"Finsihing sending {pending_count} browser events...") # Wait for all pending futures to complete for future in futures_to_wait: diff --git a/src/lmnr/sdk/browser/cdp_utils.py b/src/lmnr/sdk/browser/cdp_utils.py index 47698539..9240d27b 100644 --- a/src/lmnr/sdk/browser/cdp_utils.py +++ b/src/lmnr/sdk/browser/cdp_utils.py @@ -1,5 +1,4 @@ import asyncio -import logging import orjson import os import time @@ -8,12 +7,17 @@ from lmnr.sdk.decorators import observe from lmnr.sdk.browser.utils import retry_async +from lmnr.sdk.browser.background_send_events import ( + get_background_loop, + track_async_send, +) from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient from lmnr.opentelemetry_lib.tracing.context import get_current_context from lmnr.opentelemetry_lib.tracing import TracerWrapper +from lmnr.sdk.log import get_default_logger from lmnr.sdk.types import MaskInputOptions -logger = logging.getLogger(__name__) +logger = get_default_logger(__name__) OLD_BUFFER_TIMEOUT = 60 CDP_OPERATION_TIMEOUT_SECONDS = 10 @@ -787,6 +791,9 @@ async def start_recording_events( logger.debug("Failed to inject session recorder, not registering bindings") return + # Get the background loop for async sends (independent of CDP's loop) + background_loop = get_background_loop() + # Buffer for reassembling chunks chunk_buffers = {} @@ -819,9 +826,13 @@ async def send_events_from_browser(chunk: dict): # Parse the JSON events = orjson.loads(full_data) - # Send to server + # Send to server in background loop (independent of CDP's loop) if events and len(events) > 0: - await client._browser_events.send(lmnr_session_id, trace_id, events) + future = asyncio.run_coroutine_threadsafe( + client._browser_events.send(lmnr_session_id, trace_id, events), + background_loop, + ) + track_async_send(future) # Clean up buffer del chunk_buffers[batch_id] diff --git a/src/lmnr/sdk/browser/pw_utils.py b/src/lmnr/sdk/browser/pw_utils.py index 9bd39f94..9c145cb5 100644 --- a/src/lmnr/sdk/browser/pw_utils.py +++ b/src/lmnr/sdk/browser/pw_utils.py @@ -11,7 +11,7 @@ from lmnr.opentelemetry_lib.utils.package_check import is_package_installed from lmnr.sdk.decorators import observe from lmnr.sdk.browser.utils import retry_sync, retry_async -from lmnr.sdk.browser.pw_background_sending import ( +from lmnr.sdk.browser.background_send_events import ( get_background_loop, submit_sync_task, track_async_send, From fe036aef45950ae84cd8f18c29f5ecc9b5fb99bf Mon Sep 17 00:00:00 2001 From: Dinmukhamed Mailibay <47117969+dinmukhamedm@users.noreply.github.com> Date: Sun, 26 Oct 2025 11:21:55 +0000 Subject: [PATCH 4/5] Update src/lmnr/sdk/browser/background_send_events.py Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> --- src/lmnr/sdk/browser/background_send_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lmnr/sdk/browser/background_send_events.py b/src/lmnr/sdk/browser/background_send_events.py index b0bf305a..d03994b0 100644 --- a/src/lmnr/sdk/browser/background_send_events.py +++ b/src/lmnr/sdk/browser/background_send_events.py @@ -154,7 +154,7 @@ def _cleanup_background_loop(): pending_count = len(futures_to_wait) if pending_count > 0: - logger.info(f"Finsihing sending {pending_count} browser events...") + logger.info(f"Finishing sending {pending_count} browser events...") # Wait for all pending futures to complete for future in futures_to_wait: From f5408b9b8ba51bd16cfed9e67f421b1c5359cfc1 Mon Sep 17 00:00:00 2001 From: Din Date: Tue, 28 Oct 2025 13:11:44 +0000 Subject: [PATCH 5/5] run the async client in sync tasks as well --- .../tracing/_instrument_initializers.py | 14 +- .../opentelemetry_lib/tracing/instruments.py | 2 +- .../sdk/browser/background_send_events.py | 47 +--- src/lmnr/sdk/browser/patchright_otel.py | 8 +- src/lmnr/sdk/browser/playwright_otel.py | 16 +- src/lmnr/sdk/browser/pw_utils.py | 206 +++++++++--------- 6 files changed, 134 insertions(+), 159 deletions(-) diff --git a/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py b/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py index 93b93de7..e5dfffcc 100644 --- a/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py +++ b/src/lmnr/opentelemetry_lib/tracing/_instrument_initializers.py @@ -80,7 +80,7 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None: class BrowserUseSessionInstrumentorInitializer(InstrumentorInitializer): def init_instrumentor( - self, client, async_client, *args, **kwargs + self, async_client, *args, **kwargs ) -> BaseInstrumentor | None: if not is_package_installed("browser-use"): return None @@ -344,17 +344,17 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None: class PatchrightInstrumentorInitializer(InstrumentorInitializer): def init_instrumentor( - self, client, async_client, *args, **kwargs + self, async_client, *args, **kwargs ) -> BaseInstrumentor | None: if not is_package_installed("patchright"): return None from lmnr.sdk.browser.patchright_otel import PatchrightInstrumentor - if client is None and async_client is None: + if async_client is None: return None - return PatchrightInstrumentor(client, async_client) + return PatchrightInstrumentor(async_client) class PineconeInstrumentorInitializer(InstrumentorInitializer): @@ -371,17 +371,17 @@ def init_instrumentor(self, *args, **kwargs) -> BaseInstrumentor | None: class PlaywrightInstrumentorInitializer(InstrumentorInitializer): def init_instrumentor( - self, client, async_client, *args, **kwargs + self, async_client, *args, **kwargs ) -> BaseInstrumentor | None: if not is_package_installed("playwright"): return None from lmnr.sdk.browser.playwright_otel import PlaywrightInstrumentor - if client is None and async_client is None: + if async_client is None: return None - return PlaywrightInstrumentor(client, async_client) + return PlaywrightInstrumentor(async_client) class QdrantInstrumentorInitializer(InstrumentorInitializer): diff --git a/src/lmnr/opentelemetry_lib/tracing/instruments.py b/src/lmnr/opentelemetry_lib/tracing/instruments.py index e3042ff5..ea64fa2e 100644 --- a/src/lmnr/opentelemetry_lib/tracing/instruments.py +++ b/src/lmnr/opentelemetry_lib/tracing/instruments.py @@ -124,7 +124,7 @@ def init_instrumentations( continue try: - instrumentor = initializer.init_instrumentor(client, async_client) + instrumentor = initializer.init_instrumentor(async_client) if instrumentor is None: continue if not instrumentor.is_instrumented_by_opentelemetry: diff --git a/src/lmnr/sdk/browser/background_send_events.py b/src/lmnr/sdk/browser/background_send_events.py index d03994b0..91ad9c73 100644 --- a/src/lmnr/sdk/browser/background_send_events.py +++ b/src/lmnr/sdk/browser/background_send_events.py @@ -1,17 +1,13 @@ """ -Background sending for browser events (sync and async). +Background sending for browser events. This module provides background execution for HTTP requests that send browser events, ensuring sends never block the main execution flow while guaranteeing completion at program exit. -## Sync Version (ThreadPoolExecutor) -For synchronous Playwright code, uses a thread pool to handle sends in background -threads. The executor automatically waits for completion on shutdown. - -## Async Version (Background Event Loop) -For async Playwright code, uses a dedicated event loop running in a separate thread -to handle async HTTP requests. This architecture provides: +## Background Event Loop Architecture +Uses a dedicated event loop running in a separate thread to handle async HTTP requests. +This architecture provides: 1. **Non-blocking execution**: Sends happen in the background, never blocking the main thread or Playwright's event loop, allowing browser automation to continue smoothly. @@ -24,15 +20,14 @@ program exit. The pattern uses `asyncio.run_coroutine_threadsafe()` to submit async coroutines -from Playwright's loop to our background loop, maintaining pure async code while -achieving cross-loop execution. +from any thread (sync or async) to our background loop, maintaining pure async code +while achieving cross-thread execution. """ import asyncio import atexit import threading -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Callable +from typing import Any from lmnr.sdk.log import get_default_logger @@ -48,28 +43,7 @@ THREAD_JOIN_TIMEOUT_SECONDS = 5 # ============================================================================== -# Sync version: ThreadPoolExecutor for background sends -# ============================================================================== - -_sync_executor = ThreadPoolExecutor(thread_name_prefix="lmnr-sync-submit") - - -def submit_sync_task(func: Callable, *args) -> None: - """ - Submit a synchronous task to run in the background thread pool. - - The task will execute in a background thread without blocking the main thread. - On program exit, the executor automatically waits for all pending tasks. - - Args: - func: The function to execute - *args: Arguments to pass to the function - """ - _sync_executor.submit(func, *args) - - -# ============================================================================== -# Async version: Background event loop for async sends +# Background event loop for async sends # ============================================================================== # Background event loop state @@ -154,7 +128,10 @@ def _cleanup_background_loop(): pending_count = len(futures_to_wait) if pending_count > 0: - logger.info(f"Finishing sending {pending_count} browser events...") + logger.info( + f"Finishing sending {pending_count} browser events... " + "Ctrl+C to cancel (may result in incomplete session recording)." + ) # Wait for all pending futures to complete for future in futures_to_wait: diff --git a/src/lmnr/sdk/browser/patchright_otel.py b/src/lmnr/sdk/browser/patchright_otel.py index ef01feb4..4dca3015 100644 --- a/src/lmnr/sdk/browser/patchright_otel.py +++ b/src/lmnr/sdk/browser/patchright_otel.py @@ -6,7 +6,6 @@ _wrap_new_context_sync, _wrap_new_context_async, ) -from lmnr.sdk.client.synchronous.sync_client import LaminarClient from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap @@ -97,9 +96,8 @@ class PatchrightInstrumentor(BaseInstrumentor): - def __init__(self, client: LaminarClient, async_client: AsyncLaminarClient): + def __init__(self, async_client: AsyncLaminarClient): super().__init__() - self.client = client self.async_client = async_client def instrumentation_dependencies(self) -> Collection[str]: @@ -109,6 +107,8 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + # Both sync and async methods use async_client + # because we are using a background asyncio loop for async sends for wrapped_method in WRAPPED_METHODS: wrap_package = wrapped_method.get("package") wrap_object = wrapped_method.get("object") @@ -119,7 +119,7 @@ def _instrument(self, **kwargs): f"{wrap_object}.{wrap_method}", wrapped_method.get("wrapper")( tracer, - self.client, + self.async_client, wrapped_method, ), ) diff --git a/src/lmnr/sdk/browser/playwright_otel.py b/src/lmnr/sdk/browser/playwright_otel.py index f59eefb4..d89a410d 100644 --- a/src/lmnr/sdk/browser/playwright_otel.py +++ b/src/lmnr/sdk/browser/playwright_otel.py @@ -10,7 +10,6 @@ ) from lmnr.sdk.browser.utils import with_tracer_and_client_wrapper from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient -from lmnr.sdk.client.synchronous.sync_client import LaminarClient from lmnr.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -55,7 +54,7 @@ @with_tracer_and_client_wrapper def _wrap_new_browser_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): browser: SyncBrowser = wrapped(*args, **kwargs) session_id = str(uuid.uuid4().hex) @@ -98,7 +97,7 @@ async def page_handler(page): @with_tracer_and_client_wrapper def _wrap_new_context_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): context: SyncBrowserContext = wrapped(*args, **kwargs) session_id = str(uuid.uuid4().hex) @@ -140,7 +139,7 @@ async def page_handler(page): @with_tracer_and_client_wrapper def _wrap_bring_to_front_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): wrapped(*args, **kwargs) take_full_snapshot(instance) @@ -156,7 +155,7 @@ async def _wrap_bring_to_front_async( @with_tracer_and_client_wrapper def _wrap_browser_new_page_sync( - tracer: Tracer, client: LaminarClient, to_wrap, wrapped, instance, args, kwargs + tracer: Tracer, client: AsyncLaminarClient, to_wrap, wrapped, instance, args, kwargs ): page = wrapped(*args, **kwargs) session_id = str(uuid.uuid4().hex) @@ -266,9 +265,8 @@ async def _wrap_browser_new_page_async( class PlaywrightInstrumentor(BaseInstrumentor): - def __init__(self, client: LaminarClient, async_client: AsyncLaminarClient): + def __init__(self, async_client: AsyncLaminarClient): super().__init__() - self.client = client self.async_client = async_client def instrumentation_dependencies(self) -> Collection[str]: @@ -278,6 +276,8 @@ def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(__name__, __version__, tracer_provider) + # Both sync and async methods use async_client because we are using + # a background asyncio loop for async sends for wrapped_method in WRAPPED_METHODS: wrap_package = wrapped_method.get("package") wrap_object = wrapped_method.get("object") @@ -288,7 +288,7 @@ def _instrument(self, **kwargs): f"{wrap_object}.{wrap_method}", wrapped_method.get("wrapper")( tracer, - self.client, + self.async_client, wrapped_method, ), ) diff --git a/src/lmnr/sdk/browser/pw_utils.py b/src/lmnr/sdk/browser/pw_utils.py index 9c145cb5..2560fbbc 100644 --- a/src/lmnr/sdk/browser/pw_utils.py +++ b/src/lmnr/sdk/browser/pw_utils.py @@ -13,10 +13,8 @@ from lmnr.sdk.browser.utils import retry_sync, retry_async from lmnr.sdk.browser.background_send_events import ( get_background_loop, - submit_sync_task, track_async_send, ) -from lmnr.sdk.client.synchronous.sync_client import LaminarClient from lmnr.sdk.client.asynchronous.async_client import AsyncLaminarClient from lmnr.sdk.log import get_default_logger from lmnr.sdk.types import MaskInputOptions @@ -45,6 +43,88 @@ OLD_BUFFER_TIMEOUT = 60 + +def create_send_events_handler( + chunk_buffers: dict, + session_id: str, + trace_id: str, + client: AsyncLaminarClient, + background_loop: asyncio.AbstractEventLoop, +): + """ + Create an async event handler for sending browser events. + + This handler reassembles chunked event data and submits it to the background + loop for async HTTP sending. The handler itself processes chunks synchronously + but delegates the actual HTTP send to the background loop. + + Args: + chunk_buffers: Dictionary to store incomplete chunk batches + session_id: Browser session ID + trace_id: OpenTelemetry trace ID + client: Async Laminar client for HTTP requests + background_loop: Background event loop for async sends + + Returns: + An async function that handles incoming event chunks from the browser + """ + + async def send_events_from_browser(chunk): + try: + # Handle chunked data + batch_id = chunk["batchId"] + chunk_index = chunk["chunkIndex"] + total_chunks = chunk["totalChunks"] + data = chunk["data"] + + # Initialize buffer for this batch if needed + if batch_id not in chunk_buffers: + chunk_buffers[batch_id] = { + "chunks": {}, + "total": total_chunks, + "timestamp": time.time(), + } + + # Store chunk + chunk_buffers[batch_id]["chunks"][chunk_index] = data + + # Check if we have all chunks + if len(chunk_buffers[batch_id]["chunks"]) == total_chunks: + # Reassemble the full message + full_data = "" + for i in range(total_chunks): + full_data += chunk_buffers[batch_id]["chunks"][i] + + # Parse the JSON + events = orjson.loads(full_data) + + # Send to server in background loop (independent of Playwright's loop) + if events and len(events) > 0: + future = asyncio.run_coroutine_threadsafe( + client._browser_events.send(session_id, trace_id, events), + background_loop, + ) + track_async_send(future) + + # Clean up buffer + del chunk_buffers[batch_id] + + # Clean up old incomplete buffers + current_time = time.time() + to_delete = [] + for bid, buffer in chunk_buffers.items(): + if current_time - buffer["timestamp"] > OLD_BUFFER_TIMEOUT: + to_delete.append(bid) + for bid in to_delete: + logger.debug(f"Cleaning up incomplete chunk buffer: {bid}") + del chunk_buffers[bid] + + except Exception as e: + logger.debug(f"Could not send events: {e}") + + return send_events_from_browser + + current_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(current_dir, "recorder", "record.umd.min.cjs"), "r") as f: RRWEB_CONTENT = f"() => {{ {f.read()} }}" @@ -646,68 +726,34 @@ async def load_session_recorder(): @observe(name="playwright.page", ignore_input=True, ignore_output=True) -def start_recording_events_sync(page: SyncPage, session_id: str, client: LaminarClient): +def start_recording_events_sync( + page: SyncPage, session_id: str, client: AsyncLaminarClient +): ctx = get_current_context() span = trace.get_current_span(ctx) trace_id = format(span.get_span_context().trace_id, "032x") span.set_attribute("lmnr.internal.has_browser_session", True) + # Get the background loop for async sends + background_loop = get_background_loop() + # Buffer for reassembling chunks chunk_buffers = {} - def send_events_from_browser(chunk): - try: - # Handle chunked data - batch_id = chunk["batchId"] - chunk_index = chunk["chunkIndex"] - total_chunks = chunk["totalChunks"] - data = chunk["data"] - - # Initialize buffer for this batch if needed - if batch_id not in chunk_buffers: - chunk_buffers[batch_id] = { - "chunks": {}, - "total": total_chunks, - "timestamp": time.time(), - } - - # Store chunk - chunk_buffers[batch_id]["chunks"][chunk_index] = data - - # Check if we have all chunks - if len(chunk_buffers[batch_id]["chunks"]) == total_chunks: - # Reassemble the full message - full_data = "".join( - chunk_buffers[batch_id]["chunks"][i] for i in range(total_chunks) - ) - - # Parse the JSON - events = orjson.loads(full_data) - - # Send to server - if events and len(events) > 0: - client._browser_events.send(session_id, trace_id, events) - - # Clean up buffer - del chunk_buffers[batch_id] - - # Clean up old incomplete buffers - current_time = time.time() - to_delete = [] - for bid, buffer in chunk_buffers.items(): - if current_time - buffer["timestamp"] > OLD_BUFFER_TIMEOUT: - to_delete.append(bid) - for bid in to_delete: - logger.debug(f"Cleaning up incomplete chunk buffer: {bid}") - del chunk_buffers[bid] - - except Exception as e: - logger.debug(f"Could not send events: {e}") + # Create the async event handler (shared implementation) + send_events_from_browser = create_send_events_handler( + chunk_buffers, session_id, trace_id, client, background_loop + ) def submit_event(chunk): + """Sync wrapper that submits async handler to background loop.""" try: - submit_sync_task(send_events_from_browser, chunk) + # Submit async handler to background loop + asyncio.run_coroutine_threadsafe( + send_events_from_browser(chunk), + background_loop, + ) except Exception as e: logger.debug(f"Error submitting event: {e}") @@ -743,58 +789,10 @@ async def start_recording_events_async( # Buffer for reassembling chunks chunk_buffers = {} - async def send_events_from_browser(chunk): - try: - # Handle chunked data - batch_id = chunk["batchId"] - chunk_index = chunk["chunkIndex"] - total_chunks = chunk["totalChunks"] - data = chunk["data"] - - # Initialize buffer for this batch if needed - if batch_id not in chunk_buffers: - chunk_buffers[batch_id] = { - "chunks": {}, - "total": total_chunks, - "timestamp": time.time(), - } - - # Store chunk - chunk_buffers[batch_id]["chunks"][chunk_index] = data - - # Check if we have all chunks - if len(chunk_buffers[batch_id]["chunks"]) == total_chunks: - # Reassemble the full message - full_data = "" - for i in range(total_chunks): - full_data += chunk_buffers[batch_id]["chunks"][i] - - # Parse the JSON - events = orjson.loads(full_data) - - # Send to server in background loop (independent of Playwright's loop) - if events and len(events) > 0: - future = asyncio.run_coroutine_threadsafe( - client._browser_events.send(session_id, trace_id, events), - background_loop, - ) - track_async_send(future) - - # Clean up buffer - del chunk_buffers[batch_id] - - # Clean up old incomplete buffers - current_time = time.time() - to_delete = [] - for bid, buffer in chunk_buffers.items(): - if current_time - buffer["timestamp"] > OLD_BUFFER_TIMEOUT: - to_delete.append(bid) - for bid in to_delete: - logger.debug(f"Cleaning up incomplete chunk buffer: {bid}") - del chunk_buffers[bid] - - except Exception as e: - logger.debug(f"Could not send events: {e}") + # Create the async event handler (shared implementation) + send_events_from_browser = create_send_events_handler( + chunk_buffers, session_id, trace_id, client, background_loop + ) try: await page.expose_function("lmnrSendEvents", send_events_from_browser)