Install all the necessary packages and dependencies.

Step 1. Search the web for documentations, developer guides, and API references for tools, apps, and services.

Step 2. Extract the information from the web pages, including metadata like the type of the service, the API endpoints, and the parameters. The technical details and examples of how to use the service.

Step 3. Save the information in a vector database for easier retrieval.

In [82]:
from openai import OpenAI
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Get API key from environment variable
api_key = os.getenv('OPENAI_API_KEY')

# Use api_key in your OpenAI client initialization
client = OpenAI(api_key=api_key)


In [83]:
from uuid import uuid4

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = f"AIE5 â€“ Midterm {uuid4().hex[0:8]}"
os.environ["LANGCHAIN_API_KEY"] = os.getenv('LANGSMITH_API_KEY') 

In [84]:
from langchain_community.tools.tavily_search import TavilySearchResults

tavily_api_key = os.getenv('TAVILY_API_KEY')

tavily_tool = TavilySearchResults(max_results=3,tavily_api_key=tavily_api_key, search_depth="advanced")



In [85]:

import json
from typing import TypedDict, List, Dict, Optional
from langchain_core.messages import BaseMessage
from langchain.tools import Tool
from langchain_openai import ChatOpenAI



In [86]:
# Update system prompt to be more explicit about sequence
system_prompt = """You are a software procurement analyst. 

YOU MUST FOLLOW THIS EXACT SEQUENCE:
1. FIRST: vendor_search({"prompt": "Subscription Plan Selection"})
2. WAIT for vendor_search results, then IMMEDIATELY use analyze_vendors with those results
3. AFTER analyze_vendors completes, call retrieve_apis("Subscription Plan Selection")

CRITICAL RULES:
- NO CONVERSATION
- NO QUESTIONS
- FOLLOW THE SEQUENCE EXACTLY
- DO NOT SKIP ANY STEPS
- AFTER vendor_search, YOU MUST USE analyze_vendors
- ONLY USE retrieve_apis AFTER analyze_vendors

SEQUENCE ENFORCEMENT:
- IF vendor_search IS DONE, NEXT MUST BE analyze_vendors
- IF analyze_vendors IS DONE, NEXT MUST BE retrieve_apis
- NEVER REPEAT A TOOL
- NEVER SKIP A TOOL
"""
# Update model configuration
model = ChatOpenAI(model="gpt-4-0125-preview", temperature=0).with_config(
    {"system_prompt": system_prompt}
)

In [87]:
from typing import Dict, List, Annotated, Union, Optional, TypedDict
from pydantic import BaseModel, Field
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, FunctionMessage, ToolMessage
from langchain.tools import Tool, tool
# Define structured types
# Define structured types
class Vendor(TypedDict):
    name: str
    vendor_url: Optional[str]
    api_url: Optional[str]
    category: str
    capabilities: List[str]
    description: str

# Define input types
class VendorSearchInput(BaseModel):
    prompt: str

class VendorSearchResult(TypedDict):
    vendors: List[Dict]

class RetrieveApisInput(BaseModel):
    capabilities: List[str]

class AnalyzeVendorsInput(BaseModel):
    vendor_results: Union[List[VendorSearchResult], str] = Field(
        description="List of vendor search results or JSON string containing vendor results"
    )

class AgentState(TypedDict):
    messages: List[BaseMessage]
    vendor_results: Optional[Dict]

In [88]:

VENDOR_SEARCH_SYSTEM_PROMPT = """You are a software analyst expert. Your task is to identify vendors that match the requested capability.

YOU MUST RETURN EXACTLY THIS JSON STRUCTURE - NO MODIFICATIONS:
{
    "vendors": [
        {
            "name": "Vendor Name",
            "vendor_url": "Vendor's website URL",
            "api_url": "Vendor's API documentation URL",
            "category": "Primary category (e.g., Payment Processing, Subscription Management)",
            "capabilities": ["List", "of", "specific", "capabilities"],
            "description": "Brief description focusing on the requested capability"
        }
    ]
}

CRITICAL RULES:
- EXACTLY 3 vendors
- ALL fields must be present
- NO additional fields
- NO modifications to the structure
- NO explanatory text
"""


