Skip to content

Move telemetry off event loop to dedicated worker thread#6626

Open
masenf wants to merge 3 commits into
mainfrom
claude/happy-bardeen-7m3x3u
Open

Move telemetry off event loop to dedicated worker thread#6626
masenf wants to merge 3 commits into
mainfrom
claude/happy-bardeen-7m3x3u

Conversation

@masenf

@masenf masenf commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Type of change

  • Bug fix (non-breaking change which fixes an issue)

Description

Fixes #6618: telemetry collection and delivery were blocking the asyncio event loop when send() was called from within a running loop (e.g., backend error telemetry). The blocking syscalls, subprocess calls, and synchronous HTTP requests used to gather and post events would stall the event loop.

Changes:

  1. Replace asyncio task-based approach with a dedicated worker thread pool:

    • Removed asyncio.create_task() and background task tracking
    • Introduced a single-worker ThreadPoolExecutor (_get_telemetry_executor()) that lazily initializes on first use
    • All telemetry work now runs on this dedicated worker thread, never on the caller's thread or event loop
  2. New helper functions:

    • _get_telemetry_executor(): Returns the process-wide, lazily-created telemetry executor
    • _submit(): Queues work on the executor, swallowing all errors so telemetry never breaks the app
    • _run_suppressed(): Wraps executor work to catch and suppress exceptions at debug level
    • _flush(): Blocks until queued telemetry has been processed (for tests and shutdown)
    • _process_event(): Extracted from send() to run on the worker thread
  3. Simplified send() API:

    • Now a thin wrapper that queues _process_event() on the worker thread
    • Resolves telemetry_enabled from config if unspecified
    • All error handling is suppressed; failures never surface to the caller
  4. Test coverage:

    • Added _drain_telemetry_executor fixture to prevent test leakage
    • New tests verify: lazy initialization, single-worker constraint, off-loop execution, event loop safety, error suppression
    • Updated existing test to flush the executor before assertions

Testing

  • Added comprehensive unit tests covering executor behavior, thread isolation, and error handling
  • Existing telemetry tests updated to flush the executor
  • All tests pass with adequate coverage

Checklist

  • Tests pass with adequate coverage
  • uv run ruff check . and uv run ruff format . clean
  • uv run pyright reflex tests passes
  • No user-facing API changes (internal refactor)

https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN

Telemetry collection uses blocking syscalls and subprocess calls and a
synchronous httpx.post. The asyncio "send" path ran that work directly
on the event loop thread (a task that never awaited), so it stalled the
loop -- notably when emitting runtime-error telemetry at a high rate --
and the no-event-loop path blocked the caller synchronously.

Run all telemetry collection and delivery on a lazily-initialized,
single-worker ThreadPoolExecutor whose FIFO queue serializes events off
the caller's thread. Delivery is best-effort: every failure is
suppressed so telemetry can never break the app. The pool is created
only when telemetry is enabled and there is something to send, and
concurrent.futures drains its queue at interpreter exit so CLI events
are not lost.

Closes #6618

https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN
@masenf masenf requested a review from a team as a code owner June 8, 2026 19:14
@greptile-apps

greptile-apps Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR moves telemetry collection and delivery off the asyncio event loop onto a dedicated single-worker ThreadPoolExecutor, fixing event-loop stalls caused by blocking syscalls and synchronous HTTP requests that previously ran inside an asyncio task.

  • send() is now a thin wrapper that resolves config and queues _process_event onto the worker thread; all blocking work (subprocess calls, HTTP post) happens only on that thread.
  • New helper functions (_get_telemetry_executor, _submit, _run_suppressed, _flush) form a clean, error-suppressing dispatch layer with a flush primitive for tests and graceful shutdown.
  • Tests cover lazy init, single-worker constraint, off-loop execution, event-loop safety, and error suppression; an autouse fixture flushes the executor between tests to prevent job leakage.

Confidence Score: 4/5

Safe to merge; the core bug fix is correct and all blocking work is now off the event loop.

The executor setup, double-checked locking, error-suppression chain, and the recursive-alias-send path via the single worker are all handled correctly. The one rough edge is that _flush(timeout=...) swallows TimeoutError, making it impossible for callers to confirm the drain completed — currently only tests use _flush without a timeout, so this has no runtime impact today.

No files need special attention; both changed files are straightforward and the logic is sound.

Important Files Changed

Filename Overview
reflex/utils/telemetry.py Replaces asyncio-task telemetry with a single-worker ThreadPoolExecutor; double-checked locking, error suppression, and flush sentinel are all correct. Minor: _flush() silently discards TimeoutError, so a caller passing timeout= cannot distinguish clean flush from expiry.
tests/units/test_telemetry.py Good new coverage: lazy init, single-worker, off-thread execution, event-loop safety, error suppression. Async test runs correctly under asyncio_mode="auto". _drain_telemetry_executor autouse fixture prevents cross-test leakage.

Reviews (1): Last reviewed commit: "fix(telemetry): process telemetry events..." | Re-trigger Greptile

Comment thread reflex/utils/telemetry.py
Comment on lines +535 to +538
if _executor is None:
return
with suppress(Exception):
_executor.submit(lambda: None).result(timeout)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _flush(timeout=...) silently swallows concurrent.futures.TimeoutError, so a caller that passes a finite timeout has no way to tell whether the flush actually completed or just expired. The suppress is appropriate for catastrophic errors, but letting TimeoutError propagate (or returning a bool) would let the test harness or shutdown code detect an incomplete drain.

Suggested change
if _executor is None:
return
with suppress(Exception):
_executor.submit(lambda: None).result(timeout)
if _executor is None:
return
with suppress(Exception):
future = _executor.submit(lambda: None)
try:
future.result(timeout)
except Exception:
pass

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the unobservable drain. Addressed in cc83909, though with a different shape than the suggestion: _flush now returns bool (True when the queue drained or no worker was ever started, False on timeout) and still never raises, which keeps it safe for best-effort shutdown paths.

I avoided the proposed snippet because it would still swallow TimeoutError via except Exception: pass, and future would be unbound (NameError) if submit() raised inside the preceding with suppress(...). Added tests for both the drained and timed-out branches.


Generated by Claude Code

@codspeed-hq

codspeed-hq Bot commented Jun 8, 2026

Copy link
Copy Markdown

Merging this PR will not alter performance

✅ 26 untouched benchmarks
⏩ 8 skipped benchmarks1


Comparing claude/happy-bardeen-7m3x3u (e88322e) with main (31f785e)

Open in CodSpeed

Footnotes

  1. 8 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

claude added 2 commits June 8, 2026 19:50
_flush() swallowed concurrent.futures.TimeoutError, so a caller passing
a finite timeout could not distinguish a clean drain from an expiry.

Return a bool instead: True when the queue drained (or no worker was
started), False on timeout. _flush still never raises, preserving its
best-effort contract for shutdown paths. Add tests for both branches.

https://claude.ai/code/session_011DzmkWzgwGcJL9rdiBTLVN

@FarhanAliRaza FarhanAliRaza left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now accept the latency atexit time. because now it sends all the remaining events at the end before exiting at the default timeout of 5 seconds of httpx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Send telemetry event in thread so as to never block async event loop

3 participants