diff --git a/guides/features/pipecat-flows.mdx b/guides/features/pipecat-flows.mdx index 11be5d64..b7161506 100644 --- a/guides/features/pipecat-flows.mdx +++ b/guides/features/pipecat-flows.mdx @@ -1,484 +1,306 @@ --- title: "Pipecat Flows" -description: "Learn how to create structured conversations using Pipecat’s flow system" +description: "Learn how to create structured conversations using Pipecat's flow system" --- -Pipecat Flows provides a framework for building structured conversations in your AI applications. It enables you to create both predefined conversation paths and dynamically generated flows while handling the complexities of state management and LLM interactions. +## What is Pipecat Flows? -The framework consists of: +Pipecat Flows is an add-on framework for Pipecat that allows you to build structured conversations in your AI applications. It enables you to define conversation paths while handling the complexities of state management and LLM interactions. -- A Python module for building conversation flows with Pipecat -- A visual editor for designing and exporting flow configurations +Want to dive right in? Check out these examples: -## Key Concepts - -- **Nodes**: Represent conversation states with specific messages and available functions -- **Messages**: Set the role and tasks for each node -- **Functions**: Define actions and transitions (Node functions for operations, Edge functions for transitions) -- **Actions**: Execute operations during state transitions (pre/post actions) -- **State Management**: Handle conversation state and data persistence - -## Example Flows - - + - A static flow demonstrating movie exploration using OpenAI. Shows real API - integration with TMDB, structured data collection, and state management. + A great first Flow to show you the ropes - - A dynamic flow using Google Gemini that adapts policy recommendations based - on user responses. Demonstrates runtime node creation and conditional paths. + A simple, practical Flow example - - These examples are fully functional and can be run locally. Make sure you have - the required dependencies installed and API keys configured. - - -## When to Use Static vs Dynamic Flows - -**Static Flows** are ideal when: - -- Conversation structure is known upfront -- Paths follow predefined patterns -- Flow can be fully configured in advance -- Example: Customer service scripts, intake forms - -**Dynamic Flows** are better when: - -- Paths depend on external data -- Flow structure needs runtime modification -- Complex decision trees are involved -- Example: Personalized recommendations, adaptive workflows - -# Installation - -If you're already using Pipecat: - -```bash -pip install pipecat-ai-flows -``` - -If you're starting fresh: - -```bash -# Basic installation -pip install pipecat-ai-flows - -# Install Pipecat with specific LLM provider options: -pip install "pipecat-ai[daily,openai,deepgram]" # For OpenAI -pip install "pipecat-ai[daily,anthropic,deepgram]" # For Anthropic -pip install "pipecat-ai[daily,google,deepgram]" # For Google -``` - -💡 Want to design your flows visually? Try the [online Flow Editor](https://flows.pipecat.ai) +## How do Pipecat and Pipecat Flows work together? -# Core Concepts +**Pipecat** defines the core capabilities of your bot. This includes the pipeline and processors which, at a minimum, enable your bot to: -## Designing Conversation Flows +- Receive audio from a user +- Transcribe the user's input +- Run an LLM completion +- Convert the LLM response to audio +- Send audio back to the user -Functions in Pipecat Flows serve two key purposes: +**Pipecat Flows** complements Pipecat's core functionality by providing structure to a conversation, managing context and tools as the conversation progresses from one state to another. This is separate from the core pipeline, allowing you to separate conversation logic from core pipeline mechanics. -- Processing data (likely by interfacing with external systems and APIs) -- Advancing the conversation to the next node +## When to Use Pipecat Flows? -Each function can do one or both. +Pipecat Flows is best suited for use cases where: -LLMs decide when to run each function, via their function calling (or tool calling) mechanism. +- **You need precise control** over how a conversation progresses through specific steps +- **Your bot handles complex tasks** that can be broken down into smaller, manageable pieces +- **You want to improve LLM accuracy** by focusing the model on one specific task at a time instead of managing multiple responsibilities simultaneously -### Defining a Function +This approach addresses a common problem: traditional methods often use large, monolithic prompts with many tools available at once, leading to hallucinations and lower accuracy. -A function is expected to return a `(result, next_node)` tuple. More precisely, it’s expected to return: +Pipecat Flows solves this by: -```python -# (result, next_node) -Tuple[Optional[FlowResult], Optional[Union[NodeConfig, str]]] -``` +- **Breaking complex tasks into focused steps** - Each node has a clear, single purpose +- **Providing relevant tools only when needed** - Functions are available only in the appropriate context +- **Giving clear, specific instructions** - Task messages focus the LLM on exactly what to do next -If the function processes data, it should return a non-`None` value for the first element of the tuple. This value should be a `FlowResult` or subclass. +## Selecting a Flows Pattern -If the function advances the conversation to the next node, it should return a non-`None` value for the second element of the tuple. This value can be either: +Pipecat Flows provides two general patterns for how to build your application: Dynamic Flows and Static Flows. -- A `NodeConfig` defining the next node (for dynamic flows) -- A string identifying the next node (for static flows) +- **Dynamic Flows (recommended)**: When conversation paths need to be determined at runtime based on user input, external data, or business logic. +- **Static Flows**: When your conversation structure is known upfront and follows predefined paths. -### Example Function - -```python -from pipecat_flows import FlowArgs, FlowManager, FlowResult, NodeConfig + + Dynamic Flows can handle both simple and complex use cases. Selecting it + provides you with an option that can grow with the complexity of your + application. For these reasons, we strongly recommend it as the API pattern to + follow. + -async def check_availability(args: FlowArgs, flow_manager: FlowManager) -> tuple[FlowResult, NodeConfig]: - # Read arguments - date = args["date"] - time = args["time"] +## Technical Overview - # Read previously-stored data - party_size = flow_manager.state.get("party_size") +Pipecat Flows represents a conversation as a graph, where each step of the conversation is represented by a node. Nodes are of type `NodeConfig`, and may contain the following properties: - # Use flow_manager for immediate user feedback - await flow_manager.task.queue_frame(TTSSpeakFrame("Checking our reservation system...")) +- `name`: The name of the node; used as a reference to transition to the node. +- `role_messages`: A list of message `dicts` defining the bot's role/personality. Typically set once in the initial node. +- `task_messages`: A list of message `dicts` defining the current node's objectives. +- `functions`: A list of function call definitions and their corresponding handlers. +- `pre_actions`: Actions to execute before LLM inference. Actions run once upon transitioning to a node. +- `post_actions`: Actions to execute after LLM inference. Actions run once after the node's initial LLM inference. +- `context_strategy`: Strategy for updating context during transitions. The default behavior is to append messages to the context. +- `respond_immediately`: Whether to run LLM inference as soon as the node is set. The default is True. - # Store data in flow state for later use - flow_manager.state["requested_date"] = date + + The only required field is `task_messages`, as your bot always needs a prompt + to advance the conversation. + - # Interface with reservation system - is_available = await reservation_system.check_availability(date, time, party_size) +Now that we've defined the structure of a node, let's look at the components that make up a node. - # Assemble result - result = { "status": "success", "available": available } +### Messages - # Decide which node to go to next - if is_available: - next_node = create_confirmation_node() - else: - next_node = create_no_availability_node() +Messages define what your bot should do and how it should behave at each node in your conversation flow. - # Return both result and next node - return result, next_node -``` +#### Message Types -## Node Structure +There are two types of messages you can configure: -Each node in your flow represents a conversation state and consists of three main components: +**Role Messages** (Optional) +Define your bot's personality, tone, and overall behavior. These are typically set once at the beginning of your flow and establish the consistent persona your bot maintains throughout the conversation. -### Messages +**Task Messages** (Required) +Define the specific objective your bot should accomplish in the current node. These messages focus the LLM on the immediate task at hand, such as asking a specific question or processing particular information. -Nodes use two types of messages to control the conversation: +#### Message Format -1. **Role Messages**: Define the bot's personality or role (optional) +Messages are specified in OpenAI format as a list of `dicts`: ```python "role_messages": [ { "role": "system", - "content": "You are a friendly pizza ordering assistant. Keep responses casual and upbeat." + "content": "You are an inquisitive child.", } -] -``` - -2. **Task Messages**: Define what the bot should do in the current node - -```python +], "task_messages": [ { "role": "system", - "content": "Ask the customer which pizza size they'd like: small, medium, or large." + "content": "Say 'Hello world' and ask what is the user's favorite color.", } -] +], ``` -Role messages are typically defined in your initial node and inherited by subsequent nodes, while task messages are specific to each node's purpose. +#### Cross-Provider Compatibility -### Functions +Pipecat's default message format uses the OpenAI message spec. With this message format, messages are automatically translated by Pipecat to work with your chosen LLM provider, making it easy to switch between OpenAI, Anthropic, Google, and other providers without changing your code. -Functions in Pipecat Flows can: + + Some LLMs, like Anthropic and Gemini, can only set the system instruction at + initialization time. This means you will only be able to set the + `role_messages` at initialization for those LLMs. + -1. Process data -2. Specify node transitions -3. Do both +### Functions -This leads to two conceptual types of functions: +Functions in Pipecat Flows serve two key purposes: -- **Node functions**, which only process data. -- **Edge functions**, which also (or only) transition to the next node. +1. **Process data** by interfacing with external systems and APIs to read or write information +2. **Progress the conversation** by transitioning between nodes in your flow -The function itself ([which you can read more about here](#defining-a-function)) is usually wrapped in a function configuration, which also contains some metadata about the function. +#### How Functions Work -#### Function Configuration +When designing your nodes, clearly define the task in the `task_messages` and reference the available functions. The LLM will use these functions to complete the task and signal when it's ready to move forward. -Pipecat Flows supports three ways of specifying function configuration: +For example, if your node's job is to collect a user's favorite color: -1. Provider-specific dictionary format +1. The LLM asks the question +2. The user provides their answer +3. The LLM calls the function with the answer +4. The function processes the data and determines the next node -```python -# Dictionary format -{ - "type": "function", - "function": { - "name": "select_size", - "handler": select_size, - "description": "Select pizza size", - "parameters": { - "type": "object", - "properties": { - "size": {"type": "string", "enum": ["small", "medium", "large"]} - } - }, - } -} -``` +#### Function Definition -2. FlowsFunctionSchema +Flows provides a universal `FlowsFunctionSchema` that works across all LLM providers: ```python -# Using FlowsFunctionSchema from pipecat_flows import FlowsFunctionSchema -size_function = FlowsFunctionSchema( - name="select_size", - description="Select pizza size", - properties={ - "size": {"type": "string", "enum": ["small", "medium", "large"]} - }, - required=["size"], - handler=select_size +record_favorite_color_func = FlowsFunctionSchema( + name="record_favorite_color_func", + description="Record the color the user said is their favorite.", + required=["color"], + handler=record_favorite_color_and_set_next_node, + properties={"color": {"type": "string"}}, ) - -# Use in node configuration -node_config = { - "task_messages": [...], - "functions": [size_function] -} ``` -The `FlowsFunctionSchema` approach provides some advantages over the provider-specific dictionary format: +#### Function Handlers -- Consistent structure across LLM providers -- Simplified parameter definition -- Cleaner, more readable code +Each function has a corresponding `handler` where you implement your application logic and specify the next node: - - Both dictionary and `FlowsFunctionSchema` approaches are fully supported. - `FlowsFunctionSchema` is recommended for new projects as it provides better - type checking and a provider-independent format. - +```python +async def record_favorite_color_and_set_next_node( + args: FlowArgs, flow_manager: FlowManager +) -> tuple[str, NodeConfig]: + """Function handler that records the color then sets the next node. -3. Direct function usage (auto-configuration) + Here "record" means print to the console, but any logic could go here: + Write to a database, make an API call, etc. + """ + print(f"Your favorite color is: {args['color']}") + return args["color"], create_end_node() +``` -This approach lets you bypass specifying a standalone function configuration. +#### Handler Return Values -Instead, relevant function metadata is automatically extracted from the function's signature and docstring: +Function handlers must return a tuple containing: -- `name` -- `description` -- `properties` (including individual property `description`s) -- `required` +- **Result**: Data provided to the LLM for context in subsequent completions +- **Next Node**: The `NodeConfig` for Flows to transition to next -Note that the function signature is a bit different when using direct functions. The first parameter is the `FlowManager`, followed by any others necessary for the function. +Some handlers may not want to transition conversational state, in which case you can return `None` for the next node. Other handlers may _only_ want to transition conversational state without doing other work, in which case you can return `None` for the result. -```python -from pipecat_flows import FlowManager, FlowResult +#### Direct Functions + +For more concise code, you can optionally use Direct Functions where the function definition and handler are combined in a single function. The function signature and docstring are automatically used to generate the function schema: -async def select_pizza_order( +```python +async def record_favorite_color( flow_manager: FlowManager, - size: str, - pizza_type: str, - additional_toppings: list[str] = [], -) -> tuple[FlowResult, str]: + color: str +) -> tuple[FlowResult, NodeConfig]: """ - Record the pizza order details. + Record the color the user said is their favorite. Args: - size (str): Size of the pizza. Must be one of "small", "medium", or "large". - pizza_type (str): Type of pizza. Must be one of "pepperoni", "cheese", "supreme", or "vegetarian". - additional_toppings (list[str]): List of additional toppings. Defaults to empty list. + color (str): The user's favorite color """ - ... + print(f"Your favorite color is: {args["color"]}") + return args["color"], create_end_node() -# Use in node configuration +# Use directly in NodeConfig node_config = { - "task_messages": [...], - "functions": [select_pizza_order] + "functions": [record_favorite_color] } ``` -#### Node Functions +This approach eliminates the need for separate `FlowsFunctionSchema` definitions while maintaining the same functionality. -Functions that process data within a single conversational state, without switching nodes. When called, they: +### Actions -- Execute their `handler` to do the data processing (typically by interfacing with an external system or API) -- Trigger an immediate LLM completion with the result +Actions allow you to execute custom functionality at specific points in your conversation flow, giving you precise control over timing and sequencing. -```python -from pipecat_flows import FlowArgs, FlowManager, FlowResult +#### Action Types -async def select_size(args: FlowArgs, flow_manager: FlowManager) -> tuple[FlowResult, None]: - """Process pizza size selection.""" - size = args["size"] - await ordering_system.record_size_selection(size) - return { - "status": "success", - "size": size - }, None - -# Function configuration -{ - "type": "function", - "function": { - "name": "select_size", - "handler": select_size, - "description": "Select pizza size", - "parameters": { - "type": "object", - "properties": { - "size": {"type": "string", "enum": ["small", "medium", "large"]} - } - }, - } -} -``` +- `pre_actions` execute immediately when transitioning to a new node, _before_ the LLM inference begins. +- `post_actions` execute after the LLM inference completes and any TTS has finished speaking. -#### Edge Functions +#### Built-in Actions -Functions that specify a transition between nodes (optionally processing data first). When called, they: +Pipecat Flows includes several ready-to-use actions for common scenarios: -- Execute their `handler` to do any data processing (optional) and determine the next node -- Add the function result to the LLM context -- Trigger LLM completion after both the function result and the next node's messages are in the context +- **`tts_say`**: Speak a phrase immediately (useful for "please wait" messages) ```python -from pipecat_flows import FlowArgs, FlowManager, FlowResult - -async def select_size(args: FlowArgs, flow_manager: FlowManager) -> tuple[FlowResult, NodeConfig]: - """Process pizza size selection.""" - size = args["size"] - await ordering_system.record_size_selection(size) - result = { - "status": "success", - "size": size - } - next_node = create_confirmation_node() - return result, next_node - -# Function configuration -{ - "type": "function", - "function": { - "name": "select_size", - "handler": select_size, - "description": "Select pizza size", - "parameters": { - "type": "object", - "properties": { - "size": {"type": "string", "enum": ["small", "medium", "large"]} - } - }, +"pre_actions": [ + { + "type": "tts_say", + "text": "Please hold while I process your request..." } -} +] ``` -### Actions - -Actions are operations that execute as part of the lifecycle of a node, with two distinct timing options: - -- Pre-actions: execute when entering the node, before the LLM completion -- Post-actions: execute after the LLM completion - -#### Pre-Actions - -Execute when entering the node, before LLM inference. Useful for: - -- Providing immediate feedback while waiting for LLM responses -- Bridging gaps during longer function calls -- Setting up state or context +- **`end_conversation`**: Gracefully terminate the conversation ```python -"pre_actions": [ +"post_actions": [ { - "type": "tts_say", - "text": "Hold on a moment..." # Immediate feedback during processing + "type": "end_conversation", + "text": "Thank you for your time!" } -], +] ``` -Note that when the node is configured with `respond_immediately: False`, the `pre_actions` still run when entering the node, which may be well before LLM inference, depending on how long the user takes to speak first. - - - Avoid mixing `tts_say` actions with chat completions as this may result in a - conversation flow that feels unnatural. `tts_say` are best used as filler - words when the LLM will take time to generate an completion. - - -#### Post-Actions - -Execute after LLM inference completes. Useful for: - -- Cleanup operations -- State finalization -- Ensuring proper sequence of operations +- **`function`**: Execute a custom function at the specified timing ```python "post_actions": [ { - "type": "end_conversation" # Ensures TTS completes before ending + "type": "function", + "handler": end_conversation_handler } ] ``` -Note that when the node is configured with `respond_immediately: False`, the `post_actions` still only run after the first LLM inference, which may be a while depending on how long the user takes to speak first. +#### Custom Actions -#### Timing Considerations +You can define your own actions to handle specific business logic or integrations. In most cases, consider using a **function action** first, as it executes at the expected time in the pipeline. -- **Pre-actions**: Execute immediately, before any LLM processing begins -- **LLM Inference**: Processes the node's messages and functions -- **Post-actions**: Execute after LLM processing and TTS completion +Custom actions give you complete flexibility to execute any functionality your application needs, but require careful timing considerations. -For example, when using `end_conversation` as a post-action, the sequence is: +#### Action Timing -1. LLM generates response -2. TTS speaks the response -3. End conversation action executes +The execution order ensures predictable behavior: -This ordering ensures proper completion of all operations. +1. **Pre-actions** run first upon node entry (in the order they are defined) +2. **LLM inference** processes the node's messages and functions +3. **TTS** speaks the LLM's response +4. **Post-actions** run after TTS completes (in the order they are defined) -#### Action Types +This timing guarantees that actions execute in the correct sequence, such as ensuring the bot finishes speaking before ending the conversation. Note that custom actions may not follow this predictable timing, which is another reason to prefer function actions when possible. -Flows comes equipped with pre-canned actions and you can also define your own action behavior. See the [reference docs](/server/frameworks/flows/pipecat-flows#actions) for more information. +### Context Strategy -## Deciding Who Speaks First +Flows provides three built-in ways to manage conversation context as you move between nodes: -For each node in the conversation, you can decide whether the LLM should respond immediately upon entering the node (the default behavior) or whether the LLM should wait for the user to speak first before responding. You do this using the `respond_immediately` field. +#### Strategy Types - - `respond_immediately=False` may be particularly useful in the very first node, - especially in outbound-calling cases where the user has to first answer the - phone to trigger the conversation. - +1. **APPEND** (Default): New node messages are added to the existing context, preserving the full conversation history. The context grows as the conversation progresses. -```python -NodeConfig( - task_messages=[ - { - "role": "system", - "content": "Warmly greet the customer and ask how many people are in their party. This is your only job for now; if the customer asks for something else, politely remind them you can't do it.", - } - ], - respond_immediately=False, - # ... other fields -) -``` - - - Keep in mind that if you specify `respond_immediately=False`, the user may not - be aware of the conversational task at hand when entering the node (the bot - hasn't told them yet). While it's always important to have guardrails in your - node messages to keep the conversation on topic, letting the user speak first - makes it even more so. - +2. **RESET**: The context is cleared and replaced with only the new node's messages. Useful when previous conversation history is no longer relevant or to reduce context window size. -## Context Management +3. **RESET_WITH_SUMMARY**: The context is cleared but includes an AI-generated summary of the previous conversation along with the new node's messages. Helps reduce context size while preserving key information. -Pipecat Flows provides three strategies for managing conversation context during node transitions: +#### When to Use Each Strategy -### Context Strategies +- Use **APPEND** when full conversation history is important for context +- Use **RESET** when starting a new topic or when previous context might confuse the current task +- Use **RESET_WITH_SUMMARY** for long conversations where you need to preserve key points but reduce context size -- **APPEND** (default): Adds new messages to the existing context, maintaining the full conversation history -- **RESET**: Clears the context and starts fresh with the new node's messages -- **RESET_WITH_SUMMARY**: Resets the context but includes an AI-generated summary of the previous conversation +#### Configuration Examples -### Configuration - -Context strategies can be configured globally or per-node: +Context strategies can be defined globally in the FlowManager constructor: ```python from pipecat_flows import ContextStrategy, ContextStrategyConfig @@ -489,11 +311,14 @@ flow_manager = FlowManager( llm=llm, context_aggregator=context_aggregator, context_strategy=ContextStrategyConfig( - strategy=ContextStrategy.RESET_WITH_SUMMARY, - summary_prompt="Summarize the key points discussed so far, focusing on decisions made and important information collected." + strategy=ContextStrategy.APPEND, ) ) +``` + +Or on a per-node basis: +```python # Per-node strategy configuration node_config = { "task_messages": [...], @@ -505,478 +330,172 @@ node_config = { } ``` -### Strategy Selection - -Choose your strategy based on your conversation needs: +### Respond Immediately -- Use **APPEND** when full conversation history is important -- Use **RESET** when previous context might confuse the current node's purpose -- Use **RESET_WITH_SUMMARY** for long conversations where key points need to be preserved - - - When using RESET_WITH_SUMMARY, if summary generation fails or times out, the - system automatically falls back to RESET strategy for resilience. - - -## State Management - -The `state` variable in FlowManager is a shared dictionary that persists throughout the conversation. Think of it as a conversation memory that lets you: - -- Store user information -- Track conversation progress -- Share data between nodes -- Inform decision-making +For each node in the conversation, you can decide whether the LLM should respond immediately upon entering the node (the default behavior) or whether the LLM should wait for the user to speak first before responding. You do this using the `respond_immediately` field. -Here's a practical example of a pizza ordering flow: + + `respond_immediately=False` may be particularly useful in the very first node, + especially in outbound-calling cases where the user has to first answer the + phone to trigger the conversation. + ```python -# Store user choices as they're made -async def select_size(args: FlowArgs) -> tuple[FlowResult, str]: - """Handle pizza size selection.""" - size = args["size"] - - # Initialize order in state if it doesn't exist - if "order" not in flow_manager.state: - flow_manager.state["order"] = {} - - # Store the selection - flow_manager.state["order"]["size"] = size - - return {"status": "success", "size": size}, "toppings" - -async def select_toppings(args: FlowArgs) -> tuple[FlowResult, str]: - """Handle topping selection.""" - topping = args["topping"] - - # Get existing order and toppings - order = flow_manager.state.get("order", {}) - toppings = order.get("toppings", []) - - # Add new topping - toppings.append(topping) - order["toppings"] = toppings - flow_manager.state["order"] = order - - return {"status": "success", "toppings": toppings}, "finalize" - -async def finalize_order(args: FlowArgs) -> tuple[FlowResult, str]: - """Process the complete order.""" - order = flow_manager.state.get("order", {}) - - # Validate order has required information - if "size" not in order: - return {"status": "error", "error": "No size selected"} - - # Calculate price based on stored selections - size = order["size"] - toppings = order.get("toppings", []) - price = calculate_price(size, len(toppings)) - - return { - "status": "success", - "summary": f"Ordered: {size} pizza with {', '.join(toppings)}", - "price": price - }, "end" +NodeConfig( + task_messages=[ + { + "role": "system", + "content": "Warmly greet the customer and ask how many people are in their party. This is your only job for now; if the customer asks for something else, politely remind them you can't do it.", + } + ], + respond_immediately=False, + # ... other fields +) ``` -In this example: - -1. `select_size` initializes the order and stores the size -2. `select_toppings` builds a list of toppings -3. `finalize_order` uses the stored information to process the complete order - -The state variable makes it easy to: - -- Build up information across multiple interactions -- Access previous choices when needed -- Validate the complete order -- Calculate final results + + Keep in mind that if you specify `respond_immediately=False`, the user may not + be aware of the conversational task at hand when entering the node (the bot + hasn't told them yet). While it's always important to have guardrails in your + node messages to keep the conversation on topic, letting the user speak first + makes it even more so. + -This is particularly useful when information needs to be collected across multiple conversation turns or when later decisions depend on earlier choices. +## Initialization -## LLM Provider Support +Initialize your flow by creating a `FlowManager` instance and calling `initialize()` to start the conversation. -Pipecat Flows automatically handles format differences between LLM providers: +### Dynamic Flow -### OpenAI Format +First, create the FlowManager: ```python -"functions": [{ - "type": "function", - "function": { - "name": "function_name", - "handler": select_size, - "description": "description", - "parameters": {...} - } -}] +flow_manager = FlowManager( + task=task, # PipelineTask + llm=llm, # LLMService + context_aggregator=context_aggregator, # Context aggregator + transport=transport, # Transport +) ``` -### Anthropic Format +Then, initialize by passing the first `NodeConfig` into the `initialize()` method: ```python -"functions": [{ - "name": "function_name", - "handler": select_size, - "description": "description", - "input_schema": {...} -}] +@transport.event_handler("on_client_connected") +async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await flow_manager.initialize(create_initial_node()) ``` -### Google (Gemini) Format +### Static Flow -```python -"functions": [{ - "function_declarations": [{ - "name": "function_name", - "handler": select_size, - "description": "description", - "parameters": {...} - }] -}] -``` - - - You don't need to handle these differences manually - Pipecat Flows adapts - your configuration to the correct format based on your LLM provider. - -# Implementation Approaches - -## Static Flows - -Static flows use a configuration-driven approach where the entire conversation structure is defined upfront. - -### Basic Setup +First, create the FlowManager: ```python -from pipecat_flows import FlowManager - -# Define flow configuration -flow_config = { - "initial_node": "greeting", - "nodes": { - "greeting": { - "role_messages": [...], - "task_messages": [...], - "functions": [...] - } - } -} - -# Initialize flow manager with static configuration flow_manager = FlowManager( - task=task, - llm=llm, - context_aggregator=context_aggregator, - flow_config=flow_config + task=task, # PipelineTask + llm=llm, # LLMService + context_aggregator=context_aggregator, # Context aggregator + flow_config=flow_config, # FlowConfig ) +``` + +Then initialize: +```python @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): await transport.capture_participant_transcription(participant["id"]) + logger.debug("Initializing flow") await flow_manager.initialize() ``` -### Example FlowConfig +For Static Flows, `initialize()` does not take any args. -```python -flow_config = { - "initial_node": "start", - "nodes": { - "start": { - "role_messages": [ - { - "role": "system", - "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", - } - ], - "task_messages": [ - { - "role": "system", - "content": "You are an order-taking assistant. Ask if they want pizza or sushi." - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "choose_pizza", - "handler": choose_pizza, # Returns [None, "pizza_order"] - "description": "User wants pizza", - "parameters": {"type": "object", "properties": {}} - } - } - ] - }, - "pizza_order": { - "task_messages": [...], - "functions": [ - { - "type": "function", - "function": { - "name": "select_size", - "handler": select_size, # Returns [FlowResult, "toppings"] - "description": "Select pizza size", - "parameters": { - "type": "object", - "properties": { - "size": {"type": "string", "enum": ["small", "medium", "large"]} - } - } - } - } - ] - } - } -} -``` +## Cross-Node State -## Dynamic Flows +Pipecat Flows supports cross-node state through the `flow_manager.state` dictionary. This persistent storage lets you share data across nodes throughout the entire conversation. -Dynamic flows create and modify conversation paths at runtime based on data or business logic. - -### Example Implementation - -Here's a complete example of a dynamic insurance quote flow: +**Basic usage** ```python -from pipecat_flows import FlowManager, FlowArgs, FlowResult +async def record_favorite_color_and_set_next_node( + args: FlowArgs, flow_manager: FlowManager +) -> tuple[str, NodeConfig]: + """Function handler that records the color then sets the next node. -# Define handlers and transitions -async def collect_age(args: FlowArgs, flow_manager: FlowManager) -> tuple[AgeResult, NodeConfig]: - """Process age collection.""" - age = args["age"] + Here "record" means print to the console, but any logic could go here; + Write to a database, make an API call, etc. + """ + flow_manager.state["color"] = args["color"] + print(f"Your favorite color is: {args['color']}") + return args["color"], create_end_node() +``` - # Assemble result - result = AgeResult(status="success", age=age) +## Usage Examples: - # Decide which node to go to next - if age < 25: - await flow_manager.set_node_from_config(create_young_adult_node()) - else: - await flow_manager.set_node_from_config(create_standard_node()) +### Dynamic Flow (Recommended) - return result, age +Here's an example that ties together all the concepts we've covered: -# Node creation functions +```python def create_initial_node() -> NodeConfig: - """Create initial age collection node.""" + """Create the initial node of the flow. + + Define the bot's role and task for the node as well as the function for it to call. + The function call includes a handler which provides the function call result to + Pipecat and then transitions to the next node. + """ + record_favorite_color_func = FlowsFunctionSchema( + name="record_favorite_color_func", + description="Record the color the user said is their favorite.", + required=["color"], + handler=record_favorite_color_and_set_next_node, + properties={"color": {"type": "string"}}, + ) + return { "name": "initial", "role_messages": [ { "role": "system", - "content": "You are an insurance quote assistant." - } - ], - "task_messages": [ - { - "role": "system", - "content": "Ask for the customer's age." - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "collect_age", - "handler": collect_age, - "description": "Collect customer age", - "parameters": { - "type": "object", - "properties": { - "age": {"type": "integer"} - } - } - } - } - ] - } - -def create_young_adult_node() -> Dict[str, Any]: - """Create node for young adult quotes.""" - return { - "name": "young_adult", - "task_messages": [ - { - "role": "system", - "content": "Explain our special young adult coverage options." + "content": "You are an inquisitive child. Use very simple language. Ask simple questions. You must ALWAYS use one of the available functions to progress the conversation. Your responses will be converted to audio. Avoid outputting special characters and emojis.", } ], - "functions": [...] # Additional quote-specific functions - } - -def create_standard_node() -> Dict[str, Any]: - """Create node for standard quotes.""" - return { - "name": "standard", "task_messages": [ { "role": "system", - "content": "Present our standard coverage options." + "content": "Say 'Hello world' and ask what is the user's favorite color.", } ], - "functions": [...] # Additional quote-specific functions + "functions": [record_favorite_color_func], } - -# Initialize flow manager -flow_manager = FlowManager( - task=task, - llm=llm, - context_aggregator=context_aggregator, -) - -@transport.event_handler("on_first_participant_joined") -async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - await flow_manager.initialize(create_initial_node()) ``` -### Best Practices - -- Store shared data in flow_manager.state -- Create separate functions for node creation - -# Flow Editor - -The Pipecat Flow Editor provides a visual interface for creating and managing conversation flows. It offers a node-based interface that makes it easier to design, visualize, and modify your flows. - -![Food Ordering Flow](/images/food-ordering-flow.png) - -## Visual Design - -### Node Types +### Static Flow -- **Start Node** (Green): Entry point of your flow +For static flows, you define your entire flow structure upfront using a `FlowConfig`. This follows the same rules as the NodeConfig, but is assembled as a single JSON object that defines the entire state of the program. - ```python - "greeting": { - "role_messages": [...], - "task_messages": [...], - "functions": [...] - } - ``` - -- **Flow Nodes** (Blue): Intermediate states - - ```python - "collect_info": { - "task_messages": [...], - "functions": [...], - "pre_actions": [...] - } - ``` - -- **End Node** (Red): Final state - - ```python - "end": { - "task_messages": [...], - "functions": [], - "post_actions": [{"type": "end_conversation"}] - } - ``` - -- **Function Nodes**: - - Edge Functions (Purple): Create transitions - ```python - { - "name": "next_node", - "description": "Transition to next state" - } - ``` - - Node Functions (Orange): Perform operations - ```python - { - "name": "process_data", - "handler": process_data_handler, - "description": "Process user data" - } - ``` +See the [food_ordering example](https://github.com/pipecat-ai/pipecat-flows/blob/main/examples/static/food_ordering.py) for a complete FlowConfig implementation. -## Naming Conventions +## Next Steps -- **Start Node**: Use descriptive names (e.g., "greeting", "welcome") -- **Flow Nodes**: Name based on purpose (e.g., "collect_info", "verify_data") -- **End Node**: Conventionally named "end" -- **Functions**: Use clear, action-oriented names +Now that you understand the basics of Pipecat Flows, explore the reference docs and more examples: -### Function Configuration - -```python -{ - "type": "function", - "function": { - "name": "process_data", - "handler": process_handler, - "description": "Process user data", - "parameters": {...} - } -} -``` - -When using the Flow Editor, function handlers can be specified using the `__function__:` token: - -```python -{ - "type": "function", - "function": { - "name": "process_data", - "handler": "__function__:process_data", # References function in main script - "description": "Process user data", - "parameters": {...} - } -} -``` - -The handler will be looked up in your main script when the flow is executed. - - - When function handlers are specified in the flow editor, they will be exported - with the `__function__:` token. - - -## Using the Editor - -### Creating a New Flow - -1. Start with a descriptively named Start Node -2. Add Flow Nodes for each conversation state -3. Connect nodes using Edge Functions -4. Add Node Functions for operations -5. Include an End Node - -### Import/Export - -```python -# Export format -{ - "initial_node": "greeting", - "nodes": { - "greeting": { - "role_messages": [...], - "task_messages": [...], - "functions": [...], - "pre_actions": [...] - }, - "process": { - "task_messages": [...], - "functions": [...], - }, - "end": { - "task_messages": [...], - "functions": [], - "post_actions": [...] - } - } -} -``` - -### Tips - -- Use the visual preview to verify flow logic -- Test exported configurations -- Document node purposes and transitions -- Keep flows modular and maintainable - -Try the editor at [flows.pipecat.ai](https://flows.pipecat.ai) + + + Complete API reference and technical details + + + Explore more complex examples and use cases + + diff --git a/server/frameworks/flows/pipecat-flows.mdx b/server/frameworks/flows/pipecat-flows.mdx index a757eae3..2950da91 100644 --- a/server/frameworks/flows/pipecat-flows.mdx +++ b/server/frameworks/flows/pipecat-flows.mdx @@ -1,6 +1,6 @@ --- title: "Pipecat Flows" -description: "Technical reference for Pipecat’s conversation flow system" +description: "Reference docs for Pipecat’s conversation flow system" --- @@ -8,1118 +8,128 @@ description: "Technical reference for Pipecat’s conversation flow system" guide](/guides/features/pipecat-flows) first. +Pipecat Flows is an add-on framework for Pipecat that allows you to build structured conversations in your AI applications. It enables you to define conversation paths while handling the complexities of state management and LLM interactions. + + + + Complete API documentation and method details + + + Source code, examples, and issue tracking + + + Working example with basic conversation flow + + + ## Installation - +### Pipecat Flows -```bash Existing Pipecat installation -pip install pipecat-ai-flows -``` +To use Pipecat Flows, install the required dependency: -```bash Fresh Pipecat installation -# Basic installation +```bash pip install pipecat-ai-flows - -# Install Pipecat with required provider dependencies: -pip install "pipecat-ai[daily,openai,deepgram]" # For OpenAI -pip install "pipecat-ai[daily,anthropic,deepgram]" # For Anthropic -pip install "pipecat-ai[daily,google,deepgram]" # For Google ``` - - -## Core Types - -### FlowArgs - - - Type alias for function handler arguments. - - -### FlowResult - - - Base type for function handler results. Additional fields can be included as needed. - - - - Optional status field - - - Optional error message - - - - -### FlowConfig - - - Configuration for the entire conversation flow. - - - - Starting node identifier - - - Map of node names to configurations - - - - -### NodeConfig - - - Configuration for a single node in the flow. - - - - The name of the node, used in debug logging in dynamic flows. If no name is specified, an - automatically-generated UUID is used. - - ```python - # Example name - "name": "greeting" - ``` - - - Defines the role or persona of the LLM. Required for the initial node and optional for subsequent nodes. - - ```python - # Example role messages - "role_messages": [ - { - "role": "system", - "content": "You are a helpful assistant..." - } - ], - ``` - - - Defines the task for a given node. Required for all nodes. - - ```python - # Example task messages - "task_messages": [ - { - "role": "system", # May be `user` depending on the LLM - "content": "Ask the user for their name..." - } - ], - ``` - - - Strategy for managing context during transitions to this node. - - ```python - # Example context strategy configuration - "context_strategy": ContextStrategyConfig( - strategy=ContextStrategy.RESET_WITH_SUMMARY, - summary_prompt="Summarize the key points discussed so far." - ) - ``` - - - - LLM function / tool call configurations, defined in one of the [supported formats](/guides/features/pipecat-flows#function-configuration). - - ```python - # Using provider-specific dictionary format - "functions": [ - { - "type": "function", - "function": { - "name": "get_current_movies", - "handler": get_movies, - "description": "Fetch movies currently playing", - "parameters": {...} - }, - } - ] - - # Using FlowsFunctionSchema - "functions": [ - FlowsFunctionSchema( - name="get_current_movies", - description="Fetch movies currently playing", - properties={...}, - required=[...], - handler=get_movies - ) - ] - - # Using direct functions (auto-configuration) - "functions": [get_movies] - ``` - - - - Actions that execute before the LLM inference. For example, you can send a message to the TTS to speak a phrase (e.g. "Hold on a moment..."), which may be effective if an LLM function call takes time to execute. - - ```python - # Example pre_actions - "pre_actions": [ - { - "type": "tts_say", - "text": "Hold on a moment..." - } - ], - ``` - - - Actions that execute after the LLM inference. For example, you can end the conversation. - - ```python - # Example post_actions - "post_actions": [ - { - "type": "end_conversation" - } - ] - ``` - +### Pipecat Dependencies - - If set to `False`, the LLM will not respond immediately when the node is set, but will instead wait for the user to speak first before responding. +For fresh installations, you'll need to install Pipecat with depdencies for your Transport, STT, LLM, and TTS providers. - Defaults to `True`. +For example, to use Daily, OpenAI, Deepgram, Cartesia, and Silero: - ```python - # Example usage - "respond_immediately": False - ``` - - - - - -### Function Handler Types - - - Legacy function handler that only receives arguments. - -Returns either: - -- A `FlowResult` (⚠️ deprecated) -- A "consolidated" result tuple (result, next node) where: - - result is an optional `FlowResult` - - next node is an optional `NodeConfig` (for dynamic flows) or string (for static flows) - - - - - Modern function handler that receives both arguments and `FlowManager`. - -Returns either: - -- A `FlowResult` (⚠️ deprecated) -- A "consolidated" result tuple (result, next node) where: - - result is an optional `FlowResult` - - next node is an optional `NodeConfig` (for dynamic flows) or string (for static flows) - - - - - Function that is meant to be [passed directly](/guides/features/pipecat-flows#function-configuration) into a `NodeConfig` rather than into the `handler` field of a function configuration. - -It must be an `async` function with `flow_manager: FlowManager` as its first parameter. - -It must return a `ConsolidatedFunctionResult`, which is a tuple (result, next node) where: - -- result is an optional `FlowResult` -- next node is an optional `NodeConfig` (for dynamic flows) or string (for static flows) - - - -### ContextStrategy - - - Strategy for managing conversation context during node transitions. - - - - Default strategy. Adds new messages to existing context. - - - Clears context and starts fresh with new messages. - - - Resets context but includes an AI-generated summary. - - - - -### ContextStrategyConfig - - - Configuration for context management strategy. - -{" "} - - - - The strategy to use for context management - - - Required when using RESET_WITH_SUMMARY. Prompt text for generating the - conversation summary. - - - -```python -# Example usage -config = ContextStrategyConfig( - strategy=ContextStrategy.RESET_WITH_SUMMARY, - summary_prompt="Summarize the key points discussed so far." -) +```bash +pip install "pipecat-ai[daily,openai,deepgram,cartesia,silero]" ``` - - -### FlowsFunctionSchema +## Flow Types - - A standardized schema for defining functions in Pipecat Flows with flow-specific properties. - - - - Name of the function - - - Description of the function's purpose - - - Dictionary defining properties types and descriptions - - - List of required parameter names - - - Function handler to process the function call - - - Target node to transition to after function execution +### Dynamic Flows (Recommended) - - Deprecated: instead of `transition_to`, use a "consolidated" `handler` that returns a tuple (result, next node). - - - - Callback function for dynamic transitions - - - Deprecated: instead of `transition_callback`, use a "consolidated" `handler` that returns a tuple (result, next node). - - - - +Create conversation paths at runtime based on user input, external data, or business logic. Enables more flexible and adaptive interactions. Additionally, can handle simple scenarios with predefined paths. - You cannot specify both `transition_to` and `transition_callback` in the same - function schema. + Dynamic Flows are the recommended approach for most applications, as they + allow for greater flexibility and adaptability in conversation design. -Example usage: +### Static Flows -```python -from pipecat_flows import FlowsFunctionSchema +Use predefined conversation paths configured upfront. Ideal for predefined interactions where the graph navigation can be predetermined. -# Define a function schema -collect_name_schema = FlowsFunctionSchema( - name="collect_name", - description="Record the user's name", - properties={ - "name": { - "type": "string", - "description": "The user's name" - } - }, - required=["name"], - handler=collect_name_handler -) +## Key Components -# Use in node configuration -node_config = { - "name": "greeting", - "task_messages": [ - {"role": "system", "content": "Ask the user for their name."} - ], - "functions": [collect_name_schema] -} +### Core Classes -# Pass to flow manager -await flow_manager.set_node_from_config(node_config) -``` +- **FlowManager** - Main orchestration class for managing conversation flows +- **FlowsFunctionSchema** - Standardized function call schema that works with any LLM provider - +### Type Definitions -## FlowManager +- **FlowArgs** - Function handler arguments +- **FlowResult** - Function return type with status and optional data +- **NodeConfig** - Complete node configuration including messages, functions, and actions +- **FlowConfig** - Static flow configuration with nodes and initial state - - Main class for managing conversation flows, supporting both static (configuration-driven) and dynamic (runtime-determined) flows. +### Context Management - - - Pipeline task for frame queueing - - - LLM service instance (OpenAI, Anthropic, or Google). Must be initialized with the corresponding pipecat-ai provider dependency installed. - - - Context aggregator used for pushing messages to the LLM service - - - Optional TTS service for voice actions. - - Deprecated: No need to explicitly pass `tts` to `FlowManager` in order to use `tts_say` actions. - - - - Optional static flow configuration - - - Optional configuration for how context should be managed during transitions. - Defaults to APPEND strategy if not specified. - +- **ContextStrategyConfig** - Configuration for context management during transitions +- **ContextStrategy** - Enum for APPEND, RESET, and RESET_WITH_SUMMARY strategies - - +### Actions -### Methods +- **ActionConfig** - Configuration for custom pre- and post-actions during conversation flow - - Initialize the flow with starting messages. +## Function Types - - - The initial conversation node (needed for dynamic flows only). If not - specified, you'll need to call `set_node_from_config()` to kick off the - conversation. - - +### Node Functions - - - If initialization fails - - - +Execute operations within a single conversation state without switching nodes. Return `(FlowResult, None)`. - - Set up a new conversation node programmatically (dynamic flows only). - - - In dynamic flows, the application advances the conversation using `set_node` to set up each next node. - - In static flows, `set_node` is triggered under the hood when a node contains a `transition_to` field. - - Deprecated: use the following patterns instead of `set_node`: - - Prefer "consolidated" function handlers that return a tuple (result, next node), which implicitly sets up the next node - - Prefer passing your initial node to `FlowManager.initialize()` - - If you really need to set a node explicitly, use `set_node_from_config()` (note: its name will be read from its `NodeConfig`) - - - - Identifier for the new node - - - Node configuration including messages, functions, and actions - - +### Edge Functions - - - If node setup fails - - - +Create transitions between conversation states, optionally processing data first. Return `(FlowResult, NodeConfig)` or `(FlowResult, str)`. - - Set up a new conversation node programmatically (dynamic flows only). +### Direct Functions -Note that this method should only be used in rare circumstances. Most often, you should: +Functions passed directly to NodeConfig with automatic metadata extraction from signatures and docstrings. -- Prefer "consolidated" function handlers that return a tuple (result, next node), which implicitly sets up the next node -- Prefer passing your initial node to `FlowManager.initialize()` +## LLM Provider Support - - - Node configuration including messages, functions, and actions - - +Pipecat Flows automatically handles format differences between providers: - - If node setup fails - - +| Provider | Format Support | Installation | +| ------------- | --------------------- | ------------------------------------- | +| OpenAI | Function calling | `pip install "pipecat-ai[openai]"` | +| Anthropic | Native tools | `pip install "pipecat-ai[anthropic]"` | +| Google Gemini | Function declarations | `pip install "pipecat-ai[google]"` | +| AWS Bedrock | Anthropic-compatible | `pip install "pipecat-ai[aws]"` | - - Register a handler for a custom action type. - - - - String identifier for the action - - - Async or sync function that handles the action - - - - - - Get the current conversation context. - -Returns a list of messages in the current context, including system messages, -user messages, and assistant responses. - - - - List of messages in the current context - - - - - - If context aggregator is not available - - - -Example usage: - -```python -# Access current conversation context -context = flow_manager.get_current_context() - -# Use in handlers -async def process_response(args: FlowArgs) -> tuple[FlowResult, str]: - context = flow_manager.get_current_context() - # Process conversation history - return {"status": "success"}, "next" -``` - - - -### State Management - -The FlowManager provides a state dictionary for storing conversation data: - - - -```python Access state -flow_manager.state: Dict[str, Any] - -# Store data -flow_manager.state["user_age"] = 25 -``` - -```python Access in transitions -async def handle_transitions( - function_name: str, - args: Dict[str, Any], - flow_manager: FlowManager -) -> None: - age = flow_manager.state.get("user_age") -``` - - - -### Usage Examples - - - -```python Static Flow -flow_config: FlowConfig = { - "initial_node": "greeting", - "nodes": { - "greeting": { - "role_messages": [ - { - "role": "system", - "content": "You are a helpful assistant. Your responses will be converted to audio." - } - ], - "task_messages": [ - { - "role": "system", - "content": "Start by greeting the user and asking for their name." - } - ], - "functions": [{ - "type": "function", - "function": { - "name": "collect_name", - "handler": collect_name_handler, - "description": "Record user's name", - "parameters": {...} - } - }] - } - } -} - -# Create and initialize the FlowManager -flow_manager = FlowManager( - task=task, - llm=llm, - context_aggregator=context_aggregator, - flow_config=flow_config -) - -# Initialize the flow_manager to start the conversation -await flow_manager.initialize() -``` - -```python Dynamic Flow -def create_initial_node() -> NodeConfig: - return { - "name": "initial", - "role_messages": [ - { - "role": "system", - "content": "You are a helpful assistant." - } - ], - "task_messages": [ - { - "role": "system", - "content": "Ask the user for their age." - } - ], - "functions": [ - { - "type": "function", - "function": { - "name": "collect_age", - "handler": collect_age, - "description": "Record user's age", - "parameters": { - "type": "object", - "properties": { - "age": {"type": "integer"} - }, - "required": ["age"] - } - } - } - ] - } - -# Create the FlowManager -flow_manager = FlowManager( - task=task, - llm=llm, - context_aggregator=context_aggregator, -) - -# Initialize the flow_manager with the initial node -@transport.event_handler("on_first_participant_joined") -async def on_first_participant_joined(transport, participant): - await transport.capture_participant_transcription(participant["id"]) - await flow_manager.initialize(create_initial_node()) -``` - - - - - Functions that execute operations within a single conversational state, without switching nodes. - - - -```python -from pipecat_flows import FlowArgs, FlowResult - -async def process_data(args: FlowArgs) -> tuple[FlowResult, None]: - """Handle data processing within a node.""" - data = args["data"] - result = await process(data) - return { - "status": "success", - "processed_data": result - }, None - -# Function configuration -{ - "type": "function", - "function": { - "name": "process_data", - "handler": process_data, - "description": "Process user data", - "parameters": { - "type": "object", - "properties": { - "data": {"type": "string"} - } - } - } -} -``` - - - - - - - Functions that specify a transition between nodes (optionally processing data first). - - - -```python -from pipecat_flows import FlowArgs, FlowResult - -async def next_step(args: FlowArgs) -> tuple[None, str]: - """Specify the next node to transition to.""" - return None, "target_node" # Return NodeConfig instead of str for dynamic flows - -# Function configuration -{ - "type": "function", - "function": { - "name": "next_step", - "handler": next_step, - "description": "Transition to next node", - "parameters": {"type": "object", "properties": {}} - } -} -``` - - - - - -### Function Properties - - - -Async function that processes data within a node and/or specifies the next node ([more details here](/guides/features/pipecat-flows#defining-a-function)). Can be specified as: - -- Direct function reference -- Either a Callable function or a string with `__function__:` prefix (e.g., `"__function__:process_data"`) to reference a function in the main script - - -```python Direct Reference -{ - "type": "function", - "function": { - "name": "process_data", - "handler": process_data, # Callable function - "parameters": {...} - } -} -``` - -```python Function Token -{ - "type": "function", - "function": { - "name": "process_data", - "handler": "__function__:process_data", # References function in main script - "parameters": {...} - } -} -``` +## Error Handling - +Comprehensive exception hierarchy for flow management: - - - - Handler for dynamic flow transitions. - - - Deprecated: instead of `transition_callback`, use a "consolidated" `handler` - that returns a tuple (result, next node). - - - Must be an async function with one of these signatures: - -```python -# New style (recommended) -async def handle_transition( - args: Dict[str, Any], - result: FlowResult, - flow_manager: FlowManager -) -> None: - """Handle transition to next node.""" - if result.available: # Type-safe access to result - await flow_manager.set_node_from_config(create_confirmation_node()) - else: - await flow_manager.set_node_from_config( - create_no_availability_node(result.alternative_times) - ) - -# Legacy style (supported for backwards compatibility) -async def handle_transition( - args: Dict[str, Any], - flow_manager: FlowManager -) -> None: - """Handle transition to next node.""" - await flow_manager.set_node_from_config(create_next_node()) -``` - -The callback receives: - -- `args`: Arguments from the function call -- `result`: Typed result from the function handler (new style only) -- `flow_manager`: Reference to the FlowManager instance - -Example usage: - -```python -async def handle_availability_check( - args: Dict, - result: TimeResult, # Typed result - flow_manager: FlowManager -): - """Handle availability check and transition based on result.""" - if result.available: - await flow_manager.set_node_from_config(create_confirmation_node()) - else: - await flow_manager.set_node_from_config( - create_no_availability_node(result.alternative_times) - ) - -# Use in function configuration -{ - "type": "function", - "function": { - "name": "check_availability", - "handler": check_availability, - "parameters": {...}, - "transition_callback": handle_availability_check - } -} -``` - -Note: A function cannot have both `transition_to` and `transition_callback`. - - - -### Handler Signatures - -Function handlers passed as a `handler` in a function configuration can be defined with three different signatures: - - - ```python Modern (Args + FlowManager) - async def handler_with_flow_manager(args: FlowArgs, flow_manager: FlowManager) -> tuple[FlowResult, NodeConfig]: - """Modern handler that receives both arguments and FlowManager access.""" - # Access state - previous_data = flow_manager.state.get("stored_data") - - # Access pipeline resources - await flow_manager.task.queue_frame(TTSSpeakFrame("Processing your request...")) - - # Store data in state for later - flow_manager.state["new_data"] = args["input"] - - return { - "status": "success", - "result": "Processed with flow access" - }, create_next_node() - ``` - - ```python Legacy (Args Only) - async def handler_with_args(args: FlowArgs) -> tuple[FlowResult, NodeConfig]: - """Legacy handler that only receives arguments.""" - value = args["some_value"] - return { - "status": "success", - "result": value - }, create_next_node() - ``` - - ```python No Arguments - async def handler_no_args() -> tuple[FlowResult, NodeConfig]: - """Handler that doesn't need arguments.""" - result = await fetch_data() - return { - "status": "success", - "data": result - }, create_next_node() - ``` - - - -The framework automatically detects which signature your handler is using and calls it appropriately. - -If you're [passing your function directly](/guides/features/pipecat-flows#function-configuration) into your `NodeConfig` rather than as a `handler` in a function configuration, you'd use a somewhat different signature: - -```python Direct -async def do_something(flow_manager: FlowManager, foo: int, bar: str = "") -> tuple[FlowResult, NodeConfig]: - """ - Do something interesting. - - Args: - foo (int): The foo to do something interesting with. - bar (string): The bar to do something interesting with. Defaults to empty string. - """ - result = await fetch_data(foo, bar) - next_node = create_end_node() - return result, next_node -``` - -### Return Types - - - ```python Success Response - { - "status": "success", - "data": "some data" # Optional additional data - } - ``` - - ```python Error Response - { - "status": "error", - "error": "Error message" - } - ``` - - - -### Provider-Specific Formats - - - You don't need to handle these format differences manually - use the standard - format and the FlowManager will adapt it for your chosen provider. - - - - ```python OpenAI - { - "type": "function", - "function": { - "name": "function_name", - "handler": handler, - "description": "Description", - "parameters": {...} - } - } - ``` - - ```python Anthropic - { - "name": "function_name", - "handler": handler, - "description": "Description", - "input_schema": {...} - } - ``` - - ```python Google (Gemini) - { - "function_declarations": [ - { - "name": "function_name", - "handler": handler, - "description": "Description", - "parameters": {...} - } - ] - } - ``` - - - -## Actions - -`pre_actions` and `post_actions` are used to manage conversation flow. They are included in the `NodeConfig` and executed before and after the LLM completion, respectively. - -Three kinds of actions are available: - -- Pre-canned actions: These actions perform common tasks and require little configuration. -- Function actions: These actions run developer-defined functions at the appropriate time. -- Custom actions: These are fully developer-defined actions, providing flexibility at the expense of complexity. - -### Pre-canned Actions - -Common actions shipped with Flows for managing conversation flow. To use them, just add them to your `NodeConfig`. - - - Speaks text immediately using the TTS service. - - - -```python -"pre_actions": [ - { - "type": "tts_say", - "text": "Processing your request..." # Required - } -] -``` - - - - - - Ends the conversation and closes the connection. - - - -```python -"post_actions": [ - { - "type": "end_conversation", - "text": "Goodbye!" # Optional farewell message - } -] -``` - - - - - -### Function Actions - -Actions that run developer-defined functions at the appropriate time. For example, if used in `post_actions`, they'll run after the bot has finished talking and after any previous `post_actions` have finished. - - - Calls the developer-defined function at the appropriate time. - - - -```python -"post_actions": [ - { - "type": "function", - "handler": bot_turn_ended # Required - } -] -``` - - - - - -### Custom Actions - -Fully developer-defined actions, providing flexibility at the expense of complexity. - -Here's the complexity: because these actions aren't queued in the Pipecat pipeline, they may execute seemingly early if used in `post_actions`; they'll run immediately after the LLM completion is triggered but won't wait around for the bot to finish talking. - -Why would you want this behavior? You might be writing an action that: - -- Itself just queues another `Frame` into the Pipecat pipeline (meaning there would no benefit to waiting around for sequencing purposes) -- Does work that can be done a bit sooner, like logging that the LLM was updated - -Custom actions are composed of at least: - - - String identifier for the action - - - Async or sync function that handles the action - - -Example: - - - -```python -# Define custom action handler -async def custom_notification(action: dict, flow_manager: FlowManager): - """Custom action handler.""" - message = action.get("message", "") - await notify_user(message) - -# Use in node configuration -"pre_actions": [ - { - "type": "notify", - "handler": send_notification, - "message": "Attention!", - } -] -``` - - - -## Exceptions - - -Base exception for all flow-related errors. - - - -```python -from pipecat_flows import FlowError - -try: - await flow_manager.set_node_from_config(config) -except FlowError as e: - print(f"Flow error: {e}") -``` - - - - - - -Raised when flow initialization fails. - - - -```python -from pipecat_flows import FlowInitializationError - -try: - await flow_manager.initialize() -except FlowInitializationError as e: - print(f"Initialization failed: {e}") - -``` - - - - - - -Raised when a state transition fails. - - - -```python -from pipecat_flows import FlowTransitionError - -try: - await flow_manager.set_node_from_config(node_config) -except FlowTransitionError as e: - print(f"Transition failed: {e}") - -``` - - - - - - -Raised when an invalid or unavailable function is specified. - - - -```python -from pipecat_flows import InvalidFunctionError - -try: -await flow_manager.set_node_from_config({ - "functions": [{ - "type": "function", - "function": { - "name": "invalid_function" - } - }] -}) - -except InvalidFunctionError as e: - print(f"Invalid function: {e}") -``` +- **FlowError** - Base exception for all flow-related errors +- **FlowInitializationError** - Flow manager setup failures +- **FlowTransitionError** - Node transition failures +- **InvalidFunctionError** - Function registration or execution errors +- **ActionError** - Action execution failures - +## Additional Notes - +- **State Management**: Use `flow_manager.state` dictionary for persistent conversation data +- **Automatic Function Call Registration and Validation**: All functions are automatically registered and validated at run-time +- **Provider Compatibility**: Format differences handled automatically via adapter system +- **Deprecation**: Some legacy patterns (`transition_to`, `transition_callback`) are deprecated in favor of consolidated function handlers diff --git a/server/links/server-reference.mdx b/server/links/server-reference.mdx index 1295df49..ad40fa94 100644 --- a/server/links/server-reference.mdx +++ b/server/links/server-reference.mdx @@ -1,5 +1,5 @@ --- title: "Reference docs" -url: "https://pipecat-docs.readthedocs.io/" +url: "https://reference-server.pipecat.ai/" icon: "book" ---