@tool(args_schema=VendorSearchInput)
def vendor_search(prompt: str) -> VendorSearchResult:
    """Search for vendors/software based on user stories."""
    search_results = tavily_tool.invoke({
        "query": f"software vendors or tools that provide {prompt} capabilities"
    })
    
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": VENDOR_SEARCH_SYSTEM_PROMPT},
            {"role": "user", "content": f"Find vendors that provide this capability: {prompt}\n\nSearch results: {str(search_results)}"}
        ]
    )

    try:
        content = response.choices[0].message.content
        json_start = content.find('{')
        json_end = content.rfind('}') + 1
        if json_start >= 0 and json_end > json_start:
            content = content[json_start:json_end]
        result = json.loads(content)
        
        # Validate the structure
        if not isinstance(result, dict) or 'vendors' not in result:
            raise ValueError("Invalid vendor result structure")
        
        return result
    except Exception as e:
        print(f"Error in vendor_search: {str(e)}")
        return {"vendors": []}

In [89]:
from typing import Dict, List, Annotated, Union
from pydantic import BaseModel, Field  # Add Field import



@tool(args_schema=AnalyzeVendorsInput)
def analyze_vendors(vendor_results: List[VendorSearchResult]) -> Dict:
    """Analyze vendor search results and select top vendors based on capability count."""
    try:
        # Debug incoming data
        print("\nDEBUG: Incoming vendor_results type:", type(vendor_results))
        print("DEBUG: Incoming vendor_results:", vendor_results)
        
        # If we got a string, try to parse it
        if isinstance(vendor_results, str):
            try:
                parsed_data = json.loads(vendor_results)
                vendor_results = parsed_data.get('vendor_results', [])
                print("\nDEBUG: Parsed vendor_results:", vendor_results)
            except json.JSONDecodeError as e:
                print(f"ERROR: Failed to parse JSON: {e}")
                return {"error": "Invalid JSON format"}
        
        vendor_data = {}
        
        # Process all vendor search results
        for result in vendor_results:
            if isinstance(result, dict) and 'vendors' in result:
                for vendor in result['vendors']:
                    name = vendor['name']
                    if name not in vendor_data:
                        vendor_data[name] = {
                            "name": name,
                            "capabilities": set(),
                            "capability_count": 0,
                            "description": vendor['description'],
                            "vendor_url": vendor.get('vendor_url'),
                            "api_url": vendor.get('api_url'),
                            "category": vendor.get('category', '')
                        }
                    # Add capabilities and update count
                    if 'capabilities' in vendor:
                        new_capabilities = set(vendor['capabilities'])
                        vendor_data[name]['capabilities'].update(new_capabilities)
                        vendor_data[name]['capability_count'] = len(vendor_data[name]['capabilities'])
        
        # Convert to list and format for output
        vendor_list = [
            {
                "name": v["name"],
                "capabilities": list(v["capabilities"]),
                "capability_count": v["capability_count"],
                "description": v["description"],
                "vendor_url": v["vendor_url"],
                "api_url": v["api_url"],
                "category": v["category"]
            }
            for v in vendor_data.values()
        ]
        
        # Sort by capability count
        sorted_vendors = sorted(
            vendor_list,
            key=lambda x: x["capability_count"],
            reverse=True
        )
        
        return {
            "vendor_count": len(vendor_list),
            "vendors_analyzed": vendor_list,
            "top_vendors": sorted_vendors[:3] if sorted_vendors else [],
            "analysis_summary": f"Analyzed {len(vendor_list)} vendors." + 
                (f" Top vendor '{sorted_vendors[0]['name']}' supports {sorted_vendors[0]['capability_count']} capabilities." if sorted_vendors else "")
        }
    except Exception as e:
        print(f"\nERROR in analyze_vendors: {str(e)}")
        import traceback
        traceback.print_exc()
        return {
            "vendor_count": 0,
            "vendors_analyzed": [],
            "top_vendors": [],
            "error": str(e)
        }



In [63]:
from langchain_community.document_loaders import PyPDFLoader


docs = PyPDFLoader("api_files/Stripe API Reference.pdf").load()



In [64]:
import tiktoken
from langchain.text_splitter import RecursiveCharacterTextSplitter

def tiktoken_len(text):
    tokens = tiktoken.encoding_for_model("gpt-4o-mini").encode(
        text,
    )
    return len(tokens)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size = 200,
    chunk_overlap = 100,
    length_function = tiktoken_len,
)

split_chunks = text_splitter.split_documents(docs)

In [65]:
len(split_chunks)

207

In [66]:
from langchain_openai.embeddings import OpenAIEmbeddings

embedding_model = OpenAIEmbeddings(model="text-embedding-3-large")

In [67]:
from langchain_community.vectorstores import Qdrant

qdrant_vectorstore = Qdrant.from_documents(
    split_chunks,
    embedding_model,
    location=":memory:",
    collection_name="stripe_api_reference",
)

