<a href="https://colab.research.google.com/github/manideepvangari/IAT339-P1-Git-Test/blob/main/llm_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://colab.research.google.com/github/IAT-ComputationalCreativity-Spring2025/Week4-Cognitive-Agents/blob/main/llm_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Building a Memory-Aware Cognitive Agent

## Pre-downloading the Phi-2 Model

Before we begin the lab exercises, we'll download and cache the Phi-2 model locally. This is important because:
- The model is large (approximately 14GB)
- Downloading at the start prevents delays during exercises
- Local caching means we only need to download once

⚠️ **Requirements**:
- At least 16GB of free disk space
- Stable internet connection
- Hugging Face account and token
- Accepted model license

Please make sure you have:
1. Created a Hugging Face account at https://huggingface.co/join
2. Generated your access token at https://huggingface.co/settings/tokens

In [1]:
# Step 1: Install required packages
! pip install torch transformers huggingface_hub accelerate>=0.26.0

In [None]:
# Step 2: Import necessary libraries
import os
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from huggingface_hub import login, snapshot_download
from pathlib import Path

def setup_and_download_model(token=None, model_id='microsoft/phi-2', cache_dir="./model_cache"):
    """
    Set up authentication and pre-download the model files.

    Args:
        token (str): Hugging Face token
        model_id (str): Model identifier on Hugging Face
        cache_dir (str): Directory to store the model files
    """
    if token is None:
        token = os.getenv('HUGGINGFACE_TOKEN')
        if token is None:
            raise ValueError(
                "Please provide your Hugging Face token either as an argument or "
                "set it as HUGGINGFACE_TOKEN environment variable. "
                "You can get your token from https://huggingface.co/settings/tokens"
            )

    # Login to Hugging Face
    login(token=token)
    print("Authenticated with Hugging Face!")

    # Create cache directory if it doesn't exist
    cache_dir = Path(cache_dir)
    cache_dir.mkdir(parents=True, exist_ok=True)

    try:
        print(f"\nDownloading {model_id} to {cache_dir}...")
        print("This may take a while depending on your internet connection...")

        # Download model files
        snapshot_download(
            repo_id=model_id,
            local_dir=cache_dir,
            token=token,
            ignore_patterns=["*.md", "*.h5", "*.ot", "*.pt"],  # Exclude unnecessary files
        )

        print("\nVerifying the download by loading model and tokenizer...")

        # Try loading the model to verify the download
        tokenizer = AutoTokenizer.from_pretrained(cache_dir)
        model = AutoModelForCausalLM.from_pretrained(
            cache_dir,
            torch_dtype=torch.float16,
            device_map="auto",
            low_cpu_mem_usage=True
        )

        print("\nSuccess! Model and tokenizer have been downloaded and verified.")
        print(f"Model files are cached in: {cache_dir}")

        return tokenizer, model

    except Exception as e:
        print(f"\nAn error occurred during download/verification: {str(e)}")
        print("\nPlease ensure:")
        print("1. You have accepted the model's license at:")
        print(f"   https://huggingface.co/{model_id}")
        print("2. You have sufficient disk space")
        print("3. Your internet connection is stable")
        return None, None

def get_model_size(cache_dir):
    """Get the total size of downloaded model files"""
    total_size = 0
    for path in Path(cache_dir).rglob('*'):
        if path.is_file():
            total_size += path.stat().st_size
    return total_size / (1024 * 1024 * 1024)  # Convert to GB

In [None]:
# IMPORTANT: Replace this with your actual token or set as environment variable
YOUR_TOKEN = "paste your token here"
CACHE_DIR = "./model_cache"

print("Starting model download process...")
print("Required disk space: approximately 14GB\n")

tokenizer, model = setup_and_download_model(
    token=YOUR_TOKEN,
    cache_dir=CACHE_DIR
)

if tokenizer is not None and model is not None:
    size_gb = get_model_size(CACHE_DIR)
    print(f"\nTotal model size on disk: {size_gb:.2f}GB")

    print("\nTesting with a simple input...")
    inputs = tokenizer("Hello, how are you?", return_tensors="pt").to(model.device)
    outputs = model.generate(inputs["input_ids"], max_length=50)
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    print(f"Test response: {response}")

