-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Description
Streaming deadlocks when using async tools/callbacks - sync_send_to_stream
blocks event loop
🐛 Bug Description
When using DSPy's streaming feature (dspy.streamify()
) with async tools (like MCP tools or ReAct tool calls), the streaming freezes indefinitely after a tool completes execution. The stream never continues past "Tool calling finished!" status message.
💥 Error Message
WARNING dspy.utils.callback: Error when applying callback <dspy.streaming.messages.StatusStreamingCallback object>'s end handler on function acall: Not running inside an AnyIO worker thread, and no event loop token was provided.
🔍 Root Cause
The sync_send_to_stream()
function in dspy/streaming/messages.py
blocks the event loop in two ways:
- When event loop IS running (lines 45-47):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(run_in_new_loop)
return future.result() # ← BLOCKS the main event loop!
- When event loop is NOT running (line 50):
return syncify(_send)() # ← Raises AnyIO error without proper context
The blocking future.result()
call prevents the stream from continuing, causing a deadlock when StatusStreamingCallback.on_tool_end()
fires after async tool execution.
📋 Reproduction Steps
import asyncio
import dspy
from dspy.streaming import StreamListener
# 1. Set up streaming with async tool
lm = dspy.LM(model="openai/gpt-4o")
def my_async_tool(query: str) -> str:
"""An async tool that simulates work."""
import time
time.sleep(0.1) # Simulate async work
return f"Result for: {query}"
signature = dspy.Signature(
{"input": (str, dspy.InputField()), "output": (str, dspy.OutputField())}
)
# 2. Create ReAct program with tools
program = dspy.ReAct(signature, tools=[my_async_tool], max_iters=2)
# 3. Wrap with streamify
streaming_task = dspy.streamify(
program,
is_async_program=True,
stream_listeners=[StreamListener(signature_field_name="output")]
)
# 4. Execute - this will freeze after tool execution
async def run():
with dspy.context(lm=lm):
stream = streaming_task(input="test query")
async for value in stream:
print(value)
asyncio.run(run())
# ⚠️ Stream freezes at "Tool calling finished!" - never completes
🔧 Proposed Fix
Replace the blocking implementation with non-blocking fire-and-forget:
def sync_send_to_stream(stream, message):
"""Send message to stream without blocking the event loop."""
async def _send():
try:
await stream.send(message)
except Exception:
pass # Gracefully handle send failures
try:
loop = asyncio.get_running_loop()
# Non-blocking: schedule as background task (fire-and-forget)
loop.create_task(_send())
except RuntimeError:
# No event loop running - safe to create new one
try:
asyncio.run(_send())
except Exception:
pass # Gracefully handle failures
Key changes:
- Remove
ThreadPoolExecutor
+ blockingfuture.result()
- Use
loop.create_task()
for fire-and-forget pattern - Never block the calling thread
- Gracefully handle errors
✅ Testing
We've tested this fix extensively with:
- Multiple concurrent streams
- MCP tool integrations
- WebSocket broadcasting
- Long-running tool executions
The fix eliminates the deadlock while maintaining all streaming functionality.
🌍 Impact
This bug affects any use case involving:
- Async tools with streaming
- ReAct agents with tool calls
- Real-time status updates during tool execution
- Multi-agent systems with parallel streaming
📦 Version Info
- DSPy Version: 3.0.4b1 (also present in 3.0.0)
- Python: 3.10+
- Environment: Async event loop context
🤝 Contribution
We have a working implementation and can submit a PR with tests if this approach looks good to the maintainers.
Let me know if you need any additional details or reproduction examples!