In [68]:
qdrant_retriever = qdrant_vectorstore.as_retriever()

In [69]:
from langchain_core.prompts import ChatPromptTemplate

RAG_PROMPT = """
CONTEXT:
{context}

QUERY:
{question}

You are a helpful assistant. Use the available context to answer the question. If you can't answer the question, say you don't know.
"""

rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT)

In [70]:
from langchain_openai import ChatOpenAI

openai_chat_model = ChatOpenAI(model="gpt-4o-mini")

In [71]:
from operator import itemgetter
from langchain.schema.output_parser import StrOutputParser

rag_chain = (
    {"context": itemgetter("question") | qdrant_retriever, "question": itemgetter("question")}
    | rag_prompt | openai_chat_model | StrOutputParser()
)


In [72]:
rag_chain.invoke({"question" : "what are the payments related APIs"})

'The payments-related APIs from the provided context include the following endpoints:\n\n1. **Payment Links**:\n   - `POST /v1/payment_links` - Create a new payment link.\n   - `POST /v1/payment_links/:id` - Update an existing payment link.\n   - `GET /v1/payment_links/:id/line_items` - Retrieve line items for a specific payment link.\n   - `GET /v1/payment_links/:id` - Retrieve details for a specific payment link.\n   - `GET /v1/payment_links` - Retrieve a list of payment links.\n\n2. **Credit Notes**:\n   - `POST /v1/credit_notes` - Create a new credit note.\n   - `POST /v1/credit_notes/:id` - Update an existing credit note.\n   - `GET /v1/credit_notes/:id/lines` - Retrieve line items for a specific credit note.\n   - `GET /v1/credit_notes/preview/lines` - Preview line items for a credit note.\n   - `GET /v1/credit_notes/:id` - Retrieve details for a specific credit note.\n\nAdditionally, there are endpoints related to sending and receiving funds through treasury operations, although

In [73]:
from typing import Annotated
from pydantic import BaseModel

# Update tool definition
class RetrieveApisInput(BaseModel):
    capabilities: List[str] = Field(
        description="List of capabilities to search for",
        example=["Subscription Plan Selection"]
    )
    
@tool
def retrieve_apis(prompt: str) -> str:
    """
    Retrieve relevant APIs based on capabilities.
    Input should be a capability string.
    Example: "Subscription Plan Selection"
    """
    try:
        # Convert single capability to list
        capabilities = [prompt]
        print(f"Searching for capabilities: {capabilities}")
        
        # Construct query
        question = f"Find APIs that provide these capabilities: {', '.join(capabilities)}. Return a list of relevant APIs with their endpoints and brief descriptions."
        
        result = rag_chain.invoke({"question": question})
        return result if isinstance(result, str) else str(result)
        
    except Exception as e:
        print(f"Error in retrieve_apis: {str(e)}")
        import traceback
        traceback.print_exc()
        return f"Error retrieving APIs: {str(e)}"

In [74]:
#capabilities = ["payment processing", "subscription management", "refunds"]
#result = retrieve_apis({"capabilities": capabilities})
#print(result)

In [75]:
from langgraph.prebuilt import ToolNode
from langchain_core.tools import Tool, tool


# Update tools list
tools = [
    Tool(
        name="vendor_search",
        description="REQUIRED FIRST STEP: Search for vendors matching the capability.",
        func=vendor_search
    ),
    Tool(
        name="analyze_vendors",
        description="""SECOND STEP: Analyze vendor results. Input must be JSON object with 'vendor_results' array containing vendor objects.
Example: {"vendor_results": [{"vendors": [{"name": "Vendor1", ...}]}]}""",
        func=analyze_vendors
    ),
    Tool(
        name="retrieve_apis",
        description="FINAL STEP: Get API details for the capability. Input should be a capability string.",
        func=retrieve_apis
    )
]

model = model.bind_tools(tools)

In [76]:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
import operator
from langchain_core.messages import BaseMessage
from typing import TypedDict, List, Dict, Optional, Set
from langchain_core.messages import BaseMessage

# Update state type
class AgentState(TypedDict):
    messages: List[BaseMessage]
    vendor_results: Optional[Dict]
    completed_tools: Set[str]  # Add this to track completed tools


In [77]:
from langgraph.prebuilt import ToolNode


tool_node = ToolNode(tools)


In [78]:
from langgraph.graph import StateGraph, END


In [79]:
from enum import Enum
from typing import Tuple, Union

class GraphState(str, Enum):
    AGENT = "agent"
    ACTION = "action"
    END = "end"



