Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ mcphost -p "Generate a random UUID" --quiet | tr '[:lower:]' '[:upper:]'
- `-m, --model string`: Model to use (format: provider:model) (default "anthropic:claude-sonnet-4-20250514")
- `-p, --prompt string`: **Run in non-interactive mode with the given prompt**
- `--quiet`: **Suppress all output except the AI response (only works with --prompt)**
- `--stream`: Enable streaming responses (default: true, use `--stream=false` to disable)

### Authentication Subcommands
- `mcphost auth login anthropic`: Authenticate with Anthropic using OAuth (alternative to API keys)
Expand Down Expand Up @@ -625,6 +626,9 @@ top-p: 0.95
top-k: 40
stop-sequences: ["Human:", "Assistant:"]

# Streaming configuration
stream: false # Disable streaming (default: true)

# API Configuration
provider-api-key: "your-api-key" # For OpenAI, Anthropic, or Google
provider-url: "https://api.openai.com/v1" # Custom base URL
Expand Down
74 changes: 64 additions & 10 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
quietFlag bool
noExitFlag bool
maxSteps int
streamFlag bool // Enable streaming output
scriptMCPConfig *config.Config // Used to override config in script mode

// Session management
Expand Down Expand Up @@ -163,6 +164,8 @@ func init() {
BoolVar(&noExitFlag, "no-exit", false, "prevent non-interactive mode from exiting, show input prompt instead")
rootCmd.PersistentFlags().
IntVar(&maxSteps, "max-steps", 0, "maximum number of agent steps (0 for unlimited)")
rootCmd.PersistentFlags().
BoolVar(&streamFlag, "stream", true, "enable streaming output for faster response display")

// Session management flags
rootCmd.PersistentFlags().
Expand Down Expand Up @@ -192,6 +195,7 @@ func init() {
viper.BindPFlag("debug", rootCmd.PersistentFlags().Lookup("debug"))
viper.BindPFlag("prompt", rootCmd.PersistentFlags().Lookup("prompt"))
viper.BindPFlag("max-steps", rootCmd.PersistentFlags().Lookup("max-steps"))
viper.BindPFlag("stream", rootCmd.PersistentFlags().Lookup("stream"))
viper.BindPFlag("provider-url", rootCmd.PersistentFlags().Lookup("provider-url"))
viper.BindPFlag("provider-api-key", rootCmd.PersistentFlags().Lookup("provider-api-key"))
viper.BindPFlag("max-tokens", rootCmd.PersistentFlags().Lookup("max-tokens"))
Expand Down Expand Up @@ -285,10 +289,11 @@ func runNormalMode(ctx context.Context) error {

// Create agent configuration
agentConfig := &agent.AgentConfig{
ModelConfig: modelConfig,
MCPConfig: mcpConfig,
SystemPrompt: systemPrompt,
MaxSteps: viper.GetInt("max-steps"), // Pass 0 for infinite, agent will handle it
ModelConfig: modelConfig,
MCPConfig: mcpConfig,
SystemPrompt: systemPrompt,
MaxSteps: viper.GetInt("max-steps"), // Pass 0 for infinite, agent will handle it
StreamingEnabled: viper.GetBool("stream"),
}

// Create the agent with spinner for Ollama models
Expand Down Expand Up @@ -635,7 +640,40 @@ func runAgenticStep(ctx context.Context, mcpAgent *agent.Agent, cli *ui.CLI, mes
currentSpinner.Start()
}

result, err := mcpAgent.GenerateWithLoop(ctx, messages,
// Create streaming callback for real-time display
var streamingCallback agent.StreamingResponseHandler
var responseWasStreamed bool
var lastDisplayedContent string
var streamingContent strings.Builder
var streamingStarted bool
if cli != nil && !config.Quiet {
streamingCallback = func(chunk string) {
// Stop spinner before first chunk if still running
if currentSpinner != nil {
currentSpinner.Stop()
currentSpinner = nil
}
// Mark that this response is being streamed
responseWasStreamed = true

// Start streaming message on first chunk
if !streamingStarted {
cli.StartStreamingMessage(config.ModelName)
streamingStarted = true
}

// Accumulate content and update message
streamingContent.WriteString(chunk)
cli.UpdateStreamingMessage(streamingContent.String())
}
}

// Reset streaming state before agent execution
responseWasStreamed = false
streamingStarted = false
streamingContent.Reset()

result, err := mcpAgent.GenerateWithLoopAndStreaming(ctx, messages,
// Tool call handler - called when a tool is about to be executed
func(toolName, toolArgs string) {
if !config.Quiet && cli != nil {
Expand Down Expand Up @@ -713,18 +751,31 @@ func runAgenticStep(ctx context.Context, mcpAgent *agent.Agent, cli *ui.CLI, mes
},
// Tool call content handler - called when content accompanies tool calls
func(content string) {
if !config.Quiet && cli != nil {
if !config.Quiet && cli != nil && !responseWasStreamed {
// Only display if content wasn't already streamed
// Stop spinner before displaying content
if currentSpinner != nil {
currentSpinner.Stop()
currentSpinner = nil
}
cli.DisplayAssistantMessageWithModel(content, config.ModelName)
lastDisplayedContent = content
// Start spinner again for tool calls
currentSpinner = ui.NewSpinner("Thinking...")
currentSpinner.Start()
} else if responseWasStreamed {
// Content was already streamed, just track it and manage spinner
lastDisplayedContent = content
if currentSpinner != nil {
currentSpinner.Stop()
currentSpinner = nil
}
// Start spinner again for tool calls
currentSpinner = ui.NewSpinner("Thinking...")
currentSpinner.Start()
}
},
streamingCallback, // Add streaming callback as the last parameter
)

// Make sure spinner is stopped if still running
Expand All @@ -743,14 +794,17 @@ func runAgenticStep(ctx context.Context, mcpAgent *agent.Agent, cli *ui.CLI, mes
response := result.FinalResponse
conversationMessages := result.ConversationMessages

// Display assistant response with model name (skip if quiet)
if !config.Quiet && cli != nil {
// Display assistant response with model name
// Skip if: quiet mode, same content already displayed, or if streaming completed the full response
streamedFullResponse := responseWasStreamed && streamingContent.String() == response.Content
if !config.Quiet && cli != nil && response.Content != lastDisplayedContent && response.Content != "" && !streamedFullResponse {
if err := cli.DisplayAssistantMessageWithModel(response.Content, config.ModelName); err != nil {
cli.DisplayError(fmt.Errorf("display error: %v", err))
return nil, nil, err
}

// Update usage tracking with the last user message and response
} else if streamedFullResponse {
// Streaming was used - the message is already displayed in the message component
// Just update usage tracking with the last user message and response
if len(messages) > 0 {
lastUserMessage := ""
// Find the last user message
Expand Down
139 changes: 119 additions & 20 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

tea "github.com/charmbracelet/bubbletea"
Expand All @@ -18,10 +19,11 @@ import (

// AgentConfig is the config for agent.
type AgentConfig struct {
ModelConfig *models.ProviderConfig
MCPConfig *config.Config
SystemPrompt string
MaxSteps int
ModelConfig *models.ProviderConfig
MCPConfig *config.Config
SystemPrompt string
MaxSteps int
StreamingEnabled bool
}

// ToolCallHandler is a function type for handling tool calls as they happen
Expand All @@ -36,16 +38,21 @@ type ToolResultHandler func(toolName, toolArgs, result string, isError bool)
// ResponseHandler is a function type for handling LLM responses
type ResponseHandler func(content string)

// StreamingResponseHandler is a function type for handling streaming LLM responses
type StreamingResponseHandler func(content string)

// ToolCallContentHandler is a function type for handling content that accompanies tool calls
type ToolCallContentHandler func(content string)

// Agent is the agent with real-time tool call display.
type Agent struct {
toolManager *tools.MCPToolManager
model model.ToolCallingChatModel
maxSteps int
systemPrompt string
loadingMessage string // Message from provider loading (e.g., GPU fallback info)
toolManager *tools.MCPToolManager
model model.ToolCallingChatModel
maxSteps int
systemPrompt string
loadingMessage string // Message from provider loading (e.g., GPU fallback info)
providerType string // Provider type for streaming behavior
streamingEnabled bool // Whether streaming is enabled
}

// NewAgent creates an agent with MCP tool integration and real-time tool call display
Expand All @@ -62,12 +69,23 @@ func NewAgent(ctx context.Context, config *AgentConfig) (*Agent, error) {
return nil, fmt.Errorf("failed to load MCP tools: %v", err)
}

// Determine provider type from model string
providerType := "default"
if config.ModelConfig != nil && config.ModelConfig.ModelString != "" {
parts := strings.SplitN(config.ModelConfig.ModelString, ":", 2)
if len(parts) >= 1 {
providerType = parts[0]
}
}

return &Agent{
toolManager: toolManager,
model: providerResult.Model,
maxSteps: config.MaxSteps, // Keep 0 for infinite, handle in loop
systemPrompt: config.SystemPrompt,
loadingMessage: providerResult.Message,
toolManager: toolManager,
model: providerResult.Model,
maxSteps: config.MaxSteps, // Keep 0 for infinite, handle in loop
systemPrompt: config.SystemPrompt,
loadingMessage: providerResult.Message,
providerType: providerType,
streamingEnabled: config.StreamingEnabled,
}, nil
}

Expand All @@ -81,6 +99,13 @@ type GenerateWithLoopResult struct {
func (a *Agent) GenerateWithLoop(ctx context.Context, messages []*schema.Message,
onToolCall ToolCallHandler, onToolExecution ToolExecutionHandler, onToolResult ToolResultHandler, onResponse ResponseHandler, onToolCallContent ToolCallContentHandler) (*GenerateWithLoopResult, error) {

return a.GenerateWithLoopAndStreaming(ctx, messages, onToolCall, onToolExecution, onToolResult, onResponse, onToolCallContent, nil)
}

// GenerateWithLoopAndStreaming processes messages with a custom loop that displays tool calls in real-time and supports streaming callbacks
func (a *Agent) GenerateWithLoopAndStreaming(ctx context.Context, messages []*schema.Message,
onToolCall ToolCallHandler, onToolExecution ToolExecutionHandler, onToolResult ToolResultHandler, onResponse ResponseHandler, onToolCallContent ToolCallContentHandler, onStreamingResponse StreamingResponseHandler) (*GenerateWithLoopResult, error) {

// Create a copy of messages to avoid modifying the original
workingMessages := make([]*schema.Message, len(messages))
copy(workingMessages, messages)
Expand Down Expand Up @@ -125,17 +150,16 @@ func (a *Agent) GenerateWithLoop(ctx context.Context, messages []*schema.Message
}

// Call the LLM with cancellation support
response, err := a.generateWithCancellation(ctx, workingMessages, toolInfos)
response, err := a.generateWithCancellationAndStreaming(ctx, workingMessages, toolInfos, onStreamingResponse)
if err != nil {
return nil, err
}

// Add response to working messages
workingMessages = append(workingMessages, response)

// Check if this is a tool call or final response
if len(response.ToolCalls) > 0 {

// Check if this is a tool call or final response
if len(response.ToolCalls) > 0 {
// Display any content that accompanies the tool calls
if response.Content != "" && onToolCallContent != nil {
onToolCallContent(response.Content)
Expand Down Expand Up @@ -227,8 +251,83 @@ func (a *Agent) GetLoadingMessage() string {
return a.loadingMessage
}

// generateWithCancellation calls the LLM with ESC key cancellation support
func (a *Agent) generateWithCancellation(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo) (*schema.Message, error) {


// generateWithCancellationAndStreaming calls the LLM with ESC key cancellation support and streaming callbacks
func (a *Agent) generateWithCancellationAndStreaming(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo, streamingCallback StreamingResponseHandler) (*schema.Message, error) {
// Check if streaming is enabled
if !a.streamingEnabled {
// Use traditional non-streaming approach
return a.generateWithoutStreaming(ctx, messages, toolInfos)
}

// Try streaming first if no tools are expected or if we can detect tool calls early
if len(toolInfos) == 0 {
// No tools available, use streaming directly
return a.generateWithStreamingAndCallback(ctx, messages, toolInfos, streamingCallback)
}

// Try streaming with tool call detection
return a.generateWithStreamingFirstAndCallback(ctx, messages, toolInfos, streamingCallback)
}





// generateWithStreamingAndCallback uses streaming for responses without tool calls with real-time callbacks
func (a *Agent) generateWithStreamingAndCallback(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo, callback StreamingResponseHandler) (*schema.Message, error) {
// Try streaming first
reader, err := a.model.Stream(ctx, messages, model.WithTools(toolInfos))
if err != nil {
// Fallback to non-streaming if streaming fails
return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
}

// Use streaming with callback for real-time display
response, err := StreamWithCallback(ctx, reader, func(chunk string) {
if callback != nil {
callback(chunk)
}
})
if err != nil {
// Fallback to non-streaming on error
return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
}

// Return the complete streamed response (with tool calls if any)
return response, nil
}

// generateWithStreamingFirstAndCallback attempts streaming first with provider-aware tool call detection and callbacks
func (a *Agent) generateWithStreamingFirstAndCallback(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo, callback StreamingResponseHandler) (*schema.Message, error) {
// Try streaming first
reader, err := a.model.Stream(ctx, messages, model.WithTools(toolInfos))
if err != nil {
// Fallback to non-streaming if streaming fails
return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
}

// Use streaming with callback for real-time display
response, err := StreamWithCallback(ctx, reader, func(chunk string) {
if callback != nil {
callback(chunk)
}
})
if err != nil {
// Fallback to non-streaming on error
return a.model.Generate(ctx, messages, model.WithTools(toolInfos))
}

// Return the complete streamed response (with tool calls if any)
// No need to restart - we have everything we need!
return response, nil
}



// generateWithoutStreaming uses the traditional non-streaming approach
func (a *Agent) generateWithoutStreaming(ctx context.Context, messages []*schema.Message, toolInfos []*schema.ToolInfo) (*schema.Message, error) {
// Create a cancellable context for just this LLM call
llmCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading