# NPC Cognitive Pipeline Testing

This notebook provides full observability into the NPC cognitive pipeline using the new LangGraph architecture with proper Godot action schemas.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import json
import time
import asyncio
from typing import Any
from dataclasses import dataclass
import logging

# Import the new cognitive architecture
from mind.cognitive_architecture.pipeline import CognitivePipeline
from mind.cognitive_architecture.state import PipelineState
from mind.cognitive_architecture.models import (
    Observation,
    StatusObservation,
    NeedsObservation,
    VisionObservation,
    EntityData,
    AvailableAction, 
    Action, 
    ActionType,
    Memory
)
from mind.cognitive_architecture.nodes.cognitive_update.models import WorkingMemory
from mind.cognitive_architecture.memory.vector_db_memory import VectorDBMemory

# Import LangChain LLM wrapper
from mind.apis.langchain_llm import get_llm, LangChainModel

# Set up logging to see all intermediate steps
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## Token Counter

Simple token counting utility to measure usage at each step.

In [None]:
import tiktoken

@dataclass
class TokenUsage:
    """Track token usage for a single LLM call"""
    step: str
    input_tokens: int
    output_tokens: int
    total_tokens: int
    duration_ms: int
    
class TokenTracker:
    """Track token usage across the pipeline"""
    def __init__(self):
        self.encoder = tiktoken.encoding_for_model("gpt-4")  # Close enough for Claude
        self.usage_history: list[TokenUsage] = []
    
    def count_tokens(self, text: str) -> int:
        """Count tokens in a string"""
        return len(self.encoder.encode(text))
    
    def track_call(self, step: str, input_text: str, output_text: str, duration_ms: int):
        """Track a single LLM call"""
        usage = TokenUsage(
            step=step,
            input_tokens=self.count_tokens(input_text),
            output_tokens=self.count_tokens(output_text),
            total_tokens=self.count_tokens(input_text) + self.count_tokens(output_text),
            duration_ms=duration_ms
        )
        self.usage_history.append(usage)
        return usage
    
    def summary(self) -> dict[str, Any]:
        """Get summary of all token usage"""
        total_input = sum(u.input_tokens for u in self.usage_history)
        total_output = sum(u.output_tokens for u in self.usage_history)
        total_time = sum(u.duration_ms for u in self.usage_history)
        
        return {
            "total_input_tokens": total_input,
            "total_output_tokens": total_output,
            "total_tokens": total_input + total_output,
            "total_duration_ms": total_time,
            "steps": [{
                "step": u.step,
                "tokens": u.total_tokens,
                "duration_ms": u.duration_ms
            } for u in self.usage_history]
        }

## Initialize Pipeline Components

In [None]:
# Initialize LLM via OpenRouter
# Can easily switch models using LangChainModel constants
llm = get_llm(LangChainModel.GEMINI_FLASH_LITE)

# Initialize memory store
memory_store = VectorDBMemory(collection_name="test_memories")

# Create the cognitive pipeline
pipeline = CognitivePipeline(llm=llm, memory_store=memory_store)

print("Pipeline initialized with nodes:")
print("  1. Memory Query")
print("  2. Memory Retrieval") 
print("  3. Cognitive Update")
print("  4. Action Selection")

## Test Scenario Setup

Create a realistic test scenario using proper Godot action schemas.

In [None]:
# Seed the memory store with initial memories
# Note: Using default importance (1.0) for seeded memories since they're generic background knowledge
initial_memories = [
    "Yesterday I shaped the blade and got it to the right length",
    "The customer wants a ceremonial sword with intricate engravings",
    "My apprentice has been learning quickly and can now help with basic tasks",
    "The forge needs more coal soon, running low on supplies",
    "Last week I completed a set of horseshoes for the stable",
    "The village festival is coming up in three days",
    "I promised my wife I would come home early today",
    "The apprentice's name is Tom and he's been working with me for 3 months",
    "I have a bad back from years of smithing work",
    "The local inn keeper is my best customer"
]

# Clear and repopulate memory store
memory_store.clear()
for memory_content in initial_memories:
    memory = memory_store.add_memory(memory_content)  # Uses default importance
    print(f"Added memory: {memory_content[:50]}...")

print(f"\nTotal memories in store: {memory_store.collection.count()}")

