In [None]:
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
print(r.ping())

True


In [7]:
r.set('foo', 'bar')
# True
r.get('foo')
# bar

'bar'

In [1]:
from dataclasses import dataclass, field
from typing import Any, Optional
from datetime import datetime
import json
from enum import Enum

In [2]:
class WorkingMemorySlot(Enum):
    """
    Cognitive slots in working memory.
    
    Each slot represents a distinct type of information the agent
    might hold in active cognition. The separation allows independent
    access and update‚Äîchanging the current goal doesn't require
    rewriting the entire working memory.
    """
    CURRENT_GOAL = "current_goal"
    ACTIVE_CONTEXT = "active_context"
    SCRATCHPAD = "scratchpad"
    RECENT_OBSERVATIONS = "recent_observations"
    PENDING_ACTIONS = "pending_actions"
    USER_INTENT = "user_intent"
    REASONING_TRACE = "reasoning_trace"

In [3]:
@dataclass
class WorkingMemoryEntry:
    """
    A single entry in working memory.
    
    Each entry has content (what we're remembering), metadata (when, how important),
    and lifecycle information (how long until it expires).
    """
    slot: WorkingMemorySlot
    content: Any  # JSON-serializable content
    timestamp: datetime = field(default_factory=datetime.utcnow)
    ttl_seconds: int = 300  # 5 minute default‚Äîtune based on your application
    importance: float = 0.5  # 0-1 scale; higher = retain longer under pressure

    def to_redis_hash(self) -> dict:
        """
        Convert to Redis hash format.
        
        Redis hashes store field-value pairs. We serialize complex content
        to JSON since Redis values are strings/bytes. This serialization
        is intentionally simple‚Äîif you need more sophisticated serialization,
        consider msgpack or protobuf for better performance.
        """
        return {
            "slot": self.slot.value,
            "content": json.dumps(self.content),
            "timestamp": self.timestamp.isoformat(),
            "importance": str(self.importance)
        }
    
@classmethod
def from_redis_hash(cls, data: dict) -> "WorkingMemoryEntry":
    """
    Reconstruct from Redis hash data.
    
    Note the byte decoding‚ÄîRedis returns bytes by default.
    """
    return cls(
        slot=WorkingMemorySlot(data[b"slot"].decode()),
        content=json.loads(data[b"content"].decode()),
        timestamp=datetime.fromisoformat(data[b"timestamp"].decode()),
        importance=float(data[b"importance"].decode()),
        ttl_seconds=0  # Already in Redis; TTL managed there
    )

In [1]:
pip install redis[hiredis]


Collecting hiredis>=3.2.0 (from redis[hiredis])
  Downloading hiredis-3.3.0-cp311-cp311-win_amd64.whl.metadata (7.7 kB)
Downloading hiredis-3.3.0-cp311-cp311-win_amd64.whl (22 kB)
Installing collected packages: hiredis
Successfully installed hiredis-3.3.0
Note: you may need to restart the kernel to use updated packages.


pip install hiredis

In [25]:
import redis.asyncio as redis
class WorkingMemoryManager:
    """
    Manages ephemeral cognitive state with automatic decay.
    
    This class is the agent's interface to its working memory. It handles
    all the complexity of Redis operations, key management, and TTL handling,
    exposing a simple API for reading and writing cognitive state.
    
    Design Principles:
    1. Fast access (<1ms) for active reasoning‚Äîno operation should block
    2. Automatic cleanup via TTL‚Äîno manual garbage collection needed
    3. Importance-weighted retention during overflow‚Äîsmart degradation
    4. Session isolation for multi-tenant deployments‚Äîusers don't see each other's state
    """
    def __init__(
        self,
        redis_client: redis.Redis,
        session_id: str,
        max_observations: int = 10,
        default_ttl: int = 300
    ):
        self.redis = redis_client
        self.session_id = session_id
        self.max_observations = max_observations
        self.default_ttl = default_ttl
        self._key_prefix = f"wm:{session_id}"
    async def _slot_key(self, slot: WorkingMemorySlot) -> str:
        """
        Generate Redis key for a memory slot.
        
        Key structure: wm:{session_id}:{slot_name}
        
        This hierarchical naming enables:
        - Easy debugging (keys are human-readable)
        - Pattern-based operations (delete all slots for a session)
        - Clear ownership (each session has its own namespace)
        """
        return f"{self._key_prefix}:{slot.value}"
    async def set_slot(
        self,
        slot: WorkingMemorySlot,
        content: Any,
        ttl_seconds: Optional[int] = None,
        importance: float = 0.5
    ) -> None:
        """
        Store content in a working memory slot.
        
        This is the primary write operation. It overwrites any existing
        content in the slot‚Äîthere's no append or merge, by design.
        Working memory slots hold current state, not history.
        
        Args:
            slot: The cognitive slot to update
            content: Any JSON-serializable content
            ttl_seconds: Override default TTL (useful for high-importance content)
            importance: 0-1 score for retention priority during pressure
        """
        entry = WorkingMemoryEntry(
            slot=slot,
            content=content,
            ttl_seconds=ttl_seconds or self.default_ttl,
            importance=importance
        )
        
        key = self._slot_key(slot)
        
        # Use pipeline for atomic operation
        async with self.redis.pipeline(transaction=True) as pipe:
            await pipe.hset(key, mapping=entry.to_redis_hash())
            await pipe.expire(key, entry.ttl_seconds)
            await pipe.execute()
    def get_slot(self, slot: WorkingMemorySlot) -> Optional[WorkingMemoryEntry]:
        """
        Retrieve content from a working memory slot.
        
        Returns None if the slot is empty or expired. This is intentional‚Äî
        we treat expired content the same as never-existed content.
        The caller doesn't need to distinguish between "was never set"
        and "was set but expired."
        """
        key = self._slot_key(slot)
        data = self.redis.hgetall(key)
        
        if not data:
            return None
        
        return WorkingMemoryEntry.from_redis_hash(data)
    async def append_observation(self, observation: dict) -> None:
        """
        Add an observation to the recent observations list.
        
        Observations are different from slot content. Slots hold single values
        that get overwritten. Observations accumulate‚Äîthe agent notices things
        over time, and we want to keep a sliding window of recent notices.
        
        We use a Redis sorted set with timestamps as scores. This gives us:
        - Automatic ordering by time
        - Efficient retrieval of most recent N items
        - Easy trimming of old items
        
        The timestamp-as-score pattern is powerful: it lets us query time
        ranges efficiently, which we'd need if we wanted "observations from
        the last 5 minutes" rather than just "last N observations."
        """
        key = f"{self._key_prefix}:observations"
        timestamp = datetime.utcnow().timestamp()
        
        async with self.redis.pipeline(self, transaction=True) as pipe:
            # Add new observation with timestamp score
            await pipe.zadd(key, {json.dumps(observation): timestamp})
            
            # Trim to max size, keeping most recent
            # ZREMRANGEBYRANK removes elements by rank (0 = lowest score = oldest)
            # We keep from -(max+1) to -1 (the most recent max_observations)
            await pipe.zremrangebyrank(key, 0, -(self.max_observations + 1))
            
            # Set expiry on the whole set
            # Even if we don't add new observations, old ones eventually vanish
            await pipe.expire(key, self.default_ttl * 2)
            
            await pipe.execute()
    async def get_recent_observations(self, limit: int = 5) -> list[dict]:
        """
        Retrieve most recent observations in chronological order.
        
        Returns observations oldest-first within the result set. This ordering
        matches how humans typically review history‚Äîstart from earlier and
        read toward now.
        """
        key = f"{self._key_prefix}:observations"
        
        # ZRANGE with negative indices gets from the end (most recent)
        # -limit to -1 gives us the last `limit` items, oldest-first
        observations = await self.redis.zrange(key, -limit, -1)
        
        return [json.loads(obs.decode()) for obs in observations]
    async def get_full_context(self) -> dict:
        """
        Retrieve complete working memory state.
        
        This is the primary method called during context assembly. It gathers
        all slots, observations, and scratchpad into a single dictionary
        suitable for inclusion in an LLM prompt.
        
        The returned structure mirrors our cognitive model:
        - Named slots with their current values
        - Recent observations as a list
        - Scratchpad as a key-value store
        
        Empty/expired slots are simply absent from the result‚Äîno need for
        the caller to handle None values.
        """
        context = {}
        
        # Gather all standard slots
        for slot in WorkingMemorySlot:
            entry = await self.get_slot(slot)
            if entry:
                context[slot.value] = entry.content
        
        # Add observations
        context["recent_observations"] = await self.get_recent_observations()
        
        # Add scratchpad
        context["scratchpad"] = await self.get_scratchpad()
        
        return context
    
    async def agent_reasoning_step(wm: WorkingMemoryManager, user_input: str, llm: Any) -> str:
        """
        Execute one step of agent reasoning with working memory.
        
        This function demonstrates the typical flow:
        1. Load existing working memory context
        2. Update working memory with new input
        3. Generate response using full context
        4. Record results back to working memory
        """
    
    # 1. Load current working memory context
    # This gives us everything the agent currently "knows" about the session
        context = await wm.get_full_context()
    
    # 2. Update with new input
    # The user's intent is high-importance‚Äîwe want to retain it
        await wm.set_slot(
            WorkingMemorySlot.USER_INTENT,
            {
                "raw_input": user_input,
                "parsed_at": datetime.utcnow().isoformat()
            },
            importance=0.9  # High importance for user intent
        )
    
    # 3. Record as an observation
    # This goes into the observation stream for temporal tracking
        await wm.append_observation({
            "type": "user_input",
            "content": user_input,
            "timestamp": datetime.utcnow().isoformat()
        })
        
    # 4. Generate response with full context
    # The context dict includes all working memory state
        response = await llm.generate(
            prompt=build_prompt(user_input, context)
        )
        
    # 5. Update reasoning trace
    # Track what we've done for future reference
        await wm.update_scratchpad("last_response", response)
        
        reasoning_steps = context.get("scratchpad", {}).get("reasoning_steps", [])
        reasoning_steps.append({
            "input": user_input,
            "output": response,
            "timestamp": datetime.utcnow().isoformat()
        })
        await wm.update_scratchpad("reasoning_steps", reasoning_steps)
        
        return response
        
        

In [32]:
from uuid import uuid4

In [36]:
import redis



# =============================================================================
# CONFIGURATION
# =============================================================================

REDIS_URL = "redis://localhost:6379"
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
# =============================================================================
# PART 1: WORKING MEMORY (Redis)
# =============================================================================
# Working memory is like your mental scratchpad - it holds what you're 
# currently thinking about and expires automatically.

def working_memory_set(session_id: str, key: str, value: dict, ttl: int = 300):
    """
    Store something in working memory.
    
    Args:
        session_id: Unique session identifier
        key: What we're storing (e.g., "current_goal", "user_intent")
        value: The data to store
        ttl: Time-to-live in seconds (default 5 minutes)
    """
    redis_key = f"wm:{session_id}:{key}"
    redis_client.setex(redis_key, ttl, json.dumps(value))


def working_memory_get(session_id: str, key: str) -> dict | None:
    """Retrieve something from working memory."""
    redis_key = f"wm:{session_id}:{key}"
    data = redis_client.get(redis_key)
    return json.loads(data) if data else None


def working_memory_add_observation(session_id: str, observation: dict, max_items: int = 10):
    """
    Add an observation to the observation stream.
    Uses a Redis list to maintain ordered observations.
    """
    redis_key = f"wm:{session_id}:observations"
    observation["timestamp"] = datetime.utcnow().isoformat()
    
    redis_client.lpush(redis_key, json.dumps(observation))
    redis_client.ltrim(redis_key, 0, max_items - 1)  # Keep only recent items
    redis_client.expire(redis_key, 600)  # Expire after 10 minutes


def working_memory_get_observations(session_id: str, limit: int = 5) -> list:
    """Get recent observations."""
    redis_key = f"wm:{session_id}:observations"
    items = redis_client.lrange(redis_key, 0, limit - 1)
    return [json.loads(item) for item in items]


def working_memory_get_full_context(session_id: str) -> dict:
    """Get all working memory for a session."""
    context = {}
    
    # Get all keys for this session
    pattern = f"wm:{session_id}:*"
    for key in redis_client.scan_iter(match=pattern):
        short_key = key.split(":")[-1]
        if short_key == "observations":
            context["observations"] = working_memory_get_observations(session_id)
        else:
            context[short_key] = working_memory_get(session_id, short_key)
    
    return context


def demo_working_memory():
    """Demonstrate working memory operations."""
    print("\n" + "="*60)
    print("üìù WORKING MEMORY (Redis)")
    print("="*60)
    
    session_id = f"demo_{uuid4().hex[:8]}"
    
    # Set current goal
    working_memory_set(session_id, "current_goal", {
        "goal": "Help user analyze sales data",
        "priority": "high"
    })
    print("‚úì Set current goal")
    
    # Set user intent
    working_memory_set(session_id, "user_intent", {
        "intent": "data_analysis",
        "confidence": 0.9
    })
    print("‚úì Set user intent")
    
    # Add some observations
    working_memory_add_observation(session_id, {"type": "user_upload", "file": "sales.csv"})
    working_memory_add_observation(session_id, {"type": "user_question", "text": "Show me trends"})
    print("‚úì Added observations")
    
    # Retrieve everything
    context = working_memory_get_full_context(session_id)
    print(f"\nüìã Full Working Memory Context:")
    print(json.dumps(context, indent=2))
    
    # Show TTL
    ttl = redis_client.ttl(f"wm:{session_id}:current_goal")
    print(f"\n‚è∞ TTL remaining: {ttl} seconds (auto-expires!)")

In [37]:
wmm = WorkingMemoryManager(redis_client=redis.Redis, session_id="session123")

In [38]:
demo_working_memory()


üìù WORKING MEMORY (Redis)
‚úì Set current goal
‚úì Set user intent
‚úì Added observations

üìã Full Working Memory Context:
{
  "user_intent": {
    "intent": "data_analysis",
    "confidence": 0.9
  },
  "observations": [
    {
      "type": "user_question",
      "text": "Show me trends",
      "timestamp": "2025-12-19T15:45:58.873735"
    },
    {
      "type": "user_upload",
      "file": "sales.csv",
      "timestamp": "2025-12-19T15:45:58.867905"
    }
  ],
  "current_goal": {
    "goal": "Help user analyze sales data",
    "priority": "high"
  }
}

‚è∞ TTL remaining: 300 seconds (auto-expires!)


  observation["timestamp"] = datetime.utcnow().isoformat()


In [1]:
pip install pymongo

Collecting pymongo
  Downloading pymongo-4.15.5-cp311-cp311-win_amd64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.8.0-py3-none-any.whl.metadata (5.7 kB)
Downloading pymongo-4.15.5-cp311-cp311-win_amd64.whl (859 kB)
   ---------------------------------------- 0.0/859.2 kB ? eta -:--:--
   ---------------------------------------- 859.2/859.2 kB 9.6 MB/s  0:00:00
Downloading dnspython-2.8.0-py3-none-any.whl (331 kB)
Installing collected packages: dnspython, pymongo

   ---------------------------------------- 0/2 [dnspython]
   ---------------------------------------- 0/2 [dnspython]
   ---------------------------------------- 0/2 [dnspython]
   ---------------------------------------- 0/2 [dnspython]
   -------------------- ------------------- 1/2 [pymongo]
   -------------------- ------------------- 1/2 [pymongo]
   -------------------- ------------------- 1/2 [pymongo]
   -------------------- ------------------- 1/2 [pymongo]
   ---

# MONGO DB

In [2]:
import pymongo

myclient = pymongo.MongoClient("mongodb://localhost:27017/")

mydb = myclient["mydatabase"]

In [3]:
myclient.list_database_names()

['admin', 'config', 'local']

In [5]:
!pip install qdrant-client pymongo sentence-transformers langgraph torch


^C


Collecting qdrant-client
  Downloading qdrant_client-1.16.2-py3-none-any.whl.metadata (11 kB)
Collecting sentence-transformers
  Downloading sentence_transformers-5.2.0-py3-none-any.whl.metadata (16 kB)
Collecting langgraph
  Downloading langgraph-1.0.5-py3-none-any.whl.metadata (7.4 kB)
Collecting torch
  Downloading torch-2.9.1-cp311-cp311-win_amd64.whl.metadata (30 kB)
Collecting grpcio>=1.41.0 (from qdrant-client)
  Downloading grpcio-1.76.0-cp311-cp311-win_amd64.whl.metadata (3.8 kB)