## Interactive Lab Session

In this lab, we'll build a simple cognitive agent that can remember and learn from conversations. We'll explore how to implement different types of memory and make our agent more context-aware.

In [None]:
from dataclasses import dataclass
from typing import List, Dict
import time

# Verify installations
print("Setup complete!")

### 1. Understanding Memory Systems

Just like humans, our agent needs different types of memory:
- Short-term memory for recent conversations
- Long-term memory for important information

Let's implement a basic memory structure:

In [None]:
@dataclass
class Memory:
    """Simple memory structure for our cognitive agent"""
    short_term: List[Dict[str, str]]  # Recent interactions
    long_term: List[Dict[str, str]]   # Important information
    max_short_term: int = 5           # Maximum number of recent interactions

# Create a new memory instance
memory = Memory(short_term=[], long_term=[])
print("Memory system initialized!")

### Exercise 1: Adding Memories

Let's try adding some memories to our system:

In [None]:
def add_memory(memory: Memory, text: str, is_important: bool = False):
    """Add a new memory to either short-term or long-term storage

    Args:
        memory (Memory): The memory system to update
        text (str): The text to remember
        is_important (bool): Whether this should go to long-term memory
    """

    memory_entry = {
        'content': text,
        'timestamp': time.time()
    }

    # If the memory is important, add to long-term memory
    if is_important:
        memory.long_term.append(memory_entry)

    # Always add to short-term memory
    memory.short_term.append(memory_entry)

    # If we've exceeded the short-term memory capacity,
    # remove the oldest entry (first item in the list)
    if len(memory.short_term) > memory.max_short_term:
        memory.short_term.pop(0)

# Test the implementation
memory = Memory(short_term=[], long_term=[])

# Test 1: Add a regular memory
add_memory(memory, "Hello, my name is Alice")
print("After adding first memory:")
print(f"Short-term memories: {memory.short_term}\n")

# Test 2: Add an important memory
add_memory(memory, "Remember my phone number is 555-0123", is_important=True)
print("After adding important memory:")
print(f"Short-term memories: {memory.short_term}")
print(f"Long-term memories: {memory.long_term}\n")

# Test 3: Add several memories to test capacity
for i in range(5):
    add_memory(memory, f"Test memory {i}")
print("After adding multiple memories:")
print(f"Number of short-term memories: {len(memory.short_term)}")
print(f"Short-term memory capacity: {memory.max_short_term}")

### 2. Implementing Memory Management

Now let's create a system to decide what's important enough for long-term memory:

In [None]:
def is_important(text: str) -> bool:
    """Determine if a piece of information should be stored long-term"""
    important_keywords = ['remember', 'important', 'key', 'crucial']
    return any(keyword in text.lower() for keyword in important_keywords)

# Test the importance detection
test_texts = [
    "Remember to buy milk",
    "The weather is nice",
    "This is important information"
]

for text in test_texts:
    print(f"\"{text}\" is important: {is_important(text)}")

### Exercise 2: Enhanced Memory Management

Can you improve the importance detection? Consider:
- Additional keywords
- Context analysis
- User instructions
- Sentiment analysis

