In [None]:
from typing import TypedDict, Annotated, Sequence
from langgraph.prebuilt import ToolExecutor
from langgraph.graph import Graph, StateGraph
from operator import itemgetter
import json
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from huggingface_hub import login

# Initialize Llama model and tokenizer
model_name = "mistralai/Mistral-7B-v0.3"; model_name_to_load  = "Mistral"
model_name = "meta-llama/Llama-2-7b-chat-hf"; model_name_to_load  = "Llama2"  # or your preferred Llama 3 checkpoint
model_name = "meta-llama/Meta-Llama-3-8B-Instruct"; model_name_to_load  = "Llama3"
if "Llama-3" in model_name:
	acc_token = "${HUGGINGFACE_TOKEN}" # for 1lama3
elif "Llama-2" in model_name:
	acc_token = "${HUGGINGFACE_TOKEN}" # for 11ama2

login (token=acc_token)


In [None]:
import os

tokenizer = AutoTokenizer.from_pretrained(model_name)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    # device_map="auto"
)

# Save the model and tokenizer
def save_model(model, tokenizer, output_dir: str):
    """Save both model and tokenizer to the specified directory"""
    try:
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Save model
        model.save_pretrained(output_dir)
        
        # Save tokenizer
        tokenizer.save_pretrained(output_dir)
        
        print(f"Model and tokenizer saved to {output_dir}")
    except Exception as e:
        print(f"Error saving model: {str(e)}")

# Usage
output_directory = "model/"+model_name_to_load
save_model(model, tokenizer, output_directory)

In [None]:
def load_saved_model(model_path: str):
    """Load a saved model and tokenizer"""
    try:
        # Load model
        model = AutoModelForCausalLM.from_pretrained(
            model_path,
            torch_dtype=torch.float16
        )
        
        # Load tokenizer
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        
        return model, tokenizer
    except Exception as e:
        print(f"Error loading model: {str(e)}")
        return None, None


model, tokenizer = load_saved_model(f"model/{model_name_to_load}")

In [None]:
import transformers

# pipeline = transformers.pipeline(
#     "text-generation",
#     model="microsoft/phi-4",
#     model_kwargs={"torch_dtype": "auto"},
#     device_map="cpu",
# )

messages = [
    {"role": "system", "content": "You are a medieval knight and must provide explanations to modern people."},
    {"role": "user", "content": "How should I explain the Internet?"},
]

outputs = pipeline(messages, max_new_tokens=30)
print(outputs[0]["generated_text"][-1])


In [None]:
from typing import TypedDict, Annotated, Optional, Any, Callable, List, Dict
from functools import wraps
from langgraph.graph import StateGraph, END
import json
import transformers

pipeline = transformers.pipeline(
    "text-generation",
    model="microsoft/phi-4",
    model_kwargs={"torch_dtype": "auto"},
    device_map="cpu",
)

def generate_llm_response(prompt: str, max_length: int = 50) -> str:
    """Generate response using Llama model"""
    messages = [
        {"role": "system", "content": "You are a intellient agent which can help with various tasks."},
        {"role": "user", "content": prompt},
    ]

    outputs = pipeline(messages, max_new_tokens=max_length)
    response = (outputs[0]["generated_text"])

    return response

# Tool Definitions
class ToolResult(TypedDict):
    """Type definition for tool results"""
    error: Optional[str]
    result: Optional[Any]

def tool(func: Callable) -> Callable:
    """Decorator to mark and process tool functions"""
    @wraps(func)
    def wrapper(*args, **kwargs) -> ToolResult:
        try:
            return func(*args, **kwargs)
        except Exception as e:
            return {'result': None, 'error': f"{func.__name__} failed: {str(e)}"}
    wrapper.is_tool = True
    wrapper.name = func.__name__
    wrapper.description = func.__doc__
    return wrapper