Collecting httpx>=0.20.0 (from httpx[http2]>=0.20.0->qdrant-client)
  Using cached httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting numpy>=1.21 (from qdrant-client)
  Downloading numpy-2.4.0-cp311-cp311-win_amd64.whl.metadata (6.6 kB)
Collecting portalocker<4.0,>=2.7.0 (from qdrant-client)
  Using cached portalocker-3.2.0-py3-none-any.whl.metadata (8.7 kB)
Collecting protobuf>=3.20.0 (from qdrant-client)
  Downloading protobuf-6.33.2-cp310-abi3-win_amd64.whl.metadata (593 bytes)
Collecting p

In [6]:
import json
import asyncio
from datetime import datetime, timedelta
from uuid import uuid4

# =============================================================================
# IMPORTS
# =============================================================================

import redis
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from pymongo import MongoClient
from sentence_transformers import SentenceTransformer

# LangGraph imports
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

# CONFIGURATION
# =============================================================================

REDIS_URL = "redis://localhost:6379"
QDRANT_URL = "http://localhost:6333"
MONGODB_URL = "mongodb://localhost:27017"

# Initialize clients
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
qdrant_client = QdrantClient(url=QDRANT_URL)
mongo_client = MongoClient(MONGODB_URL)
mongo_db = mongo_client["agent_memory"]

# Initialize embedding model (small model for demo)
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
EMBEDDING_DIM = 384  # Dimension for all-MiniLM-L6-v2

  from .autonotebook import tqdm as notebook_tqdm


Loading embedding model...


In [7]:
def working_memory_set(session_id: str, key: str, value: dict, ttl: int = 300):
    """
    Store something in working memory.
    
    Args:
        session_id: Unique session identifier
        key: What we're storing (e.g., "current_goal", "user_intent")
        value: The data to store
        ttl: Time-to-live in seconds (default 5 minutes)
    """
    redis_key = f"wm:{session_id}:{key}"
    redis_client.setex(redis_key, ttl, json.dumps(value))


def working_memory_get(session_id: str, key: str) -> dict | None:
    """Retrieve something from working memory."""
    redis_key = f"wm:{session_id}:{key}"
    data = redis_client.get(redis_key)
    return json.loads(data) if data else None


def working_memory_add_observation(session_id: str, observation: dict, max_items: int = 10):
    """
    Add an observation to the observation stream.
    Uses a Redis list to maintain ordered observations.
    """
    redis_key = f"wm:{session_id}:observations"
    observation["timestamp"] = datetime.utcnow().isoformat()
    
    redis_client.lpush(redis_key, json.dumps(observation))
    redis_client.ltrim(redis_key, 0, max_items - 1)  # Keep only recent items
    redis_client.expire(redis_key, 600)  # Expire after 10 minutes


def working_memory_get_observations(session_id: str, limit: int = 5) -> list:
    """Get recent observations."""
    redis_key = f"wm:{session_id}:observations"
    items = redis_client.lrange(redis_key, 0, limit - 1)
    return [json.loads(item) for item in items]


def working_memory_get_full_context(session_id: str) -> dict:
    """Get all working memory for a session."""
    context = {}
    
    # Get all keys for this session
    pattern = f"wm:{session_id}:*"
    for key in redis_client.scan_iter(match=pattern):
        short_key = key.split(":")[-1]
        if short_key == "observations":
            context["observations"] = working_memory_get_observations(session_id)
        else:
            context[short_key] = working_memory_get(session_id, short_key)
    
    return context


def demo_working_memory():
    """Demonstrate working memory operations."""
    print("\n" + "="*60)
    print("üìù WORKING MEMORY (Redis)")
    print("="*60)
    
    session_id = f"demo_{uuid4().hex[:8]}"
    
    # Set current goal
    working_memory_set(session_id, "current_goal", {
        "goal": "Help user analyze sales data",
        "priority": "high"
    })
    print("‚úì Set current goal")
    
    # Set user intent
    working_memory_set(session_id, "user_intent", {
        "intent": "data_analysis",
        "confidence": 0.9
    })
    print("‚úì Set user intent")
    
    # Add some observations
    working_memory_add_observation(session_id, {"type": "user_upload", "file": "sales.csv"})
    working_memory_add_observation(session_id, {"type": "user_question", "text": "Show me trends"})
    print("‚úì Added observations")
    
    # Retrieve everything
    context = working_memory_get_full_context(session_id)
    print(f"\nüìã Full Working Memory Context:")
    print(json.dumps(context, indent=2))
    
    # Show TTL
    ttl = redis_client.ttl(f"wm:{session_id}:current_goal")
    print(f"\n‚è∞ TTL remaining: {ttl} seconds (auto-expires!)")


In [8]:
demo_working_memory()


üìù WORKING MEMORY (Redis)
‚úì Set current goal
‚úì Set user intent
‚úì Added observations

üìã Full Working Memory Context:
{
  "observations": [
    {
      "type": "user_question",
      "text": "Show me trends",
      "timestamp": "2025-12-23T00:52:35.329675"
    },
    {
      "type": "user_upload",
      "file": "sales.csv",
      "timestamp": "2025-12-23T00:52:35.318648"
    }
  ],
  "user_intent": {
    "intent": "data_analysis",
    "confidence": 0.9
  },
  "current_goal": {
    "goal": "Help user analyze sales data",
    "priority": "high"
  }
}

‚è∞ TTL remaining: 300 seconds (auto-expires!)


