# Memory Consolidation

Long-running AI agents accumulate numerous memory entries over time, storing facts, preferences, past interactions and learned information about users and contexts. As agents interact across multiple sessions and handle diverse tasks, their memory systems can become fragmented with redundant, overlapping, or closely related information stored as separate entries. A customer service agent might store "User prefers email communication" in one memory, "Customer likes email updates" in another, and "Contact via email requested" in a third, consuming three times the necessary context space with semantically identical information.

Memory consolidation addresses this fragmentation by identifying related memories, merging redundant or overlapping information, and creating unified memory entries that capture the same knowledge in less space. Rather than maintaining dozens of fragmented facts about a user or context, the system detects semantic similarity between memories, groups related information, and consolidates it into comprehensive but concise memory records. This reduces memory storage requirements, improves retrieval relevance by avoiding duplicate hits, and makes the agent's knowledge more coherent and maintainable.

This notebook demonstrates how to implement memory consolidation from basic duplicate detection to production-ready systems. We will explore semantic similarity detection for finding related memories, deduplication strategies that merge identical or near-identical information, clustering techniques that group thematically related memories, and complete consolidation managers that automatically maintain clean, efficient memory stores. These techniques are essential for building agents that learn and remember effectively over extended periods without exponential memory growth.

In [1]:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.messages import HumanMessage, AIMessage
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any, Tuple, Set, Union
from datetime import datetime
from sklearn.cluster import DBSCAN
import numpy as np
import os

### Initialize the language and embedding model