In [None]:
# Create test observation with structured models
observation = Observation(
    entity_id="blacksmith_001",
    current_simulation_time=100,  # Game time
    status=StatusObservation(
        position=(5, 10),
        movement_locked=False
    ),
    needs=NeedsObservation(
        needs={
            "hunger": 75.0,
            "energy": 60.0,
            "fun": 40.0,
            "hygiene": 80.0
        }
    ),
    vision=VisionObservation(
        visible_entities=[
            EntityData(
                entity_id="tom_001",
                display_name="Tom",
                position=(6, 10),
                interactions={
                    "conversation": {
                        "name": "conversation",
                        "description": "Talk with Tom",
                        "needs_filled": ["fun"],
                        "needs_drained": ["energy"]
                    }
                }
            ),
            EntityData(
                entity_id="sword_blade_001",
                display_name="Sword Blade",
                position=(5, 11),
                interactions={
                    "craft": {
                        "name": "craft",
                        "description": "Continue working on the sword blade",
                        "needs_filled": ["fun"],
                        "needs_drained": ["energy"]
                    }
                }
            ),
            EntityData(
                entity_id="forge_001",
                display_name="Forge",
                position=(4, 10),
                interactions={
                    "restart_fire": {
                        "name": "restart_fire",
                        "description": "Restart the forge fire",
                        "needs_filled": [],
                        "needs_drained": ["energy"]
                    }
                }
            )
        ]
    )
)

# Available actions
available_actions = [
    AvailableAction(
        name="MOVE_TO",
        description="Move to a specific cell",
        parameters={"destination": "Target position to move to"}
    ),
    AvailableAction(
        name="INTERACT_WITH",
        description="Interact with an item or NPC",
        parameters={
            "entity_id": "Target entity ID",
            "interaction_name": "Name of interaction to request"
        }
    ),
    AvailableAction(
        name="WANDER",
        description="Move to a random location",
        parameters={"range": "Maximum wander distance from current position"}
    ),
    AvailableAction(
        name="WAIT",
        description="Wait for a specified duration",
        parameters={"duration": "How long to wait in seconds"}
    ),
    AvailableAction(
        name="CONTINUE",
        description="Continue current activity",
        parameters={}
    )
]

# Personality traits
personality_traits = ["hardworking", "perfectionist", "patient teacher", "caring mentor"]

# Initial working memory
initial_working_memory = WorkingMemory(
    situation_assessment="I am a blacksmith in the village. I've been working on a sword commission for the past two days.",
    active_goals=["Finish the sword blade today", "Start on the handle tomorrow"],
    emotional_state="Well-rested despite back pain"
)

print("Test scenario created:")
print(f"  Entity: {observation.entity_id}")
print(f"  Position: {observation.status.position if observation.status else 'Unknown'}")
print(f"  Visible entities: {len(observation.vision.visible_entities) if observation.vision else 0}")
print(f"  Available actions: {len(available_actions)}")
print(f"  Personality traits: {', '.join(personality_traits)}")
print(f"\nObservation (formatted for LLM):")
print(str(observation))

## Display Available Actions

Show how the actions format themselves for the LLM.

In [None]:
print("Available Actions (as formatted for LLM):")
print("="*60)
for i, action in enumerate(available_actions, 1):
    print(f"{i}. {str(action)}")
    print()

## Run Cognitive Pipeline

Execute the pipeline with full observability of each step.

In [None]:
async def run_pipeline_with_observation(pipeline, observation, available_actions, personality_traits, working_memory):
    """Run the pipeline and display results"""
    
    state = PipelineState(
        observation=observation,
        available_actions=available_actions,
        personality_traits=personality_traits,
        working_memory=working_memory
    )
    
    print("="*70)
    print("COGNITIVE PIPELINE EXECUTION")
    print("="*70)
    
    # Run the pipeline
    start_time = time.time()
    result = await pipeline.process(state)
    total_time = time.time() - start_time
    
    # Display step results
    print(f"\n1. MEMORY QUERIES ({result.time_ms.get('memory_query', 0)}ms):")
    for query in result.memory_queries:
        print(f"   • {query}")
    
    print(f"\n2. MEMORIES RETRIEVED ({result.time_ms.get('memory_retrieval', 0)}ms):")
    for memory in result.retrieved_memories:
        print(f"   {str(memory)}")
    
    print(f"\n3. COGNITIVE UPDATE ({result.time_ms.get('cognitive_update', 0)}ms):")
    print(json.dumps(result.cognitive_context, indent=2))
    
    print(f"\n   Working Memory:")
    print(f"   {str(result.working_memory)}")
    
    print(f"\n4. ACTION SELECTED ({result.time_ms.get('action_selection', 0)}ms):")
    if result.chosen_action:
        print(f"   {str(result.chosen_action)}")
    
    print(f"\n{'='*70}")
    print(f"Total: {total_time:.2f}s | {sum(result.tokens_used.values())} tokens")
    print(f"{'='*70}\n")
    
    return result