In [9]:
def setup_episodic_collection():
    """Create the episodic memory collection if it doesn't exist."""
    collections = [c.name for c in qdrant_client.get_collections().collections]
    
    if "episodic_memory" not in collections:
        qdrant_client.create_collection(
            collection_name="episodic_memory",
            vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE)
        )
        print("‚úì Created episodic_memory collection")


def episodic_memory_store(user_id: str, content: str, summary: str, metadata: dict = None):
    """
    Store an episode (experience/interaction).
    
    Args:
        user_id: Who this memory belongs to
        content: Full content of the interaction
        summary: Brief summary for context
        metadata: Additional info (topics, sentiment, etc.)
    """
    # Generate embedding from content
    embedding = embedder.encode(f"{summary} {content}").tolist()
    
    # Create unique ID
    point_id = uuid4().hex
    
    # Build payload
    payload = {
        "user_id": user_id,
        "content": content,
        "summary": summary,
        "timestamp": datetime.utcnow().isoformat(),
        "episode_id": point_id,
        **(metadata or {})
    }
    
    # Store in Qdrant
    qdrant_client.upsert(
        collection_name="episodic_memory",
        points=[PointStruct(id=point_id, vector=embedding, payload=payload)]
    )
    
    return point_id


def episodic_memory_search(query: str, user_id: str = None, limit: int = 5) -> list:
    """
    Search episodic memory by semantic similarity.
    
    Args:
        query: What to search for
        user_id: Optional filter by user
        limit: Max results
    
    Returns:
        List of relevant episodes with similarity scores
    """
    # Generate query embedding
    query_embedding = embedder.encode(query).tolist()
    
    # Build filter if user_id provided
    search_filter = None
    if user_id:
        search_filter = Filter(
            must=[FieldCondition(key="user_id", match=MatchValue(value=user_id))]
        )
    
    # Search
    results = qdrant_client.query_points(
        collection_name="episodic_memory",
        query=query_embedding,
        query_filter=search_filter,
        limit=limit
    )
    l = [i for i in results]
    print('length', len(l))
    for r in results:
        print(len(r[1]))
    return [
        {
            "content": r[1][0].payload["content"],
            "summary": r[1][0].payload["summary"],
            "timestamp": r[1][0].payload["timestamp"],
            "score": r[1][0].score,
            **{k: v for k, v in r[1][0].payload.items() if k not in ["content", "summary", "timestamp", "user_id"]}
        }
        for r in results
    ]


def demo_episodic_memory():
    """Demonstrate episodic memory operations."""
    print("\n" + "="*60)
    print("üé¨ EPISODIC MEMORY (Qdrant)")
    print("="*60)
    
    setup_episodic_collection()
    user_id = f"user_{uuid4().hex[:8]}"
    
    # Store some episodes
    episodes = [
        ("We discussed Python best practices for data processing. User mentioned they prefer pandas over polars.",
         "Python data processing discussion",
         {"topics": ["python", "pandas", "data"]}),
        
        ("User asked about machine learning model deployment. Recommended using FastAPI with Docker.",
         "ML deployment advice",
         {"topics": ["ml", "deployment", "docker"]}),
        
        ("Helped debug a SQL query performance issue. The problem was missing indexes on join columns.",
         "SQL debugging session",
         {"topics": ["sql", "performance", "debugging"]}),
        
        ("User shared their project goals: build a recommendation system for e-commerce.",
         "Project planning discussion",
         {"topics": ["recommendation", "ecommerce", "planning"]}),
        ("User shared their Movie interest: User mentioned they like action and comedy movies.",
         "Movie interest discussion",
         {"topics": ["movies", "action", "comedy"]})
    ]
    
    for content, summary, metadata in episodes:
        episodic_memory_store(user_id, content, summary, metadata)
    print(f"‚úì Stored {len(episodes)} episodes")
    
    # Search for relevant episodes
    print("\nüîç Searching for 'Python best practices for data processing':")
    results = episodic_memory_search("recommend me a movie", user_id, limit=3)
    
    for i, r in enumerate(results, 1):
        print(f"\n   {i}. [{r['score']:.2f}] {r['summary']}")
        print(f"      Topics: {r.get('topics', [])}")
        print(r)

In [10]:
demo_episodic_memory()


üé¨ EPISODIC MEMORY (Qdrant)
‚úì Created episodic_memory collection
‚úì Stored 5 episodes

üîç Searching for 'Python best practices for data processing':
length 1
3

   1. [0.40] Movie interest discussion
      Topics: ['movies', 'action', 'comedy']
{'content': 'User shared their Movie interest: User mentioned they like action and comedy movies.', 'summary': 'Movie interest discussion', 'timestamp': '2025-12-23T00:52:47.984691', 'score': 0.4023379, 'episode_id': '9b5d9c3b8e124517aaa27ae1420ac183', 'topics': ['movies', 'action', 'comedy']}


In [11]:
def setup_semantic_collection():
    """Create the semantic memory collection if it doesn't exist."""
    collections = [c.name for c in qdrant_client.get_collections().collections]
    
    if "semantic_memory" not in collections:
        qdrant_client.create_collection(
            collection_name="semantic_memory",
            vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE)
        )
        print("‚úì Created semantic_memory collection")


def semantic_memory_store(
    user_id: str,
    knowledge: str,
    knowledge_type: str = "fact",  # fact, preference, skill, relationship
    confidence: float = 0.8,
    metadata: dict = None
):
    """
    Store a piece of knowledge.
    
    Args:
        user_id: Who this knowledge is about
        knowledge: The factual statement
        knowledge_type: Type of knowledge (fact/preference/skill/relationship)
        confidence: How confident we are (0-1)
        metadata: Additional context
    """
    embedding = embedder.encode(knowledge).tolist()
    point_id = uuid4().hex
    
    payload = {
        "user_id": user_id,
        "knowledge": knowledge,
        "knowledge_type": knowledge_type,
        "confidence": confidence,
        "created_at": datetime.utcnow().isoformat(),
        "memory_id": point_id,
        **(metadata or {})
    }
    
    qdrant_client.upsert(
        collection_name="semantic_memory",
        points=[PointStruct(id=point_id, vector=embedding, payload=payload)]
    )
    
    return point_id