def should_continue(state: AgentState) -> str:
    """Determine next state in the workflow."""
    messages = state.get("messages", [])
    completed_tools = state.get("completed_tools", set())  # Get from state
    
    print("\nChecking workflow progress:")
    print(f"Number of messages: {len(messages)}")
    print(f"Previously completed tools: {completed_tools}")
    
    # Required sequence - must follow this order
    required_sequence = ["vendor_search", "analyze_vendors", "retrieve_apis"]
    
    
       # Track the last completed tool and current tool call
    last_completed = None
    current_tool = None
  
    # Process messages in order
    for msg in messages:
        if isinstance(msg, (FunctionMessage, ToolMessage)):
            tool_name = None
            if hasattr(msg, 'name'):
                tool_name = msg.name
            elif hasattr(msg, 'function_call'):
                tool_name = msg.function_call.name
            
            if tool_name:
                completed_tools.add(tool_name)
                last_completed = tool_name
                print(f"Found completed tool: {tool_name}")
        
        elif isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
            tool_call = msg.tool_calls[0]
            if isinstance(tool_call, dict):
                current_tool = tool_call.get('name') or tool_call.get('function', {}).get('name')
            else:
                current_tool = tool_call.function.name if hasattr(tool_call, 'function') else None
                
    # Update state with completed tools
    state["completed_tools"] = completed_tools
    print(f"Last completed tool: {last_completed}")
    print(f"Current tool call: {current_tool}")
    print(f"Updated completed tools: {completed_tools}")
    
    # Validate sequence
    expected_index = len(completed_tools)
    if expected_index < len(required_sequence):
        expected_tool = required_sequence[expected_index]
        print(f"Expected next tool: {expected_tool}")
        
        # If there's a current tool call
        if current_tool:
            if current_tool != expected_tool:
                print(f"ERROR: Expected {expected_tool} but got {current_tool}")
                return "agent"  # Try again with correct tool
            return "action"  # Execute the correct tool
        
        print(f"Need to execute: {expected_tool}")
        return "agent"
    
    # All tools completed
    if len(completed_tools) == len(required_sequence):
        # Verify sequence
        for i, tool in enumerate(required_sequence):
            if tool not in completed_tools:
                print(f"Missing tool in sequence: {tool}")
                return "agent"
        print("All tools completed in correct sequence -> END")
        return "end"
    
    print("ERROR: Unexpected state -> END")
    return "end"

def call_model(state: AgentState) -> Dict:
    """Process the current state through the model."""
    try:
        messages = state.get("messages", [])
        completed_tools = state.get("completed_tools", set())
        
        print("\nProcessing messages:")
        print(f"Current message count: {len(messages)}")
        print(f"Starting with completed tools: {completed_tools}")
        
        # Build valid conversation chain
        filtered_messages = []
        tool_call_map = {}  # Track tool_call_id to response mapping
        
        # First pass: build tool call map and track completed tools
        for msg in messages:
            if isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
                for tool_call in msg.tool_calls:
                    if isinstance(tool_call, dict):
                        call_id = tool_call.get('id')
                    else:
                        call_id = tool_call.id
                    tool_call_map[call_id] = None
            
            elif isinstance(msg, (FunctionMessage, ToolMessage)):
                # Track completed tools
                tool_name = None
                if hasattr(msg, 'name'):
                    tool_name = msg.name
                elif hasattr(msg, 'function_call'):
                    tool_name = msg.function_call.name
                
                if tool_name:
                    completed_tools.add(tool_name)
                    print(f"Added {tool_name} to completed tools")
                
                # Map responses to tool calls
                if hasattr(msg, 'tool_call_id'):
                    tool_call_map[msg.tool_call_id] = msg
        
        # Always start with human message if none exists
        if not any(isinstance(msg, HumanMessage) for msg in messages):
            filtered_messages.append(
                HumanMessage(content="Please analyze vendor capabilities for Subscription Plan Selection")
            )
            print("Added initial HumanMessage")
        
        # Second pass: build filtered message chain
        current_tool_calls = None
        for msg in messages:
            if isinstance(msg, HumanMessage):
                filtered_messages.append(msg)
                print("Added HumanMessage")
                
            elif isinstance(msg, AIMessage):
                if hasattr(msg, 'tool_calls') and msg.tool_calls:
                    # Check if we have responses for all tool calls
                    all_responses_found = True
                    current_tool_calls = msg.tool_calls
                    
                    for tool_call in current_tool_calls:
                        call_id = tool_call.id if hasattr(tool_call, 'id') else tool_call.get('id')
                        if not tool_call_map.get(call_id):
                            all_responses_found = False
                            break
                    
                    if all_responses_found:
                        filtered_messages.append(msg)
                        # Add corresponding tool responses
                        for tool_call in current_tool_calls:
                            call_id = tool_call.id if hasattr(tool_call, 'id') else tool_call.get('id')
                            if tool_call_map.get(call_id):
                                filtered_messages.append(tool_call_map[call_id])
                                print(f"Added tool response for {call_id}")
                else:
                    filtered_messages.append(msg)
                    current_tool_calls = None
        
        print(f"\nFiltered message count: {len(filtered_messages)}")
        print(f"Final completed tools: {completed_tools}")
        
        # Get model response
        response = model.invoke(filtered_messages)
        print("Got model response")
        
        # Return updated state with completed tools
        return {
            "messages": [*filtered_messages, response],
            "completed_tools": completed_tools,  # Important: return the updated set
            "vendor_results": state.get("vendor_results")
        }
            
    except Exception as e:
        print(f"Error in call_model: {str(e)}")
        import traceback
        traceback.print_exc()
        return {
            "messages": messages,
            "completed_tools": completed_tools,  # Return what we have
            "vendor_results": state.get("vendor_results")
        }


