<a href="https://colab.research.google.com/github/wjleece/AI-Agents/blob/main/AI_Agents_w_Evals.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%pip install anthropic
#%pip install openai
%pip install -q -U google-generativeai
%pip install fuzzywuzzy

Collecting anthropic
  Downloading anthropic-0.51.0-py3-none-any.whl.metadata (25 kB)
Downloading anthropic-0.51.0-py3-none-any.whl (263 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/264.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━[0m [32m215.0/264.0 kB[0m [31m6.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m264.0/264.0 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: anthropic
Successfully installed anthropic-0.51.0
Collecting fuzzywuzzy
  Downloading fuzzywuzzy-0.18.0-py2.py3-none-any.whl.metadata (4.9 kB)
Downloading fuzzywuzzy-0.18.0-py2.py3-none-any.whl (18 kB)
Installing collected packages: fuzzywuzzy
Successfully installed fuzzywuzzy-0.18.0


In [2]:
#Setup and Imports
import anthropic
import google.generativeai as gemini
import re
import json
import time
import os
import copy
import glob # For finding files matching a pattern
import uuid # For generating unique learning IDs in RAG
from google.colab import userdata
#from openai import OpenAI
from google.colab import drive # For Google Drive mounting
from datetime import datetime
from typing import Dict, List, Any, Optional, Union, Tuple
from fuzzywuzzy import process, fuzz

# LLM API Keys
ANTHROPIC_API_KEY = userdata.get('ANTHROPIC_API_KEY')
#OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')

anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
#openai_client = OpenAI(api_key=OPENAI_API_KEY)
gemini.configure(api_key=GOOGLE_API_KEY)

ANTHROPIC_MODEL_NAME = "claude-3-5-sonnet-latest"
#OPENAI_MODEL_NAME = "gpt-4.1" # Or your preferred GPT-4 class model
EVAL_MODEL_NAME = "gemini-2.5-pro-preview-05-06" # Or your preferred Gemini model


DRIVE_MOUNT_PATH = '/content/drive'

try:
    drive.mount(DRIVE_MOUNT_PATH)
    print(f"Google Drive mounted successfully at {DRIVE_MOUNT_PATH}.")
except Exception as e:
    print(f"Error mounting Google Drive: {e}. RAG features will not work.")

# Set up the default learnings path
DEFAULT_LEARNINGS_DRIVE_SUBPATH = "My Drive/AI/Knowledgebases"  # Your default path
LEARNINGS_DRIVE_BASE_PATH = os.path.join(DRIVE_MOUNT_PATH, DEFAULT_LEARNINGS_DRIVE_SUBPATH)

# Create the directory if it doesn't exist
if not os.path.exists(LEARNINGS_DRIVE_BASE_PATH):
    try:
        os.makedirs(LEARNINGS_DRIVE_BASE_PATH)
        print(f"Created learnings directory: {LEARNINGS_DRIVE_BASE_PATH}")
    except Exception as e:
        print(f"Error creating learnings directory {LEARNINGS_DRIVE_BASE_PATH}: {e}")
else:
    print(f"Using existing learnings directory: {LEARNINGS_DRIVE_BASE_PATH}")

print("Imports and LLM clients initialized. Drive RAG configuration variables set.")



Mounted at /content/drive
Google Drive mounted successfully at /content/drive.
Using existing learnings directory: /content/drive/My Drive/AI/Knowledgebases
Imports and LLM clients initialized. Drive RAG configuration variables set.


In [3]:
# --- Specialized System Prompts ---

# --- Worker AI Prompts ---
worker_base_instructions = """
You are a helpful customer service assistant for an e-commerce system.
Your overriding goal is to be helpful by answering questions and performing actions as requested by a human user.
When responding to the user, use the conversation context to maintain continuity.
- If a user refers to "my order" or similar, use the context to determine which order they're talking about.
- If they mention "that product" or use other references, check the context to determine what they're referring to.
- Always prioritize recent context over older context when resolving references.

The conversation context will be provided to you with each message. This includes:
- Previous questions and answers
- Recently viewed customers, products, and orders
- Recent actions taken (like creating orders, updating products, etc.)
- Relevant Learnings from a knowledge base (if applicable to the current query type).

REQUESTING CLARIFICATION FROM THE USER:
If you determine that you absolutely need more information from the user to accurately and efficiently fulfill their request or use a tool correctly, you MUST:
1. Formulate a clear, concise question for the user.
2. Prefix your entire response with the exact tag: `CLARIFICATION_REQUESTED:`
   Example: `CLARIFICATION_REQUESTED: To update the order, could you please provide the Order ID?`
3. Do NOT use any tools in the same turn you are requesting clarification. Wait for the user's response.

Keep all other responses friendly, concise, and helpful.
"""

worker_operational_system_prompt = f"""
{worker_base_instructions}

Your current task is OPERATIONAL. Focus on understanding user requests related to e-commerce functions (managing orders, products, customers), using the provided tools accurately, and interacting with the data store.
The "Relevant Learnings from Knowledge Base" provided in your context may contain operational guidelines.
"""

worker_metacognitive_learnings_system_prompt = f"""
{worker_base_instructions}

Your current task is METACOGNITIVE: SUMMARIZING LEARNINGS.
If the user asks you to "summarize your learnings," "what have you learned," "why did you", "is there a better way to" or similar phrases, your response should be based PRIMARILY on the content provided to you under the heading "Relevant Learnings from Knowledge Base (In-Session Cache)" in your current context.
- List the key principles or pieces of information from these provided learnings.
- Do not confuse these explicit learnings with a general summary of your recent actions or the current state of the data store, unless a learning specifically refers to such an action or state.
- If no specific learnings are provided in your context for this type of query, you can state that no specific new learnings have been highlighted for this interaction.
- Avoid using tools for this type of summarization unless a tool is specifically designed to retrieve or process learnings.
"""

# --- Evaluator AI Prompt (unified but guided by query type information) ---
# This prompt is largely the same as the one from worker_prompt_update_learning_summary,
# but we will emphasize the query_type in the main prompt to the evaluator.

evaluator_system_prompt = """
You are Google Gemini, an impartial evaluator assessing the quality of responses from an AI assistant to customer service queries.

You will be provided with:
- The user's query and the TYPE of query it was classified as (e.g., OPERATIONAL, METACOGNITIVE_LEARNINGS_SUMMARY).
- The conversation context (including RAG learnings) that was available to the AI assistant.
- The AI assistant's final response.
- A snapshot of the 'Data Store State *Before* AI Action'.
- A snapshot of the 'Data Store State *After* AI Action'.
- Details of any clarification questions the AI assistant asked.

Your primary goal is to assess the AI assistant based on the SPECIFIC TASK it was attempting, as indicated by the query type.

For each interaction, evaluate the assistant's response based on:
1.  **Accuracy**:
    * If OPERATIONAL: How correct and factual is the AI's textual response? Did its actions (tool calls) correctly modify the datastore as intended and claimed? Verify against 'Before' and 'After' states.
    * If METACOGNITIVE_LEARNINGS_SUMMARY: Did the AI accurately summarize the "Relevant Learnings from Knowledge Base" provided in its context? Was the summary faithful to these learnings? Avoid confusing this with operational history.
    * Check for new entity IDs and correct updates if applicable to the query type.

2.  **Efficiency**:
    * Did the assistant achieve its goal with minimal clarifying questions?
    * If OPERATIONAL: Were tool calls used appropriately?
    * If METACOGNITIVE_LEARNINGS_SUMMARY: Was the summary direct and to the point based on provided learnings?

3.  **Context Awareness**:
    * Did the assistant correctly use the conversation history and entities?
    * Crucially, did the assistant adhere to the task defined by the query type? (e.g., if asked to summarize learnings, did it do that, or did it perform an operational summary by mistake?).
    * Did it correctly use any "Relevant Learnings from Knowledge Base" that were pertinent to the query type?

4.  **Helpfulness**:
    * How well did the assistant address the user's needs *for the identified query type*?
    * Was the response clear and did it provide relevant information?

Score the response on a scale of 1-10 for each criterion, and provide an overall score. Provide detailed reasoning, EXPLICITLY MENTIONING THE QUERY TYPE you are evaluating against.
- For OPERATIONAL queries, heavily reference the 'Before' and 'After' data store states.
- For METACOGNITIVE_LEARNINGS_SUMMARY, heavily reference the "Relevant Learnings from Knowledge Base" that were provided to the worker.

EVALUATING CLARIFICATION QUESTIONS: (Same as before)
If the worker AI asked for clarification:
- Assess necessity using 'Data Store State *Before* AI Action' and context.
- If necessary and well-phrased, it should NOT negatively impact Efficiency.
- If unnecessary, it SHOULD negatively impact Efficiency.

If you, the evaluator, still have questions, use "CLARIFICATION NEEDED_EVALUATOR:".

DATA STORE CONSISTENCY (Primarily for OPERATIONAL tasks):
When assessing Accuracy for OPERATIONAL tasks, explicitly compare the AI's actions with changes between 'Before' and 'After' states.
"""

In [4]:
#Initialize stuff to prevent possible caching issues

human_feedback_learnings = {}
tools_schemas_list = []

In [5]:
#Gemini models have different structure than Anthropic and need to be called this way before use to enable generate_content(prompt),
#whereas Anthropic allows model definition + system instructions within messages.create(prompt)

eval_model_instance = gemini.GenerativeModel(
    model_name=EVAL_MODEL_NAME,
    system_instruction=evaluator_system_prompt
)
print("Gemini Evaluator model instance initialized.")

#this instance of Gemini is to get "ground truth" answers by running queries in parallel with Anthropic. The evaluator instance then evaluates the "ground truth" against Anthropic's response.
gemini_actor_model_instance = gemini.GenerativeModel(
    model_name=EVAL_MODEL_NAME, # Using the same underlying model
    system_instruction=worker_base_instructions # But with the worker's system prompt
)
print("Gemini Actor model instance initialized.")

Gemini Evaluator model instance initialized.
Gemini Actor model instance initialized.


In [6]:
# Global Data Stores (Initial data - will be managed by the Storage class instance)
# These are initial values. The Storage class will manage them.
initial_customers = {
    "C1": {"name": "John Doe", "email": "john@example.com", "phone": "123-456-7890"},
    "C2": {"name": "Jane Smith", "email": "jane@example.com", "phone": "987-654-3210"}
}

initial_products = {
    "P1": {"name": "Widget A", "description": "A simple widget. Very compact.", "price": 19.99, "inventory_count": 999},
    "P2": {"name": "Gadget B", "description": "A powerful gadget. It spins.", "price": 49.99, "inventory_count": 200},
    "P3": {"name": "Perplexinator", "description": "A perplexing perfunctator", "price": 79.99, "inventory_count": 1483}
}

initial_orders = {
    "O1": {"id": "O1", "product_id": "P1", "product_name": "Widget A", "quantity": 2, "price": 19.99, "status": "Shipped"},
    "O2": {"id": "O2", "product_id": "P2", "product_name": "Gadget B", "quantity": 1, "price": 49.99, "status": "Processing"}
}


In [7]:
# Standalone Anthropic Completion Function (for basic tests)
def get_completion_anthropic_standalone(prompt: str):
    message = anthropic_client.messages.create(
        model=ANTHROPIC_MODEL_NAME,
        max_tokens=2000,
        temperature=0.0,
        system=worker_base_instructions,
        tools=tools_schemas_list,
        messages=[
          {"role": "user", "content": prompt}
        ]
    )
    return message.content[0].text

In [8]:
prompt_test_anthropic = "Hey there, which AI model do you use for answering questions?"
print(f"Anthropic Standalone Test: {get_completion_anthropic_standalone(prompt_test_anthropic)}")

Anthropic Standalone Test: I am a customer service assistant designed to help with e-commerce related questions and tasks. I aim to be helpful by answering questions and performing actions related to orders, products, customers, and other e-commerce functions. I don't actually discuss my underlying AI model or technical details - instead, I'd be happy to help you with any questions about orders, products, or other e-commerce related matters. What can I assist you with today?


In [9]:
#def get_completion_openai_standalone(prompt: str):
#    response = openai_client.chat.completions.create(
#        model=OPENAI_MODEL_NAME,
#        max_tokens=2000,
#        temperature=0.0,
#        tools=tools_schemas_list,
#        messages=[
#            {"role": "system", "content": worker_system_prompt},
#            {"role": "user", "content": prompt}
#        ]
#    )
#    return response.choices[0].message.content

In [10]:
#prompt_test_openai = "Hey there, which AI model do you use for answering questions?"
#print(f"OpenAI Standalone Test: {get_completion_openai_standalone(prompt_test_openai)}")

In [11]:
def get_completion_eval_standalone(prompt: str):
    # Uses the eval_model_instance which has the system prompt
        response = eval_model_instance.generate_content(prompt)
        return response.text

In [12]:
prompt_test_eval = "Hey there, can you tell me which AI you are and what your key tasks are?"
print(f"Gemini Eval Standalone Test:\n{get_completion_eval_standalone(prompt_test_eval)}")

Gemini Eval Standalone Test:
## Evaluation of AI Assistant Response

**1. Accuracy:**
   - **Score:** 10/10
   - **Reasoning:** The AI accurately identifies itself as "a large language model, trained by Google." It also correctly describes its key tasks as understanding and responding to a wide range of questions and requests, assisting with information, completing tasks based on instructions, and engaging in conversation. This is a truthful representation of its capabilities in this context.

**2. Efficiency:**
   - **Score:** 10/10
   - **Reasoning:** The AI answered the user's query directly and concisely without asking any clarifying questions. The response was immediate and to the point.

**3. Context Awareness:**
   - **Score:** 10/10
   - **Reasoning:** The query type is METACOGNITIVE_ABOUT_ASSISTANT. The AI correctly understood that the user was asking about its own identity and functions. It did not attempt to perform an operational task or summarize learnings, which would hav

In [13]:
# Storage Class
class Storage:
    def __init__(self):
        self.customers = copy.deepcopy(initial_customers)
        self.products = copy.deepcopy(initial_products)
        self.orders = copy.deepcopy(initial_orders)
        self.human_feedback_learnings = human_feedback_learnings

    def get_full_datastore_copy(self) -> Dict[str, Any]:
        """Returns a deep copy of the current datastore."""
        return {
            "customers": copy.deepcopy(self.customers),
            "products": copy.deepcopy(self.products),
            "orders": copy.deepcopy(self.orders)
        }

print("Storage class defined with deepcopy for initial data and get_full_datastore_copy method.")

Storage class defined with deepcopy for initial data and get_full_datastore_copy method.


In [14]:
#Definitive list of tool schemas.
tools_schemas_list = [
    {
        "name": "create_customer",
        "description": "Adds a new customer to the database. Includes customer name, email, and (optional) phone number.",
        "input_schema": {
            "type": "object",
            "properties": {
                "name": {"type": "string", "description": "The name of the customer."},
                "email": {"type": "string", "description": "The email address of the customer."},
                "phone": {"type": "string", "description": "The phone number of the customer (optional)."}
            },
            "required": ["name", "email"]
        }
    },
    {
        "name": "get_customer_info",
        "description": "Retrieves customer information based on their customer ID. Returns the customer's name, email, and (optional) phone number.",
        "input_schema": {
            "type": "object",
            "properties": {
                "customer_id": {"type": "string", "description": "The unique identifier for the customer."}
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "create_product",
        "description": "Adds a new product to the product database. Includes name, description, price, and initial inventory count.",
        "input_schema": {
            "type": "object",
            "properties": {
                "name": {"type": "string", "description": "The name of the product."},
                "description": {"type": "string", "description": "A description of the product."},
                "price": {"type": "number", "description": "The price of the product."},
                "inventory_count": {"type": "integer", "description": "The amount of the product that is currently in inventory."}
            },
            "required": ["name", "description", "price", "inventory_count"]
        }
    },
    {
        "name": "update_product",
        "description": "Updates an existing product with new information. Only fields that are provided will be updated; other fields remain unchanged.",
        "input_schema": {
            "type": "object",
            "properties": {
                "product_id": {"type": "string", "description": "The unique identifier for the product to update."},
                "name": {"type": "string", "description": "The new name for the product (optional)."},
                "description": {"type": "string", "description": "The new description for the product (optional)."},
                "price": {"type": "number", "description": "The new price for the product (optional)."},
                "inventory_count": {"type": "integer", "description": "The new inventory count for the product (optional)."}
            },
            "required": ["product_id"]
        }
    },
    {
        "name": "get_product_info",
        "description": "Retrieves product information based on product ID or product name (with fuzzy matching for misspellings). Returns product details including name, description, price, and inventory count.",
        "input_schema": {
            "type": "object",
            "properties": {
                "product_id_or_name": {"type": "string", "description": "The product ID or name (can be approximate)."}
            },
            "required": ["product_id_or_name"]
        }
    },
    {
        "name": "list_all_products",
        "description": "Lists all available products in the inventory.",
        "input_schema": { "type": "object", "properties": {}, "required": [] }
    },
    {
        "name": "create_order",
        "description": "Creates an order using the product's current price. If requested quantity exceeds available inventory, no order is created and available quantity is returned. Orders can only be created for products that are in stock. Supports specifying products by either ID or name with fuzzy matching for misspellings.",
        "input_schema": {
            "type": "object",
            "properties": {
                "product_id_or_name": {"type": "string", "description": "The ID or name of the product to order (supports fuzzy matching)."},
                "quantity": {"type": "integer", "description": "The quantity of the product in the order."},
                "status": {"type": "string", "description": "The initial status of the order (e.g., 'Processing', 'Shipped')."}
            },
            "required": ["product_id_or_name", "quantity", "status"]
        }
    },
    {
        "name": "get_order_details",
        "description": "Retrieves the details of a specific order based on the order ID. Returns the order ID, product name, quantity, price, and order status.",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string", "description": "The unique identifier for the order."}
            },
            "required": ["order_id"]
        }
    },
    {
        "name": "update_order_status",
        "description": "Updates the status of an order and adjusts inventory accordingly. Changing to \"Shipped\" decreases inventory. Changing to \"Returned\" or \"Canceled\" from \"Shipped\" increases inventory. Status can be \"Processing\", \"Shipped\", \"Delivered\", \"Returned\", or \"Canceled\".",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string", "description": "The unique identifier for the order."},
                "new_status": {
                    "type": "string",
                    "description": "The new status to set for the order.",
                    "enum": ["Processing", "Shipped", "Delivered", "Returned", "Canceled"]
                }
            },
            "required": ["order_id", "new_status"]
        }
    }
]
print(f"Defined {len(tools_schemas_list)} tool schemas.")

Defined 9 tool schemas.


In [15]:
# Tool Function Definitions
# These tool functions now accept a 'current_storage' argument to operate on a specific Storage instance.

# Customer functions
def create_customer(current_storage: Storage, name: str, email: str, phone: Optional[str] = None) -> Dict[str, Any]:
    """Creates a new customer and adds them to the customer database."""
    new_id = f"C{len(current_storage.customers) + 1}"
    current_storage.customers[new_id] = {"name": name, "email": email, "phone": phone}
    print(f"[Tool Executed] create_customer: ID {new_id}, Name: {name} (in {type(current_storage).__name__})")
    return {"status": "success", "customer_id": new_id, "customer": current_storage.customers[new_id]}

def get_customer_info(current_storage: Storage, customer_id: str) -> Dict[str, Any]:
    """Retrieves information about a customer based on their ID."""
    customer = current_storage.customers.get(customer_id)
    if customer:
        print(f"[Tool Executed] get_customer_info: ID {customer_id} found (in {type(current_storage).__name__}).")
        return {"status": "success", "customer_id": customer_id, "customer": customer}
    print(f"[Tool Executed] get_customer_info: ID {customer_id} not found (in {type(current_storage).__name__}).")
    return {"status": "error", "message": "Customer not found"}

# Product functions
def create_product(current_storage: Storage, name: str, description: str, price: float, inventory_count: int) -> Dict[str, Any]:
    """Creates a new product and adds it to the product database."""
    new_id = f"P{len(current_storage.products) + 1}"
    current_storage.products[new_id] = {
        "name": name,
        "description": description,
        "price": float(price),
        "inventory_count": int(inventory_count)
    }
    print(f"[Tool Executed] create_product: ID {new_id}, Name: {name} (in {type(current_storage).__name__})")
    return {"status": "success", "product_id": new_id, "product": current_storage.products[new_id]}

def update_product(current_storage: Storage, product_id: str, name: Optional[str] = None, description: Optional[str] = None,
                   price: Optional[float] = None, inventory_count: Optional[int] = None) -> Dict[str, Any]:
    """Updates a product with the provided parameters."""
    if product_id not in current_storage.products:
        print(f"[Tool Executed] update_product: ID {product_id} not found (in {type(current_storage).__name__}).")
        return {"status": "error", "message": f"Product {product_id} not found"}

    product = current_storage.products[product_id]
    updated_fields = []

    if name is not None:
        product["name"] = name
        updated_fields.append("name")
    if description is not None:
        product["description"] = description
        updated_fields.append("description")
    if price is not None:
        product["price"] = float(price)
        updated_fields.append("price")
    if inventory_count is not None:
        product["inventory_count"] = int(inventory_count)
        updated_fields.append("inventory_count")

    if not updated_fields:
        print(f"[Tool Executed] update_product: ID {product_id}, no fields updated (in {type(current_storage).__name__}).")
        return {"status": "warning", "message": "No fields were updated.", "product": product}

    print(f"[Tool Executed] update_product: ID {product_id}, Updated fields: {', '.join(updated_fields)} (in {type(current_storage).__name__})")
    return {
        "status": "success",
        "message": f"Product {product_id} updated. Fields: {', '.join(updated_fields)}",
        "product_id": product_id,
        "updated_fields": updated_fields,
        "product": product
    }

def find_product_by_name(current_storage: Storage, product_name: str, min_similarity: int = 70) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
    """Find a product by name using fuzzy string matching."""
    if not product_name: return None, None

    name_id_list = [(p_data["name"], p_id) for p_id, p_data in current_storage.products.items()]
    if not name_id_list: return None, None

    best_match_name_score = process.extractOne(
        product_name,
        [item[0] for item in name_id_list],
        scorer=fuzz.token_sort_ratio
    )

    if best_match_name_score and best_match_name_score[1] >= min_similarity:
        matched_name = best_match_name_score[0]
        for name, pid_val in name_id_list:
            if name == matched_name:
                print(f"[Tool Helper] find_product_by_name: Matched '{product_name}' to '{matched_name}' (ID: {pid_val}) with score {best_match_name_score[1]} (in {type(current_storage).__name__})")
                return pid_val, current_storage.products[pid_val]

    print(f"[Tool Helper] find_product_by_name: No good match for '{product_name}' (min_similarity: {min_similarity}, Best match: {best_match_name_score}) (in {type(current_storage).__name__})")
    return None, None


def get_product_id(current_storage: Storage, product_identifier: str) -> Optional[str]:
    """Get product ID either directly or by fuzzy matching the name."""
    if product_identifier in current_storage.products:
        return product_identifier
    product_id, _ = find_product_by_name(current_storage, product_identifier)
    return product_id

def get_product_info(current_storage: Storage, product_id_or_name: str) -> Dict[str, Any]:
    """Get information about a product by its ID or name."""
    if product_id_or_name in current_storage.products:
        product = current_storage.products[product_id_or_name]
        print(f"[Tool Executed] get_product_info: Found by ID '{product_id_or_name}' (in {type(current_storage).__name__}).")
        return {"status": "success", "product_id": product_id_or_name, "product": product}

    # Use the modified find_product_by_name that takes current_storage
    product_id_found, product_data = find_product_by_name(current_storage, product_id_or_name)
    if product_id_found and product_data:
        print(f"[Tool Executed] get_product_info: Found by name (fuzzy) '{product_id_or_name}' as ID '{product_id_found}' (in {type(current_storage).__name__}).")
        return {"status": "success", "message": f"Found product matching '{product_id_or_name}'", "product_id": product_id_found, "product": product_data}

    print(f"[Tool Executed] get_product_info: No product found for '{product_id_or_name}' (in {type(current_storage).__name__}).")
    return {"status": "error", "message": f"No product found matching '{product_id_or_name}'"}


def list_all_products(current_storage: Storage) -> Dict[str, Any]:
    """List all available products in the inventory."""
    print(f"[Tool Executed] list_all_products: Found {len(current_storage.products)} products (in {type(current_storage).__name__}).")
    return {"status": "success", "count": len(current_storage.products), "products": dict(current_storage.products)}

# Order functions
def create_order(current_storage: Storage, product_id_or_name: str, quantity: int, status: str) -> Dict[str, Any]:
    """Creates an order using the product's stored price."""
    actual_product_id = get_product_id(current_storage, product_id_or_name) # Pass current_storage

    if not actual_product_id:
        print(f"[Tool Executed] create_order: Product '{product_id_or_name}' not found (in {type(current_storage).__name__}).")
        return {"status": "error", "message": f"Product '{product_id_or_name}' not found."}

    product = current_storage.products[actual_product_id]
    price = product["price"]

    if product["inventory_count"] == 0:
        print(f"[Tool Executed] create_order: Product ID {actual_product_id} is out of stock (in {type(current_storage).__name__}).")
        return {"status": "error", "message": f"{product['name']} is out of stock."}
    if quantity <= 0:
        print(f"[Tool Executed] create_order: Quantity must be positive. Requested: {quantity} (in {type(current_storage).__name__})")
        return {"status": "error", "message": "Quantity must be a positive number."}
    if quantity > product["inventory_count"]:
        print(f"[Tool Executed] create_order: Insufficient inventory for {product['name']} (ID: {actual_product_id}). Available: {product['inventory_count']}, Requested: {quantity} (in {type(current_storage).__name__})")
        return {
            "status": "partial_availability",
            "message": f"Insufficient inventory. Only {product['inventory_count']} units of {product['name']} are available.",
            "available_quantity": product["inventory_count"],
            "requested_quantity": quantity,
            "product_name": product['name']
        }

    if status == "Shipped":
        product["inventory_count"] -= quantity
        print(f"[Tool Executed] create_order: Inventory for {product['name']} (ID: {actual_product_id}) reduced by {quantity} due to 'Shipped' status on creation (in {type(current_storage).__name__}).")

    new_id = f"O{len(current_storage.orders) + 1}"
    current_storage.orders[new_id] = {
        "id": new_id,
        "product_id": actual_product_id,
        "product_name": product["name"],
        "quantity": quantity,
        "price": price,
        "status": status
    }
    print(f"[Tool Executed] create_order: Order {new_id} created for {quantity} of {product['name']} (ID: {actual_product_id}). Status: {status}. Remaining inv: {product['inventory_count']} (in {type(current_storage).__name__})")
    return {
        "status": "success",
        "order_id": new_id,
        "order_details": current_storage.orders[new_id],
        "remaining_inventory": product["inventory_count"]
    }

def get_order_details(current_storage: Storage, order_id: str) -> Dict[str, Any]:
    """Get details of a specific order."""
    order = current_storage.orders.get(order_id)
    if order:
        print(f"[Tool Executed] get_order_details: Order {order_id} found (in {type(current_storage).__name__}).")
        return {"status": "success", "order_id": order_id, "order_details": dict(order)}
    print(f"[Tool Executed] get_order_details: Order {order_id} not found (in {type(current_storage).__name__}).")
    return {"status": "error", "message": "Order not found"}

def update_order_status(current_storage: Storage, order_id: str, new_status: str) -> Dict[str, Any]:
    """Updates the status of an order and adjusts inventory accordingly."""
    if order_id not in current_storage.orders:
        print(f"[Tool Executed] update_order_status: Order {order_id} not found (in {type(current_storage).__name__}).")
        return {"status": "error", "message": "Order not found"}

    order = current_storage.orders[order_id]
    old_status = order["status"]
    product_id = order["product_id"]
    quantity = order["quantity"]

    if old_status == new_status:
        print(f"[Tool Executed] update_order_status: Order {order_id} status unchanged ({old_status}) (in {type(current_storage).__name__}).")
        return {"status": "unchanged", "message": f"Order {order_id} status is already {old_status}", "order_details": dict(order)}

    inventory_adjusted = False
    current_inventory_val = "unknown" # Default if product not found (should not happen if order is valid)

    if product_id in current_storage.products:
        product = current_storage.products[product_id]
        current_inventory_val = product["inventory_count"]

        if new_status == "Shipped" and old_status not in ["Shipped", "Delivered"]:
            if current_inventory_val < quantity:
                print(f"[Tool Executed] update_order_status: Insufficient inventory to ship order {order_id}. Have {current_inventory_val}, need {quantity} (in {type(current_storage).__name__}).")
                return {"status": "error", "message": f"Insufficient inventory to ship. Available: {current_inventory_val}, Required: {quantity}"}
            product["inventory_count"] -= quantity
            inventory_adjusted = True
            current_inventory_val = product["inventory_count"]
            print(f"[Tool Executed] update_order_status: Order {order_id} Shipped. Inv for {product_id} reduced by {quantity} to {current_inventory_val} (in {type(current_storage).__name__}).")
        elif new_status in ["Returned", "Canceled"] and old_status in ["Shipped", "Delivered"]:
            product["inventory_count"] += quantity
            inventory_adjusted = True
            current_inventory_val = product["inventory_count"]
            print(f"[Tool Executed] update_order_status: Order {order_id} {new_status}. Inv for {product_id} increased by {quantity} to {current_inventory_val} (in {type(current_storage).__name__}).")
    else:
        print(f"[Tool Executed] update_order_status: Product {product_id} for order {order_id} not found for inventory adjustment (in {type(current_storage).__name__}).")

    order["status"] = new_status
    print(f"[Tool Executed] update_order_status: Order {order_id} status updated from {old_status} to {new_status} (in {type(current_storage).__name__}).")
    return {
        "status": "success",
        "message": f"Order {order_id} status updated from {old_status} to {new_status}.",
        "order_id": order_id,
        "product_id": product_id,
        "old_status": old_status,
        "new_status": new_status,
        "inventory_adjusted": inventory_adjusted,
        "current_inventory": current_inventory_val,
        "order_details": dict(order)
    }

print("Tool functions defined.")

Tool functions defined.


In [16]:
class ConversationContext:
    def __init__(self):
        self.messages: List[Dict[str, Any]] = []
        self.context_data: Dict[str, Any] = {
            "customers": {}, "products": {}, "orders": {}, "last_action": None
        }
        self.session_start_time = datetime.now()

    def add_user_message(self, message: str) -> None:
        self.messages.append({"role": "user", "content": message})

    def add_assistant_message(self, message_content: Union[str, List[Dict[str, Any]]]) -> None:
        self.messages.append({"role": "assistant", "content": message_content})

    def update_entity_in_context(self, entity_type: str, entity_id: str, data: Any) -> None:
        if entity_type in self.context_data:
            self.context_data[entity_type][entity_id] = data # Store the actual data
            print(f"[Context Updated] Entity: {entity_type}, ID: {entity_id}, Data (type): {type(data)}")

    def set_last_action(self, action_type: str, action_details: Any) -> None:
        self.context_data["last_action"] = {
            "type": action_type,
            "details": action_details,
            "timestamp": datetime.now().isoformat()
        }
        print(f"[Context Updated] Last Action: {action_type}, Details: {json.dumps(action_details, default=str)}")


    def get_full_conversation_for_api(self) -> List[Dict[str, Any]]:
        return self.messages.copy()

    def get_context_summary(self) -> str:
        summary_parts = []
        if self.context_data["customers"]:
            customers_str = ", ".join([f"ID: {cid} (Name: {c.get('name', 'N/A') if isinstance(c, dict) else 'N/A'})" for cid, c in self.context_data["customers"].items()])
            summary_parts.append(f"Recent customers: {customers_str}")
        if self.context_data["products"]:
            products_str = ", ".join([f"ID: {pid} (Name: {p.get('name', 'N/A') if isinstance(p, dict) else 'N/A'})" for pid, p in self.context_data["products"].items()])
            summary_parts.append(f"Recent products: {products_str}")
        if self.context_data["orders"]:
            orders_str = ", ".join([f"ID: {oid} (Product: {o.get('product_name', 'N/A') if isinstance(o, dict) else 'N/A'}, Status: {o.get('status', 'N/A') if isinstance(o, dict) else 'N/A'})" for oid, o in self.context_data["orders"].items()])
            summary_parts.append(f"Recent orders: {orders_str}")

        last_action = self.context_data["last_action"]
        if last_action:
            action_type = last_action['type']
            action_details_summary = "..." # Default summary
            if isinstance(last_action.get('details'), dict):
                action_input = last_action['details'].get('input', {})
                action_result_status = last_action['details'].get('result', {}).get('status')
                action_details_summary = f"Input: {action_input}, Result Status: {action_result_status}"
                if action_result_status == "success":
                    if "order_id" in last_action['details'].get('result', {}):
                         action_details_summary += f", OrderID: {last_action['details']['result']['order_id']}"
                    elif "product_id" in last_action['details'].get('result', {}):
                         action_details_summary += f", ProductID: {last_action['details']['result']['product_id']}"


            summary_parts.append(f"Last action: {action_type} at {last_action['timestamp']} ({action_details_summary})")

        if not summary_parts: return "No specific context items set yet."
        return "\n".join(summary_parts)

    def clear(self) -> None:
        self.messages = []
        self.context_data = {"customers": {}, "products": {}, "orders": {}, "last_action": None}
        self.session_start_time = datetime.now()
        print("[Context Cleared]")

print("ConversationContext class defined.")


ConversationContext class defined.


In [17]:
# --- MODIFIED AgentEvaluator Class ---
class AgentEvaluator:
    def __init__(self):
        if 'tools_schemas_list' not in globals() or not tools_schemas_list:
            raise NameError("ERROR: Global 'tools_schemas_list' not found or empty.")
        self.conversation_context = ConversationContext()
        self.evaluation_results = []
        self.anthropic_storage = Storage()
        self.anthropic_tools_schemas = tools_schemas_list
        self.available_tool_functions = {
            "create_customer": create_customer, "get_customer_info": get_customer_info,
            "create_product": create_product, "update_product": update_product,
            "get_product_info": get_product_info, "list_all_products": list_all_products,
            "create_order": create_order, "get_order_details": get_order_details,
            "update_order_status": update_order_status,
        }
        self.active_learnings_cache: List[Dict] = self._load_initial_learnings_from_drive()
        self.learnings_updated_this_session_flag: bool = False
        print(f"AgentEvaluator initialized. Loaded {len(self.active_learnings_cache)} initial learnings.")

    def _classify_query_type(self, user_message: str) -> str:
        """
        Classifies the user query into predefined types.
        Simple keyword-based classification for now.
        """
        msg_lower = user_message.lower()
        if "learnings" in msg_lower or "what have you learned" in msg_lower or "summarize your learning" in msg_lower:
            return "METACOGNITIVE_LEARNINGS_SUMMARY"
        # Add more classifications here if needed, e.g., for explaining reasoning
        # elif "why did you" in msg_lower:
        #     return "METACOGNITIVE_EXPLAIN_REASONING"
        return "OPERATIONAL" # Default

    # RAG and Learning Persistence methods (_mount_drive_if_needed, _initialize_learnings_path, etc.)
    # ... (Copied from previous version, assumed correct and complete) ...
    def _mount_drive_if_needed(self):
        if not os.path.exists(DRIVE_MOUNT_PATH) or not os.listdir(DRIVE_MOUNT_PATH):
            try: drive.mount(DRIVE_MOUNT_PATH, force_remount=True); print(f"Drive mounted.")
            except Exception as e: print(f"Error mounting Drive: {e}.")
    def _initialize_learnings_path(self):
        global LEARNINGS_DRIVE_BASE_PATH
        if not LEARNINGS_DRIVE_BASE_PATH: LEARNINGS_DRIVE_BASE_PATH = os.path.join(DRIVE_MOUNT_PATH, DEFAULT_LEARNINGS_DRIVE_SUBPATH)
        if not os.path.exists(LEARNINGS_DRIVE_BASE_PATH):
            try: os.makedirs(LEARNINGS_DRIVE_BASE_PATH); print(f"Created: {LEARNINGS_DRIVE_BASE_PATH}")
            except Exception as e: print(f"Error creating learnings dir: {e}")
    def _get_latest_learnings_filepath_from_drive(self) -> Optional[str]:
        self._mount_drive_if_needed(); self._initialize_learnings_path()
        if not os.path.isdir(LEARNINGS_DRIVE_BASE_PATH): return None
        list_of_files = glob.glob(os.path.join(LEARNINGS_DRIVE_BASE_PATH, 'learnings_*.json'))
        return max(list_of_files, key=os.path.getctime) if list_of_files else None
    def _read_learnings_from_drive_file(self, filepath: str) -> List[Dict]:
        if not filepath or not os.path.exists(filepath): return []
        try:
            with open(filepath, 'r') as f: learnings_list = json.load(f)
            return learnings_list if isinstance(learnings_list, list) else []
        except Exception as e: print(f"Error reading learnings {filepath}: {e}"); return []
    def _load_initial_learnings_from_drive(self) -> List[Dict]:
        latest_filepath = self._get_latest_learnings_filepath_from_drive()
        if latest_filepath: return self._read_learnings_from_drive_file(latest_filepath)
        return []
    def _persist_active_learnings_to_drive(self):
        self._mount_drive_if_needed(); self._initialize_learnings_path()
        if not os.path.isdir(LEARNINGS_DRIVE_BASE_PATH) or not self.active_learnings_cache: return
        ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        new_filepath = os.path.join(LEARNINGS_DRIVE_BASE_PATH, f'learnings_{ts}.json')
        try:
            with open(new_filepath, 'w') as f: json.dump(self.active_learnings_cache, f, indent=4)
            print(f"Persisted {len(self.active_learnings_cache)} learnings to {new_filepath}")
            self.learnings_updated_this_session_flag = False
        except Exception as e: print(f"Error persisting learnings: {e}")
    def check_relevant_learnings(self, query: str, query_type: str, count: int = 5) -> Optional[str]:
        if not self.active_learnings_cache: return None
        # For learning summaries, we might want to show all or more learnings.
        if query_type == "METACOGNITIVE_LEARNINGS_SUMMARY":
            count = len(self.active_learnings_cache) # Show all for summary

        # Basic keyword matching for operational, more direct presentation for summary
        relevant_learning_objects = []
        if query_type == "OPERATIONAL":
            keywords_from_query = self.extract_keywords(query)
            for entry in self.active_learnings_cache:
                text_to_search = entry.get("final_learning_statement", "") + " " + " ".join(entry.get("keywords", []))
                if any(kw.lower() in text_to_search.lower() for kw in keywords_from_query):
                    relevant_learning_objects.append(entry)
        elif query_type == "METACOGNITIVE_LEARNINGS_SUMMARY":
            relevant_learning_objects = self.active_learnings_cache # All of them

        relevant_learning_objects.sort(key=lambda x: x.get('timestamp_created', ''), reverse=True)

        # Only include up to 'count' learnings, especially if not summarizing all.
        learnings_to_format = relevant_learning_objects[:count] if query_type != "METACOGNITIVE_LEARNINGS_SUMMARY" else relevant_learning_objects

        formatted_learnings = [
            f"- Learning (ID: {entry.get('learning_id', 'N/A')[:8]}): {entry.get('final_learning_statement', str(entry))}"
            for entry in learnings_to_format
        ]
        return "\\nRelevant Learnings from Knowledge Base (In-Session Cache):\\n" + "\\n".join(formatted_learnings) if formatted_learnings else None

    def _update_context_from_tool_results(self, tool_name: str, tool_input: Dict, tool_result: Dict): # Same as before
        if not isinstance(tool_result, dict): return
        if tool_result.get("status") == "success":
            if "customer_id" in tool_result and "customer" in tool_result: self.conversation_context.update_entity_in_context("customers", tool_result["customer_id"], tool_result["customer"])
            elif "product_id" in tool_result and "product" in tool_result: self.conversation_context.update_entity_in_context("products", tool_result["product_id"], tool_result["product"])
            elif "order_id" in tool_result and "order_details" in tool_result: self.conversation_context.update_entity_in_context("orders", tool_result["order_id"], tool_result["order_details"])
        self.conversation_context.set_last_action(f"tool_{tool_name}_Anthropic", {"input": tool_input, "result": tool_result})

    def process_tool_call(self, tool_name: str, tool_input: Dict[str, Any]) -> Dict[str, Any]: # Same as before
        if tool_name in self.available_tool_functions:
            try:
                result = self.available_tool_functions[tool_name](self.anthropic_storage, **tool_input)
                return result
            except Exception as e: return {"status": "error", "message": str(e)}
        return {"status": "error", "message": f"Tool {tool_name} not found."}

    def get_anthropic_response(self, current_worker_system_prompt: str, conversation_history: List[Dict[str, Any]], query_type: str) -> str:
        messages_for_api = list(conversation_history)
        # For metacognitive tasks like summarizing learnings, we typically don't want tool use.
        # The prompt already guides this, but we can also pass an empty tool list for such tasks.
        tools_for_this_call = self.anthropic_tools_schemas if query_type == "OPERATIONAL" else []

        try:
            for i in range(5 if query_type == "OPERATIONAL" else 1): # Allow tool iterations only for operational
                response = anthropic_client.messages.create(
                    model=ANTHROPIC_MODEL_NAME, max_tokens=4000, temperature=0.0,
                    system=current_worker_system_prompt,
                    tools=tools_for_this_call, # Use conditional tools
                    messages=messages_for_api
                )
                assistant_response_blocks = response.content
                messages_for_api.append({"role": "assistant", "content": assistant_response_blocks})
                text_blocks = [block.text for block in assistant_response_blocks if block.type == "text"]
                final_text_response = " ".join(text_blocks).strip()

                if final_text_response.startswith("CLARIFICATION_REQUESTED:"): return final_text_response

                tool_calls_to_process = [block for block in assistant_response_blocks if block.type == "tool_use"]
                if not tool_calls_to_process or query_type != "OPERATIONAL": # No tools or not an operational task
                    return final_text_response if final_text_response else "No text content."

                tool_results_content = []
                for tool_use_block in tool_calls_to_process:
                    tool_name, tool_input, tool_use_id = tool_use_block.name, tool_use_block.input, tool_use_block.id
                    tool_result_data = self.process_tool_call(tool_name, tool_input)
                    self._update_context_from_tool_results(tool_name, tool_input, tool_result_data)
                    tool_results_content.append({"type": "tool_result", "tool_use_id": tool_use_id, "content": json.dumps(tool_result_data)})
                messages_for_api.append({"role": "user", "content": tool_results_content})
            return "Max tool iterations reached."
        except Exception as e: return f"Error in get_anthropic_response: {str(e)}"

    def _handle_worker_clarification(self, agent_question: str, current_prompt: str,
                                     agent_specific_history: List[Dict], query_type: str, max_attempts: int = 2) -> Tuple[str, List[Dict]]:
        clarification_turn_details = []
        current_history = list(agent_specific_history)
        for attempt in range(max_attempts):
            print(f"--- Anthropic requests clarification: {agent_question} ---")
            user_clarification = input(f"Your response to Anthropic: ").strip() or "(User provided no input)"
            clarification_turn_details.append({"agent_question": agent_question, "user_answer": user_clarification})
            current_history.append({"role": "user", "content": user_clarification})
            agent_response_text = self.get_anthropic_response(current_prompt, current_history, query_type) # Pass query_type
            if agent_response_text.startswith("CLARIFICATION_REQUESTED:"):
                agent_question = agent_response_text.split("CLARIFICATION_REQUESTED:", 1)[-1].strip()
            else: return agent_response_text, clarification_turn_details
        return agent_response_text, clarification_turn_details # Max attempts reached

    def _get_llm_response_with_clarification_loop(self, system_prompt: str, base_history: List[Dict], query_type: str) -> Tuple[str, List[Dict]]:
        current_history = list(base_history)
        final_agent_response = self.get_anthropic_response(system_prompt, current_history, query_type) # Pass query_type
        clarifications = []
        if final_agent_response.startswith("CLARIFICATION_REQUESTED:"):
            question = final_agent_response.split("CLARIFICATION_REQUESTED:", 1)[-1].strip()
            final_agent_response, clarifications = self._handle_worker_clarification(question, system_prompt, current_history, query_type) # Pass query_type
        return final_agent_response, clarifications

    def process_user_request(self, user_message: str) -> Dict[str, Any]:
        print(f"\\n{'='*60}\\nUser Message: {user_message}\\n{'='*60}")
        self.conversation_context.add_user_message(user_message)

        query_type = self._classify_query_type(user_message)
        print(f"--- Classified Query Type: {query_type} ---")

        # Select worker prompt based on query type
        if query_type == "METACOGNITIVE_LEARNINGS_SUMMARY":
            current_worker_base_prompt = worker_metacognitive_learnings_system_prompt
        else: # Default to OPERATIONAL
            current_worker_base_prompt = worker_operational_system_prompt

        context_summary = self.conversation_context.get_context_summary()
        learnings_for_prompt = self.check_relevant_learnings(user_message, query_type) # Pass query_type

        # Construct the full prompt for the worker
        # Note: The RAG learnings might be more or less relevant depending on the query type.
        # The worker prompts themselves guide how to use these.
        prompt_for_worker_with_context = (
            f"{current_worker_base_prompt}\\n\\n"
            f"Conversation Context Summary:\\n{context_summary}\\n\\n"
            f"{learnings_for_prompt if learnings_for_prompt else 'No specific learnings from knowledge base provided for this query.'}"
        )

        initial_datastore_state_for_eval = self.anthropic_storage.get_full_datastore_copy()

        anthropic_base_history = self.conversation_context.get_full_conversation_for_api()
        anthropic_final_response, anthropic_clarifications = self._get_llm_response_with_clarification_loop(
            prompt_for_worker_with_context, anthropic_base_history, query_type # Pass query_type
        )
        self.conversation_context.add_assistant_message(f"[Anthropic Final Text ({query_type})]: {anthropic_final_response}")

        if anthropic_clarifications: # Process learnings from clarifications
            for inter in anthropic_clarifications: self.process_and_store_new_learning(f"User clarification: '{inter['user_answer']}' (re: '{inter['agent_question']}')", user_message, context_summary)

        final_datastore_state_for_eval = self.anthropic_storage.get_full_datastore_copy()

        evaluation = self.evaluate_responses(
            user_message, query_type, anthropic_final_response,
            context_summary, learnings_for_prompt or "",
            anthropic_clarifications,
            initial_datastore_state_for_eval,
            final_datastore_state_for_eval
        )
        self.evaluation_results.append(evaluation)

        # Human feedback for general learning
        try:
            human_general_learning = input("General learning from this turn? (or 'skip'): ").strip()
            if human_general_learning.lower() not in ['skip', '']:
                self.process_and_store_new_learning(human_general_learning, user_message, context_summary)
        except EOFError: pass # Non-interactive skip

        if self.learnings_updated_this_session_flag: self._persist_active_learnings_to_drive()

        return {"user_message": user_message, "query_type": query_type, "anthropic_response": anthropic_final_response, "evaluation": evaluation}

    def evaluate_responses(self, user_message: str, query_type: str, anthropic_response: str,
                       context_summary_for_eval: str, learnings_for_eval: str,
                       anthropic_clarifications: Optional[List[Dict]],
                       initial_datastore_state: Dict[str, Any],
                       final_datastore_state: Dict[str, Any]
                       ) -> Dict[str, Any]:
        print(f"\\n--- Starting Evaluation by Gemini (Query Type: {query_type}) ---")

        initial_ds_prompt = f"Data Store State *Before* AI Action:\\n{json.dumps(initial_datastore_state, indent=2, default=str)}"
        final_ds_prompt = f"Data Store State *After* AI Action:\\n{json.dumps(final_datastore_state, indent=2, default=str)}"

        clarification_info_prompt = "No worker AI clarifications."
        if anthropic_clarifications:
            clar_summary = [f"  Q: '{c['agent_question']}' -> User A: '{c['user_answer']}'" for c in anthropic_clarifications]
            clarification_info_prompt = f"Worker AI Clarification Interactions:\\n" + "\\n".join(clar_summary)

        # The evaluator_system_prompt is already set on eval_model_instance.
        # We construct the main content prompt for the evaluator.
        eval_content_prompt = f"""
        User query: {user_message}
        Classified Query Type: {query_type}

        Context provided to assistant:
        {context_summary_for_eval}

        Relevant RAG Learnings provided to assistant:
        {learnings_for_eval if learnings_for_eval else 'None'}

        {initial_ds_prompt}
        {final_ds_prompt}

        {clarification_info_prompt}

        Anthropic Claude final textual response:
        {anthropic_response}

        ---
        INSTRUCTIONS FOR EVALUATOR (You are Gemini):
        Based on your system prompt and the classified query type ({query_type}), please evaluate the AI assistant's response.
        - If OPERATIONAL, focus on tool use accuracy and data store changes (Before vs. After).
        - If METACOGNITIVE_LEARNINGS_SUMMARY, focus on whether the AI accurately summarized the 'Relevant RAG Learnings' it was provided, not its operational history.
        Provide detailed reasoning for scores (Accuracy, Efficiency, Context Awareness, Helpfulness) and an overall score (1-10).
        """
        try:
            gemini_response_obj = eval_model_instance.generate_content(eval_content_prompt)
            evaluation_text = gemini_response_obj.text
            print(f"Gemini Raw Evaluation:\\n{evaluation_text}")

            # Handle evaluator clarification (simplified, assumes it doesn't change query_type focus)
            clarif_details = {"used": False, "needed": "", "provided_input": "", "action_summary": ""}
            # ... (evaluator clarification logic can be added here if needed) ...

            final_evaluation_text = evaluation_text
            anthropic_score = self.extract_score(final_evaluation_text)
            return {"anthropic_score": anthropic_score, "full_evaluation": final_evaluation_text,
                    "clarification_details": clarif_details, "query_type_evaluated": query_type}
        except Exception as e:
            return {"error": str(e), "anthropic_score": 0, "full_evaluation": f"Eval failed: {e}",
                    "clarification_details": {}, "query_type_evaluated": query_type}

    def process_and_store_new_learning(self, human_feedback_text: str, user_query_context: str, turn_context_summary: str): # Mostly same
        print(f"--- Processing New Learning: \"{human_feedback_text}\" ---")
        # ... (rest of the learning synthesis logic using eval_model_instance)
        # For brevity, assuming the existing logic for synthesis is okay.
        # Key is that it uses `eval_model_instance` which has the general evaluator prompt.
        # This part might also benefit from a specialized prompt if it's too complex.
        evaluator_task_prompt_parts = [
            "You are an AI assistant helping to maintain a knowledge base of 'learnings'.",
            f"New Human Feedback: \"{human_feedback_text}\"",
            f"Context of User Query: \"{user_query_context}\"",
            "Existing ACTIVE learnings (sample):" + \
            "".join([f"  - {entry.get('final_learning_statement', '')[:100]}..." for entry in self.active_learnings_cache[-3:]]) ,
            "Tasks: Analyze feedback. Check for CONFLICT/REDUNDANCY. If new/refining, output `FINALIZED_LEARNING: [statement]`. Else, `CONFLICT_DETECTED:` or `REDUNDANT_LEARNING:` or `NOT_ACTIONABLE:`."
        ]
        synthesis_response_obj = eval_model_instance.generate_content("\\n".join(evaluator_task_prompt_parts))
        evaluator_synthesis_text = synthesis_response_obj.text
        # ... (parse synthesis_text and update active_learnings_cache)
        if "FINALIZED_LEARNING:" in evaluator_synthesis_text:
            final_statement = evaluator_synthesis_text.split("FINALIZED_LEARNING:", 1)[-1].strip()
            if final_statement:
                 self.active_learnings_cache.append({
                     "learning_id": str(uuid.uuid4()), "timestamp_created": datetime.now().isoformat(),
                     "original_human_input": human_feedback_text, "final_learning_statement": final_statement,
                     "keywords": self.extract_keywords(final_statement + " " + human_feedback_text), "status": "active"
                 })
                 self.learnings_updated_this_session_flag = True
                 print(f"Stored new learning. Cache size: {len(self.active_learnings_cache)}")


    def extract_keywords(self, text: str) -> List[str]: # Same as before
        if not text: return ["general"]
        words = re.findall(r'\\b\\w{4,}\\b', text.lower())
        stop_words = {"the", "and", "is", "in", "to", "a", "of", "for", "with", "on", "at", "what", "how", "show", "tell", "please", "user", "query", "this", "that", "context", "claude", "anthropic", "before", "after", "state", "action", "truth", "ground", "learnings"}
        extracted = list(set(word for word in words if word not in stop_words and not word.isdigit()))
        return extracted if extracted else ["generic"]

    def extract_score(self, evaluation_text: str) -> int: # Same as before
        patterns = [rf"Overall Score.*?[:\\s]*(\\d+)/10", rf"Overall Score.*?[:\\s]*(\\d+)"]
        for p_str in patterns:
            match = re.search(p_str, evaluation_text, re.IGNORECASE | re.DOTALL)
            if match and match.group(1):
                try: return int(match.group(1))
                except ValueError: continue
        return 0 # Default if not found

    def process_human_feedback_actions(self, feedback: str, target_storage: Optional[Storage]): # Same as before
        # ... (implementation)
        return "No specific data action taken from evaluator feedback."

print("AgentEvaluator class MODIFIED with query classification and gated prompt logic.")


AgentEvaluator class MODIFIED with query classification and gated prompt logic.


In [18]:
def main():
    print("\\nStarting Main Execution (Single Agent - Anthropic) with MODIFIED Evaluator...\\n")
    try:
        agent = AgentEvaluator()
    except Exception as e:
        print(f"Failed to initialize AgentEvaluator: {e}")
        import traceback; traceback.print_exc()
        return

    results_log = []
    while True:
        try:
            user_query = input("\\nEnter your query (or 'quit', 'exit', 'stop', 'q' to end): ")
            if user_query.lower() in ['quit', 'exit', 'stop', 'q']:
                print("Exiting the system. Goodbye!")
                if agent.learnings_updated_this_session_flag:
                    print("Persisting final session learnings to Drive...")
                    agent._persist_active_learnings_to_drive()
                break
            if not user_query.strip(): print("Empty query, please enter something."); continue

            result = agent.process_user_request(user_query)
            results_log.append(result)
        except SystemExit as se:
            print(f"System exit requested: {se}")
            if agent.learnings_updated_this_session_flag:
                print("Persisting session learnings to Drive before exiting...")
                agent._persist_active_learnings_to_drive()
            break
        except Exception as e:
            print(f"CRITICAL ERROR processing query '{user_query}': {e}")
            import traceback; traceback.print_exc()
            results_log.append({"user_message": user_query, "anthropic_response": "ERROR",
                                "evaluation": {"anthropic_score": 0, "full_evaluation": f"Critical error: {e}",
                                               "clarification_details": {"used": False, "action_summary": ""}}})
    print("\\n\\n===== EVALUATION SUMMARY =====")
    total_anthropic_score, num_q = 0, 0
    for i, res in enumerate(results_log):
        if not res: print(f"\\nQuery {i+1}: Skipped (empty result)."); continue
        num_q +=1
        print(f"\\nQuery {i+1}: {res.get('user_message', 'N/A')}")
        print(f"  Anthropic Resp: {str(res.get('anthropic_response', 'N/A'))[:250]}...")
        eval_data = res.get('evaluation', {})
        anth_s = eval_data.get('anthropic_score', 0)
        total_anthropic_score += anth_s
        print(f"  Score - Anthropic: {anth_s}")
        # Log initial and final states if available in eval_data for review
        if "initial_datastore_state_provided_to_eval" in eval_data:
            print(f"    Initial DS Orders: {len(eval_data['initial_datastore_state_provided_to_eval'].get('orders', {}))}, Products Inv P3: {eval_data['initial_datastore_state_provided_to_eval'].get('products', {}).get('P3',{}).get('inventory_count', 'N/A')}")
        if "final_datastore_state_provided_to_eval" in eval_data:
            print(f"    Final DS Orders: {len(eval_data['final_datastore_state_provided_to_eval'].get('orders', {}))}, Products Inv P3: {eval_data['final_datastore_state_provided_to_eval'].get('products', {}).get('P3',{}).get('inventory_count', 'N/A')}")

        clarif_details = eval_data.get('clarification_details',{})
        if clarif_details.get('used'):
            print(f"    Evaluator Clarification: Needed='{clarif_details.get('needed', 'N/A')}', Provided='{clarif_details.get('provided_input', 'N/A')}'")
            if clarif_details.get('action_summary'): print(f"    Action from Evaluator Clarification: {clarif_details['action_summary']}")
    print(f"\\n----- Overall Performance -----")
    if num_q > 0: print(f"Avg Anthropic Score: {total_anthropic_score/num_q:.2f}")
    else: print("No queries processed.")
    print(f"Total Anthropic Score: {total_anthropic_score}")
    print(f"\\nLearnings are stored in: {LEARNINGS_DRIVE_BASE_PATH}")
    latest_learnings_file = agent._get_latest_learnings_filepath_from_drive()
    if latest_learnings_file: print(f"Most recent learnings file: {os.path.basename(latest_learnings_file)}")
    else: print("No learnings files found.")
    print("\\nExecution Finished.")

# To run in a new cell:
# main()

In [19]:
""" Sample queries:
* Show me all the products available
* I'd like to order 25 Perplexinators, please
* Show me the status of my order
* (If the order is not in Shipped state, then) Please ship my order now
* How many Perplexinators are now left in stock?
* Add a new customer: Bill Leece, bill.leece@mail.com, +1.222.333.4444
* Add new new product: Gizmo X, description: A fancy gizmo, price: 29.99, inventory: 50
* Update Gizzmo's price to 99.99 #Note the misspelling of 'Gizmo'
* Who won the 2020 US presidential election?
* I need to update our insurance policy, so I need to know the total value of all the products in our inventory. Please tell me this amount.
* Summarize your learnings from our recent interactions.
"""

main()

\nStarting Main Execution (Single Agent - Anthropic) with MODIFIED Evaluator...\n
AgentEvaluator initialized. Loaded 0 initial learnings.
\nEnter your query (or 'quit', 'exit', 'stop', 'q' to end): Show me all the products available
--- Classified Query Type: OPERATIONAL ---
[Tool Executed] list_all_products: Found 3 products (in Storage).
[Context Updated] Last Action: tool_list_all_products_Anthropic, Details: {"input": {}, "result": {"status": "success", "count": 3, "products": {"P1": {"name": "Widget A", "description": "A simple widget. Very compact.", "price": 19.99, "inventory_count": 999}, "P2": {"name": "Gadget B", "description": "A powerful gadget. It spins.", "price": 49.99, "inventory_count": 200}, "P3": {"name": "Perplexinator", "description": "A perplexing perfunctator", "price": 79.99, "inventory_count": 1483}}}}
\n--- Starting Evaluation by Gemini (Query Type: OPERATIONAL) ---
Gemini Raw Evaluation:\nEVALUATION:

1.  **Accuracy**:
    *   **Score**: 10/10
    *   **Reaso