def semantic_memory_search(
    query: str,
    user_id: str = None,
    knowledge_type: str = None,
    min_confidence: float = 0.0,
    limit: int = 5
) -> list:
    """
    Search semantic memory.
    
    Can filter by user, knowledge type, and minimum confidence.
    """
    query_embedding = embedder.encode(query).tolist()
    
    # Build filters
    conditions = []
    if user_id:
        conditions.append(FieldCondition(key="user_id", match=MatchValue(value=user_id)))
    if knowledge_type:
        conditions.append(FieldCondition(key="knowledge_type", match=MatchValue(value=knowledge_type)))
    
    search_filter = Filter(must=conditions) if conditions else None

    results = qdrant_client.query_points(
        collection_name="semantic_memory",
        query=query_embedding,
        query_filter=search_filter,
        limit=limit
    )
    
    # Filter by confidence after retrieval
    for r in results:
        print('query_result =', r)
    return [
        {
            "knowledge": r[1][0].payload["knowledge"],
            "type": r[1][0].payload["knowledge_type"],
            "confidence": r[1][0].payload["confidence"],
            "score": r[1][0].score
        }
        for r in results
        if r[1][0].payload["confidence"] >= min_confidence
    ]


def demo_semantic_memory():
    """Demonstrate semantic memory operations."""
    print("\n" + "="*60)
    print("üß† SEMANTIC MEMORY (Qdrant)")
    print("="*60)
    
    setup_semantic_collection()
    user_id = f"user_{uuid4().hex[:8]}"
    
    # Store various types of knowledge
    knowledge_items = [
        ("User prefers Python over JavaScript for backend work", "preference", 0.9),
        ("User works at a fintech startup as a data engineer", "fact", 0.95),
        ("User is experienced with PostgreSQL and Redis", "skill", 0.85),
        ("User's team lead is named Sarah", "relationship", 0.8),
        ("User prefers morning meetings over afternoon ones", "preference", 0.7),
        ("User is learning Rust in their spare time", "fact", 0.75),
    ]
    
    for knowledge, ktype, confidence in knowledge_items:
        semantic_memory_store(user_id, knowledge, ktype, confidence)
    print(f"‚úì Stored {len(knowledge_items)} knowledge items")
    
    # Search for preferences
    print("\nüîç Searching for 'programming preferences':")
    results = semantic_memory_search(
        "programming language preferences",
        user_id=user_id,
        knowledge_type="preference",
        limit=3
    )
    
    for r in results:
        print(f"   [{r['confidence']:.0%}] {r['knowledge']}")
    
    # Search for skills
    print("\nüîç Searching for 'database skills':")
    results = semantic_memory_search(
        "database experience",
        user_id=user_id,
        knowledge_type="skill",
        limit=3
    )
    
    for r in results:
        print(f"   [{r['confidence']:.0%}] {r['knowledge']}")

In [12]:
demo_semantic_memory()


üß† SEMANTIC MEMORY (Qdrant)
‚úì Created semantic_memory collection
‚úì Stored 6 knowledge items

üîç Searching for 'programming preferences':
query_result = ('points', [ScoredPoint(id='db4a83b7-55de-4024-ac18-be4f6328173d', version=1, score=0.39543617, payload={'user_id': 'user_7ff3f921', 'knowledge': 'User prefers Python over JavaScript for backend work', 'knowledge_type': 'preference', 'confidence': 0.9, 'created_at': '2025-12-23T00:53:09.158130', 'memory_id': 'db4a83b755de4024ac18be4f6328173d'}, vector=None, shard_key=None, order_value=None), ScoredPoint(id='c0b28d3f-58f1-49e7-ad6d-cae69511e037', version=5, score=0.13748819, payload={'user_id': 'user_7ff3f921', 'knowledge': 'User prefers morning meetings over afternoon ones', 'knowledge_type': 'preference', 'confidence': 0.7, 'created_at': '2025-12-23T00:53:09.343981', 'memory_id': 'c0b28d3f58f149e7ad6dcae69511e037'}, vector=None, shard_key=None, order_value=None)])
   [90%] User prefers Python over JavaScript for backend work



In [13]:
def procedural_memory_store(
    user_id: str,
    name: str,
    tool_name: str,
    steps: list,
    trigger_patterns: list = None
):
    """
    Store a procedure (learned skill).
    
    Args:
        user_id: Who learned this
        name: Name of the procedure
        tool_name: What tool this is for
        steps: List of steps to execute
        trigger_patterns: When to use this procedure
    """
    procedure = {
        "procedure_id": uuid4().hex,
        "user_id": user_id,
        "name": name,
        "tool_name": tool_name,
        "steps": steps,
        "trigger_patterns": trigger_patterns or [],
        "created_at": datetime.utcnow(),
        "total_executions": 0,
        "successful_executions": 0,
        "success_rate": 0.0
    }
    
    mongo_db.procedures.insert_one(procedure)
    return procedure["procedure_id"]