# Run the pipeline
result = await run_pipeline_with_observation(
    pipeline,
    observation,
    available_actions,
    personality_traits,
    initial_working_memory
)

In [None]:
print("Daily Memories Formed:")
print("="*60)

if result.daily_memories:
    for mem in result.daily_memories:
        print(f"[{mem.importance:.1f}/10] {mem.content}")
else:
    print("No significant memories formed (routine action)")

# Demonstrate consolidation if needed
if result.daily_memories:
    from mind.cognitive_architecture.nodes.memory_consolidation import MemoryConsolidationNode
    
    print("\nConsolidating to long-term storage...")
    consolidation_node = MemoryConsolidationNode(memory_store)
    await consolidation_node.process(result)
    print(f"✓ Added {len(result.daily_memories)} memories")

## Memory Formation Analysis

Examine what memories were formed during the decision cycle.

## Format Action for MCP Protocol

Show how the selected action would be sent back to Godot via MCP.

In [None]:
if result.chosen_action:
    # Format as MCP response
    mcp_response = {
        "action": result.chosen_action.action,
        "parameters": result.chosen_action.parameters
    }
    
    print("MCP Response to Godot:")
    print("="*40)
    print(json.dumps(mcp_response, indent=2))
    
    print("\nThis action would cause the NPC to:")
    if result.chosen_action.action == "interact_with":
        entity = result.chosen_action.parameters.get("entity_id", "unknown")
        interaction = result.chosen_action.parameters.get("interaction_name", "unknown")
        print(f"  → Request a '{interaction}' interaction with entity '{entity}'")
        print(f"  → Transition to NpcBiddingState in the controller")
    elif result.chosen_action.action == "move_to":
        dest = result.chosen_action.parameters.get("destination", "unknown")
        print(f"  → Navigate to grid position {dest}")
        print(f"  → Transition to MovingState in the controller")
    elif result.chosen_action.action == "wait":
        duration = result.chosen_action.parameters.get("duration", 1.0)
        print(f"  → Pause for {duration} seconds")
        print(f"  → Transition to WaitingState in the controller")
    elif result.chosen_action.action == "wander":
        print(f"  → Randomly explore the environment")
        print(f"  → Transition to WanderingState in the controller")
    elif result.chosen_action.action == "continue":
        print(f"  → Continue current behavior unchanged")
        print(f"  → No state transition")
else:
    print("No action was selected by the pipeline")

## Test Different Scenarios

Try the pipeline with different observations to see how it responds.

In [None]:
# Urgent scenario - low on supplies
urgent_observation = Observation(
    entity_id="blacksmith_001",
    current_simulation_time=150,
    status=StatusObservation(
        position=(5, 10),
        movement_locked=False
    ),
    needs=NeedsObservation(
        needs={
            "hunger": 70.0,
            "energy": 55.0,
            "fun": 30.0,
            "hygiene": 75.0
        }
    ),
    vision=VisionObservation(
        visible_entities=[
            EntityData(
                entity_id="tom_001",
                display_name="Tom",
                position=(6, 10),
                interactions={
                    "conversation": {
                        "name": "conversation",
                        "description": "Talk with Tom about getting more coal urgently",
                        "needs_filled": ["fun"],
                        "needs_drained": ["energy"]
                    }
                }
            )
        ]
    )
)

urgent_available_actions = [
    AvailableAction(
        name="INTERACT_WITH",
        description="Interact with an item or NPC",
        parameters={
            "entity_id": "Target entity ID",
            "interaction_name": "Name of interaction to request"
        }
    ),
    AvailableAction(
        name="MOVE_TO",
        description="Move to a specific cell",
        parameters={"destination": "Target position to move to"}
    ),
    AvailableAction(
        name="WAIT",
        description="Wait for a specified duration",
        parameters={"duration": "How long to wait in seconds"}
    ),
    AvailableAction(
        name="CONTINUE",
        description="Continue current activity",
        parameters={}
    )
]

urgent_traits = ["hardworking", "perfectionist", "patient teacher", "resourceful"]

print("Testing URGENT scenario:")
print("="*40)
print("\nObservation context:")
print(str(urgent_observation))

urgent_result = await pipeline.process(PipelineState(
    observation=urgent_observation,
    available_actions=urgent_available_actions,
    personality_traits=urgent_traits,
    working_memory=result.working_memory
))

print(f"\nAction: {str(urgent_result.chosen_action)}")
print(f"\nCognitive Context:")
print(json.dumps(urgent_result.cognitive_context, indent=2))

## Memory Analysis

