Skip to content

feat: Enable streaming output #86

@ezynda3

Description

@ezynda3

Enable Streaming Output for LLM Responses

Overview

Implement streaming output for LLM responses to provide real-time display of generated content, improving user experience with faster perceived response times and modern AI assistant feel.

Current State Analysis

Message Display: MCPHost uses a sophisticated UI system with:

  • DisplayAssistantMessage() - waits for complete response, then displays entire message at once
  • DisplayStreamingMessage() - exists but collects all chunks then displays complete message (not true streaming)
  • Message rendering through MessageRenderer and MessageContainer with rich styling

Streaming Infrastructure:

  • All providers support Eino's Stream() method returning *schema.StreamReader[*schema.Message]
  • Agent currently only uses model.Generate() (non-streaming) for tool calling and multi-step reasoning
  • UI has streaming method but it's not truly streaming - just collects then displays

Enhanced Implementation Strategy: Eino ReAct Pattern

Based on the Eino ReAct agent implementation, we should adopt their sophisticated streaming tool call detection pattern.

Key Insights from Eino ReAct Agent

1. StreamToolCallChecker Pattern
The Eino ReAct agent uses a StreamToolCallChecker function to handle different provider behaviors:

  • OpenAI: Outputs tool calls directly in early chunks
  • Claude/Anthropic: Outputs text content first, then tool calls later
  • Custom Logic: Allows provider-specific tool call detection

2. Provider-Specific Streaming Behavior

// From Eino ReAct agent - handles different streaming patterns
type StreamToolCallChecker func(ctx context.Context, modelOutput *schema.StreamReader[*schema.Message]) (bool, error)

// Default implementation (works for OpenAI)
func firstChunkStreamToolCallChecker(_ context.Context, sr *schema.StreamReader[*schema.Message]) (bool, error) {
    defer sr.Close()
    
    for {
        msg, err := sr.Recv()
        if err == io.EOF {
            return false, nil
        }
        if err != nil {
            return false, err
        }
        
        if len(msg.ToolCalls) > 0 {
            return true, nil
        }
        
        if len(msg.Content) == 0 { // skip empty chunks at the front
            continue
        }
        
        return false, nil
    }
}

3. Stream Processing with Tool Call Detection
The agent processes streams while checking for tool calls, then branches accordingly.

Implementation Strategy: Smart Streaming by Default

Default Behavior: Always attempt streaming first, with intelligent provider-aware tool call detection.

Phase 1: Provider-Aware Stream Tool Call Detection

1. Implement StreamToolCallChecker Interface

// StreamToolCallChecker determines if streaming output contains tool calls
type StreamToolCallChecker func(ctx context.Context, modelOutput *schema.StreamReader[*schema.Message]) (hasToolCalls bool, content string, err error)

// Provider-specific implementations
func anthropicStreamToolCallChecker(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (bool, string, error) {
    defer sr.Close()
    
    var fullContent strings.Builder
    var toolCallDetected bool
    
    for {
        msg, err := sr.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return false, "", err
        }
        
        fullContent.WriteString(msg.Content)
        
        // Claude typically outputs tool calls after text content
        if len(msg.ToolCalls) > 0 {
            toolCallDetected = true
            break
        }
        
        // Check for Claude-specific tool call patterns in accumulated content
        content := fullContent.String()
        if strings.Contains(content, "<function_calls>") || 
           strings.Contains(content, "I'll use the") ||
           strings.Contains(content, "Let me use") {
            toolCallDetected = true
            break
        }
    }
    
    return toolCallDetected, fullContent.String(), nil
}