def procedural_memory_record_execution(
    procedure_id: str,
    success: bool,
    duration_ms: int,
    error: str = None
):
    """
    Record an execution of a procedure.
    Updates success rate automatically.
    """
    # Record the trace
    trace = {
        "trace_id": uuid4().hex,
        "procedure_id": procedure_id,
        "timestamp": datetime.utcnow(),
        "success": success,
        "duration_ms": duration_ms,
        "error": error
    }
    mongo_db.traces.insert_one(trace)
    
    # Update procedure stats
    procedure = mongo_db.procedures.find_one({"procedure_id": procedure_id})
    if procedure:
        total = procedure["total_executions"] + 1
        successful = procedure["successful_executions"] + (1 if success else 0)
        
        mongo_db.procedures.update_one(
            {"procedure_id": procedure_id},
            {"$set": {
                "total_executions": total,
                "successful_executions": successful,
                "success_rate": successful / total
            }}
        )


def procedural_memory_find_best(tool_name: str, user_id: str = None) -> dict | None:
    """
    Find the best procedure for a tool based on success rate.
    """
    query = {"tool_name": tool_name}
    if user_id:
        query["user_id"] = user_id
    
    procedures = list(mongo_db.procedures.find(query).sort("success_rate", -1).limit(1))
    return procedures[0] if procedures else None


def procedural_memory_get_stats(procedure_id: str) -> dict:
    """Get statistics for a procedure."""
    procedure = mongo_db.procedures.find_one({"procedure_id": procedure_id})
    if not procedure:
        return {}
    
    # Get recent traces
    recent_traces = list(
        mongo_db.traces.find({"procedure_id": procedure_id})
        .sort("timestamp", -1)
        .limit(10)
    )
    
    return {
        "name": procedure["name"],
        "total_executions": procedure["total_executions"],
        "success_rate": procedure["success_rate"],
        "recent_outcomes": [t["success"] for t in recent_traces]
    }


def demo_procedural_memory():
    """Demonstrate procedural memory operations."""
    print("\n" + "="*60)
    print("‚öôÔ∏è PROCEDURAL MEMORY (MongoDB)")
    print("="*60)
    
    user_id = f"user_{uuid4().hex[:8]}"
    
    # Create a procedure
    procedure_id = procedural_memory_store(
        user_id=user_id,
        name="CSV Data Analysis",
        tool_name="data_analyzer",
        steps=[
            {"action": "load_csv", "params": {"encoding": "utf-8"}},
            {"action": "check_missing_values", "params": {}},
            {"action": "generate_summary_stats", "params": {"columns": "all"}},
            {"action": "create_visualizations", "params": {"type": "auto"}}
        ],
        trigger_patterns=["analyze csv", "data analysis", "explore data"]
    )
    print(f"‚úì Created procedure: CSV Data Analysis")
    
    # Simulate some executions
    executions = [
        (True, 150),
        (True, 180),
        (True, 165),
        (False, 200),  # One failure
        (True, 145),
        (True, 170),
    ]
    
    for success, duration in executions:
        procedural_memory_record_execution(procedure_id, success, duration)
    print(f"‚úì Recorded {len(executions)} executions")
    
    # Get stats
    stats = procedural_memory_get_stats(procedure_id)
    print(f"\nüìä Procedure Statistics:")
    print(f"   Name: {stats['name']}")
    print(f"   Total Executions: {stats['total_executions']}")
    print(f"   Success Rate: {stats['success_rate']:.0%}")
    print(f"   Recent Outcomes: {['‚úì' if s else '‚úó' for s in stats['recent_outcomes']]}")
    
    # Find best procedure for tool
    best = procedural_memory_find_best("data_analyzer", user_id)
    if best:
        print(f"\nüèÜ Best procedure for 'data_analyzer': {best['name']} ({best['success_rate']:.0%} success)")

In [14]:
demo_procedural_memory()


‚öôÔ∏è PROCEDURAL MEMORY (MongoDB)
‚úì Created procedure: CSV Data Analysis
‚úì Recorded 6 executions

üìä Procedure Statistics:
   Name: CSV Data Analysis
   Total Executions: 6
   Success Rate: 83%
   Recent Outcomes: ['‚úì', '‚úì', '‚úó', '‚úì', '‚úì', '‚úì']

üèÜ Best procedure for 'data_analyzer': CSV Data Analysis (83% success)


In [15]:
class AgentState(TypedDict):
    """State that flows through our agent graph."""
    user_message: str
    user_id: str
    session_id: str
    working_memory: dict
    episodic_context: list
    semantic_context: list
    procedural_context: list
    response: str


def node_load_working_memory(state: AgentState) -> dict:
    """Load current working memory."""
    context = working_memory_get_full_context(state["session_id"])
    return {"working_memory": context}


def node_retrieve_episodic(state: AgentState) -> dict:
    """Retrieve relevant past episodes."""
    results = episodic_memory_search(
        state["user_message"],
        user_id=state["user_id"],
        limit=3
    )
    return {"episodic_context": results}


def node_retrieve_semantic(state: AgentState) -> dict:
    """Retrieve relevant knowledge."""
    results = semantic_memory_search(
        state["user_message"],
        user_id=state["user_id"],
        min_confidence=0.5,
        limit=3
    )
    return {"semantic_context": results}


def node_retrieve_procedural(state: AgentState) -> dict:
    """Find relevant procedures."""
    # Simple keyword matching for demo
    procedures = list(mongo_db.procedures.find({"user_id": state["user_id"]}).limit(2))
    return {"procedural_context": [
        {"name": p["name"], "tool": p["tool_name"], "success_rate": p["success_rate"]}
        for p in procedures
    ]}


