diff --git a/dspy/utils/parallelizer.py b/dspy/utils/parallelizer.py index c32f5e3ebb..5b5544d3d3 100644 --- a/dspy/utils/parallelizer.py +++ b/dspy/utils/parallelizer.py @@ -89,10 +89,11 @@ def worker(parent_overrides, submission_id, index, item): from dspy.dsp.utils.settings import thread_local_overrides original = thread_local_overrides.get() - token = thread_local_overrides.set({**original, **parent_overrides.copy()}) - if parent_overrides.get("usage_tracker"): + new_overrides = {**original, **parent_overrides.copy()} + if new_overrides.get("usage_tracker"): # Usage tracker needs to be deep copied across threads so that each thread tracks its own usage - thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"]) + new_overrides["usage_tracker"] = copy.deepcopy(new_overrides["usage_tracker"]) + token = thread_local_overrides.set(new_overrides) try: return index, function(item) diff --git a/tests/utils/test_parallelizer.py b/tests/utils/test_parallelizer.py index 128614ffc8..a93aeaa78d 100644 --- a/tests/utils/test_parallelizer.py +++ b/tests/utils/test_parallelizer.py @@ -2,7 +2,9 @@ import pytest +import dspy from dspy.utils.parallelizer import ParallelExecutor +from dspy.utils.usage_tracker import UsageTracker def test_worker_threads_independence(): @@ -83,3 +85,54 @@ def task(item): assert str(executor.exceptions_map[2]) == "test error for 3" assert isinstance(executor.exceptions_map[4], RuntimeError) assert str(executor.exceptions_map[4]) == "test error for 5" + + +def test_parallel_executor_with_usage_tracker_in_context(): + """Test that ParallelExecutor works correctly with UsageTracker inside dspy.context block. + + This tests the fix for the AttributeError: '_contextvars.ContextVar' object has no attribute 'overrides' + bug that occurred when using UsageTracker with ParallelExecutor inside a dspy.context block. + """ + tracker = UsageTracker() + + def task(item): + return item * 2 + + data = [1, 2, 3, 4, 5] + + # This should not raise AttributeError + with dspy.context(usage_tracker=tracker): + executor = ParallelExecutor(num_threads=2) + results = executor.execute(task, data) + + assert results == [2, 4, 6, 8, 10] + + +def test_parallel_executor_with_usage_tracker_tracks_independently(): + """Test that each thread gets its own deep copy of the UsageTracker.""" + parent_tracker = UsageTracker() + + # Add some initial usage data to the parent tracker + parent_tracker.add_usage("test-model", {"prompt_tokens": 100, "completion_tokens": 50}) + + def task(item): + from dspy.dsp.utils.settings import settings + + tracker = settings.usage_tracker + # Each thread should have its own tracker (deep copy of parent) + # Add usage specific to this thread + tracker.add_usage("test-model", {"prompt_tokens": item, "completion_tokens": item}) + return (item, len(tracker.usage_data["test-model"])) + + data = [1, 2, 3] + + with dspy.context(usage_tracker=parent_tracker): + executor = ParallelExecutor(num_threads=3) + results = executor.execute(task, data) + + # Each thread should have its own tracker with 2 entries (1 from parent, 1 from thread) + for item, entry_count in results: + assert entry_count == 2, f"Thread processing item {item} should have 2 entries, got {entry_count}" + + # Parent tracker should only have its original entry (not modified by threads) + assert len(parent_tracker.usage_data["test-model"]) == 1