@tool
def calculator(input_text: str) -> ToolResult:
    """Calculate the result of a mathematical expression."""
    try:
        # Clean the input
        expression = input_text.strip()
        if not expression:
            return {'result': None, 'error': "No expression provided"}
            
        # Only allow basic arithmetic operations and numbers
        import re
        if not re.match(r'^[\d\s+\-*/.()]+$', expression):
            return {'result': None, 'error': "Invalid characters in expression"}
        
        # Additional safety checks
        if '__' in expression or 'eval' in expression or 'exec' in expression:
            return {'result': None, 'error': "Invalid expression"}
            
        # Evaluate the expression
        result = eval(expression)
        return {'result': str(result), 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Calculation failed: {str(e)}"}

@tool
def wiki_search(query: str) -> ToolResult:
    """Search for information about a topic."""
    try:
        # For demonstration, we'll use the LLM to generate a response about the topic
        prompt = f"Provide a brief, factual summary about: {query}"
        result = generate_llm_response(prompt)
        return {'result': result, 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Search failed: {str(e)}"}

# Define available tools
tools = {
    "calculator": calculator,
    "wiki_search": wiki_search
}

# State and Message Types
class MessageBase(TypedDict):
    content: str
    type: str

class HumanMessage(MessageBase):
    pass

class AIMessage(MessageBase):
    pass

class ToolMessage(MessageBase):
    tool: str

class State(TypedDict):
    messages: List[MessageBase]
    next_step: str

def observe(state: dict) -> dict:
    """Process current state and determine next steps using LLM."""
    messages = state.get("messages", [])
    if not messages:
        return state

    # Look at the last message
    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    # Use LLM to determine the next step
    content = last_message.get("content", "")
    prompt = f"""Given the user message: '{content}'
    Determine which tool to use:
    - 'calculator' for mathematical calculations
    - 'wiki_search' for information queries
    - '' if no tool is needed
    Reply with just the tool name."""
    
    next_step = generate_llm_response(prompt).strip().lower()
    
    # Validate the response
    if next_step not in tools and next_step != "":
        next_step = ""
        
    return {
        "messages": messages,
        "next_step": next_step
    }

def think(state: dict) -> dict:
    """Plan the next action based on the current state using LLM."""
    next_step = state.get("next_step", "")
    if not next_step:
        return state

    messages = state.get("messages", [])
    if not messages:
        return state

    # Get the last message content
    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    content = last_message.get("content", "")
    
    # Use LLM to generate a response about the planned action
    prompt = f"""Given the user message: '{content}'
    And the selected tool: '{next_step}'
    Generate a brief, natural response about how you'll help with this task."""
    
    ai_response = generate_llm_response(prompt)
    
    messages.append({
        "content": ai_response,
        "type": "ai"
    })
    
    return {
        "messages": messages,
        "next_step": next_step
    }

def act(state: dict) -> dict:
    """Execute the planned action and return the next steps."""
    messages = state.get("messages", [])
    if not messages:
        return {"messages": messages}

    next_step = state.get("next_step", "")
    if not next_step or next_step not in tools:
        return {"messages": messages}

    # Get the original request from human message
    human_messages = [msg for msg in messages if msg.get("type") == "human"]
    if not human_messages:
        return {"messages": messages}
    
    content = human_messages[-1].get("content", "")
    
    # Extract the actual expression for calculator
    if next_step == "calculator":
        # Use LLM to extract the mathematical expression
        prompt = f"""Extract only the mathematical expression from: '{content}'
        Return only the numbers and operators (+,-,*,/,())."""
        
        expression = generate_llm_response(prompt).strip()
        content = expression
    
    # Execute the tool
    result = tools[next_step](content)
    
    # Add the tool result to messages
    messages.append({
        "content": json.dumps(result),
        "type": "tool",
        "tool": next_step
    })
    
    # Use LLM to generate a natural response about the result
    prompt = f"""Given the tool result: {json.dumps(result)}
    Generate a natural, helpful response explaining the result."""
    
    ai_response = generate_llm_response(prompt)
    
    messages.append({
        "content": ai_response,
        "type": "ai"
    })
    
    return {
        "messages": messages,
        "next_step": ""  # Clear the next step since we've handled it
    }

def should_continue(state: dict) -> bool:
    """Determine if the workflow should continue."""
    return bool(state.get("next_step")) or any(
        msg.get("type") == "human" 
        for msg in state.get("messages", [])[-1:]
    )

# Create the workflow
def build_workflow():
    workflow = StateGraph(State)
    
    workflow.add_node("observe", observe)
    workflow.add_node("think", think)
    workflow.add_node("act", act)
    
    workflow.add_edge("observe", "think")
    workflow.add_edge("think", "act")
    
    workflow.set_entry_point("observe")
    
    workflow.add_conditional_edges(
        "act",
        should_continue,
        {
            True: "observe",
            False: END
        }
    )
    
    return workflow.compile()

# Create the agent
agent = build_workflow()

# Example usage
initial_state = {
    "messages": [{
        "content": "What is the square root of 16?",
        "type": "human"
    }],
    "next_step": ""
}

# Execute the agent
result = agent.invoke(initial_state)
print("\nFinal state:")
for msg in result["messages"]:
    print(f"\n{msg['type'].upper()}: {msg['content']}")

In [None]:
from typing import TypedDict, Annotated, Optional, Any, Callable, List, Dict
from functools import wraps
from langgraph.graph import StateGraph, END
import json

# Tool Definitions
class ToolResult(TypedDict):
    """Type definition for tool results"""
    error: Optional[str]
    result: Optional[Any]

def tool(func: Callable) -> Callable:
    """Decorator to mark and process tool functions"""
    @wraps(func)
    def wrapper(*args, **kwargs) -> ToolResult:
        try:
            return func(*args, **kwargs)
        except Exception as e:
            return {'result': None, 'error': f"{func.__name__} failed: {str(e)}"}
    wrapper.is_tool = True
    wrapper.name = func.__name__
    wrapper.description = func.__doc__
    return wrapper

@tool
def calculator(input_text: str) -> ToolResult:
    """Calculate the result of a mathematical expression."""
    try:
        # Clean the input
        expression = input_text.strip()
        if not expression:
            return {'result': None, 'error': "No expression provided"}
            
        # Only allow basic arithmetic operations and numbers
        import re
        if not re.match(r'^[\d\s+\-*/.()]+$', expression):
            return {'result': None, 'error': "Invalid characters in expression"}
        
        # Additional safety checks
        if '__' in expression or 'eval' in expression or 'exec' in expression:
            return {'result': None, 'error': "Invalid expression"}
            
        # Evaluate the expression
        result = eval(expression)
        return {'result': str(result), 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Calculation failed: {str(e)}"}

@tool
def wiki_search(query: str) -> ToolResult:
    """Search for information about a topic."""
    try:
        # Simulated search result
        result = f"Found information about: {query}"
        return {'result': result, 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Search failed: {str(e)}"}

# Define available tools
tools = {
    "calculator": calculator,
    "wiki_search": wiki_search
}

# State and Message Types
class MessageBase(TypedDict):
    content: str
    type: str

class HumanMessage(MessageBase):
    pass

class AIMessage(MessageBase):
    pass

class ToolMessage(MessageBase):
    tool: str

class State(TypedDict):
    messages: List[MessageBase]
    next_step: str

def observe(state: dict) -> dict:
    """Process current state and determine next steps."""
    messages = state.get("messages", [])
    if not messages:
        return state

    # Look at the last message
    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    # Determine next step based on message content
    content = last_message.get("content", "").lower()
    next_step = ""
    
    if "calculate" in content:
        next_step = "calculator"
    elif "search" in content or "find" in content:
        next_step = "wiki_search"
        
    return {
        "messages": messages,
        "next_step": next_step
    }

def think(state: dict) -> dict:
    """Plan the next action based on the current state."""
    next_step = state.get("next_step", "")
    if not next_step:
        return state

    messages = state.get("messages", [])
    if not messages:
        return state

    # Get the last message content
    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    content = last_message.get("content", "")
    
    # Add AI message indicating the planned action
    messages.append({
        "content": f"I'll help you with the {next_step} task.",
        "type": "ai"
    })
    
    return {
        "messages": messages,
        "next_step": next_step
    }

def act(state: dict) -> dict:
    """Execute the planned action and return the next steps."""
    messages = state.get("messages", [])
    if not messages:
        return {"messages": messages}

    next_step = state.get("next_step", "")
    if not next_step or next_step not in tools:
        return {"messages": messages}

    # Get the original request from human message
    human_messages = [msg for msg in messages if msg.get("type") == "human"]
    if not human_messages:
        return {"messages": messages}
    
    content = human_messages[-1].get("content", "")
    
    # Extract the actual expression for calculator
    if next_step == "calculator":
        # Extract everything after "calculate" or just numbers and operators
        import re
        if "calculate" in content.lower():
            expression = content.lower().split("calculate")[-1].strip()
        else:
            # Remove everything except numbers, operators, and whitespace
            expression = ' '.join(re.findall(r'[\d+\-*/() ]+', content))
        content = expression.strip()
    
    # Execute the tool
    result = tools[next_step](content)
    
    # Add the tool result to messages
    messages.append({
        "content": json.dumps(result),
        "type": "tool",
        "tool": next_step
    })
    
    # Add an AI message interpreting the result
    if result.get("error") is None:
        ai_response = f"The {next_step} returned: {result.get('result')}"
    else:
        ai_response = f"Sorry, there was an error: {result.get('error')}"
        
    messages.append({
        "content": ai_response,
        "type": "ai"
    })
    
    return {
        "messages": messages,
        "next_step": ""  # Clear the next step since we've handled it
    }

def should_continue(state: dict) -> bool:
    """Determine if the workflow should continue."""
    return bool(state.get("next_step")) or any(
        msg.get("type") == "human" 
        for msg in state.get("messages", [])[-1:]
    )

# Create the workflow
def build_workflow():
    # Create the graph
    workflow = StateGraph(State)
    
    # Add nodes
    workflow.add_node("observe", observe)
    workflow.add_node("think", think)
    workflow.add_node("act", act)
    
    # Define edges
    workflow.add_edge("observe", "think")
    workflow.add_edge("think", "act")
    
    # Set entry point
    workflow.set_entry_point("observe")
    
    # Add conditional edges for continuation or completion
    workflow.add_conditional_edges(
        "act",
        should_continue,
        {
            True: "observe",  # Continue to observe if there's more to do
            False: END   # End the workflow if we're done
        }
    )
    
    return workflow.compile()

@tool
def wiki_search(query: str) -> ToolResult:
    """Search for information about a topic."""
    try:
        # Simulated search result
        result = f"Found information about: {query}"
        return {'result': result, 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Search failed: {str(e)}"}

# Define available tools
tools = {
    "calculator": calculator,
    "wiki_search": wiki_search
}

# State and Message Types
class MessageBase(TypedDict):
    content: str
    type: str

class HumanMessage(MessageBase):
    pass

class AIMessage(MessageBase):
    pass

class ToolMessage(MessageBase):
    tool: str

class State(TypedDict):
    messages: List[MessageBase]
    next_step: str

def observe(state: dict) -> dict:
    """Process current state and determine next steps."""
    messages = state.get("messages", [])
    if not messages:
        return state

    # Look at the last message
    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    # Determine next step based on message content
    content = last_message.get("content", "").lower()
    next_step = ""
    
    if "calculate" in content:
        next_step = "calculator"
    elif "search" in content or "find" in content:
        next_step = "wiki_search"
        
    return {
        "messages": messages,
        "next_step": next_step
    }

def think(state: dict) -> dict:
    """Plan the next action based on the current state."""
    next_step = state.get("next_step", "")
    if not next_step:
        return state

    messages = state.get("messages", [])
    if not messages:
        return state

    # Get the last message content
    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    content = last_message.get("content", "")
    
    # Add AI message indicating the planned action
    messages.append({
        "content": f"I'll help you with the {next_step} task.",
        "type": "ai"
    })
    
    return {
        "messages": messages,
        "next_step": next_step
    }

def act(state: dict) -> dict:
    """Execute the planned action and return the next steps."""
    messages = state.get("messages", [])
    if not messages:
        return {"messages": messages}

    next_step = state.get("next_step", "")
    if not next_step or next_step not in tools:
        return {"messages": messages}

    # Get the original request from human message
    human_messages = [msg for msg in messages if msg.get("type") == "human"]
    if not human_messages:
        return {"messages": messages}
    
    content = human_messages[-1].get("content", "")
    
    # Extract the actual expression for calculator
    if next_step == "calculator":
        # Extract everything after "calculate" or just numbers and operators
        import re
        if "calculate" in content.lower():
            expression = content.lower().split("calculate")[-1].strip()
        else:
            expression = re.sub(r'[^0-9+\-*/\s()]', '', content)
        content = expression.strip()
    
    # Execute the tool
    result = tools[next_step](content)
    
    # Add the tool result to messages
    messages.append({
        "content": json.dumps(result),
        "type": "tool",
        "tool": next_step
    })
    
    # Add an AI message interpreting the result
    if result.get("error") is None:
        ai_response = f"The {next_step} returned: {result.get('result')}"
    else:
        ai_response = f"Sorry, there was an error: {result.get('error')}"
        
    messages.append({
        "content": ai_response,
        "type": "ai"
    })
    
    return {
        "messages": messages,
        "next_step": ""  # Clear the next step since we've handled it
    }

def should_continue(state: dict) -> bool:
    """Determine if the workflow should continue."""
    return bool(state.get("next_step")) or any(
        msg.get("type") == "human" 
        for msg in state.get("messages", [])[-1:]
    )

# Create the workflow
def build_workflow():
    # Create the graph
    workflow = StateGraph(State)
    
    # Add nodes
    workflow.add_node("observe", observe)
    workflow.add_node("think", think)
    workflow.add_node("act", act)
    
    # Define edges
    workflow.add_edge("observe", "think")
    workflow.add_edge("think", "act")
    
    # Set entry point
    workflow.set_entry_point("observe")
    
    # Add conditional edges for continuation or completion
    workflow.add_conditional_edges(
        "act",
        should_continue,
        {
            True: "observe",  # Continue to observe if there's more to do
            False: END   # End the workflow if we're done
        }
    )
    
    return workflow.compile()

# Create the agent
agent = build_workflow()

# Example usage
initial_state = {
    "messages": [{
        "content": "Please calculate 2 + 2",
        "type": "human"
    }],
    "next_step": ""
}

# Execute the agent
result = agent.invoke(initial_state)
print("\nFinal state:")
for msg in result["messages"]:
    print(f"\n{msg['type'].upper()}: {msg['content']}")

In [1]:
from typing import TypedDict, Annotated, Optional, Any, Callable, List, Dict, Union
from functools import wraps, lru_cache
import time
from langgraph.graph import StateGraph, END
import json
import transformers
import ast
from tenacity import retry, stop_after_attempt, wait_exponential
from dataclasses import dataclass
import logging
import re

# Set up logging with detailed formatting for both file and console
import sys
from logging.handlers import RotatingFileHandler

# First, configure the root logger
logging.getLogger().setLevel(logging.DEBUG)

# Create formatters
detailed_formatter = logging.Formatter(
    '%(asctime)s.%(msecs)03d %(levelname)s [%(filename)s:%(lineno)d] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Set up file handler
file_handler = RotatingFileHandler('agent_debug.log', maxBytes=10000000, backupCount=5)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(detailed_formatter)

# Set up console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)  # Explicitly set handler level to DEBUG
console_handler.setFormatter(detailed_formatter)

# Configure logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)  # Explicitly set logger level to DEBUG

# Remove any existing handlers
for handler in logger.handlers[:]:
    logger.removeHandler(handler)

# Add our handlers
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# Prevent logging from propagating to the root logger
logger.propagate = False

# Verify logging level
logger.debug("Logging system initialized at DEBUG level")

# Prompt Templates
PROMPTS = {
    "TOOL_SELECTION": """
Given the user message: '{message}'
Select the most appropriate tool:
- calculator: For mathematical calculations
- wiki_search: For information queries
- none: If no tool is needed

Respond with the tool name only.
""",
    "ACTION_PLANNING": """
Given the user message: '{message}'
And the selected tool: '{tool}'
Generate a brief, natural response about how you'll help with this task.
""",
    "RESULT_EXPLANATION": """
Given the tool result: {result}
Generate a natural, helpful response explaining the result.
"""
}

# Model Configuration
@dataclass
class ModelConfig:
    model_name: str = "microsoft/phi-4"
    max_new_tokens: int = 50
    temperature: float = 0.7
    device: str = "cpu"
    torch_dtype: str = "auto"

# Initialize model with configuration
def initialize_model(config: ModelConfig) -> transformers.Pipeline:
    return transformers.pipeline(
        "text-generation",
        model=config.model_name,
        model_kwargs={"torch_dtype": config.torch_dtype}
    )

# Cache LLM responses to avoid redundant computations
@lru_cache(maxsize=1000)
def generate_llm_response(prompt: str, max_length: int = 50) -> str:
    """Generate response using language model with caching"""
    try:
        response = pipeline(prompt, max_new_tokens=max_length)
        if isinstance(response, list):
            return response[0]["generated_text"].strip()
        elif isinstance(response, dict):
            return response["generated_text"].strip()
        elif isinstance(response, str):
            return response.strip()
        else:
            logger.error(f"Unexpected response type from LLM: {type(response)}")
            return ""
    except Exception as e:
        logger.error(f"Error generating LLM response: {e}")
        return ""

# Tool Definitions
class ToolResult(TypedDict):
    """Type definition for tool results"""
    error: Optional[str]
    result: Optional[Any]

def tool(func: Callable) -> Callable:
    """Enhanced decorator to mark and process tool functions"""
    @wraps(func)
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def wrapper(*args, **kwargs) -> ToolResult:
        try:
            logger.info(f"Executing tool: {func.__name__}")
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f"Tool execution failed: {func.__name__}, error: {e}")
            return {'result': None, 'error': f"{func.__name__} failed: {str(e)}"}
    wrapper.is_tool = True
    wrapper.name = func.__name__
    wrapper.description = func.__doc__
    return wrapper

def safe_eval(expression: str) -> Optional[float]:
    """Safely evaluate mathematical expressions"""
    try:
        # Remove any whitespace and validate characters
        expression = ''.join(expression.split())
        if not re.match(r'^[\d+\-*/.()]+$', expression):
            return None
            
        # Parse and validate AST
        tree = ast.parse(expression, mode='eval')
        
        # Only allow basic arithmetic operations
        allowed_nodes = (ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num,
                        ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow)
                        
        for node in ast.walk(tree):
            if not isinstance(node, allowed_nodes):
                return None
                
        return eval(compile(tree, '<string>', 'eval'))
    except Exception as e:
        logger.error(f"Safe eval failed: {e}")
        return None

@tool
def calculator(input_text: str) -> ToolResult:
    """Calculate the result of a mathematical expression."""
    try:
        # Clean and validate the input
        expression = input_text.strip()
        if not expression:
            return {'result': None, 'error': "No expression provided"}
            
        # Evaluate safely
        result = safe_eval(expression)
        if result is None:
            return {'result': None, 'error': "Invalid expression"}
            
        return {'result': str(result), 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Calculation failed: {str(e)}"}

@tool
def wiki_search(query: str) -> ToolResult:
    """Search for information about a topic."""
    try:
        # For demonstration, using LLM. In production, integrate with actual Wikipedia API
        prompt = f"Provide a brief, factual summary about: {query}"
        result = generate_llm_response(prompt)
        return {'result': result, 'error': None}
    except Exception as e:
        return {'result': None, 'error': f"Search failed: {str(e)}"}

# Define available tools
tools = {
    "calculator": calculator,
    "wiki_search": wiki_search
}

# Enhanced State and Message Types
class MessageBase(TypedDict):
    content: str
    type: str
    timestamp: float  # Add timestamp for message ordering

class HumanMessage(MessageBase):
    pass

class AIMessage(MessageBase):
    confidence: float  # Add confidence score

class ToolMessage(MessageBase):
    tool: str
    attempts: int  # Track retry attempts

class State(TypedDict):
    messages: List[MessageBase]
    next_step: str
    context: Dict[str, Any]  # Add context for maintaining state

def parse_tool_selection(llm_response: str) -> Optional[str]:
    """Parse and validate tool selection from LLM response"""
    valid_tools = set(tools.keys())
    tool_name = llm_response.strip().lower()
    return tool_name if tool_name in valid_tools else None

def observe(state: State) -> State:
    """Enhanced observation phase"""
    print("\n=== OBSERVE PHASE ===")
    print(f"Messages count: {len(state.get('messages', []))}")
    print(f"Current next_step: {state.get('next_step', 'None')}")
    logger.debug("=== OBSERVE PHASE ===")
    logger.debug(f"Current state: {json.dumps(state, default=str)}")
    
    messages = state.get("messages", [])
    if not messages:
        logger.debug("No messages in state, returning unchanged")
        return state

    # Get the last message
    last_message = messages[-1]
    content = last_message.get("content", "").lower()
    
    # Simple pattern matching for mathematical operations
    math_patterns = [
        "calculate", "computation", "sum", "add", "subtract", 
        "multiply", "divide", "square root", "sqrt", "root"
    ]
    
    # Determine the tool based on message content
    next_step = ""
    if any(pattern in content.lower() for pattern in math_patterns):
        next_step = "calculator"
        logger.debug(f"Detected mathematical operation, setting next_step to: {next_step}")
    elif "what" in content and "is" in content:
        next_step = "wiki_search"
        logger.debug(f"Detected information query, setting next_step to: {next_step}")
        
    print(f"Selected next_step: {next_step}")
    
    return {
        "messages": messages,
        "next_step": next_step,
        "context": state.get("context", {})
    }

    last_message = messages[-1]
    if not isinstance(last_message, dict):
        return state

    content = last_message.get("content", "")
    prompt = PROMPTS["TOOL_SELECTION"].format(message=content)
    
    next_step = parse_tool_selection(generate_llm_response(prompt))
    
    return {
        "messages": messages,
        "next_step": next_step or "",
        "context": state.get("context", {})
    }

def think(state: State) -> State:
    """Enhanced thinking phase"""
    print("\n=== THINK PHASE ===")
    print(f"Next step to consider: {state.get('next_step', 'None')}")
    print(f"Messages count: {len(state.get('messages', []))}")
    
    logger.debug("=== THINK PHASE ===")
    logger.debug(f"Incoming state: {json.dumps(state, default=str)}")
    
    next_step = state.get("next_step", "")
    messages = state.get("messages", [])
    context = state.get("context", {})
    
    # If there's no next step, just return the state
    if not next_step or not messages:
        print("No next step or messages, skipping think phase")
        return state
    
    try:
        # Get the last message content
        last_message = messages[-1]
        content = last_message.get("content", "")
        
        # For calculator, we don't need LLM response
        if next_step == "calculator":
            ai_response = "I'll help you calculate that."
        else:
            # Use LLM to generate a response about the planned action
            prompt = f"""Given the user message: '{content}'
            And the selected tool: '{next_step}'
            Generate a brief, natural response about how you'll help with this task."""
            
            ai_response = generate_llm_response(prompt)
        
        print(f"Generated AI response: {ai_response}")
        
        # Add the AI response to messages
        messages.append({
            "content": ai_response,
            "type": "ai",
            "timestamp": time.time()
        })
        
        return {
            "messages": messages,
            "next_step": next_step,
            "context": context
        }
        
    except Exception as e:
        logger.error(f"Error in think phase: {e}")
        # Return original state on error
        return state

    if not next_step or not messages:
        return state

    last_message = messages[-1]
    content = last_message.get("content", "")
    
    prompt = PROMPTS["ACTION_PLANNING"].format(
        message=content,
        tool=next_step
    )
    
    ai_response = generate_llm_response(prompt)
    
    messages.append({
        "content": ai_response,
        "type": "ai",
        "timestamp": time.time(),
        "confidence": 0.8  # Example confidence score
    })
    
    return {
        "messages": messages,
        "next_step": next_step,
        "context": context
    }

def act(state: State) -> State:
    """Enhanced action phase"""
    print("\n=== ACT PHASE ===")
    print(f"Executing step: {state.get('next_step', 'None')}")
    print(f"Messages count: {len(state.get('messages', []))}")
    print(f"Last message type: {state.get('messages', [{}])[-1].get('type', 'None') if state.get('messages') else 'None'}")
    logger.debug("=== ACT PHASE ===")
    logger.debug(f"Incoming state: {json.dumps(state, default=str)}")
    
    messages = state.get("messages", [])
    next_step = state.get("next_step", "")
    context = state.get("context", {})
    
    logger.debug(f"Executing step: {next_step}")
    logger.debug(f"Message count: {len(messages)}")
    logger.debug(f"Context: {context}")

    if not next_step or next_step not in tools:
        return {"messages": messages, "next_step": "", "context": context}

    human_messages = [msg for msg in messages if msg.get("type") == "human"]
    if not human_messages:
        return {"messages": messages, "next_step": "", "context": context}
    
    content = human_messages[-1].get("content", "")
    
    # Extract expression for calculator
    if next_step == "calculator":
        expression = re.findall(r'[\d+\-*/().]+', content)
        content = expression[0] if expression else content
    
    # Execute tool with retry mechanism
    result = execute_tool(next_step, content)
    
    messages.append({
        "content": json.dumps(result),
        "type": "tool",
        "tool": next_step,
        "timestamp": time.time(),
        "attempts": context.get("retry_count", 1)
    })
    
    # Generate response about result
    prompt = PROMPTS["RESULT_EXPLANATION"].format(result=json.dumps(result))
    ai_response = generate_llm_response(prompt)
    
    messages.append({
        "content": ai_response,
        "type": "ai",
        "timestamp": time.time(),
        "confidence": 0.9
    })
    
    return {
        "messages": messages,
        "next_step": "",
        "context": context
    }

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def execute_tool(tool_name: str, content: str) -> ToolResult:
    """Execute tool with retry mechanism"""
    tool = tools.get(tool_name)
    if not tool:
        return {"error": "Tool not found", "result": None}
    return tool(content)

def should_continue(state: State) -> bool:
    """Enhanced continuation check with recursion protection"""
    print("\n=== CONTINUE CHECK ===")
    print(f"Recursion count: {state.get('context', {}).get('recursion_count', 0)}")
    print(f"Has next_step: {bool(state.get('next_step'))}")
    print(f"Messages count: {len(state.get('messages', []))}")
    logger.debug("=== CONTINUE CHECK ===")
    logger.debug(f"Checking state: {json.dumps(state, default=str)}")
    messages = state.get("messages", [])
    next_step = state.get("next_step", "")
    context = state.get("context", {})
    
    # Get the recursion count from context
    recursion_count = context.get("recursion_count", 0)
    
    # Update recursion count
    context["recursion_count"] = recursion_count + 1
    
    # Check if we've hit the recursion limit (20 is a safe number)
    if recursion_count >= 20:
        logger.warning("Recursion limit reached, forcing stop")
        return False
    
    # Check for unfinished business
    has_pending_action = bool(next_step)
    has_recent_human_message = any(
        msg.get("type") == "human" 
        for msg in messages[-1:]
    )
    
    # Only consider recent errors to prevent infinite loops
    has_errors = any(
        msg.get("type") == "tool" and msg.get("error")
        for msg in messages[-2:]  # Only look at the last 2 messages
    )
    
    # Don't continue if we have errors but no pending action
    if has_errors and not (has_pending_action or has_recent_human_message):
        return False
    
    return has_pending_action or has_recent_human_message

def build_workflow() -> Callable:
    """Build the enhanced workflow"""
    workflow = StateGraph(State)  # Initialize without recursion_limit
    
    workflow.add_node("observe", observe)
    workflow.add_node("think", think)
    workflow.add_node("act", act)
    
    # Add edges
    workflow.add_edge("observe", "think")
    workflow.add_edge("think", "act")
    
    workflow.set_entry_point("observe")
    
    workflow.add_conditional_edges(
        "act",
        should_continue,
        {
            True: "observe",
            False: END
        }
    )
    
    return workflow.compile()

# Initialize the system
model_config = ModelConfig()
pipeline = initialize_model(model_config)
agent = build_workflow()

def process_message(message: str) -> Dict[str, Any]:
    """Process a single message through the agent"""
    print("\n=== STARTING NEW MESSAGE PROCESSING ===")
    print(f"Input message: {message}")
    initial_state = {
        "messages": [{
            "content": message,
            "type": "human",
            "timestamp": time.time()
        }],
        "next_step": "",
        "context": {}
    }
    
    try:
        result = agent.invoke(initial_state)
        return result
    except Exception as e:
        logger.error(f"Error processing message: {e}")
        return {
            "messages": [
                {
                    "content": "I encountered an error processing your request. Please try again.",
                    "type": "ai",
                    "timestamp": time.time(),
                    "confidence": 0.0
                }
            ],
            "next_step": "",
            "context": {"error": str(e)}
        }

# Example usage
if __name__ == "__main__":
    test_message = "What is the square root of 16?"
    result = process_message(test_message)
    
    print("\nConversation flow:")
    for msg in result["messages"]:
        print(f"\n{msg['type'].upper()}: {msg['content']}")


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/local/Caskroom/miniconda/base/lib/python3.12/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/usr/local/Caskroom/miniconda/base/lib/python3.12/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/usr/local/Caskroom/miniconda/base/lib/python3.12/site-packages/ipykernel/kernelapp.py", line 739, in start
    self

2025-02-24 18:19:30.774 DEBUG [483232748.py:52] Logging system initialized at DEBUG level


Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

Device set to use mps:0


TypeError: BFloat16 is not supported on MPS