def node_generate_response(state: AgentState) -> dict:
    """Generate response based on all memory context."""
    # In a real system, this would call an LLM
    # For demo, we just summarize what we found
    
    response_parts = [f"Processing: '{state['user_message']}'"]
    
    if state["working_memory"]:
        response_parts.append(f"\nüìù Working Memory: {len(state['working_memory'])} items loaded")
    
    if state["episodic_context"]:
        response_parts.append(f"\nüé¨ Found {len(state['episodic_context'])} relevant past interactions")
        for ep in state["episodic_context"][:2]:
            response_parts.append(f"   - {ep['summary']}")
    
    if state["semantic_context"]:
        response_parts.append(f"\nüß† Found {len(state['semantic_context'])} relevant facts")
        for sm in state["semantic_context"][:2]:
            response_parts.append(f"   - {sm['knowledge']}")
    
    if state["procedural_context"]:
        response_parts.append(f"\n‚öôÔ∏è Found {len(state['procedural_context'])} applicable procedures")
        for pm in state["procedural_context"]:
            response_parts.append(f"   - {pm['name']} ({pm['success_rate']:.0%} success)")
    
    return {"response": "\n".join(response_parts)}


def build_memory_agent():
    """Build a LangGraph agent with memory retrieval."""
    workflow = StateGraph(AgentState)
    
    # Add nodes
    workflow.add_node("load_working_memory", node_load_working_memory)
    workflow.add_node("retrieve_episodic", node_retrieve_episodic)
    workflow.add_node("retrieve_semantic", node_retrieve_semantic)
    workflow.add_node("retrieve_procedural", node_retrieve_procedural)
    workflow.add_node("generate_response", node_generate_response)
    
    # Define the flow
    workflow.set_entry_point("load_working_memory")
    workflow.add_edge("load_working_memory", "retrieve_episodic")
    workflow.add_edge("retrieve_episodic", "retrieve_semantic")
    workflow.add_edge("retrieve_semantic", "retrieve_procedural")
    workflow.add_edge("retrieve_procedural", "generate_response")
    workflow.add_edge("generate_response", END)
    
    return workflow.compile()


def demo_langgraph_integration():
    """Demonstrate LangGraph memory integration."""
    print("\n" + "="*60)
    print("üîó LANGGRAPH INTEGRATION")
    print("="*60)
    
    # Setup - reuse data from previous demos
    #user_id = f"user_{uuid4().hex[:8]}"
    user_id = "user_12345678"  # Using a fixed user ID for demonstration
    #session_id = f"session_{uuid4().hex[:8]}"
    session_id = "session_12345678"  # Using a fixed session ID for demonstration
    
    # Populate some memory first
    print("\nüì¶ Setting up memory...")
    
    # Working memory
    working_memory_set(session_id, "current_goal", {"goal": "Help with data analysis"})
    working_memory_add_observation(session_id, {"type": "greeting", "text": "User said hello"})
    
    # Episodic memory
    setup_episodic_collection()
    episodic_memory_store(user_id, 
        "Helped user set up a Python data pipeline with pandas",
        "Python data pipeline setup",
        {"topics": ["python", "pandas", "pipeline"]}
    )
    
    # Semantic memory
    setup_semantic_collection()
    semantic_memory_store(user_id, "User prefers pandas for data manipulation", "preference", 0.9)
    semantic_memory_store(user_id, "User works with CSV files frequently", "fact", 0.85)
    
    # Procedural memory
    procedural_memory_store(
        user_id, "CSV Processing", "pandas_tool",
        [{"action": "read_csv"}, {"action": "clean_data"}, {"action": "analyze"}],
        ["process csv", "analyze data"]
    )
    procedural_memory_record_execution(
        mongo_db.procedures.find_one({"user_id": user_id})["procedure_id"],
        True, 100
    )
    
    print("‚úì Memory populated")
    
    # Build and run the agent
    agent = build_memory_agent()
    
    initial_state = {
        "user_message": "can you recommend me a movie?",
        "user_id": user_id,
        "session_id": session_id,
        "working_memory": {},
        "episodic_context": [],
        "semantic_context": [],
        "procedural_context": [],
        "response": ""
    }

    print("\nü§ñ Running memory-aware agent...")
    print(f"   User: '{initial_state['user_message']}'")
    
    # Execute the graph
    final_state = agent.invoke(initial_state)
    
    print("\n" + "-"*40)
    print("üì§ Agent Response:")
    print(final_state["response"])

In [16]:
demo_langgraph_integration()


üîó LANGGRAPH INTEGRATION

üì¶ Setting up memory...
‚úì Memory populated

ü§ñ Running memory-aware agent...
   User: 'can you recommend me a movie?'
length 1
1
query_result = ('points', [ScoredPoint(id='d98ee25a-67fb-457b-bb0d-45fbe7b74131', version=7, score=0.06648329, payload={'user_id': 'user_12345678', 'knowledge': 'User prefers pandas for data manipulation', 'knowledge_type': 'preference', 'confidence': 0.9, 'created_at': '2025-12-23T00:53:39.734401', 'memory_id': 'd98ee25a67fb457bbb0d45fbe7b74131'}, vector=None, shard_key=None, order_value=None), ScoredPoint(id='59f91f6e-d097-4d24-8a4b-f85a2e4018cd', version=8, score=-0.002700436, payload={'user_id': 'user_12345678', 'knowledge': 'User works with CSV files frequently', 'knowledge_type': 'fact', 'confidence': 0.85, 'created_at': '2025-12-23T00:53:39.764841', 'memory_id': '59f91f6ed0974d248a4bf85a2e4018cd'}, vector=None, shard_key=None, order_value=None)])

----------------------------------------
üì§ Agent Response:
Processin