# AI Agents Crash Course - Part 2 - Custom Tools

## Table of Contents

1. [Project Setup](#project-setup)
   - [Pre-requisites](#pre-requisites)

2. [Core Setup and Imports](#core-setup-and-imports)
   - [Import Required Libraries](#import-required-libraries)
   - [Set up Language Model (LLM)](#set-up-language-model-llm)

3. [Custom Tool Development](#custom-tool-development)
   - [Define the Currency Converter Tool](#define-the-currency-converter-tool)

4. [Query Parser Implementation](#query-parser-implementation)
   - [Define Query Parser Agent](#define-query-parser-agent)
   - [Define Query Parser Task](#define-query-parser-task)
   - [Query Parser Integration Ready](#query-parser-integration-ready)

5. [Currency Analyst Implementation](#currency-analyst-implementation)
   - [Define Currency Analyst Agent](#define-currency-analyst-agent)
   - [Define Currency Conversion Task](#define-currency-conversion-task)

6. [Multi-Stage Currency Conversion Pipeline](#multi-stage-currency-conversion-pipeline)
   - [Complete Workflow Integration](#complete-workflow-integration)
   - [Multi-Stage Pipeline Architecture Overview](#multi-stage-pipeline-architecture-overview)
   - [Create and Execute Multi-Stage Currency Conversion Pipeline](#create-and-execute-multi-stage-currency-conversion-pipeline)
   - [Pipeline Demonstration with Various Inputs](#pipeline-demonstration-with-various-inputs)
   - [🚨 Execution Loop Prevention Strategies](#-execution-loop-prevention-strategies)
   - [Optimized Agent Configurations](#optimized-agent-configurations)
   - [Safe Single-Agent Testing](#safe-single-agent-testing)
   - [Optimized Pipeline Testing](#optimized-pipeline-testing)

7. [Summary: Complete Multi-Stage AI Pipeline](#summary-complete-multi-stage-ai-pipeline)

---

This notebook demonstrates how to **create and integrate custom tools with CrewAI** to build a sophisticated **multi-stage AI pipeline** that processes natural language input and delivers professional financial analysis.

**Key Features:**
- ✅ Custom tool development with BaseTool extension
- ✅ Pydantic schema validation for tool inputs and outputs
- ✅ Natural language query parsing with structured JSON extraction
- ✅ Multi-stage agent pipeline with data flow between stages
- ✅ Real-time currency conversion API integration
- ✅ Agent specialization with domain expertise
- ✅ Complete end-to-end workflow from conversational input to expert analysis
- ✅ **Execution loop prevention strategies and optimized configurations**

**Pipeline Architecture:**
🗣️ **Natural Language** → 🤖 **Query Parser** → 💰 **Currency Analyst** → 📊 **Financial Report**

---

REFERENCE:  https://www.dailydoseofds.com/ai-agents-crash-course-part-2-with-implementation/

---

## Project Setup

This notebook will guide you through setting up the project environment for using CrewAI. We will:

1. Install the required Python modules.
2. Set up a virtual environment.
3. Verify the installation.

In [1]:
# Uncomment if you are not using devcontainers and want to set up a local environment
# 
# # Step 1: Create and activate a virtual environment
# #
# %python3 -m venv venv
# %source venv/bin/activate
# 
# # Step 2: Install required Python modules
# #
# %pip install -r requirements.txt
# 
# # Step 3: Verify installation
# #
# %pip list

### Pre-requisites

- You will need an API key from here: https://www.exchangerate-api.com/

### Load Required Python Modules and Libraries

#### Import Essential Libraries

This code block imports all the necessary libraries for building custom tools with CrewAI:

- **`dotenv`**: Loads environment variables from `.env` files
- **`os`**: Provides access to operating system interface for environment variables
- **`requests`**: HTTP library for making API calls to external services
- **`typing.Type`**: Type hints for better code documentation and IDE support
- **`IPython.display.Markdown`**: Renders markdown output in Jupyter notebooks
- **`crewai`**: Core CrewAI components (LLM, Agent, Task, Crew)
- **`crewai.tools.BaseTool`**: Base class for creating custom tools
- **`pydantic`**: Data validation and schema definition library

In [2]:
from dotenv import load_dotenv
import os
import requests
from typing import Type
from IPython.display import Markdown
from crewai import LLM, Agent, Task, Crew
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

### [Optional] Enable litellm debug logging

#### Enable Debug Logging (Troubleshooting)

This optional code enables detailed debug logging from the `litellm` library, which is useful for:

- **Diagnosing API connection issues** with LLM providers
- **Viewing detailed request/response logs** for troubleshooting
- **Understanding token usage and rate limiting**

⚠️ **Note**: Once enabled, you'll need to restart the Jupyter kernel to disable debug logging.

In [3]:
# Uncomment in order to enable litellm debugging for better error diagnostics
# NOTE: You will have to restart the jupyter kernel to disable debug logging once it has been enabled.
#
# import litellm
# litellm._turn_on_debug()

### Load Environment Variables and Configure LLM

This block loads environment variables from the `.env` file, including the OpenAI API Key, which is required to authenticate with OpenAI's services. It then configures the `LLM` object to use OpenAI's GPT-4 model. Alternatively, you can uncomment the provided code to configure the `LLM` object to use Ollama with a local model, provided Ollama is installed and running.

In [4]:
# Load environment variables from .env file
# Note: In devcontainer, variables are already loaded by dotenv feature,
# but load_dotenv() is safe and won't override existing environment variables
load_dotenv()

# Uncomment the code block below to use OpenAI with your API Key
# 
# api_key = os.getenv('OPENAI_API_KEY')
# if not api_key:
#     raise ValueError("OPENAI_API_KEY is not set in the .env file")

# Uncomment the code block below to use ollama
# 
OLLAMA_API_BASE = os.getenv('OLLAMA_API_BASE')
if not OLLAMA_API_BASE:
    raise ValueError("OLLAMA_API_BASE is not set in the .env file")

### Configure LLM

Configures the `LLM` object to use OpenAI's GPT-4 model. 

Alternatively, you can uncomment the provided code to configure the `LLM` object to use Ollama with a local model, provided Ollama is installed and running.

In [5]:
# llm = LLM(
#     model="gpt-4o",  # Specify the OpenAI model you want to use
#     api_key=api_key
# )

# Uncomment the code block below to use Ollama with your local model
# Make sure to have Ollama installed and running
# 
llm = LLM(
    # model="ollama/llama3:latest",
    # model="ollama/llama3.2:1b",
    # model="ollama/deepseek-r1:latest",
    # model="ollama/gemma3:latest",
    model="ollama/gemma3n:latest",
    base_url=OLLAMA_API_BASE
)

## Verify LLM Configuration

### Universal LLM Connection Test

#### Universal LLM Connection Testing Function

This comprehensive function validates your LLM configuration across multiple providers:

- **Configuration Analysis**: Inspects LLM settings (model, base URL, API key status)
- **Provider Detection**: Automatically identifies the LLM provider (OpenAI, Ollama, etc.)
- **Connection Testing**: Validates server availability and model accessibility
- **Functionality Test**: Performs an actual LLM call to ensure everything works
- **Troubleshooting**: Provides specific error messages and resolution tips

🔧 **Purpose**: Prevents issues downstream by validating the entire LLM pipeline upfront.

In [6]:
import requests
import os

def test_llm_connection():
    """Test LLM connection regardless of provider"""
    
    print("=== LLM Configuration Analysis ===")
    
    # Analyze LLM configuration
    model_name = getattr(llm, 'model', 'Unknown')
    base_url = getattr(llm, 'base_url', None)
    api_key_set = bool(getattr(llm, 'api_key', None))
    
    print(f"Model: {model_name}")
    print(f"Base URL: {base_url if base_url else 'Default (provider-specific)'}")
    print(f"API Key Set: {'Yes' if api_key_set else 'No'}")
    
    # Determine provider type
    provider = "unknown"
    if "ollama" in model_name.lower():
        provider = "ollama"
    elif "gpt" in model_name.lower() or "openai" in model_name.lower():
        provider = "openai"
    elif "claude" in model_name.lower() or "anthropic" in model_name.lower():
        provider = "anthropic"
    elif "gemini" in model_name.lower() or "google" in model_name.lower():
        provider = "google"
    
    print(f"Detected Provider: {provider}")
    
    # Provider-specific connection tests
    print(f"\n=== {provider.title()} Connection Test ===")
    
    if provider == "ollama" and base_url:
        try:
            # Test Ollama server availability
            test_url = f"{base_url}/api/tags"
            response = requests.get(test_url, timeout=5)
            print(f"Ollama Server Status: {response.status_code}")
            if response.status_code == 200:
                models = response.json().get('models', [])
                print(f"Available Models: {len(models)} found")
                # Check if our specific model is available
                model_available = any(model_name.replace('ollama/', '') in str(model) for model in models)
                print(f"Target Model Available: {'Yes' if model_available else 'No'}")
            else:
                print(f"Ollama server responded with status: {response.status_code}")
        except Exception as e:
            print(f"❌ Ollama server connection failed: {e}")
    
    elif provider == "openai":
        print("OpenAI connection test (API key validation happens during LLM call)")
        api_key_env = os.getenv('OPENAI_API_KEY')
        print(f"OPENAI_API_KEY environment variable: {'Set' if api_key_env else 'Not set'}")
    
    elif provider == "anthropic":
        print("Anthropic connection test")
        api_key_env = os.getenv('ANTHROPIC_API_KEY')
        print(f"ANTHROPIC_API_KEY environment variable: {'Set' if api_key_env else 'Not set'}")
    
    elif provider == "google":
        print("Google AI connection test")
        api_key_env = os.getenv('GOOGLE_API_KEY')
        print(f"GOOGLE_API_KEY environment variable: {'Set' if api_key_env else 'Not set'}")
    
    else:
        print("Generic provider - will test with LLM call only")
    
    # Universal LLM functionality test
    print(f"\n=== LLM Functionality Test ===")
    try:
        test_response = llm.call([{"role": "user", "content": "Respond with exactly: 'Test successful'"}])
        print("✅ LLM call successful!")
        print(f"Response type: {type(test_response)}")
        
        # Try to extract response content
        if hasattr(test_response, 'content'):
            print(f"Response content: {test_response.content[:100]}...")
        elif isinstance(test_response, str):
            print(f"Response: {test_response[:100]}...")
        else:
            print(f"Response: {str(test_response)[:100]}...")
            
    except Exception as e:
        print(f"❌ LLM call failed: {e}")
        print(f"Error type: {type(e).__name__}")
        
        # Provide specific troubleshooting tips based on error
        error_str = str(e).lower()
        if "connection" in error_str or "timeout" in error_str:
            print("💡 Tip: Check network connection and base_url configuration")
        elif "api_key" in error_str or "authentication" in error_str or "unauthorized" in error_str:
            print("💡 Tip: Check API key configuration and permissions")
        elif "model" in error_str or "not found" in error_str:
            print("💡 Tip: Verify model name and availability")

# Run the comprehensive test
test_llm_connection()

=== LLM Configuration Analysis ===
Model: ollama/gemma3n:latest
Base URL: http://host.docker.internal:11434
API Key Set: No
Detected Provider: ollama

=== Ollama Connection Test ===
Ollama Server Status: 200
Available Models: 6 found
Target Model Available: Yes

=== LLM Functionality Test ===
✅ LLM call successful!
Response type: <class 'str'>
Response: Test successful
...
✅ LLM call successful!
Response type: <class 'str'>
Response: Test successful
...


### Environment Variables Check for All LLM Providers

#### Environment Variables Audit Function

This function performs a comprehensive audit of API keys and environment variables for all major LLM providers:

- **Multi-Provider Support**: Checks for OpenAI, Anthropic, Google, Cohere, Hugging Face, Ollama, Azure, AWS, and more
- **Status Reporting**: Shows which API keys are configured and which are missing
- **Provider Discovery**: Identifies which LLM services you have access to
- **Security Check**: Verifies environment variable setup without exposing actual key values

📋 **Benefit**: Helps you understand your available LLM options and identify missing configurations.

In [7]:
def check_environment_variables():
    """Check environment variables for all major LLM providers"""
    
    print("=== Environment Variables Status ===")
    
    # Common LLM provider environment variables
    env_vars = {
        "OpenAI": ["OPENAI_API_KEY", "OPENAI_BASE_URL"],
        "Anthropic": ["ANTHROPIC_API_KEY"],
        "Google": ["GOOGLE_API_KEY", "GOOGLE_APPLICATION_CREDENTIALS"],
        "Cohere": ["COHERE_API_KEY"],
        "Hugging Face": ["HUGGINGFACE_API_KEY", "HF_TOKEN"],
        "Ollama": ["OLLAMA_API_BASE", "OLLAMA_HOST"],
        "Azure OpenAI": ["AZURE_OPENAI_API_KEY", "AZURE_OPENAI_ENDPOINT"],
        "AWS Bedrock": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION"],
        "Together AI": ["TOGETHER_API_KEY"],
        "Replicate": ["REPLICATE_API_TOKEN"],
        "Perplexity": ["PERPLEXITYAI_API_KEY"],
        "Groq": ["GROQ_API_KEY"]
    }
    
    found_providers = []
    
    for provider, vars_list in env_vars.items():
        provider_vars = {}
        has_any_var = False
        
        for var in vars_list:
            value = os.getenv(var)
            if value:
                provider_vars[var] = "✅ Set"
                has_any_var = True
            else:
                provider_vars[var] = "❌ Not set"
        
        if has_any_var:
            found_providers.append(provider)
            print(f"\n{provider}:")
            for var, status in provider_vars.items():
                print(f"  {var}: {status}")
    
    if not found_providers:
        print("No LLM provider environment variables found.")
        print("Make sure to set the appropriate API keys for your chosen provider.")
    else:
        print(f"\nConfigured providers: {', '.join(found_providers)}")

# Run environment check
check_environment_variables()

=== Environment Variables Status ===

OpenAI:
  OPENAI_API_KEY: ✅ Set
  OPENAI_BASE_URL: ❌ Not set

Ollama:
  OLLAMA_API_BASE: ✅ Set
  OLLAMA_HOST: ✅ Set

Configured providers: OpenAI, Ollama


## Custom Tool Implementation

### Define the input fields the tool expects using Pydantic

#### Define Input Schema with Pydantic

This code creates a structured input schema for our custom currency converter tool using Pydantic:

- **Type Safety**: Ensures `amount` is always a float and currencies are strings
- **Field Validation**: Pydantic automatically validates input types and required fields
- **Documentation**: Field descriptions help the AI agent understand what each parameter represents
- **Error Prevention**: Invalid inputs are caught before reaching the tool's execution logic

🎯 **Purpose**: Provides a clear contract for what data the tool expects, preventing runtime errors and improving AI agent reliability.

In [8]:
class CurrencyConverterInput(BaseModel):
    """Input schema for CurrencyConverterTool."""
    amount: float = Field(..., description="The amount to convert.")
    from_currency: str = Field(..., description="The source currency code (e.g., 'USD').")
    to_currency: str = Field(..., description="The target currency code (e.g., 'EUR').")


### Define the Currency Converter Tool

#### Custom Currency Converter Tool Implementation

This code defines a complete custom tool by extending CrewAI's `BaseTool` class. Here's how each component works:

##### **Class Attributes (Tool Metadata)**:
- **`name`**: Human-readable tool identifier shown to AI agents
- **`description`**: Explains the tool's purpose to help agents decide when to use it
- **`args_schema`**: Links to our Pydantic schema for input validation
- **`api_key`**: Securely retrieves the Exchange Rate API key from environment variables

##### **The `_run` Method - The Heart of the Tool** 🔧

The `_run` method is **absolutely critical** - it's the actual business logic that executes when an AI agent calls this tool:

1. **API Request**: Makes HTTP call to `exchangerate-api.com` with the source currency
2. **Error Handling**: Checks for API failures and returns helpful error messages
3. **Data Validation**: Verifies the target currency exists in the response
4. **Calculation**: Performs the currency conversion using real-time exchange rates
5. **Formatted Response**: Returns a human-readable result string

##### **Why `_run` is Essential**:
- **Execution Entry Point**: This is the ONLY method the CrewAI framework calls when an agent uses the tool
- **Type-Safe Inputs**: Parameters are automatically validated against our Pydantic schema before reaching `_run`
- **Return Format**: Must return a string that the AI agent can understand and use in its response
- **Error Resilience**: Should handle all possible failure scenarios gracefully

##### **Integration with CrewAI**:
When an AI agent needs currency conversion, CrewAI automatically:
1. Validates inputs against `CurrencyConverterInput` schema
2. Calls `_run` with the validated parameters
3. Provides the returned string to the agent for further processing

💡 **Key Insight**: The `_run` method transforms external API data into actionable information that AI agents can reason about and present to users.

In [9]:
class CurrencyConverterTool(BaseTool):
    name: str = "Currency Converter Tool"
    description: str = "Converts an amount from one currency to another."
    args_schema: Type[BaseModel] = CurrencyConverterInput
    api_key: str = os.getenv("EXCHANGE_RATE_API_KEY")
    
    def _run(self, amount: float, from_currency: str, to_currency: str) -> str:
        url = f"https://v6.exchangerate-api.com/v6/{self.api_key}/latest/{from_currency}"
        response = requests.get(url)

        if response.status_code != 200:
            return "Failed to fetch exchange rates."

        data = response.json()
        if "conversion_rates" not in data or to_currency not in data["conversion_rates"]:
            return f"Invalid currency code: {to_currency}"

        rate = data["conversion_rates"][to_currency]
        converted_amount = amount * rate
        return f"{amount} {from_currency} is equivalent to {converted_amount:.2f} {to_currency}."


## Agent, Task and Crew Creation

### Query Parser Implementation

#### Query Parser Implementation Explained

##### 🧠 **How Natural Language Processing Works in CrewAI**

The Query Parser demonstrates a powerful pattern in AI agent systems: **converting unstructured natural language into structured data**. Here's how this implementation achieves that:

##### **🔧 Core Components**

**1. Pydantic Schema Integration**
```python
output_pydantic=CurrencyConverterInput
```
- **Type Safety**: Ensures the AI output matches our exact data structure
- **Automatic Validation**: CrewAI validates the agent's response against the schema
- **JSON Serialization**: Converts agent text output into usable Python objects
- **Error Prevention**: Catches malformed outputs before they reach downstream components

**2. Intelligent Language Understanding**
The agent handles natural language variations automatically:
- **"100 dollars"** → `{"amount": 100.0, "from_currency": "USD"}`
- **"convert euros to yen"** → `{"from_currency": "EUR", "to_currency": "JPY"}`
- **"fifty pounds sterling"** → `{"amount": 50.0, "from_currency": "GBP"}`

**3. Robust Error Handling**
- **Missing Information**: Makes reasonable assumptions or requests clarification
- **Ambiguous Input**: Uses context clues to determine intent
- **Invalid Currencies**: Standardizes to recognized currency codes

##### **🚀 Real-World Benefits**

**User Experience**
- **Natural Interface**: Users can type conversational requests instead of filling forms
- **Flexible Input**: Accepts various phrasings and formats
- **Intelligent Defaults**: Handles incomplete information gracefully

**System Integration**
- **Standardized Output**: Downstream systems receive consistent, validated data
- **Type Safety**: Eliminates runtime errors from malformed input
- **Scalability**: Easy to extend for additional parameters or use cases

##### **🔄 Two-Stage Architecture**

This implementation demonstrates a powerful **two-stage processing pattern**:

1. **Stage 1: Natural Language → Structured Data**
   - Input: `"Convert 100 dollars to euros today"`
   - Output: `{"amount": 100.0, "from_currency": "USD", "to_currency": "EUR"}`

2. **Stage 2: Structured Data → Business Logic**
   - Input: Validated Pydantic object
   - Output: Real currency conversion with API calls

##### **💡 Key Design Decisions**

**Why Use `output_pydantic`?**
- **Consistency**: Ensures all parsing results follow the same structure
- **Validation**: Automatic type checking and field validation
- **Integration**: Seamless handoff between parsing and conversion stages
- **Debugging**: Clear error messages when parsing fails

**Why Separate Agents?**
- **Separation of Concerns**: Parsing logic separate from business logic
- **Reusability**: Query parser can be used with other tools that need the same input format
- **Maintainability**: Changes to parsing logic don't affect conversion logic
- **Testing**: Each component can be tested independently

##### **🎯 Advanced Use Cases**

This pattern can be extended for:
- **Multi-parameter parsing**: "Convert 100 USD to EUR and 50 GBP to JPY"
- **Context awareness**: "Convert the same amount to Japanese currency"
- **Batch processing**: "Convert these amounts: 100 USD, 50 EUR, 75 GBP to CAD"
- **Smart defaults**: Using user preferences for default currencies

##### **🔍 Troubleshooting Tips**

**Common Issues:**
- **Schema Mismatch**: Ensure agent output exactly matches Pydantic field names
- **Type Errors**: Check that numeric values are properly parsed as floats
- **Missing Fields**: Verify all required fields are extracted from input
- **Currency Codes**: Ensure standard 3-letter ISO codes are used

**Best Practices:**
- **Clear Instructions**: Provide detailed parsing guidelines in task description
- **Example Mappings**: Show common currency name → code conversions
- **Error Recovery**: Handle ambiguous or incomplete inputs gracefully
- **Validation**: Always check parsed results before using in downstream processes

#### Define Query Parser Agent

##### Create Intelligent Query Parser Agent

This agent specializes in understanding natural language input and extracting structured currency conversion parameters:

In [10]:
query_analyst = Agent(
    role="Query Parser",
    goal="Parse natural language currency conversion requests into structured JSON format.",
    backstory=(
        "You are an expert at understanding natural language and extracting specific "
        "information from user requests. You specialize in parsing currency conversion "
        "queries and converting them into structured data formats that other systems can use."
    ),
    verbose=True
)

#### Define Query Parser Task

##### Create Structured Query Parsing Task

This task instructs the agent to parse natural language input and return structured JSON:

In [11]:
query_task = Task(
    description=(
        "Analyze the user input: '{user_query}' and extract currency conversion parameters. "
        "Parse the following information from the natural language query:\n"
        "- Amount: The numerical value to convert\n"
        "- From Currency: The source currency (use standard 3-letter codes like USD, EUR, GBP)\n"
        "- To Currency: The target currency (use standard 3-letter codes)\n\n"
        "Handle common variations like:\n"
        "- 'dollars' -> 'USD'\n"
        "- 'euros' -> 'EUR'\n"
        "- 'pounds' -> 'GBP'\n"
        "- 'yen' -> 'JPY'\n\n"
        "If any information is missing or unclear, make reasonable assumptions or indicate what's missing."
    ),
    expected_output=(
        "A structured JSON object matching the CurrencyConverterInput schema with fields: "
        "amount (float), from_currency (string), to_currency (string)"
    ),
    output_pydantic=CurrencyConverterInput,
    agent=query_analyst
)

#### Query Parser Integration Ready

##### Pipeline Component Complete

The query parser is now ready for integration into our multi-stage currency conversion pipeline. This agent will work in sequence with the currency analyst to provide a seamless natural language to financial analysis workflow.

**Next**: Let's integrate this parser with our currency analyst to create a complete end-to-end pipeline! 🚀

### Currency Analyst Implementation

#### Define Currency Analyst Agent

This code defines an AI agent with domain expertise in currency analysis:

- **Role Definition**: Establishes the agent as a "Currency Analyst" with specific responsibilities
- **Goal Setting**: Defines the agent's primary objective (real-time conversions and financial insights)
- **Backstory**: Provides context and expertise that influences the agent's responses
- **Tool Integration**: Attaches our custom `CurrencyConverterTool` to give the agent access to real-time exchange rates
- **Verbose Mode**: Enables detailed logging to see the agent's reasoning process

🤖 **Agent Capabilities**: With the custom tool attached, this agent can now perform live currency conversions and provide context-aware financial advice.

In [12]:
from crewai import Agent

currency_analyst = Agent(
    role="Currency Analyst",
    goal="Provide real-time currency conversions and financial insights.",
    backstory=(
        "You are a finance expert with deep knowledge of global exchange rates."
        "You help users with currency conversion and financial decision-making."
    ),
    tools=[CurrencyConverterTool()],  # Attach our custom tool
    verbose=True
)

#### Define Currency Conversion Task

This code creates the **second stage** task in our multi-stage pipeline that performs the actual currency conversion:

##### **🔄 Pipeline Integration**:
- **Context Awareness**: Can access results from the previous query parsing stage
- **Flexible Input**: Adapts to use either parsed parameters or direct template variables
- **Intelligent Processing**: Leverages structured data from Stage 1 for more accurate conversions

##### **📋 Task Features**:
- **Dynamic Parameters**: Uses template variables that can be filled by parsing results or direct input
- **Enhanced Output**: Provides comprehensive financial analysis beyond simple conversion
- **Tool Integration**: Automatically uses the currency converter tool for real-time rates
- **Context Enrichment**: Includes market conditions and financial insights

##### **🔗 Multi-Stage Flow**: 
1. **Input**: Receives structured data from query parser (amount, currencies)
2. **Processing**: Uses custom tool to fetch real-time exchange rates  
3. **Analysis**: Provides financial context and market insights
4. **Output**: Delivers comprehensive conversion results with expert commentary

📊 **Pipeline Advantage**: By operating in sequence after the query parser, this task can handle both structured data and natural language inputs seamlessly!

In [13]:
from crewai import Task

currency_conversion_task = Task(
    description=(
        "Using the structured currency conversion parameters that were parsed from the user's "
        "natural language query, perform a real-time currency conversion. "
        "If specific parameters are provided in the context, use those values. "
        "Otherwise, convert {amount} {from_currency} to {to_currency} "
        "using real-time exchange rates. "
        "Provide the equivalent amount and explain any relevant financial context, "
        "including current market conditions and conversion factors."
    ),
    expected_output=(
        "A comprehensive response including:\n"
        "- The converted amount with precise calculations\n"
        "- Current exchange rate information\n"
        "- Relevant financial insights and market context\n"
        "- Any important factors affecting the conversion"
    ),
    agent=currency_analyst
)

### Multi-Stage Currency Conversion Pipeline

#### Complete Workflow Integration

#### Create and Execute Multi-Stage Currency Conversion Pipeline

This final code block brings everything together and executes a sophisticated **multi-stage workflow**:

##### **🏗️ Multi-Stage Pipeline Architecture**:
- **Stage 1**: Query Parser agent processes natural language input into structured JSON
- **Stage 2**: Currency Analyst agent uses the structured data to perform conversion with real-time rates
- **Data Flow**: Parsed parameters from Stage 1 automatically flow into Stage 2

##### **🔧 Crew Assembly**:
- **Agents List**: Includes both the query parser and currency analyst agents
- **Tasks List**: Contains both parsing and conversion tasks in sequence
- **Process Type**: Sequential processing ensures proper task execution order
- **Task Dependencies**: The currency task can access results from the parsing task

##### **🚀 Crew Execution**:
- **`kickoff()`**: Starts the crew with natural language input (e.g., "Convert 150 dollars to euros")
- **Automatic Parsing**: The query parser extracts amount, source currency, and target currency
- **Tool Integration**: The currency analyst automatically uses the custom converter tool with parsed data
- **Context Passing**: Results from the parsing stage inform the conversion stage

##### **📊 Response Handling**:
- **Multi-Stage Output**: Shows results from both parsing and conversion stages
- **Structured Data**: Access to both raw responses and structured JSON output
- **Complete Workflow**: Demonstrates end-to-end natural language to financial analysis pipeline

🎯 **Complete AI Pipeline**: This showcases a sophisticated multi-agent workflow where natural language processing seamlessly integrates with real-world API interactions!

#### Multi-Stage Pipeline Architecture Overview

##### 🏗️ **Complete Workflow Design**

Our currency conversion system now implements a sophisticated **two-stage AI pipeline** that transforms natural language input into professional financial analysis:

```
Natural Language Input
        ↓
┌─────────────────────┐
│   STAGE 1: PARSER   │
│  🤖 Query Analyst   │
│                     │
│ "Convert 150        │
│  dollars to euros"  │
│        ↓            │
│ {"amount": 150,     │
│  "from": "USD",     │
│  "to": "EUR"}       │
└─────────────────────┘
        ↓
┌─────────────────────┐
│  STAGE 2: ANALYST   │
│ 💰 Currency Expert  │
│                     │
│ • Fetch live rates  │
│ • Calculate result  │
│ • Provide insights  │
│        ↓            │
│ "150 USD = 138.45   │
│  EUR (rate: 0.923)" │
└─────────────────────┘
        ↓
   Final Report
```

##### 🔄 **Data Flow & Integration**

1. **Natural Input Processing**: User provides conversational request
2. **Structured Extraction**: Query Parser agent converts to JSON schema
3. **Parameter Validation**: Ensures currencies use standard codes (USD, EUR, etc.)
4. **Live Data Retrieval**: Currency Analyst fetches real-time exchange rates
5. **Financial Analysis**: Expert commentary on market conditions and rates
6. **Comprehensive Output**: Professional report with conversion and insights

##### 🎯 **Key Pipeline Advantages**

- **🗣️ Natural Language Interface**: Users can speak conversationally 
- **🔍 Intelligent Parsing**: Handles variations like "dollars" → "USD"
- **⚡ Real-Time Data**: Live exchange rates via custom API tool
- **🧠 Expert Analysis**: Financial context beyond simple conversion
- **🔧 Extensible Design**: Easy to add more analysis stages

This architecture demonstrates **advanced AI orchestration** where specialized agents collaborate in sequence to deliver sophisticated results!

In [21]:
from crewai import Crew, Process

# Create the multi-stage currency conversion crew
multi_stage_crew = Crew(
    agents=[query_analyst, currency_analyst],  # Both agents in sequence
    tasks=[query_task, currency_conversion_task],  # Both tasks in sequence
    process=Process.sequential
)

# Test with natural language input
natural_language_query = "Convert 150 dollars to euros today"
print(f"🔤 Natural Language Input: '{natural_language_query}'")
print("🔧 Executing multi-stage pipeline...")
print("   Stage 1: Parsing natural language query...")
print("   Stage 2: Performing currency conversion...")

# Execute the multi-stage workflow
response = multi_stage_crew.kickoff(inputs={"user_query": natural_language_query,
                                           "amount": 150,  # These will be overridden by parser
                                           "from_currency": "USD", 
                                           "to_currency": "EUR"})

print("\n" + "="*60)
print("📊 MULTI-STAGE PIPELINE RESULTS")
print("="*60)

# Display the final response
print("\n🎯 Final Currency Analysis:")
from IPython.display import Markdown, display
display(Markdown(response.raw))

print(f"\n📋 Pipeline Summary:")
print(f"   ✅ Natural language processing completed")
print(f"   ✅ Currency conversion completed") 
print(f"   ✅ Financial analysis provided")
print(f"\n🚀 End-to-end workflow successfully executed!")

🔤 Natural Language Input: 'Convert 150 dollars to euros today'
🔧 Executing multi-stage pipeline...
   Stage 1: Parsing natural language query...
   Stage 2: Performing currency conversion...



📊 MULTI-STAGE PIPELINE RESULTS

🎯 Final Currency Analysis:


The amount of 150 USD is equivalent to approximately 127.50 EUR based on current exchange rates. As of now, the exchange rate is around 1 USD = 0.85 EUR. 

In terms of financial context, the USD to EUR conversion can be influenced by several factors including interest rates set by the Federal Reserve and the European Central Bank, geopolitical events, and overall market sentiment. Recently, fluctuating economic data and inflation rates have impacted the currency strength of both the USD and the EUR. It's essential to consider these dynamics when evaluating currency conversions, as they can significantly affect exchange rates and investment decisions over time.


📋 Pipeline Summary:
   ✅ Natural language processing completed
   ✅ Currency conversion completed
   ✅ Financial analysis provided

🚀 End-to-end workflow successfully executed!


#### Pipeline Demonstration with Various Inputs

##### Testing Different Natural Language Patterns

Let's demonstrate the pipeline's flexibility by testing various natural language input patterns:

#### 🚨 Execution Loop Prevention Strategies

##### **Why Execution Loops Occur**

The previous code got caught in execution loops due to several factors:

**🔄 Agent Reasoning Complexity**
- **Maximum Iterations Exceeded**: Agents hit default iteration limits (15-20 cycles)
- **Reasoning Loops**: LLM gets stuck trying to perfect structured output
- **Verbose Overhead**: Excessive logging creates more context to process

**⚡ Performance Issues**
- **Local LLM Limitations**: Ollama models may struggle with complex reasoning
- **Multi-Stage Complexity**: Sequential processing compounds reasoning requirements
- **Context Window Growth**: Each iteration adds more context

##### **🛡️ Prevention Strategies**

**1. Optimize Agent Configuration**
```python
# Disable verbose mode to reduce context
query_analyst = Agent(
    role="Query Parser",
    goal="Parse currency requests to JSON",
    backstory="Expert at extracting structured data",
    verbose=False,  # ← Key change
    max_iter=10     # ← Limit iterations
)
```

**2. Simplify Task Descriptions**
```python
# Shorter, more direct task descriptions
query_task = Task(
    description="Extract amount, from_currency, to_currency from: '{user_query}'",
    expected_output="JSON with amount, from_currency, to_currency fields",
    output_pydantic=CurrencyConverterInput,
    agent=query_analyst
)
```

**3. Add Execution Safeguards**
```python
# Add timeouts and memory limits
crew = Crew(
    agents=[query_analyst, currency_analyst],
    tasks=[query_task, currency_conversion_task],
    process=Process.sequential,
    max_execution_time=300,  # 5 minute timeout
    memory=False            # Disable memory to reduce complexity
)
```

**4. Use Better LLM Models**
```python
# Switch to more capable models for complex reasoning
llm = LLM(model="gpt-4o")  # More reliable than local models
```

**5. Implement Fallback Strategies**
```python
# Test one query at a time with error handling
try:
    result = crew.kickoff(inputs={"user_query": query})
except Exception as e:
    print(f"Error: {e}")
    # Implement fallback logic
```

##### **⚠️ Warning Signs to Watch For**
- **"Maximum iterations reached"** messages
- **Excessive HTML output** in notebook cells
- **Long execution times** (>2-3 minutes per query)
- **Repetitive reasoning patterns** in verbose output

##### **🎯 Best Practices**
- **Start Simple**: Test with single agents before multi-stage pipelines
- **Use Timeouts**: Always set execution time limits
- **Monitor Output**: Watch for iteration limit warnings
- **Optimize Prompts**: Keep task descriptions concise and clear
- **Choose Models Wisely**: Use appropriate LLMs for complexity level

In [15]:
# OPTIMIZED AGENT CONFIGURATIONS - Prevents execution loops

# Create optimized query parser agent
optimized_query_analyst = Agent(
    role="Query Parser",
    goal="Extract currency conversion parameters from natural language",
    backstory="You efficiently extract structured data from user queries.",
    verbose=False,  # Disable verbose to reduce context overhead
    max_iter=8,     # Limit iterations to prevent loops
    allow_delegation=False  # Disable delegation for simpler execution
)

# Create optimized currency analyst agent  
optimized_currency_analyst = Agent(
    role="Currency Analyst", 
    goal="Perform currency conversions using live exchange rates",
    backstory="You provide accurate currency conversions and brief financial insights.",
    tools=[CurrencyConverterTool()],
    verbose=False,  # Disable verbose to reduce context overhead
    max_iter=8,     # Limit iterations to prevent loops
    allow_delegation=False  # Disable delegation for simpler execution
)

# Create simplified tasks with concise descriptions
optimized_query_task = Task(
    description="Parse '{user_query}' to extract: amount (number), from_currency (3-letter code), to_currency (3-letter code). Common mappings: dollars->USD, euros->EUR, pounds->GBP, yen->JPY.",
    expected_output="JSON object with amount, from_currency, to_currency fields",
    output_pydantic=CurrencyConverterInput,
    agent=optimized_query_analyst
)

optimized_conversion_task = Task(
    description="Convert the parsed amount from source to target currency using the currency converter tool. Provide the result and brief context.",
    expected_output="Currency conversion result with exchange rate and brief market context",
    agent=optimized_currency_analyst
)

print("✅ Optimized agents and tasks created successfully!")
print("🔧 Key optimizations:")
print("   • Verbose mode disabled")
print("   • Iteration limits set to 8")
print("   • Delegation disabled") 
print("   • Simplified task descriptions")
print("   • Reduced context overhead")

✅ Optimized agents and tasks created successfully!
🔧 Key optimizations:
   • Verbose mode disabled
   • Iteration limits set to 8
   • Delegation disabled
   • Simplified task descriptions
   • Reduced context overhead


In [16]:
# SAFE SINGLE-AGENT TEST - Validate before multi-stage pipeline

print("🧪 SAFE SINGLE-AGENT TESTING")
print("="*40)

# Test 1: Single Query Parser Agent
print("\n📝 Test 1: Query Parser Only")
print("-" * 30)

try:
    single_parser_crew = Crew(
        agents=[optimized_query_analyst],
        tasks=[optimized_query_task], 
        process=Process.sequential,
        max_execution_time=120  # 2 minute timeout
    )
    
    test_input = "Convert 100 dollars to euros"
    print(f"Input: '{test_input}'")
    
    result = single_parser_crew.kickoff(inputs={"user_query": test_input})
    
    print("✅ Query Parser Success!")
    print(f"📊 Parsed Output: {result.pydantic}")
    
except Exception as e:
    print(f"❌ Query Parser Error: {str(e)[:150]}...")

# Test 2: Single Currency Analyst Agent (with manual input)
print(f"\n📝 Test 2: Currency Analyst Only")
print("-" * 30)

try:
    single_analyst_crew = Crew(
        agents=[optimized_currency_analyst],
        tasks=[optimized_conversion_task],
        process=Process.sequential, 
        max_execution_time=120  # 2 minute timeout
    )
    
    # Manual input (bypass parser)
    manual_inputs = {"amount": 100, "from_currency": "USD", "to_currency": "EUR"}
    print(f"Manual Input: {manual_inputs}")
    
    result = single_analyst_crew.kickoff(inputs=manual_inputs)
    
    print("✅ Currency Analyst Success!")
    print(f"📊 Conversion Result: {result.raw[:100]}...")
    
except Exception as e:
    print(f"❌ Currency Analyst Error: {str(e)[:150]}...")

print(f"\n🎯 Single-agent validation completed!")
print("💡 If both tests pass, the multi-stage pipeline should work safely.")

🧪 SAFE SINGLE-AGENT TESTING

📝 Test 1: Query Parser Only
------------------------------
Input: 'Convert 100 dollars to euros'
✅ Query Parser Success!
📊 Parsed Output: amount=100.0 from_currency='USD' to_currency='EUR'

📝 Test 2: Currency Analyst Only
------------------------------
Manual Input: {'amount': 100, 'from_currency': 'USD', 'to_currency': 'EUR'}
✅ Query Parser Success!
📊 Parsed Output: amount=100.0 from_currency='USD' to_currency='EUR'

📝 Test 2: Currency Analyst Only
------------------------------
Manual Input: {'amount': 100, 'from_currency': 'USD', 'to_currency': 'EUR'}
✅ Currency Analyst Success!
📊 Conversion Result: Thought: I need to know the amount, source currency, and target currency for the conversion. Please ...

🎯 Single-agent validation completed!
💡 If both tests pass, the multi-stage pipeline should work safely.
✅ Currency Analyst Success!
📊 Conversion Result: Thought: I need to know the amount, source currency, and target currency for the conversion. Please ...

In [22]:
# OPTIMIZED VERSION - Prevents execution loops
print("🧪 TESTING PIPELINE FLEXIBILITY (OPTIMIZED)")
print("="*50)

# Test with a single query first to validate the pipeline
single_test_query = "Convert 100 dollars to euros"

print(f"\n📝 Single Test: '{single_test_query}'")
print("-" * 40)

try:
    # Create optimized crew with limits to prevent loops
    optimized_crew = Crew(
        agents=[optimized_query_analyst, optimized_currency_analyst],
        tasks=[optimized_query_task, optimized_conversion_task],
        process=Process.sequential,
        max_execution_time=300,  # 5 minute timeout
        memory=False  # Disable memory to reduce complexity
    )
    
    print("🔧 Executing optimized pipeline...")
    
    # Execute with timeout protection
    result = optimized_crew.kickoff(
        inputs={
            "user_query": single_test_query,
            "amount": 100,  # Fallback values
            "from_currency": "USD", 
            "to_currency": "EUR"
        }
    )
    
    print(f"✅ Pipeline executed successfully!")
    print(f"📊 Result preview: {result.raw[:150]}...")
    
    # If successful, test with additional queries (one at a time)
    additional_queries = [
        "How much is 250 pounds in dollars?",
        "Convert 50 euros to yen please"
    ]
    
    for i, test_query in enumerate(additional_queries, 2):
        print(f"\n📝 Test {i}: '{test_query}'")
        print("-" * 40)
        
        try:
            result = optimized_crew.kickoff(
                inputs={
                    "user_query": test_query,
                    "amount": 0,  # Will be overridden by parser
                    "from_currency": "USD", 
                    "to_currency": "EUR"
                }
            )
            
            print(f"✅ Test {i} executed successfully")
            print(f"📊 Result preview: {result.raw[:100]}...")
            
        except Exception as e:
            print(f"❌ Test {i} Error: {str(e)[:200]}...")
            break  # Stop on first error to prevent cascade failures
        
        print()
    
except Exception as e:
    print(f"❌ Pipeline Error: {str(e)[:200]}...")
    print("\n💡 Troubleshooting Tips:")
    print("   - The agents may be hitting iteration limits")
    print("   - Try using a different LLM model (e.g., GPT-4)")
    print("   - Reduce agent complexity by disabling verbose mode")
    print("   - Simplify task descriptions")

print(f"\n🎯 Optimized pipeline testing completed!")

🧪 TESTING PIPELINE FLEXIBILITY (OPTIMIZED)

📝 Single Test: 'Convert 100 dollars to euros'
----------------------------------------
🔧 Executing optimized pipeline...
✅ Pipeline executed successfully!
📊 Result preview: 100 USD is equivalent to 85.00 EUR based on the current exchange rate. The USD has been relatively strong in recent months, which may influence the EU...

📝 Test 2: 'How much is 250 pounds in dollars?'
----------------------------------------
✅ Test 2 executed successfully
📊 Result preview: The conversion of 250 GBP to USD results in 341.40 USD. This conversion reflects current exchange ra...


📝 Test 3: 'Convert 50 euros to yen please'
----------------------------------------
✅ Test 3 executed successfully
📊 Result preview: 50 EUR is equivalent to 8508.84 JPY.
```

In recent market behavior, the Euro remains relatively sta...


🎯 Optimized pipeline testing completed!


## Summary: Complete Multi-Stage AI Pipeline

### 🎯 **What We've Built**

We've successfully created a sophisticated **multi-stage AI pipeline** that demonstrates advanced CrewAI capabilities:

#### **🏗️ Pipeline Components**
1. **🤖 Query Parser Agent**: Converts natural language to structured JSON
2. **💰 Currency Analyst Agent**: Performs real-time conversions with expert analysis  
3. **🔧 Custom Currency Tool**: Integrates live exchange rate API
4. **📊 Pydantic Schemas**: Ensures data validation and type safety

#### **🔄 Workflow Stages**
```
Natural Language Input
      ↓
Query Parser (Stage 1)
      ↓ 
Structured JSON Data
      ↓
Currency Analyst (Stage 2)
      ↓
Professional Financial Report
```

#### **✨ Key Achievements**
- **🗣️ Natural Language Processing**: Handles conversational input patterns
- **⚡ Real-Time Data Integration**: Live exchange rates via custom tools
- **🧠 Multi-Agent Orchestration**: Seamless data flow between specialized agents
- **📈 Professional Output**: Expert financial analysis with market context
- **🔧 Extensible Architecture**: Easy to add more analysis stages

#### **💡 Advanced Features Demonstrated**
- Custom tool development with `BaseTool`
- Pydantic model integration for structured I/O
- Sequential agent processing with context passing
- Natural language understanding and parameter extraction
- Real-world API integration within AI workflows

This pipeline showcases how **CrewAI enables sophisticated AI applications** that combine natural language processing, structured data handling, and real-world integrations in a seamless, multi-stage workflow! 🚀

---

**Next Steps**: Consider extending this pipeline with additional agents for market analysis, historical trends, or investment recommendations!