In [None]:
def enhanced_importance(text: str) -> bool:
    """Your improved importance detection system.

    Consider
    1. Keywords and phrases
    2. Punctuation emphasis
    3. Personal information markers
    4. Time-related information
    5. Request patterns

    Args:
        text (str): Input text to analyze

    Returns:
        bool: True if the text is considered important
    """
    # Convert to lowercase for consistent matching
    text_lower = text.lower()

    # 1. Extended keyword list
    important_keywords = [
        'remember', 'important', 'key', 'crucial', 'essential',
        'don\'t forget', 'note', 'reminder', 'critical', 'urgent',
        'must', 'need to', 'save', 'keep'
    ]

    # 2. Personal information markers
    personal_info_patterns = [
        'my name', 'phone', 'email', 'address', 'birthday',
        'password', 'account', 'contact', 'social security',
        'credit card', 'appointment', 'meeting'
    ]

    # 3. Time-related patterns
    time_patterns = [
        'tomorrow', 'next week', 'schedule', 'deadline',
        'due date', 'appointment', 'meeting at'
    ]

    # Check for keyword matches
    has_keyword = any(keyword in text_lower for keyword in important_keywords)

    # Check for personal information
    has_personal_info = any(pattern in text_lower for pattern in personal_info_patterns)

    # Check for time-related information
    has_time_info = any(pattern in text_lower for pattern in time_patterns)

    # Check for emphasis through punctuation
    has_emphasis = text.count('!') > 0 or text.count('?') > 1

    # Immediate markers of importance
    if has_keyword or has_personal_info:
        return True

    # Combined factors for importance
    if (has_time_info and has_emphasis) or \
       (has_time_info and len(text.split()) > 10):  # Longer time-related messages
        return True

    return False

# Test your implementation
test_cases = [
    # Keywords and explicit importance
    "Remember to send the report",           # Should be True
    "This is important information",         # Should be True
    "Just a casual message",                 # Should be False

    # Personal information
    "My phone number is 555-0123",          # Should be True
    "My email is user@example.com",         # Should be True
    "I like blue color",                    # Should be False

    # Time-related with emphasis
    "Meeting tomorrow at 3 PM!",            # Should be True
    "Deadline for the project next week!",   # Should be True
    "I'll be there tomorrow",               # Should be False

    # Complex cases
    "Don't forget about the important meeting with the client tomorrow morning!",  # Should be True
    "Please make sure to keep my contact information saved for future reference",  # Should be True
    "The weather is quite nice today",       # Should be False
]

# Run tests
print("Testing Enhanced Importance Detection:\n")
for test in test_cases:
    result = enhanced_importance(test)
    print(f"Text: \"{test}\"")
    print(f"Important: {result}\n")

### 3. Building the Complete Cognitive Agent

Now let's put everything together into a complete agent:

In [None]:
from typing import List, Dict
from threading import Lock

class CognitiveAgent:
    def __init__(self, model_path="./model_cache", device_map="auto"):
        """Initialize the cognitive agent with a local cached Phi-2 model

        Args:
            model_path (str): Path to the cached model files
            device_map (str): Device mapping strategy for model loading
        """
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_path)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_path,
                torch_dtype=torch.float16,
                device_map=device_map,
                low_cpu_mem_usage=True
            )
        except Exception as e:
            raise RuntimeError(
                f"Failed to load model from {model_path}. "
                "Please ensure you've run the model download step first."
            ) from e

        self.memory = Memory(
            short_term=[],  # Recent interactions
            long_term=[],   # Important information
            max_short_term=3  # Maximum number of recent interactions to keep
        )
        self.lock = Lock()

        # Simplified system prompt for smaller context window
        self.system_prompt = """You are a helpful cognitive agent. Be clear and concise."""

    def _format_conversation(self) -> str:
        """Format the conversation history with system prompt"""
        conversation = [f"System: {self.system_prompt}"]

        # Add relevant long-term memories
        if self.memory.long_term:
            conversation.append("Important Context:")
            for memory in self.memory.long_term:
                conversation.append(f"- {memory['content']}")

        # Add recent conversation history
        for interaction in self.memory.short_term:
            conversation.append(f"Human: {interaction['input']}")
            if 'response' in interaction:
                conversation.append(f"Assistant: {interaction['response']}")

        return "\n".join(conversation)

    def process_input(self, user_input: str, max_new_tokens: int = 512) -> str:
        """Process user input and generate a response

        Args:
            user_input (str): The user's input text
            max_new_tokens (int): Maximum number of new tokens to generate

        Returns:
            str: The generated response
        """
        # Format the conversation with context
        conversation = self._format_conversation()
        conversation += f"\nHuman: {user_input}\nAssistant:"

        # Tokenize input
        inputs = self.tokenizer(conversation, return_tensors="pt", truncation=True)
        inputs = inputs.to(self.model.device)

        # Generate response
        with torch.no_grad():
            outputs = self.model.generate(
                inputs["input_ids"],
                max_new_tokens=max_new_tokens,  # Use max_new_tokens instead of max_length
                num_return_sequences=1,
                temperature=0.7,
                top_p=0.9,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )

        # Decode and clean up response
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        response = response.split("Assistant:")[-1].strip()
        response = response.split("Human:")[0].strip()

        # Update memory
        self._update_memory(user_input, response)

        return response

    def _update_memory(self, user_input: str, response: str):
        """Update both short-term and long-term memory"""
        with self.lock:
            # Update short-term memory
            self.memory.short_term.append({
                'input': user_input,
                'response': response,
                'timestamp': torch.cuda.Event().record()
            })

            # Maintain short-term memory size
            # Use the max_short_term from the Memory dataclass
            if len(self.memory.short_term) > self.memory.max_short_term:
                self.memory.short_term.pop(0)

            # Update long-term memory if input seems important
            if self._is_important(user_input):
                self.memory.long_term.append({
                    'content': f"User mentioned: {user_input}",
                    'timestamp': torch.cuda.Event().record()
                })

    def _is_important(self, text: str) -> bool:
        """
        Determine if information should be stored in long-term memory
        """
        important_keywords = ['remember', 'important', 'key', 'crucial', 'essential']
        return any(keyword in text.lower() for keyword in important_keywords)

    def get_memory_status(self) -> Dict:
        """Return the current state of the agent's memory"""
        return {
            'short_term_count': len(self.memory.short_term),
            'long_term_count': len(self.memory.long_term),
            'recent_interactions': self.memory.short_term[-3:] if self.memory.short_term else []
        }