In [80]:
# Create workflow
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)

# Set entry point
workflow.set_entry_point("agent")

# Add edges
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "action": "action",
        "agent": "agent",
        "end": END
    }
)
workflow.add_edge("action", "agent")

# Compile graph
compiled_graph = workflow.compile()

In [81]:
def test_capabilities():
    """Test the vendor search and analysis workflow."""
    
    # Create initial state with proper structure
    initial_state = AgentState(
        messages=[
            HumanMessage(content="Please analyze vendor capabilities for Subscription Plan Selection")
        ],
        vendor_results=None,
        completed_tools=set()  # Initialize empty set
    )
    
    print("Starting vendor analysis...")
    print(f"Initial completed tools: {initial_state['completed_tools']}")
    
    try:
        for chunk in compiled_graph.stream(initial_state):
            print("\n" + "="*50)
            print("New Update:")
            print("="*50)
            
            for key, value in chunk.items():
                print(f"\nNode: {key}")
                messages = value.get("messages", [])
                completed_tools = value.get("completed_tools", set())
                
                print(f"Completed tools in this update: {completed_tools}")
                
                for msg in messages[-3:]:  # Show last 3 messages for brevity
                    print(f"\nMessage Type: {type(msg).__name__}")
                    
                    if isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
                        print("Tool Calls:")
                        for tool_call in msg.tool_calls:
                            if isinstance(tool_call, dict):
                                print(f"- {tool_call.get('name')}: {tool_call.get('arguments')}")
                            else:
                                print(f"- {tool_call.function.name}: {tool_call.function.arguments}")
                    
                    elif isinstance(msg, (FunctionMessage, ToolMessage)):
                        print("Tool Response:")
                        if hasattr(msg, 'name'):
                            print(f"Tool Name: {msg.name}")
                        print(f"Content: {msg.content[:200]}...")
    
    except Exception as e:
        print(f"Error during execution: {str(e)}")
        import traceback
        traceback.print_exc()

# Run the test
test_capabilities()

Starting vendor analysis...

Processing messages:
Current message count: 1
Starting with completed tools: set()
Added HumanMessage

Filtered message count: 1
Message types: ['HumanMessage']


Got model response

Checking workflow progress:
Number of messages: 2
Previously completed tools: set()
Last completed tool: None
Current tool call: vendor_search
Updated completed tools: set()
Expected next tool: vendor_search

New Update:

Node: agent
Completed tools in this update: set()

Message Type: HumanMessage

Message Type: AIMessage
Tool Calls:
- vendor_search: None
- retrieve_apis: None
Searching for capabilities: ['Subscription Plan Selection']

New Update:

Node: action
Completed tools in this update: set()

Message Type: ToolMessage
Tool Response:
Tool Name: vendor_search
Content: {"vendors": [{"name": "Zoho Subscriptions", "vendor_url": "https://www.zoho.com/subscriptions/", "api_url": "https://www.zoho.com/subscriptions/api/", "category": "Subscription Management", "capabilit...

Message Type: ToolMessage
Tool Response:
Tool Name: retrieve_apis
Content: Here is a list of relevant APIs related to Subscription Plan Selection, along with their endpoints and brief descripti

KeyboardInterrupt: 