In [2]:
# Initialize language model and embeddings
llm = ChatOpenAI(model="gpt-4o-mini", api_key=os.getenv("OPENAI_API_KEY", "").strip(), temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002", api_key=os.getenv("OPENAI_API_KEY", "").strip())

## Memory fragmentation problem
Before implementing consolidation, we need to understand how memory fragmentation occurs in real agent systems. As agents interact with users, call tools and learn from experiences, they continuously add new memory entries. Without consolidation, semantically related or redundant information accumulates as separate entries, each consuming context tokens and potentially causing confusion when similar but slightly different memories are retrieved together.

We will simulate a realistic fragmented memory store from a customer service agent that has accumulated redundant information about user preferences, past issues and account details across multiple interactions. This baseline demonstrates both the token cost and the semantic confusion caused by fragmentation, motivating the need for intelligent consolidation.

In [3]:
class MemoryEntry(BaseModel):
    """
    Represents a single memory entry in the agent's memory system.
    Each memory has content, metadata, and tracking information.
    """
    id: str = Field(description="Unique memory identifier")
    content: str = Field(description="Memory content")
    timestamp: str = Field(description="When memory was created")
    category: Optional[str] = Field(default=None, description="Memory category")
    importance: float = Field(default=0.5, description="Importance score 0-1")

# Display the model structure
print("Memory Entry Model:")
print("="*80)
print(MemoryEntry.model_json_schema())

Memory Entry Model:
{'description': "Represents a single memory entry in the agent's memory system.\nEach memory has content, metadata, and tracking information.", 'properties': {'id': {'description': 'Unique memory identifier', 'title': 'Id', 'type': 'string'}, 'content': {'description': 'Memory content', 'title': 'Content', 'type': 'string'}, 'timestamp': {'description': 'When memory was created', 'title': 'Timestamp', 'type': 'string'}, 'category': {'anyOf': [{'type': 'string'}, {'type': 'null'}], 'default': None, 'description': 'Memory category', 'title': 'Category'}, 'importance': {'default': 0.5, 'description': 'Importance score 0-1', 'title': 'Importance', 'type': 'number'}}, 'required': ['id', 'content', 'timestamp'], 'title': 'MemoryEntry', 'type': 'object'}


Now we will simulate a realistic scenario where a customer service agent has accumulated fragmented memories over several days of interactions. Notice how multiple memories contain essentially the same information but worded differently - this is the fragmentation problem we are solving. In a real system, these would accumulate gradually as the agent interacts with users, and without consolidation, the memory store would grow unnecessarily large while also making retrieval less effective since similar memories compete for attention.

In [4]:
# Create fragmented memories simulating a customer service agent
# These memories were accumulated over several days of interactions
fragmented_memories = [
    MemoryEntry(
        id="mem_001",
        content="User prefers email communication over phone calls",
        timestamp="2024-11-01T10:00:00Z",
        category="preference",
        importance=0.7
    ),
    MemoryEntry(
        id="mem_002",
        content="Customer likes to receive updates via email",
        timestamp="2024-11-03T14:30:00Z",
        category="preference",
        importance=0.6
    ),
    MemoryEntry(
        id="mem_003",
        content="User requested email as primary contact method",
        timestamp="2024-11-05T09:15:00Z",
        category="preference",
        importance=0.8
    ),
    MemoryEntry(
        id="mem_004",
        content="User reported login issues on mobile app",
        timestamp="2024-11-02T11:20:00Z",
        category="issue",
        importance=0.9
    ),
    MemoryEntry(
        id="mem_005",
        content="Customer had trouble signing in to the mobile application",
        timestamp="2024-11-02T11:45:00Z",
        category="issue",
        importance=0.9
    ),
    MemoryEntry(
        id="mem_006",
        content="User's account is premium tier with annual subscription",
        timestamp="2024-11-01T09:00:00Z",
        category="account",
        importance=0.8
    ),
    MemoryEntry(
        id="mem_007",
        content="Customer has premium membership, annual billing",
        timestamp="2024-11-04T16:00:00Z",
        category="account",
        importance=0.7
    ),
    MemoryEntry(
        id="mem_008",
        content="User lives in California, PST timezone",
        timestamp="2024-11-01T10:30:00Z",
        category="personal",
        importance=0.5
    ),
    MemoryEntry(
        id="mem_009",
        content="Customer is located in California",
        timestamp="2024-11-06T13:00:00Z",
        category="personal",
        importance=0.4
    ),
]

print(f"Created {len(fragmented_memories)} memory entries")
print("\nSample memories:")
for mem in fragmented_memories[:3]:
    print(f"  [{mem.id}] ({mem.category}) {mem.content}")

Created 9 memory entries

Sample memories:
  [mem_001] (preference) User prefers email communication over phone calls
  [mem_002] (preference) Customer likes to receive updates via email
  [mem_003] (preference) User requested email as primary contact method


Let's analyze the fragmentation by grouping memories by category and calculating the token usage. This will help us understand the extent of redundancy and the potential for consolidation.

In [5]:
# Import defaultdict for easy grouping
from collections import defaultdict

# Group memories by category to see patterns of redundancy
by_category = defaultdict(list)
for mem in fragmented_memories:
    by_category[mem.category].append(mem)

print("Fragmented Memory Store Analysis:")
print("="*80)
print(f"\nTotal memories: {len(fragmented_memories)}")
print("\nMemories by category:")

# Display each category and its memories
for category, mems in by_category.items():
    print(f"\n{category.upper()} ({len(mems)} memories):")
    for mem in mems:
        # Show memory ID and content for each entry
        print(f"  [{mem.id}] {mem.content}")

# Calculate estimated token usage
# Rule of thumb: 1 word ≈ 1.3 tokens on average
total_tokens = sum(len(mem.content.split()) * 1.3 for mem in fragmented_memories)

print(f"{'='*80}")
print(f"\nEstimated token usage: ~{int(total_tokens)} tokens")

# Calculate potential savings after consolidation
print(f"\nPotential after consolidation:")
print(f"  Current: {len(fragmented_memories)} memories (~{int(total_tokens)} tokens)")
print(f"  After: ~4-5 memories (~{int(total_tokens * 0.5)}-{int(total_tokens * 0.6)} tokens)")
print(f"  Expected reduction: ~40-50%")

Fragmented Memory Store Analysis:

Total memories: 9

Memories by category:

PREFERENCE (3 memories):
  [mem_001] User prefers email communication over phone calls
  [mem_002] Customer likes to receive updates via email
  [mem_003] User requested email as primary contact method

ISSUE (2 memories):
  [mem_004] User reported login issues on mobile app
  [mem_005] Customer had trouble signing in to the mobile application

ACCOUNT (2 memories):
  [mem_006] User's account is premium tier with annual subscription
  [mem_007] Customer has premium membership, annual billing

PERSONAL (2 memories):
  [mem_008] User lives in California, PST timezone
  [mem_009] Customer is located in California

Estimated token usage: ~80 tokens

Potential after consolidation:
  Current: 9 memories (~80 tokens)
  After: ~4-5 memories (~40-48 tokens)
  Expected reduction: ~40-50%


The fragmented memory simulation demonstrates realistic accumulation patterns in long-running agents.
- The `MemoryEntry` model captures both content and metadata including timestamps, categories, and importance scores.
- The example shows multiple forms of fragmentation: exact semantic duplicates with different wording (mobile login issue memories), near-duplicates with minor detail variations (premium account memories), and redundant information stating the same fact differently (email preference and California location memories).
- The category-based analysis reveals clustering of related memories, and the token calculation shows approximately 80 tokens consumed by 9 memories that could likely be consolidated to 4-5 memories representing the same information. This fragmentation is common in production systems where memories are added incrementally without consolidation, leading to both inefficient token usage and potential retrieval confusion when multiple similar memories are returned for the same query.

## Detecting related memories
The first step in consolidation is identifying which memories are related, redundant or overlapping. Semantic similarity using embeddings provides the most effective approach, as it detects memories that convey similar meaning even when worded differently. By embedding each memory and computing similarity scores between pairs, we can identify candidates for consolidation without relying on exact text matching or keyword overlap.

We will implement a similarity-based detection system that embeds all memories, computes pairwise similarity scores, and identifies clusters of related memories that should be considered for merging. This provides the foundation for automated consolidation by surfacing memories that likely contain redundant or overlapping information.

We will start by creating a function that computes semantic similarity between all memory pairs using embeddings. The function embeds each memory's content into a high-dimensional vector space, then calculates cosine similarity between every pair of vectors. Cosine similarity ranges from 0 to 1, where 1 means the memories are semantically identical and 0 means they are completely unrelated. This approach captures semantic meaning rather than just lexical overlap, so "User prefers email" and "Customer likes email updates" will show high similarity despite different wording.

In [6]:
def compute_memory_similarity(
    memories: List[MemoryEntry],
    embeddings_model: OpenAIEmbeddings
) -> Dict[Tuple[str, str], float]:
    """
    Compute pairwise similarity scores between all memories using embeddings.
    This function creates semantic embeddings for each memory and calculates how similar each pair of memories is using cosine similarity.
    
    Args:
        memories: List of memory entries to compare
        embeddings_model: Embedding model for generating semantic vectors
        
    Returns:
        Dictionary mapping memory ID pairs (tuples) to similarity scores (0-1)
    """
    # Step 1: Extract text content from all memories
    memory_texts = [mem.content for mem in memories]
    
    # Step 2: Generate embeddings for all memory texts in one batch - This converts each text into a high-dimensional vector
    memory_embeddings = embeddings_model.embed_documents(memory_texts)
    
    # Step 3: Compute pairwise similarities using cosine similarity
    similarity_scores = {}
    
    # Compare each pair of memories (avoiding duplicate comparisons)
    for i in range(len(memories)):
        for j in range(i + 1, len(memories)):  # j > i to avoid duplicates
            # Convert embeddings to numpy arrays for mathematical operations
            emb_i = np.array(memory_embeddings[i])
            emb_j = np.array(memory_embeddings[j])
            
            # Calculate cosine similarity: measures angle between vectors
            # Formula: dot(A, B) / (||A|| * ||B||)
            # Result ranges from 0 (completely different) to 1 (identical)
            similarity = np.dot(emb_i, emb_j) / (np.linalg.norm(emb_i) * np.linalg.norm(emb_j))
            
            # Store similarity score with memory ID pair as key
            pair_key = (memories[i].id, memories[j].id)
            similarity_scores[pair_key] = float(similarity)
    
    return similarity_scores

# Test the function with our fragmented memories
print("Computing pairwise similarity scores...")
similarity_scores = compute_memory_similarity(fragmented_memories, embeddings)
print(f"✓ Computed {len(similarity_scores)} pairwise similarities for {len(fragmented_memories)} memories")

Computing pairwise similarity scores...
✓ Computed 36 pairwise similarities for 9 memories


Let's examine the top 10 most similar memory pairs to understand what patterns the similarity detection is finding. High similarity scores (above 0.85) indicate memories that are likely redundant or overlapping and should be candidates for consolidation.

In [7]:
# Sort similarity scores from highest to lowest
sorted_pairs = sorted(similarity_scores.items(), key=lambda x: x[1], reverse=True)[:10]

# Create a lookup dictionary for easy memory access by ID
memory_lookup = {mem.id: mem for mem in fragmented_memories}

print("Top 10 Most Similar Memory Pairs:")
print("="*80)

# Display each similar pair with its score and content
for (id1, id2), score in sorted_pairs:
    mem1 = memory_lookup[id1]
    mem2 = memory_lookup[id2]
    
    print(f"\nSimilarity Score: {score:.3f}")
    print(f"  [{id1}] {mem1.content}")
    print(f"  [{id2}] {mem2.content}")

Top 10 Most Similar Memory Pairs:

Similarity Score: 0.918
  [mem_006] User's account is premium tier with annual subscription
  [mem_007] Customer has premium membership, annual billing

Similarity Score: 0.914
  [mem_004] User reported login issues on mobile app
  [mem_005] Customer had trouble signing in to the mobile application

Similarity Score: 0.888
  [mem_001] User prefers email communication over phone calls
  [mem_002] Customer likes to receive updates via email

Similarity Score: 0.884
  [mem_001] User prefers email communication over phone calls
  [mem_003] User requested email as primary contact method

Similarity Score: 0.863
  [mem_002] Customer likes to receive updates via email
  [mem_003] User requested email as primary contact method

Similarity Score: 0.860
  [mem_008] User lives in California, PST timezone
  [mem_009] Customer is located in California

Similarity Score: 0.829
  [mem_002] Customer likes to receive updates via email
  [mem_007] Customer has premium 

Now we need to group related memories together. Simply looking at pairwise similarities is not enough - if memory A is similar to memory B, and memory B is similar to memory C, then A, B and C should all be in the same group even if A and C are not directly similar. We will use graph-based connected components to find these transitive relationships. This function builds a graph where memories are nodes and edges connect similar memories, then finds all connected groups using depth-first search.

In [8]:
def find_related_memories(
    memories: List[MemoryEntry],
    similarity_scores: Dict[Tuple[str, str], float],
    similarity_threshold: float = 0.85
) -> List[Set[str]]:
    """
    Group related memories based on similarity threshold using graph connectivity.
    
    Builds a graph where memories are nodes and edges exist between similar
    memories (similarity >= threshold). Then finds all connected components,
    which represent groups of transitively related memories.
    
    Args:
        memories: List of memory entries
        similarity_scores: Pairwise similarity scores from compute_memory_similarity
        similarity_threshold: Minimum similarity to consider memories related (default: 0.85)
        
    Returns:
        List of sets, where each set contains IDs of memories that should be merged
    """
    # Step 1: Build an adjacency list representing the similarity graph
    related_graph = defaultdict(set)
    
    for (mem_id1, mem_id2), score in similarity_scores.items():
        if score >= similarity_threshold:
            # These memories are similar enough to be related
            # Add bidirectional edges in the graph
            related_graph[mem_id1].add(mem_id2)
            related_graph[mem_id2].add(mem_id1)
    
    # Step 2: Find connected components using depth-first search
    visited = set()  # Track which memories we've already processed
    memory_groups = []  # Store the resulting groups
    
    def dfs(mem_id: str, current_group: Set[str]):
        """Recursively visit all memories connected to mem_id."""
        if mem_id in visited:
            return  # Already processed this memory
        
        # Mark as visited and add to current group
        visited.add(mem_id)
        current_group.add(mem_id)
        
        # Recursively visit all directly connected memories
        for related_id in related_graph[mem_id]:
            dfs(related_id, current_group)
    
    # Step 3: Find all connected components in the graph
    for memory in memories:
        # Only start DFS from memories that haven't been visited
        # and that have at least one connection
        if memory.id not in visited and memory.id in related_graph:
            group = set()
            dfs(memory.id, group)
            
            # Only include groups with multiple memories (singletons don't need merging)
            if len(group) > 1:
                memory_groups.append(group)
    
    return memory_groups

# Find groups of related memories
print("Finding groups of related memories...")
related_groups = find_related_memories(
    fragmented_memories, 
    similarity_scores, 
    similarity_threshold=0.85  # Memories with 85%+ similarity will be grouped
)

print(f"✓ Found {len(related_groups)} groups of related memories")

Finding groups of related memories...
✓ Found 4 groups of related memories


In [9]:
# Display the groups and calculate consolidation potential
print("Related Memory Groups:")
print("="*80)

for i, group in enumerate(related_groups, 1):
    print(f"\nGroup {i}: {len(group)} memories that should be merged")
    for mem_id in group:
        mem = memory_lookup[mem_id]
        print(f"  [{mem_id}] ({mem.category}) {mem.content}")

# Calculate consolidation potential
total_in_groups = sum(len(g) for g in related_groups)
groups_will_become = len(related_groups)  # Each group merges into 1 memory
savings = total_in_groups - groups_will_become

print(f"\n{'='*80}")
print(f"\nConsolidation Potential:")
print(f"  Current memories: {len(fragmented_memories)}")
print(f"  Memories in related groups: {total_in_groups}")
print(f"  After merging groups: {groups_will_become} consolidated memories")
print(f"  Standalone memories: {len(fragmented_memories) - total_in_groups}")
print(f"  Total after consolidation: {len(fragmented_memories) - savings}")
print(f"  Memory reduction: {savings} memories saved")

Related Memory Groups:

Group 1: 3 memories that should be merged
  [mem_001] (preference) User prefers email communication over phone calls
  [mem_003] (preference) User requested email as primary contact method
  [mem_002] (preference) Customer likes to receive updates via email

Group 2: 2 memories that should be merged
  [mem_004] (issue) User reported login issues on mobile app
  [mem_005] (issue) Customer had trouble signing in to the mobile application

Group 3: 2 memories that should be merged
  [mem_006] (account) User's account is premium tier with annual subscription
  [mem_007] (account) Customer has premium membership, annual billing

Group 4: 2 memories that should be merged
  [mem_009] (personal) Customer is located in California
  [mem_008] (personal) User lives in California, PST timezone


Consolidation Potential:
  Current memories: 9
  Memories in related groups: 9
  After merging groups: 4 consolidated memories
  Standalone memories: 0
  Total after consolidation: 

The similarity detection system uses embedding-based semantic comparison to identify related memories regardless of wording differences.
- The `compute_memory_similarity` function generates embeddings for all memory contents and computes pairwise cosine similarity, which measures the angle between vectors in embedding space - a score of 1.0 indicates identical semantic meaning while 0.0 indicates complete difference.
- The `find_related_memories` function builds a graph where memories are connected if their similarity exceeds the threshold, then uses depth-first search to identify connected components representing groups of related memories.
- The example demonstrates that memories about email preferences achieve similarity scores above 0.85 despite different wording, mobile login issues show high similarity around 0.91, and premium account memories cluster together with similarities above 0.92. The detection identifies 4 consolidation groups that could reduce the 9 memories to approximately 5-6 consolidated entries, providing clear targets for the next consolidation step.

## Merging similar memories with deduplication
Once related memories are identified, we need to merge them into consolidated entries that preserve all unique information while eliminating redundancy. Simple concatenation would create verbose merged memories, so we use LLM-based intelligent merging that identifies common information, resolves conflicts, and generates concise unified representations. The LLM can understand semantic equivalence, preserve the most recent or most important details when facts conflict, and create natural language consolidated memories.

We will implement an LLM-powered memory merging system that takes groups of related memories and produces consolidated versions. The system considers timestamps, importance scores, and semantic content to create optimal merged memories that maintain information fidelity while maximizing compression.

First, we will define a data model for consolidated memories. This model tracks not only the merged content but also important metadata like which source memories were combined (for audit trails), the most recent timestamp (to prioritize current information), and the highest importance score (to maintain priority). This metadata is crucial for understanding consolidation decisions and debugging issues in production.

In [10]:
class ConsolidatedMemory(BaseModel):
    """
    Represents a consolidated memory created by merging multiple related entries.
    This model extends the basic MemoryEntry by tracking which source memories were merged and preserving the most important metadata from sources.
    """
    id: str = Field(description="New consolidated memory ID")
    content: str = Field(description="Merged memory content created by LLM")
    source_ids: List[str] = Field(description="IDs of all source memories that were merged")
    timestamp: str = Field(description="Most recent timestamp from source memories")
    category: Optional[str] = Field(default=None, description="Memory category")
    importance: float = Field(description="Maximum importance score from source memories")

# Display the consolidated memory model structure
print("Consolidated Memory Model:")
print("="*80)
print(ConsolidatedMemory.model_json_schema())

Consolidated Memory Model:
{'description': 'Represents a consolidated memory created by merging multiple related entries.\nThis model extends the basic MemoryEntry by tracking which source memories were merged and preserving the most important metadata from sources.', 'properties': {'id': {'description': 'New consolidated memory ID', 'title': 'Id', 'type': 'string'}, 'content': {'description': 'Merged memory content created by LLM', 'title': 'Content', 'type': 'string'}, 'source_ids': {'description': 'IDs of all source memories that were merged', 'items': {'type': 'string'}, 'title': 'Source Ids', 'type': 'array'}, 'timestamp': {'description': 'Most recent timestamp from source memories', 'title': 'Timestamp', 'type': 'string'}, 'category': {'anyOf': [{'type': 'string'}, {'type': 'null'}], 'default': None, 'description': 'Memory category', 'title': 'Category'}, 'importance': {'description': 'Maximum importance score from source memories', 'title': 'Importance', 'type': 'number'}}, 'req

Now we will create the intelligent merging function that uses an LLM to consolidate related memories. The function sorts memories by timestamp to prioritize recent information, formats them with metadata for context, and constructs a detailed prompt that instructs the LLM to preserve all unique facts while eliminating redundancy. The LLM understands semantic equivalence better than simple string matching, so it can recognize that "User prefers email" and "Customer likes email updates" convey the same core fact and can be combined into a single clear statement.

In [11]:
def merge_memories(
    memory_group: List[MemoryEntry],
    llm: ChatOpenAI
) -> ConsolidatedMemory:
    """
    Intelligently merge a group of related memories using an LLM.
    The LLM analyzes the semantic content, identifies redundancy, resolves conflicts by favoring recent information, and generates a concise consolidated memory that preserves all unique facts.
    
    Args:
        memory_group: List of related memories that should be merged
        llm: Language model for intelligent content consolidation
        
    Returns:
        ConsolidatedMemory containing the merged content and metadata
    """
    # Step 1: Sort memories by timestamp (most recent first)
    # This helps prioritize recent information when resolving conflicts
    sorted_memories = sorted(memory_group, key=lambda m: m.timestamp, reverse=True)
    
    # Step 2: Format each memory with its metadata for LLM analysis
    memory_descriptions = []
    for mem in sorted_memories:
        # Include ID, importance, timestamp, and content
        memory_descriptions.append(
            f"[{mem.id}] (importance: {mem.importance}, {mem.timestamp}): {mem.content}"
        )
    
    # Step 3: Construct a detailed prompt with explicit consolidation instructions
    merge_prompt = f"""You are consolidating related memory entries for an AI agent.

Related Memories to Merge:
{chr(10).join(memory_descriptions)}

Create a single consolidated memory that:
1. Preserves all unique factual information
2. Eliminates redundancy and repetition
3. Uses the most recent/accurate version when details differ
4. Remains concise and clear
5. Maintains the same category and context

Consolidated Memory:"""
    
    # Step 4: Invoke the LLM to generate consolidated content
    response = llm.invoke([HumanMessage(content=merge_prompt)])
    consolidated_content = response.content.strip()
    
    # Step 5: Create the ConsolidatedMemory object with merged content - Use most recent timestamp, highest importance and track all source IDs
    consolidated = ConsolidatedMemory(
        id=f"consolidated_{sorted_memories[0].id}",  # Base ID on most recent memory
        content=consolidated_content,  # LLM-generated merged content
        source_ids=[mem.id for mem in memory_group],  # Track all source memories
        timestamp=sorted_memories[0].timestamp,  # Use most recent timestamp
        category=sorted_memories[0].category,  # Preserve category
        importance=max(mem.importance for mem in memory_group)  # Use highest importance
    )
    
    return consolidated

Now we need a function to apply the merging process to all related groups in our memory store. This function orchestrates the consolidation by merging each group of related memories while preserving standalone memories that do not belong to any group. The result is a new memory store containing both consolidated memories (from groups) and original memories (that were standalone).

In [12]:
def consolidate_memory_store(
    memories: List[MemoryEntry],
    related_groups: List[Set[str]],
    llm: ChatOpenAI
) -> List[Union[MemoryEntry, ConsolidatedMemory]]:
    """
    Apply consolidation to entire memory store by merging all related groups.
    Processes each group of related memories by merging them into consolidated entries, while preserving standalone memories that aren't part of any group.
    
    Args:
        memories: Original list of all memory entries
        related_groups: Groups of related memory IDs (from find_related_memories)
        llm: Language model for merging
        
    Returns:
        New memory store containing consolidated and standalone memories
    """
    # Step 1: Create lookup dictionary for quick memory access by ID
    memory_lookup = {mem.id: mem for mem in memories}
    
    # Step 2: Track which memories have been consolidated - This helps us identify standalone memories later
    consolidated_ids = set()
    for group in related_groups:
        consolidated_ids.update(group)
    
    # Step 3: Create result list to store final memory store
    consolidated_store = []
    
    # Step 4: Merge each group of related memories
    for group in related_groups:
        # Get actual MemoryEntry objects for this group
        group_memories = [memory_lookup[mem_id] for mem_id in group]
        # Merge the group using LLM
        merged = merge_memories(group_memories, llm)
        # Add consolidated memory to result
        consolidated_store.append(merged)
    
    # Step 5: Add standalone memories that weren't part of any group
    for memory in memories:
        if memory.id not in consolidated_ids:
            # This memory wasn't consolidated, keep it as-is
            consolidated_store.append(memory)
    
    return consolidated_store

Let's apply the consolidation to our fragmented memory store. We will use the related groups we identified earlier and merge each group using the LLM, then examine the results to see how many memories were reduced and how much clearer the consolidated store becomes.

In [13]:
# Perform consolidation on our fragmented memories
print("Memory Consolidation:")
print("="*80)

consolidated_store = consolidate_memory_store(
    fragmented_memories,  # Original fragmented memories
    related_groups,  # Groups identified by similarity detection
    llm  # LLM for intelligent merging
)

# Show consolidation statistics
print(f"\n✓ Consolidation complete!")
print(f"  Original memory count: {len(fragmented_memories)}")
print(f"  Consolidated memory count: {len(consolidated_store)}")
print(f"  Reduction: {len(fragmented_memories) - len(consolidated_store)} memories")
print(f"  Percentage: {(1 - len(consolidated_store)/len(fragmented_memories))*100:.1f}% reduction")

Memory Consolidation:

✓ Consolidation complete!
  Original memory count: 9
  Consolidated memory count: 4
  Reduction: 5 memories
  Percentage: 55.6% reduction


Now let's examine the consolidated memory store to see how the LLM merged related memories. Consolidated entries will show their source IDs and merged content, while standalone memories remain unchanged.

In [14]:
print("\nConsolidated Memory Store:")
print("="*80)

# Display each memory in the consolidated store
for i, mem in enumerate(consolidated_store, 1):
    if isinstance(mem, ConsolidatedMemory):
        # This is a consolidated memory - show source info
        print(f"\n{i}. [CONSOLIDATED]")
        print(f"   ID: {mem.id}")
        print(f"   Category: {mem.category}")
        print(f"   Importance: {mem.importance}")
        print(f"   Sources: {', '.join(mem.source_ids)} ({len(mem.source_ids)} memories merged)")
        print(f"   Content: {mem.content}")
    else:
        # This is a standalone memory - display as-is
        print(f"\n{i}. [STANDALONE]")
        print(f"   ID: {mem.id}")
        print(f"   Category: {mem.category}")
        print(f"   Content: {mem.content}")

# Calculate token savings from consolidation
original_tokens = sum(len(mem.content.split()) * 1.3 for mem in fragmented_memories)
consolidated_tokens = sum(len(mem.content.split()) * 1.3 for mem in consolidated_store)

print(f"\n{'='*80}")
print(f"\nToken Usage Analysis:")
print(f"  Original tokens: ~{int(original_tokens)}")
print(f"  Consolidated tokens: ~{int(consolidated_tokens)}")
print(f"  Token savings: ~{int(original_tokens - consolidated_tokens)}")
print(f"  Reduction: {(1 - consolidated_tokens/original_tokens)*100:.1f}%")


Consolidated Memory Store:

1. [CONSOLIDATED]
   ID: consolidated_mem_003
   Category: preference
   Importance: 0.8
   Sources: mem_001, mem_003, mem_002 (3 memories merged)
   Content: [mem_consolidated] (importance: 0.8, 2024-11-05T09:15:00Z): User prefers email communication as the primary contact method and likes to receive updates via email.

2. [CONSOLIDATED]
   ID: consolidated_mem_005
   Category: issue
   Importance: 0.9
   Sources: mem_004, mem_005 (2 memories merged)
   Content: [mem_consolidated] (importance: 0.9, 2024-11-02T11:45:00Z): Customer reported login issues on the mobile application.

3. [CONSOLIDATED]
   ID: consolidated_mem_007
   Category: account
   Importance: 0.8
   Sources: mem_006, mem_007 (2 memories merged)
   Content: [mem_consolidated] (importance: 0.8, 2024-11-04T16:00:00Z): Customer has a premium membership with an annual subscription.

4. [CONSOLIDATED]
   ID: consolidated_mem_009
   Category: personal
   Importance: 0.5
   Sources: mem_009, mem_0

The memory merging system uses LLM-based intelligent consolidation to create unified memories from related entries.
- The `merge_memories` function sorts memories by timestamp to prioritize recent information, formats them with metadata for the LLM, and constructs a prompt that explicitly instructs the model to preserve unique facts while eliminating redundancy.
- The LLM analyzes semantic overlap, resolves minor wording differences, and generates a concise consolidated version.
- The `ConsolidatedMemory` model tracks source IDs for audit trails, uses the most recent timestamp, and takes the maximum importance score from sources.
- The `consolidate_memory_store` function applies merging to all identified groups while preserving standalone memories that are not part of any group.
- In the example, three email preference memories merge into a single entry stating the email preference clearly, two login issue memories merge while preserving key details, and two account status memories combine without losing the annual billing detail. This typically achieves 30-50% token reduction in larger memory systems while maintaining complete information fidelity, and the consolidated store becomes more coherent with fewer redundant entries that could confuse retrieval.

## Semantic clustering for memory grouping
Beyond pairwise similarity, we can use clustering algorithms to identify natural groupings of related memories based on their semantic embeddings. Clustering provides a more sophisticated approach that can detect memories that are thematically related even if their pairwise similarities do not exceed threshold values. DBSCAN is particularly effective as it automatically discovers clusters of varying sizes without requiring a predefined cluster count.

We will implement clustering-based memory grouping that uses embeddings and DBSCAN to identify semantic clusters. This approach can discover higher-level relationships like "all memories about account settings" or "all memories about payment issues" that might not show up as high pairwise similarities but belong together conceptually for consolidation.

In [15]:
def cluster_memories(
    memories: List[MemoryEntry],
    embeddings_model: OpenAIEmbeddings,
    eps: float = 0.3,
    min_samples: int = 2
) -> Dict[int, List[MemoryEntry]]:
    """
    Cluster memories using DBSCAN algorithm on their semantic embeddings.
    DBSCAN identifies dense regions in embedding space where similar memories cluster together, automatically determining the number of clusters based on data density rather than requiring a predefined count.
    
    Args:
        memories: List of memory entries to cluster
        embeddings_model: Model for generating semantic embeddings
        eps: Maximum distance between samples in a cluster (lower = tighter clusters)
        min_samples: Minimum samples required to form a cluster
        
    Returns:
        Dictionary mapping cluster IDs to lists of memories in that cluster
        Cluster ID -1 represents "noise" (memories not belonging to any cluster)
    """
    # Step 1: Extract text content from all memories for embedding
    memory_texts = [mem.content for mem in memories]
    
    # Step 2: Generate embeddings for all memories in a single batch - Each memory becomes a high-dimensional vector capturing its semantic meaning
    memory_embeddings = embeddings_model.embed_documents(memory_texts)
    
    # Step 3: Convert list of embeddings to numpy matrix for sklearn - Shape will be (num_memories, embedding_dimension)
    embedding_matrix = np.array(memory_embeddings)
    
    # Step 4: Initialize DBSCAN with cosine distance metric - Cosine metric measures angle between vectors, matching our similarity approach
    # eps=0.3 means memories within 0.3 cosine distance are considered neighbors
    clustering = DBSCAN(
        eps=eps,  # Maximum distance for neighborhood
        min_samples=min_samples,  # Minimum cluster size
        metric='cosine'  # Use cosine distance (1 - cosine_similarity)
    )
    
    # Step 5: Fit the clustering model and get cluster labels for each memory
    # Labels are integers: 0, 1, 2, ... for clusters, -1 for noise points
    cluster_labels = clustering.fit_predict(embedding_matrix)
    
    # Step 6: Group memories by their assigned cluster label
    clusters = defaultdict(list)
    for memory, label in zip(memories, cluster_labels):
        # Add each memory to its cluster's list - Label -1 means this memory is "noise" - not part of any cluster
        clusters[int(label)].append(memory)
    
    return dict(clusters)

The `cluster_memories` function transforms memories into a format suitable for density-based clustering.
- The embedding generation converts each memory's text content into a high-dimensional vector where semantically similar memories have vectors pointing in similar directions.
- The numpy matrix conversion creates the required input format for sklearn's DBSCAN implementation.
- The cosine metric parameter ensures DBSCAN uses angular distance rather than Euclidean distance, which is more appropriate for comparing semantic embeddings since embedding magnitude is less meaningful than direction.
- The `fit_predict` method both fits the model to the data and returns cluster assignments in a single call.
- The resulting dictionary maps cluster IDs to memory lists, with the special ID -1 reserved for "noise" points that don't belong to any dense cluster region.

Once we have clusters of thematically related memories, we need a function to consolidate each cluster into a single comprehensive memory. Unlike pairwise merging where we combine just two highly similar memories, cluster consolidation handles groups of memories that share a common theme but may cover different aspects of that theme. For example, a cluster about "user account information" might contain memories about subscription tier, billing cycle, and account creation date - all related but distinct facts that should be preserved in the consolidated version.

The consolidation function will format all cluster memories for the LLM, construct a prompt that emphasizes preserving distinct facts while organizing them logically, and create a `ConsolidatedMemory` object that tracks which source memories were merged.

In [16]:
def consolidate_cluster(
    cluster_memories: List[MemoryEntry],
    llm: ChatOpenAI
) -> ConsolidatedMemory:
    """
    Consolidate a cluster of thematically related memories into a single entry.
    
    Args:
        cluster_memories: List of memories belonging to the same semantic cluster
        llm: Language model for intelligent consolidation
        
    Returns:
        Single ConsolidatedMemory representing the entire cluster
    """
    # Step 1: Format each memory in the cluster for LLM analysis - Include memory ID for traceability in the consolidation process
    memory_list = []
    for mem in cluster_memories:
        memory_list.append(f"- [{mem.id}] {mem.content}")
    
    # Step 2: Create a detailed prompt for thematic consolidation - This prompt differs from pairwise merging by emphasizing theme coherence
    consolidation_prompt = f"""You are consolidating thematically related memories for an AI agent.

Memories in this cluster:
{chr(10).join(memory_list)}

These memories are semantically related and belong to the same theme.

Create a comprehensive consolidated memory that:
1. Captures all distinct facts from individual memories
2. Organizes information logically by subtopic
3. Eliminates redundancy while preserving unique details
4. Remains clear and concise
5. Could replace all source memories without any information loss

Consolidated Memory:"""
    
    # Step 3: Generate consolidated content using the LLM
    response = llm.invoke([HumanMessage(content=consolidation_prompt)])
    consolidated_content = response.content.strip()
    
    # Step 4: Create the ConsolidatedMemory object with metadata - Use the most recent timestamp and highest importance from sources
    consolidated = ConsolidatedMemory(
        id=f"cluster_{cluster_memories[0].id}",  # ID based on first memory in cluster
        content=consolidated_content,  # LLM-generated merged content
        source_ids=[mem.id for mem in cluster_memories],  # Track all source memories
        timestamp=max(mem.timestamp for mem in cluster_memories),  # Most recent
        category=cluster_memories[0].category,  # Preserve category from cluster
        importance=max(mem.importance for mem in cluster_memories)  # Highest importance
    )
    
    return consolidated

The `consolidate_cluster` function handles the more complex case of merging multiple thematically related memories.
- The prompt construction emphasizes organizing information "logically by subtopic" since clusters may contain memories about different aspects of the same theme.
- The metadata aggregation uses `max()` for both timestamp and importance to ensure the consolidated memory reflects the most recent information and highest priority from any source.
- The `source_ids` list maintains complete audit trail of which original memories were combined, enabling investigation of consolidation decisions if needed.

Now let's apply clustering to our fragmented memories and see what natural groupings DBSCAN discovers. We will use an `eps` value of 0.1 which means memories within 0.1 cosine distance (roughly 90% similarity or higher) will be considered neighbors. The `min_samples` of 2 means any dense region with at least 2 memories forms a cluster. After identifying clusters, we will display each cluster's contents to understand what thematic groupings the algorithm found.

In [17]:
# Apply DBSCAN clustering to our fragmented memories
print("Clustering Fragmented Memories:")
print("="*80)

# Run the clustering algorithm with specified parameters
clusters = cluster_memories(
    fragmented_memories,
    embeddings,
    eps=0.1,  # Distance threshold - memories within 0.1 cosine distance are neighbors
    min_samples=2  # Minimum 2 memories required to form a cluster
)

print(f"\n✓ Clustering complete!")
print(f"  Found {len(clusters)} groups (including noise cluster if present)")

# Display each cluster and its contents
print("\n" + "-"*80)
print("\nDiscovered Clusters:")

for cluster_id, mems in clusters.items():
    if cluster_id == -1:
        # Cluster -1 contains "noise" - memories that don't fit any cluster
        print(f"\n[NOISE - Unclustered]: {len(mems)} memories")
        print("  These memories don't belong to any dense cluster region")
    else:
        # Regular cluster with thematically related memories
        print(f"\n[Cluster {cluster_id}]: {len(mems)} memories")
    
    # Show each memory in this cluster
    for mem in mems:
        print(f"  [{mem.id}] ({mem.category}) {mem.content}")

Clustering Fragmented Memories:

✓ Clustering complete!
  Found 3 groups (including noise cluster if present)

--------------------------------------------------------------------------------

Discovered Clusters:

[NOISE - Unclustered]: 5 memories
  These memories don't belong to any dense cluster region
  [mem_001] (preference) User prefers email communication over phone calls
  [mem_002] (preference) Customer likes to receive updates via email
  [mem_003] (preference) User requested email as primary contact method
  [mem_008] (personal) User lives in California, PST timezone
  [mem_009] (personal) Customer is located in California

[Cluster 0]: 2 memories
  [mem_004] (issue) User reported login issues on mobile app
  [mem_005] (issue) Customer had trouble signing in to the mobile application

[Cluster 1]: 2 memories
  [mem_006] (account) User's account is premium tier with annual subscription
  [mem_007] (account) Customer has premium membership, annual billing


Now we will consolidate each cluster into a single memory while keeping unclustered memories (noise) as standalone entries. This process iterates through all clusters, applies LLM-based consolidation to clusters with multiple memories, and builds a final consolidated memory store that is more compact and coherent than the original fragmented store.

In [18]:
# Consolidate each cluster into a single memory
print("Consolidating Clusters:")
print("="*80)

# Initialize list to store the final consolidated memory store
cluster_consolidated_store = []

# Process each cluster
for cluster_id, cluster_mems in clusters.items():
    if cluster_id == -1:
        # Noise memories (not part of any cluster) - keep them as standalone
        print(f"\n  Adding {len(cluster_mems)} unclustered memories as standalone entries")
        cluster_consolidated_store.extend(cluster_mems)
    elif len(cluster_mems) > 1:
        # Cluster with multiple memories - consolidate into single entry
        print(f"\n  Consolidating cluster {cluster_id} ({len(cluster_mems)} memories)...")
        consolidated = consolidate_cluster(cluster_mems, llm)
        cluster_consolidated_store.append(consolidated)
        print(f"    ✓ Merged into: {consolidated.content[:60]}...")
    else:
        # Single-memory "cluster" - keep as-is (shouldn't happen with min_samples=2)
        cluster_consolidated_store.extend(cluster_mems)

print(f"\n✓ Cluster consolidation complete!")

Consolidating Clusters:

  Adding 5 unclustered memories as standalone entries

  Consolidating cluster 0 (2 memories)...
    ✓ Merged into: **Consolidated Memory: Mobile App Login Issues**

**Overview...

  Consolidating cluster 1 (2 memories)...
    ✓ Merged into: **Consolidated Memory: User's Premium Membership Details**

...

✓ Cluster consolidation complete!


Let's examine the final consolidated memory store and calculate the reduction achieved through clustering-based consolidation. We will compare the original memory count and estimated tokens with the consolidated store to quantify the efficiency gains.

In [19]:
# Display the final consolidated memory store
print("Cluster-Consolidated Memory Store:")
print("="*80)

for i, mem in enumerate(cluster_consolidated_store, 1):
    if isinstance(mem, ConsolidatedMemory):
        # This is a consolidated memory from a cluster
        print(f"\n{i}. [CONSOLIDATED FROM CLUSTER]")
        print(f"   ID: {mem.id}")
        print(f"   Category: {mem.category}")
        print(f"   Sources: {len(mem.source_ids)} memories merged")
        print(f"   Source IDs: {', '.join(mem.source_ids)}")
        print(f"   Content: {mem.content}")
    else:
        # This is a standalone memory (was noise in clustering)
        print(f"\n{i}. [STANDALONE]")
        print(f"   ID: {mem.id}")
        print(f"   Category: {mem.category}")
        print(f"   Content: {mem.content}")

# Calculate and display consolidation statistics
print(f"\n{'='*80}")
print(f"\nCluster Consolidation Results:")
print(f"  Original memories: {len(fragmented_memories)}")
print(f"  After consolidation: {len(cluster_consolidated_store)}")
print(f"  Reduction: {len(fragmented_memories) - len(cluster_consolidated_store)} memories")
print(f"  Percentage: {(1 - len(cluster_consolidated_store)/len(fragmented_memories))*100:.1f}% reduction")

Cluster-Consolidated Memory Store:

1. [STANDALONE]
   ID: mem_001
   Category: preference
   Content: User prefers email communication over phone calls

2. [STANDALONE]
   ID: mem_002
   Category: preference
   Content: Customer likes to receive updates via email

3. [STANDALONE]
   ID: mem_003
   Category: preference
   Content: User requested email as primary contact method

4. [STANDALONE]
   ID: mem_008
   Category: personal
   Content: User lives in California, PST timezone

5. [STANDALONE]
   ID: mem_009
   Category: personal
   Content: Customer is located in California

6. [CONSOLIDATED FROM CLUSTER]
   ID: cluster_mem_004
   Category: issue
   Sources: 2 memories merged
   Source IDs: mem_004, mem_005
   Content: **Consolidated Memory: Mobile App Login Issues**

**Overview:**
Users have reported difficulties with signing in to the mobile application.

**Specific Issues:**
- Users have experienced login issues specifically on the mobile app.
- Customers have encountered troubl

The clustering-based consolidation uses DBSCAN algorithm applied to memory embeddings to discover natural semantic groupings. DBSCAN identifies "dense" regions in embedding space where many memories cluster together, automatically determining cluster count based on data density rather than requiring predetermined cluster numbers.

## Production memory consolidation system
For production deployments, we need a complete memory consolidation system that continuously monitors memory growth, automatically triggers consolidation when needed, applies appropriate strategies based on memory characteristics and maintains audit trails of consolidation operations. The system should balance consolidation aggressiveness with information preservation, provide configuration options for different use cases, and integrate seamlessly with the agent's memory management layer.

We will implement a production-ready consolidation manager that combines similarity detection, intelligent merging and clustering approaches with automatic triggering, comprehensive metrics tracking and configurable consolidation policies. This represents a complete solution for maintaining efficient, clean memory stores in long-running production agents.

Before building the consolidator class, we need a configuration system that controls consolidation behavior. The `ConsolidationPolicy` model defines thresholds and parameters that determine when and how consolidation occurs. This includes the similarity threshold for pairwise consolidation, whether to use clustering as a secondary strategy, DBSCAN parameters for clustering, and the memory count threshold that triggers automatic consolidation. By encapsulating these settings in a Pydantic model, we can easily create different policies for different use cases - more aggressive consolidation for memory-constrained agents or more conservative policies for high-precision applications where information loss must be minimized.

In [20]:
class ConsolidationPolicy(BaseModel):
    """
    Configuration model for memory consolidation behavior.
    This model defines all parameters that control when and how consolidation is triggered and executed. Different policies can be created for different use cases.
    """
    # Pairwise similarity settings
    similarity_threshold: float = Field(
        default=0.85,
        description="Minimum similarity score (0-1) for pairwise consolidation"
    )
    
    # Clustering settings
    enable_clustering: bool = Field(
        default=True,
        description="Whether to use clustering as secondary consolidation strategy"
    )
    cluster_eps: float = Field(
        default=0.3,
        description="DBSCAN eps parameter - max distance for cluster membership"
    )
    min_cluster_size: int = Field(
        default=2,
        description="Minimum number of memories required to form a cluster"
    )
    
    # Trigger settings
    consolidation_trigger_count: int = Field(
        default=20,
        description="Memory count threshold that triggers automatic consolidation"
    )

# Display the policy model structure
print("ConsolidationPolicy Model:")
print("="*80)
print(ConsolidationPolicy.model_json_schema())

ConsolidationPolicy Model:
{'description': 'Configuration model for memory consolidation behavior.\nThis model defines all parameters that control when and how consolidation is triggered and executed. Different policies can be created for different use cases.', 'properties': {'similarity_threshold': {'default': 0.85, 'description': 'Minimum similarity score (0-1) for pairwise consolidation', 'title': 'Similarity Threshold', 'type': 'number'}, 'enable_clustering': {'default': True, 'description': 'Whether to use clustering as secondary consolidation strategy', 'title': 'Enable Clustering', 'type': 'boolean'}, 'cluster_eps': {'default': 0.3, 'description': 'DBSCAN eps parameter - max distance for cluster membership', 'title': 'Cluster Eps', 'type': 'number'}, 'min_cluster_size': {'default': 2, 'description': 'Minimum number of memories required to form a cluster', 'title': 'Min Cluster Size', 'type': 'integer'}, 'consolidation_trigger_count': {'default': 20, 'description': 'Memory count 

Example Policy Configurations:
- Conservative (high precision): `similarity_threshold=0.95`, `enable_clustering=False`
- Balanced (default): `similarity_threshold=0.85`, `enable_clustering=True`
- Aggressive (memory constrained): `similarity_threshold=0.75`, `cluster_eps=0.4`

The `ConsolidationPolicy` uses Pydantic's Field descriptors to define typed parameters with default values and descriptions. 
- The `similarity_threshold` controls pairwise consolidation sensitivity - higher values (0.90+) only merge very similar memories while lower values (0.75) are more aggressive.
- The clustering parameters (`enable_clustering`, `cluster_eps`, `min_cluster_size`) control the secondary DBSCAN-based strategy applied to memories that were not caught by pairwise similarity.
- The `consolidation_trigger_count` determines when automatic consolidation activates, balancing between frequent small consolidations and less frequent larger ones.

Now we will build the `ProductionMemoryConsolidator` class that manages the memory store and handles automatic consolidation. The class maintains a list of memories, tracks metrics about consolidation operations, and provides methods for adding memories and triggering consolidation. The constructor accepts the LLM and embeddings models along with an optional policy configuration, initializing the internal state needed for memory management and metrics tracking.

In [21]:
class ProductionMemoryConsolidator:
    """
    Production-ready memory consolidation system with automatic triggering, multiple strategies, and comprehensive metrics tracking.
    This class manages the complete lifecycle of memory consolidation, from adding new memories to automatic trigger-based consolidation.
    """
    
    def __init__(
        self,
        llm: ChatOpenAI,
        embeddings: OpenAIEmbeddings,
        policy: Optional[ConsolidationPolicy] = None
    ):
        """
        Initialize the production consolidator.
        
        Args:
            llm: Language model for intelligent merging
            embeddings: Embedding model for similarity computation and clustering
            policy: Consolidation policy configuration (uses defaults if not provided)
        """
        # Store the models for later use in consolidation
        self.llm = llm
        self.embeddings = embeddings
        
        # Use provided policy or create default
        self.policy = policy or ConsolidationPolicy()
        
        # Initialize memory storage as empty list - Can contain both MemoryEntry and ConsolidatedMemory objects
        self.memories: List[Union[MemoryEntry, ConsolidatedMemory]] = []
        
        # Initialize metrics tracking dictionary - These metrics provide visibility into consolidation performance
        self.metrics = {
            'total_consolidations': 0,  # Number of times consolidation ran
            'memories_consolidated': 0,  # Total memories merged over time
            'tokens_saved': 0,  # Estimated token savings
            'last_consolidation': None  # Timestamp of last consolidation
        }
    
    def add_memory(self, memory: MemoryEntry) -> None:
        """
        Add a new memory to the store and trigger consolidation if threshold reached.
        This is the primary interface for adding memories. It automatically checks the policy threshold and triggers consolidation when needed.
        
        Args:
            memory: New memory entry to add
        """
        # Add memory to the store
        self.memories.append(memory)
        
        # Check if we've reached the consolidation trigger threshold
        if len(self.memories) >= self.policy.consolidation_trigger_count:
            # Threshold reached - trigger automatic consolidation
            self._auto_consolidate()
    
    def _estimate_tokens(self, memories: List[Any]) -> int:
        """
        Estimate total token count for a list of memories.
        Uses rough approximation of 1.3 tokens per word.
        
        Args:
            memories: List of memory objects with content attribute
            
        Returns:
            Estimated total token count
        """
        return sum(int(len(mem.content.split()) * 1.3) for mem in memories)

# Verify the class is partially defined
print("✓ ProductionMemoryConsolidator class initialized")

✓ ProductionMemoryConsolidator class initialized


- The class initialization stores references to the LLM and embeddings models needed for consolidation operations. The memories list uses a `Union` type hint to indicate it can contain both original `MemoryEntry` objects and `ConsolidatedMemory` objects after consolidation. The metrics dictionary tracks operational statistics that are useful for monitoring system health and understanding consolidation effectiveness over time.
- The `add_memory` method implements the automatic triggering logic, checking against the policy threshold after each addition.

The core of the production system is the `_auto_consolidate` method, which orchestrates the multi-strategy consolidation process. When triggered, it first converts any existing `ConsolidatedMemory` objects back to `MemoryEntry` format for uniform processing. Then it applies two consolidation strategies in sequence: first, pairwise similarity-based consolidation catches highly similar memories, and second, DBSCAN clustering catches thematically related memories that were not merged in the first pass. The method tracks pre and post metrics to calculate savings and updates the metrics dictionary for monitoring.

In [22]:
# Add the _auto_consolidate method to the class
def _auto_consolidate(self) -> None:
    """
    Automatically consolidate memories using multi-strategy approach.
    This method is called when the memory count exceeds the policy threshold. It applies pairwise similarity consolidation first, then clustering-based consolidation on remaining memories.
    """
    print(f"\n[Consolidation Triggered] {len(self.memories)} memories")
    
    # Step 1: Record pre-consolidation metrics for comparison
    pre_count = len(self.memories)
    pre_tokens = self._estimate_tokens(self.memories)
    
    # Step 2: Convert all memories to MemoryEntry for uniform processing - This handles the case where some memories are already ConsolidatedMemory
    memory_entries = [
        mem if isinstance(mem, MemoryEntry) else
        MemoryEntry(
            id=mem.id,
            content=mem.content,
            timestamp=mem.timestamp,
            category=mem.category,
            importance=mem.importance
        )
        for mem in self.memories
    ]
    
    # Step 3: Initialize result list and tracking set
    consolidated_memories = []
    consolidated_ids = set()  # Track which memories have been consolidated
    
    # Step 4: Strategy 1 - Pairwise similarity consolidation - Find and merge memories with similarity above threshold
    similarity_scores = compute_memory_similarity(memory_entries, self.embeddings)
    related_groups = find_related_memories(
        memory_entries,
        similarity_scores,
        self.policy.similarity_threshold
    )
    
    # Merge each group of similar memories
    for group in related_groups:
        group_memories = [m for m in memory_entries if m.id in group]
        if len(group_memories) > 1:
            merged = merge_memories(group_memories, self.llm)
            consolidated_memories.append(merged)
            consolidated_ids.update(group)
    
    # Step 5: Strategy 2 - Clustering-based consolidation (if enabled)
    if self.policy.enable_clustering:
        # Get memories not yet consolidated
        remaining_memories = [
            m for m in memory_entries if m.id not in consolidated_ids
        ]
        
        if len(remaining_memories) >= self.policy.min_cluster_size:
            # Apply DBSCAN clustering to remaining memories
            clusters = cluster_memories(
                remaining_memories,
                self.embeddings,
                eps=self.policy.cluster_eps,
                min_samples=self.policy.min_cluster_size
            )
            
            # Process each cluster
            for cluster_id, cluster_mems in clusters.items():
                if cluster_id != -1 and len(cluster_mems) > 1:
                    # Consolidate this cluster
                    cluster_consolidated = consolidate_cluster(cluster_mems, self.llm)
                    consolidated_memories.append(cluster_consolidated)
                    consolidated_ids.update(mem.id for mem in cluster_mems)
                else:
                    # Noise or single-memory cluster - keep as standalone
                    consolidated_memories.extend(cluster_mems)
        else:
            # Not enough remaining memories for clustering
            consolidated_memories.extend(remaining_memories)
    else:
        # Clustering disabled - add remaining memories as-is
        remaining = [m for m in memory_entries if m.id not in consolidated_ids]
        consolidated_memories.extend(remaining)
    
    # Step 6: Update memory store with consolidated results
    self.memories = consolidated_memories
    
    # Step 7: Calculate and record post-consolidation metrics
    post_count = len(self.memories)
    post_tokens = self._estimate_tokens(self.memories)
    
    # Update cumulative metrics
    self.metrics['total_consolidations'] += 1
    self.metrics['memories_consolidated'] += (pre_count - post_count)
    self.metrics['tokens_saved'] += (pre_tokens - post_tokens)
    self.metrics['last_consolidation'] = datetime.now().isoformat()
    
    # Print consolidation summary
    print(f"[Consolidation Complete]")
    print(f"  Before: {pre_count} memories (~{pre_tokens} tokens)")
    print(f"  After: {post_count} memories (~{post_tokens} tokens)")
    print(f"  Saved: {pre_count - post_count} memories, {pre_tokens - post_tokens} tokens")

# Attach method to class
ProductionMemoryConsolidator._auto_consolidate = _auto_consolidate

print("✓ _auto_consolidate method added to ProductionMemoryConsolidator")
print("  Implements multi-strategy consolidation (similarity + clustering)")
print("  Tracks metrics and provides consolidation summary")

✓ _auto_consolidate method added to ProductionMemoryConsolidator
  Implements multi-strategy consolidation (similarity + clustering)
  Tracks metrics and provides consolidation summary


The `_auto_consolidate` method implements a two-phase consolidation strategy.
- Phase one uses pairwise similarity to catch highly similar memories (those above the similarity threshold).
- Phase two applies DBSCAN clustering to remaining memories to catch thematically related groups that weren't merged in phase one.

The method converts any existing `ConsolidatedMemory` objects back to `MemoryEntry` format to ensure uniform processing - this handles the case where consolidation runs multiple times over the agent's lifetime. The metrics tracking captures cumulative statistics, making it easy to monitor long-term consolidation effectiveness.

Finally, we will add accessor methods that allow external code to retrieve the current memory store, get consolidation metrics, and manually trigger consolidation. These methods provide the interface for integrating the consolidator with the rest of an agent system and for monitoring/debugging purposes.

In [23]:
# Add accessor methods to the class
def get_memories(self) -> List[Union[MemoryEntry, ConsolidatedMemory]]:
    """
    Get the current memory store.
    
    Returns:
        List of all memories (both original and consolidated)
    """
    return self.memories

def get_metrics(self) -> Dict[str, Any]:
    """
    Get comprehensive consolidation metrics.
    Returns current metrics plus real-time statistics about the memory store state.
    
    Returns:
        Dictionary containing consolidation statistics
    """
    return {
        **self.metrics,  # Include all tracked metrics
        'current_memory_count': len(self.memories),  # Current store size
        'current_token_estimate': self._estimate_tokens(self.memories)  # Current tokens
    }

def force_consolidate(self) -> None:
    """
    Manually trigger consolidation regardless of policy threshold.
    
    Useful for:
    - Testing consolidation behavior
    - Forcing cleanup before context-critical operations
    - Scheduled maintenance consolidation
    """
    self._auto_consolidate()

# Attach methods to class
ProductionMemoryConsolidator.get_memories = get_memories
ProductionMemoryConsolidator.get_metrics = get_metrics
ProductionMemoryConsolidator.force_consolidate = force_consolidate

print("✓ Accessor methods added to ProductionMemoryConsolidator")
print("  get_memories: Returns current memory store")
print("  get_metrics: Returns consolidation statistics")
print("  force_consolidate: Manually triggers consolidation")

✓ Accessor methods added to ProductionMemoryConsolidator
  get_memories: Returns current memory store
  get_metrics: Returns consolidation statistics
  force_consolidate: Manually triggers consolidation


Now let's demonstrate the production consolidator in action. We will create a consolidator with a custom policy that has a lower trigger threshold to demonstrate the automatic consolidation behavior. As we add our fragmented memories one by one, the system will automatically trigger consolidation once the threshold is reached, applying both similarity and clustering strategies to reduce memory redundancy.

In [24]:
# Create a production consolidator with custom policy
print("Production Memory Consolidation Demo:")
print("="*80)

# Define a custom policy with lower threshold for demonstration
policy = ConsolidationPolicy(
    similarity_threshold=0.85,  # Standard similarity threshold
    enable_clustering=True,  # Enable clustering as secondary strategy
    cluster_eps=0.3,  # DBSCAN distance threshold
    min_cluster_size=2,  # Minimum cluster size
    consolidation_trigger_count=8  # Lower threshold to trigger during demo
)

print(f"\nPolicy Configuration:")
print(f"  Similarity threshold: {policy.similarity_threshold}")
print(f"  Clustering enabled: {policy.enable_clustering}")
print(f"  Trigger count: {policy.consolidation_trigger_count}")

# Initialize the consolidator
consolidator = ProductionMemoryConsolidator(llm, embeddings, policy)

print(f"\n{'='*80}")
print("\nAdding memories to the system...")
print("-"*80)

# Add fragmented memories one by one - Consolidation will trigger automatically when count reaches 8
for mem in fragmented_memories:
    consolidator.add_memory(mem)
    print(f"  Added: [{mem.id}] {mem.content[:50]}...")

Production Memory Consolidation Demo:

Policy Configuration:
  Similarity threshold: 0.85
  Clustering enabled: True
  Trigger count: 8


Adding memories to the system...
--------------------------------------------------------------------------------
  Added: [mem_001] User prefers email communication over phone calls...
  Added: [mem_002] Customer likes to receive updates via email...
  Added: [mem_003] User requested email as primary contact method...
  Added: [mem_004] User reported login issues on mobile app...
  Added: [mem_005] Customer had trouble signing in to the mobile appl...
  Added: [mem_006] User's account is premium tier with annual subscri...
  Added: [mem_007] Customer has premium membership, annual billing...

[Consolidation Triggered] 8 memories
[Consolidation Complete]
  Before: 8 memories (~71 tokens)
  After: 4 memories (~64 tokens)
  Saved: 4 memories, 7 tokens
  Added: [mem_008] User lives in California, PST timezone...
  Added: [mem_009] Customer is located in

Let's examine the final state of the memory store after automatic consolidation, and review the metrics to understand the efficiency gains achieved.

In [25]:
# Get the final state after consolidation
final_memories = consolidator.get_memories()
metrics = consolidator.get_metrics()

print("Final Memory Store:")
print("="*80)

# Display each memory in the consolidated store
for i, mem in enumerate(final_memories, 1):
    if isinstance(mem, ConsolidatedMemory):
        # Consolidated memory - show source information
        print(f"\n{i}. [CONSOLIDATED] {mem.id}")
        print(f"   Category: {mem.category}")
        print(f"   Importance: {mem.importance}")
        print(f"   Sources: {len(mem.source_ids)} memories merged")
        print(f"   Source IDs: {', '.join(mem.source_ids)}")
        print(f"   Content: {mem.content}")
    else:
        # Standalone memory
        print(f"\n{i}. [STANDALONE] {mem.id}")
        print(f"   Category: {mem.category}")
        print(f"   Content: {mem.content}")

# Display consolidation metrics
print(f"\n{'='*80}")
print("\nConsolidation Metrics:")
print("-"*80)
for key, value in metrics.items():
    # Format key for display (replace underscores, title case)
    display_key = key.replace('_', ' ').title()
    print(f"  {display_key}: {value}")

Final Memory Store:

1. [CONSOLIDATED] consolidated_mem_003
   Category: preference
   Importance: 0.8
   Sources: 3 memories merged
   Source IDs: mem_001, mem_002, mem_003
   Content: [mem_consolidated] (importance: 0.8, 2024-11-05T09:15:00Z): User prefers email communication as the primary contact method and likes to receive updates via email.

2. [CONSOLIDATED] consolidated_mem_005
   Category: issue
   Importance: 0.9
   Sources: 2 memories merged
   Source IDs: mem_004, mem_005
   Content: [mem_consolidated] (importance: 0.9, 2024-11-02T11:45:00Z): Customer reported login issues on the mobile application.

3. [CONSOLIDATED] consolidated_mem_007
   Category: account
   Importance: 0.8
   Sources: 2 memories merged
   Source IDs: mem_006, mem_007
   Content: [mem_consolidated] (importance: 0.8, 2024-11-04T16:00:00Z): Customer has a premium membership with an annual subscription.

4. [STANDALONE] mem_008
   Category: personal
   Content: User lives in California, PST timezone

5. [S

The `ProductionMemoryConsolidator` implements a complete memory management system with automatic consolidation triggering and multi-strategy processing. The system monitors memory count and automatically triggers consolidation when exceeding the policy threshold, applying both pairwise similarity detection and clustering-based grouping in sequence to maximize consolidation opportunities.
- The `ConsolidationPolicy` model provides configuration for similarity thresholds, clustering parameters, and trigger conditions, allowing customization for different use cases - stricter policies for high-precision systems or more aggressive consolidation for memory-constrained environments.
- The metrics tracking provides visibility into consolidation frequency, memory reduction and token savings over time. The system maintains audit trails through `source_ids` in consolidated memories, enabling investigation of consolidation decisions.

This production-ready implementation provides the foundation for maintaining efficient, clean memory stores in long-running agents that accumulate knowledge over weeks or months, preventing memory fragmentation from degrading agent performance or exhausting context budgets.