### Exercise 3: Testing Your Agent

Create a series of interactions to test your agent's memory capabilities:

In [None]:
import time
from typing import Optional

def test_agent(agent: Optional[CognitiveAgent] = None,
               max_new_tokens: int = 50,
               test_inputs: Optional[list] = None) -> None:
    """
    Test the cognitive agent with timing information and progress updates.

    Args:
        agent: Existing agent instance or None to create new one
        max_new_tokens: Maximum number of tokens to generate per response
        test_inputs: List of test inputs or None to use defaults
    """
    print("Starting agent test...")

    # Create agent if not provided
    if agent is None:
        print("Initializing new agent...")
        start_time = time.time()
        agent = CognitiveAgent(model_path="./model_cache")
        print(f"Agent initialization took {time.time() - start_time:.2f} seconds")

    # Default test cases if none provided
    if test_inputs is None:
        test_inputs = [
            "Hello! How are you?",  # Simple greeting
            "What's 2+2?",  # Short response
            "Remember that my name is Alice.",  # Memory test
            "What's my name?",  # Memory recall test
        ]

    total_time = 0
    print("\nRunning tests...")
    print("Note: First inference will be slower due to CUDA warmup\n")

    for i, user_input in enumerate(test_inputs, 1):
        print(f"\nTest {i}/{len(test_inputs)}")
        print(f"Input: {user_input}")

        # Time the response generation
        start_time = time.time()
        response = agent.process_input(user_input, max_new_tokens=max_new_tokens)
        elapsed = time.time() - start_time

        print(f"Response: {response}")
        print(f"Generation time: {elapsed:.2f} seconds")

        total_time += elapsed

    # Print summary statistics
    print(f"\nTest Summary:")
    print(f"Total time: {total_time:.2f} seconds")
    print(f"Average time per response: {total_time/len(test_inputs):.2f} seconds")

    # Print memory status
    print("\nFinal Memory Status:")
    memory_status = agent.get_memory_status()
    print(f"Short-term memories: {memory_status['short_term_count']}")
    print(f"Long-term memories: {memory_status['long_term_count']}")

    if torch.cuda.is_available():
        print(f"\nGPU Memory Usage:")
        print(f"Allocated: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
        print(f"Reserved: {torch.cuda.memory_reserved() / 1e9:.2f} GB")

In [None]:
agent = CognitiveAgent(model_path="./model_cache")
test_agent(
    agent=agent,
    max_new_tokens=20
)