diff --git a/README.md b/README.md index a6875a3..2a6cb81 100644 --- a/README.md +++ b/README.md @@ -114,18 +114,29 @@ The functions include a built-in encryption mechanism for sensitive information: ### **2. [N8N Pipeline](./pipelines/n8n/n8n.py)** +> [!TIP] +> **N8N Workflow Automation Integration** +> +> Connect Open WebUI with N8N to leverage powerful workflow automation. Includes configurable AI Agent tool usage display for complete transparency into your agent's actions. + - Integrates **Open WebUI** with **N8N**, an automation and workflow platform. -- Streaming support for real-time data processing. +- **AI Agent Tool Usage Display (v2.2.0)** 🛠️: Shows tool calls from N8N AI Agent workflows with three verbosity levels (minimal, compact, detailed) and customizable length limits (non-streaming mode only). +- Streaming and non-streaming support for real-time and batch data processing. - Sends messages from Open WebUI to an **N8N webhook**. - Supports real-time message processing with dynamic field handling. - Enables automation of AI-generated responses within an **N8N workflow**. - Supports encryption of sensitive information like API keys. - Here is an example [N8N workflow](./pipelines/n8n/Open_WebUI_Test_Agent.json) for [N8N Pipeline](./pipelines/n8n/n8n.py) +> [!IMPORTANT] +> **Tool Usage Display Limitation**: The AI Agent tool call display currently only works in **non-streaming mode** due to N8N's current streaming implementation. The code is future-proof and will automatically work when N8N adds `intermediateSteps` to streaming responses. + 🔗 [N8N Pipeline in Open WebUI](https://openwebui.com/f/owndev/n8n_pipeline) 🔗 [Learn More About N8N](https://n8n.io/) +📖 [N8N Tool Usage Display Documentation](./docs/n8n-tool-usage-display.md) + ### **3. [Infomaniak](./pipelines/infomaniak/infomaniak.py)** - Integrates **Open WebUI** with **Infomaniak**, a Swiss web hosting and cloud services provider. diff --git a/docs/n8n-integration.md b/docs/n8n-integration.md index d9ff21d..08c6f2e 100644 --- a/docs/n8n-integration.md +++ b/docs/n8n-integration.md @@ -3,7 +3,10 @@ This integration allows Open WebUI to communicate with workflows created in **n8n**, a powerful workflow automation tool. Messages are sent and received via webhook endpoints, making it easy to plug Open WebUI into your existing automation pipelines. > [!NOTE] -> **Recent Improvements (v2.1.0)**: Enhanced streaming support with consistent response handling, automatic systemPrompt deduplication, simplified configuration, and improved error messaging. +> **Recent Improvements (v2.2.0)**: Added AI Agent tool usage display with collapsible details sections. The pipeline now extracts and displays tool calls from `intermediateSteps` in **non-streaming mode**, showing tool names, inputs, and results in a user-friendly format. + +> [!IMPORTANT] +> **Tool Usage Display**: AI Agent tool calls are currently only visible in **non-streaming mode** due to N8N's streaming implementation. The code is future-proof and will automatically work if N8N adds `intermediateSteps` support to streaming responses. 🔗 [Learn More About N8N](https://n8n.io/) @@ -18,6 +21,19 @@ This integration allows Open WebUI to communicate with workflows created in **n8 ## Features +> [!TIP] +> **AI Agent Tool Usage Display (v2.2.0)** 🛠️ +> +> Automatically extracts and displays tool calls from N8N AI Agent workflows in **non-streaming mode**. +> +> - 📊 **Three verbosity levels**: `minimal` (names only), `compact` (names + preview), `detailed` (full info) +> - 📏 **Customizable length limits**: Control input/output text length +> - 🎯 **Flexible configuration**: Adapt display to your needs +> +> Shows tool names, inputs, and results from the `intermediateSteps` array to provide transparency into the AI agent's workflow execution. +> +> 📖 [Learn more about Tool Usage Display](./n8n-tool-usage-display.md) + - **Streaming & Non-Streaming Support** Automatic detection and handling of both streaming and non-streaming responses with consistent output formatting. diff --git a/docs/n8n-tool-usage-display.md b/docs/n8n-tool-usage-display.md new file mode 100644 index 0000000..f244210 --- /dev/null +++ b/docs/n8n-tool-usage-display.md @@ -0,0 +1,297 @@ +# N8N AI Agent Tool Usage Display + +## Overview + +Starting with version 2.2.0, the N8N pipeline automatically displays AI Agent tool calls in a user-friendly format. When your N8N workflow includes an AI Agent node that uses tools (like Wikipedia, Date/Time, Calculator, etc.), the pipeline will extract and display detailed information about each tool invocation. + +Version 2.2.0 includes **configurable verbosity levels** and **length limits** for tool display, allowing you to control how detailed the tool information is shown. + +## Important Limitation + +> [!IMPORTANT] +> **⚠️ Non-Streaming Mode Only**: Tool usage display is currently only available in **non-streaming mode**. N8N's AI Agent streaming responses do not include the `intermediateSteps` field, which is required to show tool calls. This is a limitation of N8N's streaming implementation, not the pipeline. +> +> **To see tool calls**: Configure your N8N workflow to use **non-streaming responses** (remove or disable streaming in the "Respond to Webhook" node). + +## Features + +### Automatic Detection + +- Works with **non-streaming** N8N responses +- Automatically extracts `intermediateSteps` from the N8N response payload +- Configurable verbosity levels (v2.2.0+) +- Customizable length limits for inputs and outputs (v2.2.0+) + +### Verbosity Levels (v2.2.0+) + +> [!TIP] +> **Configure via Pipeline Settings**: Set `TOOL_DISPLAY_VERBOSITY` to control the detail level. + +#### 1. **Minimal** (`minimal`) + +Shows only tool names in a collapsible list: + +```txt +🛠️ Tool Calls (4 steps) ▶ + 1. Date_Time + 2. Crypto + 3. Wikipedia + 4. Calculator +``` + +**Best for**: Quick overview, minimal UI clutter, collapsible for space-saving + +#### 2. **Compact** (`compact`) + +Shows tool names with short result previews: + +```txt +🛠️ Tool Calls (4 steps) ▶ + 1. Date_Time → [{"currentDate":"2025-10-10T11:22:38.697+01:00"}] + 2. Crypto → [{"Property_Name":"uuid4","uuid4":"c28495ca-1eb8-419f-8941-7a38c753e809"}] + 3. Wikipedia → Page: List of tz database time zones... + 4. Calculator → 128675.5415958103 +``` + +**Best for**: Balance between overview and detail + +#### 3. **Detailed** (`detailed`) - Default + +Shows full collapsible sections with all information: + +```txt +🛠️ Tool Calls (4 steps) ▶ + ├─ Step 1: Date_Time ▶ + │ 🔧 Tool: Date_Time + │ 🆔 Call ID: call_i7JgGhwh7xV0ghqR9mD4qTJ9 + │ 📥 Input: {"Include_Current_Time": true, "Timezone": "Europe/London"} + │ 📤 Result: [{"currentDate":"2025-10-10T11:22:38.697+01:00"}] + │ 📝 Log: Invoking "Date_Time" with... + ├─ Step 2: Crypto ▶ + └─ ... +``` + +**Best for**: Debugging, full transparency + +### Length Limits (v2.2.0+) + +> [!NOTE] +> **Control Output Size**: Set `TOOL_INPUT_MAX_LENGTH` and `TOOL_OUTPUT_MAX_LENGTH` to limit text length. + +- **`TOOL_INPUT_MAX_LENGTH`** (default: 500): Maximum characters for tool input display in `detailed` and `compact` modes +- **`TOOL_OUTPUT_MAX_LENGTH`** (default: 500): Maximum characters for tool output/observation display +- Set to **0** for unlimited length (no truncation) + +> [!IMPORTANT] +> **Behavior with `0` (unlimited)**: +> +> - **Detailed mode**: Shows complete input and output without any truncation +> - **Compact mode**: For inputs, shows full data. For outputs, still uses a 100-character preview for UI readability + +**Example**: For very long Wikipedia results, set `TOOL_OUTPUT_MAX_LENGTH` to 200 to show only the first 200 characters, or set to 0 to show everything. + +### Rich Display Format (Detailed Mode) + +Each tool call is displayed with: + +- 🔧 **Tool Name**: The name of the tool that was invoked +- 🆔 **Call ID**: Unique identifier for debugging (e.g., `call_FB0sIgrwuIGJkOaROor7raU2`) +- 📥 **Input**: The parameters passed to the tool (formatted as JSON, respects `TOOL_INPUT_MAX_LENGTH`) +- 📤 **Result**: The tool's response/observation (respects `TOOL_OUTPUT_MAX_LENGTH`) +- 📝 **Log**: Optional log messages from the tool execution (max 200 chars) + +### Collapsible UI + +Uses HTML `
` tags for a clean, expandable interface: + +Click to expand each step and view full details (in detailed mode). + +## Example + +### N8N Response Format + +Your N8N AI Agent workflow should return data in this format: + +```json +[ + { + "output": "Current time in Europe/London: 2025-10-10 09:46:45 BST (UTC+1)...", + "intermediateSteps": [ + { + "action": { + "tool": "Date_Time", + "toolInput": { + "Include_Current_Time": true, + "Timezone": "Europe/London" + }, + "toolCallId": "call_FB0sIgrwuIGJkOaROor7raU2", + "log": "Calling Date_Time with input: {...}" + }, + "observation": "[{\"currentDate\":\"2025-10-10T09:46:45.754+01:00\"}]" + }, + { + "action": { + "tool": "Wikipedia", + "toolInput": { + "input": "Europe/London time zone Wikipedia" + }, + "toolCallId": "call_QFUtaSdUI2PtgjhkDTmbRknt", + "log": "Calling Wikipedia with input: {...}" + }, + "observation": "Page: Time zone\nSummary: Time zones are regions..." + } + ] + } +] +``` + +### UI Display + +The user will see: + +1. **Main Response**: The agent's text response from the `output` field +2. **Tool Calls Section**: A collapsible section with all tool invocations + +## Implementation Details + +### Streaming Mode ⚠️ + +> **Not Supported**: N8N AI Agent does not include `intermediateSteps` in streaming responses. The streaming mode only sends content chunks, not metadata. This is a limitation of N8N's implementation. + +### Non-Streaming Mode ✅ + +- Tool calls are extracted from the complete response JSON +- Supports both array `[{...}]` and object `{...}` response formats +- Automatically detects and formats all tool calls from `intermediateSteps` + +### Data Structure Support + +The pipeline handles both response formats from N8N: + +**Array Format (typical for streaming):** + +```json +[ + { + "output": "...", + "intermediateSteps": [...] + } +] +``` + +**Object Format (typical for non-streaming):** + +```json +{ + "output": "...", + "intermediateSteps": [...] +} +``` + +## N8N Workflow Configuration + +To enable this feature, your N8N workflow must: + +1. **Use AI Agent Node**: Include an AI Agent node with tools +2. **Disable Streaming**: In the "Respond to Webhook" node, ensure streaming is disabled +3. **Return intermediateSteps**: Ensure your workflow returns the `intermediateSteps` array in the response + +### Example N8N Workflow Structure + +```txt +Webhook Trigger + ↓ +AI Agent (with tools: Wikipedia, Date/Time, etc.) + ↓ +Function Node (format response) + ↓ +Respond to Webhook +``` + +**Function Node Code Example:** + +```javascript +// Get the AI Agent output +const agentOutput = $('AI Agent').item.json; + +return { + output: agentOutput.output, + intermediateSteps: agentOutput.intermediateSteps || [] +}; +``` + +## Supported Tools + +The display works with any N8N tool, including: + +- 📅 Date/Time +- 📚 Wikipedia +- 🔍 Search +- 🧮 Calculator +- 🌐 HTTP Request +- 📧 Email +- 💾 Database queries +- And any custom tools you create! + +## Configuration + +### Pipeline Settings + +Configure tool display in the N8N pipeline settings (Admin Panel → Functions → N8N Pipeline): + +| Setting | Default | Description | +|---------|---------|-------------| +| `TOOL_DISPLAY_VERBOSITY` | `detailed` | Display mode: `minimal`, `compact`, or `detailed` | +| `TOOL_INPUT_MAX_LENGTH` | `500` | Maximum characters for tool input. Set to `0` for unlimited (no truncation) | +| `TOOL_OUTPUT_MAX_LENGTH` | `500` | Maximum characters for tool output. Set to `0` for unlimited in detailed mode (compact mode uses 100 char preview) | + +### Recommendations + +> [!TIP] +> **For Production Use**: +> +> - Use `compact` mode for cleaner UI with essential info +> - Set `TOOL_OUTPUT_MAX_LENGTH` to 200-300 for long outputs like Wikipedia +> +> **For Development/Debugging**: +> +> - Use `detailed` mode to see all information +> - Set lengths to 0 (unlimited) to see complete data +> +> **For Minimal UI**: +> +> - Use `minimal` mode to just show tool names +> - Perfect for when you only need to know which tools were called + +## Troubleshooting + +### Tool Calls Not Showing? + +Check that: + +1. ✅ Your N8N workflow includes an AI Agent node with tools +2. ✅ The response includes the `intermediateSteps` array +3. ✅ The N8N pipeline version is 2.2.0 or higher +4. ✅ The response structure matches the expected format (see examples above) +5. ✅ You're using non-streaming mode (streaming doesn't support tool display) + +### Debugging + +Enable debug logging in the pipeline to see: + +- Number of intermediate steps found +- Tool call extraction process +- Response parsing details + +The pipeline logs helpful messages like: + +```txt +Found 3 intermediate steps +Added 3 tool calls to response +``` + +## Related Documentation + +- [N8N Integration Overview](./n8n-integration.md) +- [N8N Template Workflows](../pipelines/n8n/) +- [N8N AI Agent Documentation](https://docs.n8n.io/integrations/builtin/cluster-nodes/root-nodes/n8n-nodes-langchain.agent/) diff --git a/pipelines/n8n/Open_WebUI_Test_Agent.json b/pipelines/n8n/Open_WebUI_Test_Agent.json index c4c4741..262bcbb 100644 --- a/pipelines/n8n/Open_WebUI_Test_Agent.json +++ b/pipelines/n8n/Open_WebUI_Test_Agent.json @@ -12,8 +12,8 @@ "name": "OpenAI Chat Model", "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi", "position": [ - 2288, - 1248 + 2240, + 1424 ], "typeVersion": 1 }, @@ -54,14 +54,14 @@ }, { "parameters": { - "respondWith": "text", - "responseBody": "={{ $json.output }}", + "respondWith": "json", + "responseBody": "={{ $json }}", "options": {} }, "type": "n8n-nodes-base.respondToWebhook", "typeVersion": 1.1, "position": [ - 3008, + 3136, 848 ], "id": "325974b4-e249-4eb3-81dc-26b392f0894e", @@ -69,13 +69,13 @@ }, { "parameters": { - "model": "gpt-4o-mini", + "model": "gpt-5-mini", "options": {} }, "type": "@n8n/n8n-nodes-langchain.lmChatAzureOpenAi", "typeVersion": 1, "position": [ - 2288, + 2240, 1072 ], "id": "f2e088e0-aa71-4b42-8eaa-e0197150b9ca", @@ -103,6 +103,7 @@ "text": "={{ $json.body.chatInput }}", "options": { "systemMessage": "={{ $json.body.systemPrompt }}", + "returnIntermediateSteps": true, "enableStreaming": false } }, @@ -125,7 +126,7 @@ "type": "n8n-nodes-base.respondToWebhook", "typeVersion": 1.4, "position": [ - 3008, + 3136, 1008 ], "id": "114d81bd-4573-43b4-bbfe-80b784cd467e", @@ -148,6 +149,31 @@ "id": "b5e9d507-92e1-45aa-8f48-3629f802bafc", "name": "Webhook", "webhookId": "d3b1cfa8-8f3c-48ca-b1c8-87c40e3dcf76" + }, + { + "parameters": {}, + "type": "@n8n/n8n-nodes-langchain.toolThink", + "typeVersion": 1.1, + "position": [ + 2928, + 1072 + ], + "id": "233faaf2-dc51-4dbd-9305-20e67a6ae589", + "name": "Think" + }, + { + "parameters": { + "modelName": "models/gemini-2.5-pro", + "options": {} + }, + "type": "@n8n/n8n-nodes-langchain.lmChatGoogleGemini", + "typeVersion": 1, + "position": [ + 2240, + 1248 + ], + "id": "2e0360ac-d1db-45a4-91a5-f9e4a49e9d97", + "name": "Google Gemini Chat Model" } ], "pinData": {}, @@ -240,13 +266,24 @@ } ] ] + }, + "Think": { + "ai_tool": [ + [ + { + "node": "AI Agent", + "type": "ai_tool", + "index": 0 + } + ] + ] } }, "active": true, "settings": { "executionOrder": "v1" }, - "versionId": "72ba3fa2-7d33-475f-bf0d-9dcdc5a44bb7", + "versionId": "cf6cc59d-2f72-4cb3-91a9-aca8affa9094", "meta": { "templateCredsSetupCompleted": true, "instanceId": "6350a4271a2777a60d73e3a3c6a9549015b6bfe8b8f285cb566cd69ef87215da" diff --git a/pipelines/n8n/Open_WebUI_Test_Agent_Streaming.json b/pipelines/n8n/Open_WebUI_Test_Agent_Streaming.json index b05d0ec..a59cdfd 100644 --- a/pipelines/n8n/Open_WebUI_Test_Agent_Streaming.json +++ b/pipelines/n8n/Open_WebUI_Test_Agent_Streaming.json @@ -12,21 +12,22 @@ "name": "OpenAI Chat Model", "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi", "position": [ - 32, - 416 + -16, + 624 ], "typeVersion": 1 }, { "parameters": { - "sessionKey": "={{ $('Webhook').item.json.body.user_id }}_{{ $('Webhook').item.json.body.chat_id }}" + "sessionKey": "={{ $('Webhook').item.json.body.user_id }}_{{ $('Webhook').item.json.body.chat_id }}", + "contextWindowLength": 2 }, "id": "5c11a401-5dbb-4edd-bd1e-7df6d0844083", "name": "Window Buffer Memory", "type": "@n8n/n8n-nodes-langchain.memoryBufferWindow", "position": [ 160, - 240 + 272 ], "typeVersion": 1 }, @@ -36,7 +37,7 @@ "typeVersion": 1, "position": [ 288, - 240 + 272 ], "id": "01a2e9b2-0e01-48eb-91ac-628432c5d1a8", "name": "Wikipedia" @@ -47,7 +48,7 @@ "typeVersion": 1, "position": [ 416, - 240 + 272 ], "id": "6ec76a25-8339-4c61-88b1-aab1a861c9e3", "name": "Calculator" @@ -60,8 +61,8 @@ "type": "@n8n/n8n-nodes-langchain.lmChatAzureOpenAi", "typeVersion": 1, "position": [ - 32, - 240 + -16, + 272 ], "id": "18b4d85b-a797-40d2-b3e3-22fe8985a872", "name": "Azure OpenAI Chat Model" @@ -77,7 +78,7 @@ "typeVersion": 2, "position": [ 544, - 240 + 272 ], "id": "5968806c-44bb-42bc-a2d5-f9532954e2e5", "name": "Date & Time" @@ -88,6 +89,9 @@ "text": "={{ $json.body.chatInput }}", "options": { "systemMessage": "={{ $json.body.systemPrompt }}", + "maxIterations": 10, + "returnIntermediateSteps": true, + "passthroughBinaryImages": true, "enableStreaming": true } }, @@ -113,8 +117,8 @@ "type": "n8n-nodes-base.respondToWebhook", "typeVersion": 1.1, "position": [ - 688, - 32 + 656, + 16 ], "id": "184ac106-3444-43e7-a312-c359d6db33a0", "name": "Respond error to Webhook" @@ -136,6 +140,31 @@ "id": "e680d51f-ffb3-459b-b765-0d86881bcc80", "name": "Webhook", "webhookId": "852c394d-2b99-4688-9045-87aafbd721d1" + }, + { + "parameters": {}, + "type": "@n8n/n8n-nodes-langchain.toolThink", + "typeVersion": 1.1, + "position": [ + 672, + 272 + ], + "id": "d4956b3f-e35f-4752-9b96-e015a261136c", + "name": "Think" + }, + { + "parameters": { + "modelName": "models/gemini-2.5-pro", + "options": {} + }, + "type": "@n8n/n8n-nodes-langchain.lmChatGoogleGemini", + "typeVersion": 1, + "position": [ + -16, + 448 + ], + "id": "745a19b5-a2ff-4472-a60d-3917e1d08cb6", + "name": "Google Gemini Chat Model" } ], "pinData": {}, @@ -222,13 +251,29 @@ "main": [ [] ] + }, + "Think": { + "ai_tool": [ + [ + { + "node": "AI Agent", + "type": "ai_tool", + "index": 0 + } + ] + ] + }, + "Google Gemini Chat Model": { + "ai_languageModel": [ + [] + ] } }, "active": true, "settings": { "executionOrder": "v1" }, - "versionId": "753266e4-fd13-4b7c-be46-30bf91bd4517", + "versionId": "58d9fa88-0a07-4311-959a-e1d1eb5d4c37", "meta": { "templateCredsSetupCompleted": true, "instanceId": "6350a4271a2777a60d73e3a3c6a9549015b6bfe8b8f285cb566cd69ef87215da" diff --git a/pipelines/n8n/n8n.py b/pipelines/n8n/n8n.py index 04c700c..49ce4d5 100644 --- a/pipelines/n8n/n8n.py +++ b/pipelines/n8n/n8n.py @@ -5,9 +5,9 @@ project_url: https://github.com/owndev/Open-WebUI-Functions funding_url: https://github.com/sponsors/owndev n8n_template: https://github.com/owndev/Open-WebUI-Functions/blob/main/pipelines/n8n/Open_WebUI_Test_Agent_Streaming.json -version: 2.1.1 +version: 2.2.0 license: Apache License 2.0 -description: An optimized streaming-enabled pipeline for interacting with N8N workflows, consistent response handling for both streaming and non-streaming modes, robust error handling, and simplified status management. Supports Server-Sent Events (SSE) streaming and various N8N workflow formats. +description: An optimized streaming-enabled pipeline for interacting with N8N workflows, consistent response handling for both streaming and non-streaming modes, robust error handling, and simplified status management. Supports Server-Sent Events (SSE) streaming and various N8N workflow formats. Now includes configurable AI Agent tool usage display with three verbosity levels (minimal, compact, detailed) and customizable length limits for tool inputs/outputs (non-streaming mode only). features: - Integrates with N8N for seamless streaming communication. - Uses FastAPI StreamingResponse for real-time streaming. @@ -17,6 +17,10 @@ - Encrypted storage of sensitive API keys. - Fallback support for non-streaming responses. - Compatible with Open WebUI streaming architecture. + - Displays N8N AI Agent tool usage with configurable verbosity (non-streaming mode only). + - Three display modes: minimal (tool names only), compact (names + preview), detailed (full collapsible sections). + - Customizable length limits for tool inputs and outputs. + - Shows tool calls, inputs, and results from intermediateSteps in non-streaming mode (N8N limitation - streaming responses do not include intermediateSteps). """ from typing import ( @@ -286,6 +290,22 @@ class Valves(BaseModel): default="output", description="Field name for the response message in the N8N payload", ) + SEND_CONVERSATION_HISTORY: bool = Field( + default=False, + description="Whether to include conversation history when sending requests to N8N", + ) + TOOL_DISPLAY_VERBOSITY: str = Field( + default="detailed", + description="Verbosity level for tool usage display: 'minimal' (only tool names), 'compact' (names + short preview), 'detailed' (full info with collapsible sections)", + ) + TOOL_INPUT_MAX_LENGTH: int = Field( + default=500, + description="Maximum length for tool input display (0 = unlimited). Longer inputs will be truncated.", + ) + TOOL_OUTPUT_MAX_LENGTH: int = Field( + default=500, + description="Maximum length for tool output/observation display (0 = unlimited). Longer outputs will be truncated.", + ) CF_ACCESS_CLIENT_ID: EncryptedStr = Field( default="", description="Only if behind Cloudflare: https://developers.cloudflare.com/cloudflare-one/identity/service-tokens/", @@ -296,11 +316,226 @@ class Valves(BaseModel): ) def __init__(self): - self.name = "N8N Streaming Agent" + self.name = "N8N Agent" self.valves = self.Valves() self.log = logging.getLogger("n8n_streaming_pipeline") self.log.setLevel(SRC_LOG_LEVELS.get("OPENAI", logging.INFO)) + def _format_tool_calls_section( + self, intermediate_steps: list, for_streaming: bool = False + ) -> str: + """ + Creates a formatted tool calls section using collapsible details elements. + + Args: + intermediate_steps: List of intermediate step objects from N8N response + for_streaming: If True, format for streaming (with escaping), else for regular response + + Returns: + Formatted tool calls section with HTML details elements + """ + if not intermediate_steps: + return "" + + verbosity = self.valves.TOOL_DISPLAY_VERBOSITY.lower() + input_max_len = self.valves.TOOL_INPUT_MAX_LENGTH + output_max_len = self.valves.TOOL_OUTPUT_MAX_LENGTH + + # Helper function to truncate text + def truncate_text(text: str, max_length: int) -> str: + if max_length <= 0 or len(text) <= max_length: + return text + return text[:max_length] + "..." + + # Minimal mode: just list tool names + if verbosity == "minimal": + tool_names = [] + for i, step in enumerate(intermediate_steps, 1): + if isinstance(step, dict): + tool_name = step.get("action", {}).get("tool", "Unknown Tool") + tool_names.append(f"{i}. {tool_name}") + + tool_list = "\\n" if for_streaming else "\n" + tool_list = tool_list.join(tool_names) + + if for_streaming: + return f"\\n\\n
\\n🛠️ Tool Calls ({len(intermediate_steps)} steps)\\n\\n{tool_list}\\n\\n
\\n" + else: + return f"\n\n
\n🛠️ Tool Calls ({len(intermediate_steps)} steps)\n\n{tool_list}\n\n
\n" + + # Compact mode: tool names with short preview + if verbosity == "compact": + tool_summaries = [] + for i, step in enumerate(intermediate_steps, 1): + if not isinstance(step, dict): + continue + + action = step.get("action", {}) + observation = step.get("observation", "") + tool_name = action.get("tool", "Unknown Tool") + + # Get short preview of output + preview = "" + if observation: + obs_str = str(observation) + # If output_max_len is 0 (unlimited), use a reasonable default preview length for compact mode + # Otherwise, use the configured limit + if output_max_len > 0: + preview_len = min(100, output_max_len) + else: + preview_len = 100 # Default preview length for compact mode when unlimited + preview = truncate_text(obs_str, preview_len) + + summary = f"**{i}. {tool_name}**" + if preview: + summary += f" → {preview}" + tool_summaries.append(summary) + + summary_text = "\\n" if for_streaming else "\n" + summary_text = summary_text.join(tool_summaries) + + if for_streaming: + return f"\\n\\n
\\n🛠️ Tool Calls ({len(intermediate_steps)} steps)\\n\\n{summary_text}\\n\\n
\\n" + else: + return f"\n\n
\n🛠️ Tool Calls ({len(intermediate_steps)} steps)\n\n{summary_text}\n\n
\n" + + # Detailed mode: full collapsible sections (default) + tool_entries = [] + + for i, step in enumerate(intermediate_steps, 1): + if not isinstance(step, dict): + continue + + action = step.get("action", {}) + observation = step.get("observation", "") + + tool_name = action.get("tool", "Unknown Tool") + tool_input = action.get("toolInput", {}) + tool_call_id = action.get("toolCallId", "") + log_message = action.get("log", "") + + # Build individual tool call details + tool_info = [] + tool_info.append(f"🔧 **Tool:** {tool_name}") + + if tool_call_id: + tool_info.append(f"🆔 **Call ID:** `{tool_call_id}`") + + # Format tool input + if tool_input: + try: + if isinstance(tool_input, dict): + input_json = json.dumps(tool_input, indent=2) + + # Apply max length limit + if input_max_len > 0: + input_json = truncate_text(input_json, input_max_len) + + if for_streaming: + # Escape for streaming + input_json = ( + input_json.replace("\\", "\\\\") + .replace('"', '\\"') + .replace("\n", "\\n") + ) + tool_info.append( + f"📥 **Input:**\\n```json\\n{input_json}\\n```" + ) + else: + tool_info.append( + f"📥 **Input:**\n```json\n{input_json}\n```" + ) + else: + input_str = str(tool_input) + if input_max_len > 0: + input_str = truncate_text(input_str, input_max_len) + tool_info.append(f"📥 **Input:** `{input_str}`") + except Exception: + input_str = str(tool_input) + if input_max_len > 0: + input_str = truncate_text(input_str, input_max_len) + tool_info.append(f"📥 **Input:** `{input_str}`") + + # Format observation/result + if observation: + try: + # Try to parse as JSON for better formatting + if isinstance(observation, str) and ( + observation.startswith("[") or observation.startswith("{") + ): + obs_json = json.loads(observation) + obs_formatted = json.dumps(obs_json, indent=2) + + # Apply max length limit + if output_max_len > 0: + obs_formatted = truncate_text(obs_formatted, output_max_len) + + if for_streaming: + obs_formatted = ( + obs_formatted.replace("\\", "\\\\") + .replace('"', '\\"') + .replace("\n", "\\n") + ) + tool_info.append( + f"📤 **Result:**\\n```json\\n{obs_formatted}\\n```" + ) + else: + tool_info.append( + f"📤 **Result:**\n```json\n{obs_formatted}\n```" + ) + else: + # Plain text observation + obs_str = str(observation) + # Apply configured limit (0 = unlimited, don't truncate) + obs_preview = ( + truncate_text(obs_str, output_max_len) + if output_max_len > 0 + else obs_str + ) + + if for_streaming: + obs_preview = ( + obs_preview.replace("\\", "\\\\") + .replace('"', '\\"') + .replace("\n", "\\n") + ) + tool_info.append(f"📤 **Result:** {obs_preview}") + except Exception: + obs_str = str(observation) + # Apply configured limit (0 = unlimited, don't truncate) + obs_preview = ( + truncate_text(obs_str, output_max_len) + if output_max_len > 0 + else obs_str + ) + tool_info.append(f"📤 **Result:** {obs_preview}") + + # Add log if available + if log_message: + log_preview = truncate_text(log_message, 200) + tool_info.append(f"📝 **Log:** {log_preview}") + + # Create collapsible details for individual tool call + tool_info_text = "\\n" if for_streaming else "\n" + tool_info_text = tool_info_text.join(tool_info) + + if for_streaming: + tool_entry = f"
\\nStep {i}: {tool_name}\\n\\n{tool_info_text}\\n\\n
" + else: + tool_entry = f"
\nStep {i}: {tool_name}\n\n{tool_info_text}\n\n
" + + tool_entries.append(tool_entry) + + # Combine all tool calls into main collapsible section + if for_streaming: + all_tools = "\\n\\n".join(tool_entries) + result = f"\\n\\n
\\n🛠️ Tool Calls ({len(tool_entries)} steps)\\n\\n{all_tools}\\n\\n
\\n" + else: + all_tools = "\n\n".join(tool_entries) + result = f"\n\n
\n🛠️ Tool Calls ({len(tool_entries)} steps)\n\n{all_tools}\n\n
\n" + + return result + async def emit_simple_status( self, __event_emitter__: Callable[[dict], Awaitable[None]], @@ -365,14 +600,24 @@ def parse_n8n_streaming_chunk(self, chunk_text: str) -> Optional[str]: data = json.loads(chunk_text.strip()) if isinstance(data, dict): - # Skip N8N metadata chunks but be more selective + # Check if this chunk contains intermediateSteps (will be handled separately) + # Note: Don't skip chunks just because they have a type field chunk_type = data.get("type", "") - if chunk_type in ["begin", "end", "error", "metadata"]: + + # Skip only true metadata chunks that have no content or intermediateSteps + if ( + chunk_type in ["begin", "end", "error", "metadata"] + and "intermediateSteps" not in data + ): self.log.debug(f"Skipping N8N metadata chunk: {chunk_type}") return None - # Skip metadata-only chunks - if "metadata" in data and len(data) <= 2: + # Skip metadata-only chunks (but allow intermediateSteps) + if ( + "metadata" in data + and len(data) <= 2 + and "intermediateSteps" not in data + ): return None # Extract content from various possible field names @@ -544,18 +789,24 @@ async def pipe( if messages and messages[0].get("role") == "system": system_prompt = self.dedupe_system_prompt(messages[0]["content"]) - # Include full conversation history like in stream-example.py + # Optionally include full conversation history (controlled by valve) conversation_history = [] - for msg in messages: - if msg.get("role") in ["user", "assistant"]: - conversation_history.append( - {"role": msg["role"], "content": msg["content"]} - ) + if self.valves.SEND_CONVERSATION_HISTORY: + for msg in messages: + if msg.get("role") in ["user", "assistant"]: + conversation_history.append( + {"role": msg["role"], "content": msg["content"]} + ) # Prepare payload for N8N workflow (improved version) payload = { "systemPrompt": system_prompt, - "messages": conversation_history, # Full conversation context + # Include messages only when enabled in valves for privacy/control + "messages": ( + conversation_history + if self.valves.SEND_CONVERSATION_HISTORY + else [] + ), "currentMessage": question, # Current user message "user_id": __user__.get("id") if __user__ else None, "user_email": __user__.get("email") if __user__ else None, @@ -603,20 +854,37 @@ async def pipe( if response.status == 200: # Enhanced streaming detection (n8n controls streaming) content_type = response.headers.get("Content-Type", "").lower() + + # Check for explicit streaming indicators + # Note: Don't rely solely on Transfer-Encoding: chunked as regular JSON can also be chunked is_streaming = ( - "stream" in content_type - or "text/plain" in content_type - or "text/event-stream" in content_type + "text/event-stream" in content_type or "application/x-ndjson" in content_type - or response.headers.get("Transfer-Encoding") == "chunked" + or ( + "application/json" in content_type + and response.headers.get("Transfer-Encoding") == "chunked" + and "Cache-Control" in response.headers + and "no-cache" + in response.headers.get("Cache-Control", "").lower() + ) ) + # Additional check: if content-type is text/html or application/json without streaming headers, it's likely not streaming + if "text/html" in content_type: + is_streaming = False + elif ( + "application/json" in content_type + and "Cache-Control" not in response.headers + ): + is_streaming = False + if is_streaming: # Enhanced streaming like in stream-example.py self.log.info("Processing streaming response from N8N") n8n_response = "" buffer = "" completed_thoughts: list[str] = [] + intermediate_steps = [] # Collect tool calls try: async for chunk in response.content.iter_any(): @@ -655,7 +923,27 @@ async def pipe( json_chunk = buffer[start_idx : end_idx + 1] buffer = buffer[end_idx + 1 :] - # Parse N8N streaming chunk + # Try to parse the chunk as JSON to extract intermediateSteps + # This must happen BEFORE parse_n8n_streaming_chunk filters out metadata + # Future-proof: If N8N adds intermediateSteps support in streaming, this will work automatically + try: + parsed_chunk = json.loads(json_chunk) + if isinstance(parsed_chunk, dict): + # Extract intermediateSteps if present (future-proof for when N8N supports this) + chunk_steps = parsed_chunk.get( + "intermediateSteps", [] + ) + if chunk_steps: + intermediate_steps.extend( + chunk_steps + ) + self.log.info( + f"✓ Found {len(chunk_steps)} intermediate steps in streaming chunk" + ) + except json.JSONDecodeError: + pass # Continue with content parsing + + # Parse N8N streaming chunk for content content = self.parse_n8n_streaming_chunk( json_chunk ) @@ -785,6 +1073,19 @@ def replace_think_block(match): "Empty response received from N8N, using fallback message" ) + # Add tool calls section if present + if intermediate_steps: + tool_calls_section = ( + self._format_tool_calls_section( + intermediate_steps, for_streaming=False + ) + ) + if tool_calls_section: + n8n_response += tool_calls_section + self.log.info( + f"Added {len(intermediate_steps)} tool calls to response" + ) + await __event_emitter__( { "type": "chat:message", @@ -865,23 +1166,28 @@ async def read_body_safely(): json_body = None lowered = content_type.lower() try: - # Only call .json() if declared JSON, else read text and attempt manual parse - if "application/json" in lowered or "json" in lowered: - try: - json_body = await response.json( - content_type=None - ) - except Exception as je: + # Read as text first (works for all content types) + text_body = await response.text() + + # Try to parse as JSON regardless of content-type + # (N8N might return JSON with text/html content-type) + try: + json_body = json.loads(text_body) + self.log.debug( + f"Successfully parsed response body as JSON (content-type was: {content_type})" + ) + except json.JSONDecodeError: + # If it starts with [{ or { it might be JSON wrapped in something + if text_body.strip().startswith( + "[{" + ) or text_body.strip().startswith("{"): self.log.warning( - f"Direct JSON parse failed: {je}; attempting text fallback" + f"Response looks like JSON but failed to parse (content-type: {content_type})" + ) + else: + self.log.debug( + f"Response is not JSON, will use as plain text (content-type: {content_type})" ) - if json_body is None: - text_body = await response.text() - # Try parse anyway - try: - json_body = json.loads(text_body) - except Exception: - pass except Exception as e_inner: self.log.error( f"Error reading response body: {e_inner}" @@ -898,6 +1204,49 @@ async def read_body_safely(): ) self.log.debug(f"Raw text body snippet: {snippet}") + # Extract intermediateSteps from non-streaming response + intermediate_steps = [] + if isinstance(response_json, list): + # Handle array response format + self.log.debug( + f"Response is an array with {len(response_json)} items" + ) + for item in response_json: + if ( + isinstance(item, dict) + and "intermediateSteps" in item + ): + steps = item.get("intermediateSteps", []) + intermediate_steps.extend(steps) + self.log.debug( + f"Found {len(steps)} intermediate steps in array item" + ) + elif isinstance(response_json, dict): + # Handle single object response format + self.log.debug( + f"Response is a dict with keys: {list(response_json.keys())}" + ) + intermediate_steps = response_json.get( + "intermediateSteps", [] + ) + if intermediate_steps: + self.log.debug( + f"Found intermediateSteps field with {len(intermediate_steps)} items" + ) + else: + self.log.debug( + f"Response is not JSON (type: {type(response_json)}), cannot extract intermediateSteps" + ) + + if intermediate_steps: + self.log.info( + f"✓ Found {len(intermediate_steps)} intermediate steps in non-streaming response" + ) + else: + self.log.debug( + "No intermediate steps found in non-streaming response" + ) + def extract_message(data) -> str: if data is None: return "" @@ -998,6 +1347,17 @@ def replace_think_block(match): f"Non-streaming thinking parse failed: {post_e}" ) + # Add tool calls section if present (non-streaming mode) + if intermediate_steps: + tool_calls_section = self._format_tool_calls_section( + intermediate_steps, for_streaming=False + ) + if tool_calls_section: + n8n_response += tool_calls_section + self.log.info( + f"Added {len(intermediate_steps)} tool calls to non-streaming response" + ) + # Cleanup await cleanup_response(response, session) session = None