func openaiStreamToolCallChecker(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (bool, string, error) {
    defer sr.Close()
    
    var content strings.Builder
    
    for {
        msg, err := sr.Recv()
        if err == io.EOF {
            return false, content.String(), nil
        }
        if err != nil {
            return false, "", err
        }
        
        content.WriteString(msg.Content)
        
        // OpenAI outputs tool calls in early chunks
        if len(msg.ToolCalls) > 0 {
            return true, content.String(), nil
        }
        
        // If we have content and no tool calls, likely no tools coming
        if len(msg.Content) > 0 {
            return false, content.String(), nil
        }
    }
}

2. Provider-Aware Agent Streaming (internal/agent/agent.go)

type StreamingConfig struct {
    ToolCallChecker StreamToolCallChecker
    DisplayHandler  func(content string) error
}

func (a *Agent) generateWithStreamingFirst(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo) (*schema.Message, error) {
    // Get provider-specific tool call checker
    checker := a.getProviderToolCallChecker()
    
    // Always try streaming first
    reader, err := a.model.Stream(ctx, messages, model.WithTools(toolInfos))
    if err != nil {
        // Fallback to non-streaming if streaming fails
        return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
    }
    
    // Check if stream contains tool calls using provider-specific logic
    hasToolCalls, content, err := checker(ctx, reader)
    if err != nil {
        // Fallback to non-streaming on error
        return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
    }
    
    if hasToolCalls && len(toolInfos) > 0 {
        // Tool calls detected - restart with non-streaming for proper tool handling
        return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
    }
    
    // No tool calls - display the streamed content and return
    if a.cli != nil {
        a.cli.DisplayStreamedContent(content)
    }
    
    return &schema.Message{
        Role:    schema.Assistant,
        Content: content,
    }, nil
}

func (a *Agent) getProviderToolCallChecker() StreamToolCallChecker {
    // Determine provider type and return appropriate checker
    switch a.getProviderType() {
    case "anthropic":
        return anthropicStreamToolCallChecker
    case "openai", "azure":
        return openaiStreamToolCallChecker
    case "gemini":
        return geminiStreamToolCallChecker
    case "ollama":
        return ollamaStreamToolCallChecker
    default:
        return defaultStreamToolCallChecker
    }
}

Phase 2: Enhanced UI Streaming Integration

3. Dual-Mode Streaming Display (internal/ui/cli.go)

// DisplayStreamedContent displays pre-collected streaming content
func (c *CLI) DisplayStreamedContent(content string) error {
    return c.DisplayAssistantMessageWithModel(content, "")
}

// DisplayStreamingMessageLive displays streaming content with real-time updates
func (c *CLI) DisplayStreamingMessageLive(reader *schema.StreamReader[*schema.Message], modelName string, checker StreamToolCallChecker) error {
    // Create initial empty assistant message
    msg := c.messageRenderer.RenderAssistantMessage("", time.Now(), modelName)
    c.messageContainer.AddMessage(msg)
    c.displayContainer()
    
    var content strings.Builder
    
    // Use provider-specific checker to handle streaming
    hasToolCalls, finalContent, err := checker(context.Background(), reader)
    if err != nil {
        return err
    }
    
    if hasToolCalls {
        // Tool calls detected - let agent handle with non-streaming
        return fmt.Errorf("tool calls detected in stream")
    }
    
    // Display the final content
    c.messageContainer.UpdateLastMessage(finalContent)
    c.displayContainer()
    
    return nil
}

Phase 3: Advanced Streaming Features

4. Streaming Branch Logic (inspired by Eino ReAct)

func (a *Agent) processStreamWithBranching(ctx context.Context, reader *schema.StreamReader[*schema.Message], messages []*schema.Message, toolInfos []*schema.ToolInfo) (*schema.Message, error) {
    checker := a.getProviderToolCallChecker()
    
    // Check stream for tool calls
    hasToolCalls, content, err := checker(ctx, reader)
    if err != nil {
        return nil, err
    }
    
    if hasToolCalls && len(toolInfos) > 0 {
        // Branch to tool execution path
        return a.handleToolCallsFromStream(ctx, content, messages, toolInfos)
    }
    
    // Branch to final response path
    if a.cli != nil {
        a.cli.DisplayStreamedContent(content)
    }
    
    return &schema.Message{
        Role:    schema.Assistant,
        Content: content,
    }, nil
}

Provider-Specific Streaming Patterns

Anthropic Claude

  • Pattern: Text content first, then tool calls
  • Detection: Look for <function_calls>, "I'll use", "Let me use" patterns
  • Strategy: Collect full stream, then check for tool patterns

OpenAI

  • Pattern: Tool calls in early chunks
  • Detection: Check first few chunks for ToolCalls field
  • Strategy: Early detection, quick branching

Google Gemini

  • Pattern: Similar to OpenAI
  • Detection: Check for tool calls in early chunks
  • Strategy: Early detection with Gemini-specific patterns

Ollama

  • Pattern: Varies by model
  • Detection: Conservative approach, check full stream
  • Strategy: Collect more content before deciding

Specific Code Changes

1. internal/agent/agent.go - Provider-aware streaming

// Add provider detection
func (a *Agent) getProviderType() string {
    // Detect provider type from model configuration
    // This could be stored during agent creation
    return a.providerType
}

// Replace generateWithCancellation to use streaming by default
func (a *Agent) generateWithCancellation(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo) (*schema.Message, error) {
    // ... existing cancellation setup ...
    
    go func() {
        // Use provider-aware streaming-first approach
        message, err := a.generateWithStreamingFirst(llmCtx, messages, toolInfos)
        if err != nil {
            err = fmt.Errorf("failed to generate response: %v", err)
        }
        resultChan <- struct {
            message *schema.Message
            err     error
        }{message, err}
    }()
    
    // ... rest of cancellation logic ...
}

2. internal/ui/cli.go - Enhanced streaming display

// UpdateLastMessage updates the content of the last message efficiently
func (c *MessageContainer) UpdateLastMessage(content string) {
    if len(c.messages) == 0 {
        return
    }
    
    lastIdx := len(c.messages) - 1
    lastMsg := &c.messages[lastIdx]
    
    // Only re-render if content actually changed
    if lastMsg.Type == AssistantMessage {
        newMsg := c.renderer.RenderAssistantMessage(content, lastMsg.Timestamp, "")
        c.messages[lastIdx] = newMsg
    }
}

3. Provider-specific configurations

// Add to provider creation
type ProviderConfig struct {
    // ... existing fields ...
    StreamingConfig *StreamingConfig
}

type StreamingConfig struct {
    ToolCallChecker StreamToolCallChecker
    BufferSize      int
    UpdateInterval  time.Duration
}

Benefits

  1. Provider-Aware: Handles different streaming behaviors correctly
  2. Better UX: Users see responses as they're generated
  3. Intelligent Fallback: Automatically switches to non-streaming for tool calls
  4. Anthropic Optimized: Properly handles Claude's text-first, tool-calls-later pattern
  5. Performance: Efficient stream processing with minimal overhead
  6. Robust: Graceful error handling and fallback mechanisms

Implementation Priority

  1. Phase 1: Provider-aware stream tool call detection
  2. Phase 2: Enhanced UI streaming integration
  3. Phase 3: Advanced streaming features and optimizations
  4. Phase 4: Configuration options and fine-tuning

Key Considerations

  1. Anthropic Specifics: Claude outputs text before tool calls - need full stream analysis
  2. OpenAI Efficiency: Tool calls appear early - can branch quickly
  3. Performance: Stream processing should be efficient and non-blocking
  4. Error Handling: Graceful degradation if streaming fails
  5. Tool Call Accuracy: Must not miss tool calls due to streaming optimizations

Files to Modify

  • internal/agent/agent.go - Core streaming logic with provider awareness
  • internal/ui/cli.go - Enhanced streaming display methods
  • internal/ui/messages.go - Efficient message container updates
  • internal/models/providers.go - Provider-specific streaming configurations
  • cmd/root.go - Integration with main loop
  • Configuration files for streaming preferences

Testing Strategy

  • Test with all supported providers (Anthropic, OpenAI, Gemini, Ollama, Azure)
  • Test tool calling scenarios with each provider's streaming pattern
  • Test mixed scenarios (text + tool calls)
  • Test cancellation during streaming
  • Test error handling and fallback scenarios
  • Performance testing with frequent UI updates
  • Test provider-specific tool call detection accuracy

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions