From c4809df8a3ef836c608915d5fe87ecce4bff668a Mon Sep 17 00:00:00 2001 From: Siddhant Shah Date: Mon, 16 Mar 2026 18:58:45 +0530 Subject: [PATCH] feat(context fix): fixed sessions and users for langchain callback --- src/openlayer/lib/integrations/langchain_callback.py | 7 +++++-- src/openlayer/lib/tracing/tracer.py | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/openlayer/lib/integrations/langchain_callback.py b/src/openlayer/lib/integrations/langchain_callback.py index b33adb06..2066b84c 100644 --- a/src/openlayer/lib/integrations/langchain_callback.py +++ b/src/openlayer/lib/integrations/langchain_callback.py @@ -1,6 +1,7 @@ """Module with the Openlayer callback handler for LangChain.""" # pylint: disable=unused-argument +import contextvars import time from typing import Any, Callable, Dict, List, Optional, Union from uuid import UUID @@ -190,8 +191,9 @@ def _end_step( ): trace = self._traces_by_root.pop(run_id) if tracer._configured_background_publish_enabled: + ctx = contextvars.copy_context() tracer._get_background_executor().submit( - self._process_and_upload_trace, trace + ctx.run, self._process_and_upload_trace, trace ) else: self._process_and_upload_trace(trace) @@ -1247,8 +1249,9 @@ def _end_step( if is_root_step and has_standalone_trace and not self._has_external_trace: trace = self._traces_by_root.pop(run_id) if tracer._configured_background_publish_enabled: + ctx = contextvars.copy_context() tracer._get_background_executor().submit( - self._process_and_upload_async_trace, trace + ctx.run, self._process_and_upload_async_trace, trace ) else: self._process_and_upload_async_trace(trace) diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index c3312ef3..319ebbf3 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -1631,9 +1631,12 @@ def _handle_trace_completion( if _publish: if _configured_background_publish_enabled: - # Submit to background thread pool + # Submit to background thread pool, copying context so that + # contextvars (user_id, session_id, etc.) are preserved. + ctx = contextvars.copy_context() executor = _get_background_executor() executor.submit( + ctx.run, _upload_and_publish_trace, current_trace, resolved_pipeline_id,