Examine what memories were retrieved and why.

In [None]:
print("Memory Retrieval Analysis")
print("="*60)

# Import the query model
from mind.cognitive_architecture.memory.vector_db_memory import VectorDBQuery

# Test memory retrieval with different queries
test_queries = [
    "apprentice Tom teaching",
    "sword blade commission",
    "coal forge supplies",
    "wife promise home"
]

for query_text in test_queries:
    query = VectorDBQuery(query=query_text, top_k=2)
    memories = await memory_store.search(query)
    print(f"\nQuery: '{query_text}'")
    print("-"*40)
    for i, memory in enumerate(memories, 1):
        print(f"  {i}. {memory.content}")
        print(f"     Importance: {memory.importance:.1f}")

## Performance Summary

Analyze the performance characteristics of the pipeline.

In [None]:
# Collect timing and token data from multiple runs
timing_results = []

print("Running pipeline 3 times to measure performance...")
print("="*60)

for i in range(3):
    test_state = PipelineState(
        observation=observation,
        available_actions=available_actions,
        personality_traits=personality_traits,
        working_memory=initial_working_memory
    )
    
    start = time.time()
    test_result = await pipeline.process(test_state)
    elapsed = (time.time() - start) * 1000  # Convert to ms
    
    timing_results.append({
        "run": i + 1,
        "total_ms": elapsed,
        "memory_query_ms": test_result.time_ms.get('memory_query', 0),
        "memory_retrieval_ms": test_result.time_ms.get('memory_retrieval', 0),
        "cognitive_update_ms": test_result.time_ms.get('cognitive_update', 0),
        "action_selection_ms": test_result.time_ms.get('action_selection', 0),
        "memory_query_tokens": test_result.tokens_used.get('memory_query', 0),
        "cognitive_update_tokens": test_result.tokens_used.get('cognitive_update', 0),
        "action_selection_tokens": test_result.tokens_used.get('action_selection', 0),
    })
    print(f"Run {i+1}: {elapsed:.0f}ms total, {sum(test_result.tokens_used.values())} tokens")

print("\nPerformance Summary:")
print("="*60)

# Calculate averages
avg_total = sum(r['total_ms'] for r in timing_results) / len(timing_results)
avg_query = sum(r['memory_query_ms'] for r in timing_results) / len(timing_results)
avg_retrieval = sum(r['memory_retrieval_ms'] for r in timing_results) / len(timing_results)
avg_cognitive = sum(r['cognitive_update_ms'] for r in timing_results) / len(timing_results)
avg_action = sum(r['action_selection_ms'] for r in timing_results) / len(timing_results)

avg_query_tokens = sum(r['memory_query_tokens'] for r in timing_results) / len(timing_results)
avg_cognitive_tokens = sum(r['cognitive_update_tokens'] for r in timing_results) / len(timing_results)
avg_action_tokens = sum(r['action_selection_tokens'] for r in timing_results) / len(timing_results)
avg_total_tokens = avg_query_tokens + avg_cognitive_tokens + avg_action_tokens

print(f"Average Total Time: {avg_total:.0f}ms")
print(f"\nBreakdown by Step:")
print(f"  Memory Query:     {avg_query:.0f}ms ({avg_query/avg_total*100:.1f}%) | {avg_query_tokens:.0f} tokens")
print(f"  Memory Retrieval: {avg_retrieval:.0f}ms ({avg_retrieval/avg_total*100:.1f}%) | 0 tokens (vector search)")
print(f"  Cognitive Update: {avg_cognitive:.0f}ms ({avg_cognitive/avg_total*100:.1f}%) | {avg_cognitive_tokens:.0f} tokens")
print(f"  Action Selection: {avg_action:.0f}ms ({avg_action/avg_total*100:.1f}%) | {avg_action_tokens:.0f} tokens")

print(f"\nToken Usage:")
print(f"  Total: {avg_total_tokens:.0f} tokens per decision")
print(f"  Cost estimate (Gemini Flash Lite): ${avg_total_tokens * 0.000000075:.6f} per decision")
print(f"  Cost estimate (Claude Sonnet 4): ${avg_total_tokens * 0.000003:.6f} per decision")

print(f"\nObservations:")
llm_time = avg_query + avg_cognitive + avg_action
print(f"  - LLM calls account for {llm_time/avg_total*100:.1f}% of total time")
print(f"  - Memory retrieval (vector search) is {avg_retrieval/avg_total*100:.1f}% of total time")
if avg_cognitive > avg_action:
    print(f"  - Cognitive update is the slowest LLM step")
else:
    print(f"  - Action selection is the slowest LLM step")
