diff --git a/python/examples/langgraph_chronological.py b/python/examples/langgraph_chronological.py deleted file mode 100644 index 27b860e..0000000 --- a/python/examples/langgraph_chronological.py +++ /dev/null @@ -1,1257 +0,0 @@ -#!/usr/bin/env python3 - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Complete LangGraph + ProllyTree Integration: Production-Ready Memory System - -This example demonstrates a production-ready memory system that combines: -1. Structured memory extraction using LangGraph's patterns -2. Vector embeddings for semantic search (mock or real) -3. ProllyTree for git-like version control -4. Entity tracking with complete history -5. Hybrid retrieval combining semantic and versioned data - -Architecture: -┌─────────────────────────────────────────────────────────────────────────┐ -│ Production Memory System Architecture │ -└─────────────────────────────────────────────────────────────────────────┘ - -┌─────────────────────────────────────────────────────────────────────────┐ -│ Memory Processing Pipeline │ -│ │ -│ User Message → Extract Memories → Generate Embeddings → Store Both │ -│ ↓ ↓ ↓ │ -│ (structured data) (vector search) (version control)│ -└─────────────────────────────────────────────────────────────────────────┘ - -Key Components: -• MemoryConfig: Defines extraction schemas (patch/insert modes) -• HybridMemoryService: Combines vector and versioned storage -• MemoryGraph: LangGraph workflow for memory processing -• Entity tracking with complete audit trail -""" - -import hashlib -import json -import os -import subprocess -import tempfile -import uuid -from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Tuple, Annotated, Literal - -import numpy as np -from langchain_core.messages import HumanMessage, AIMessage -from langchain_core.pydantic_v1 import BaseModel, Field -from langgraph.graph import StateGraph, START, END -from langgraph.graph.message import add_messages -from typing_extensions import TypedDict - -# ProllyTree imports -from prollytree import VersionedKvStore - -# For embeddings -try: - from langchain_openai import OpenAIEmbeddings - OPENAI_AVAILABLE = True -except ImportError: - OPENAI_AVAILABLE = False - -# For diagram visualization -try: - from IPython.display import display, Image - IPYTHON_AVAILABLE = True -except ImportError: - IPYTHON_AVAILABLE = False - - -# ============================================================================ -# Schema Definitions (following langgraph-memory patterns) -# ============================================================================ - -class UserProfile(BaseModel): - """User profile schema for patch-based memory.""" - name: Optional[str] = Field(None, description="User's name") - preferences: Dict[str, Any] = Field(default_factory=dict, description="User preferences") - interests: List[str] = Field(default_factory=list, description="User interests") - context: Dict[str, Any] = Field(default_factory=dict, description="Additional context") - - -class ConversationEvent(BaseModel): - """Event schema for insert-based memory.""" - event_type: str = Field(..., description="Type of event (query, fact, preference)") - content: str = Field(..., description="Event content") - entities: List[str] = Field(default_factory=list, description="Referenced entities") - timestamp: str = Field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat()) - metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") - - -class FunctionSchema(TypedDict): - """Function schema for memory extraction.""" - name: str - description: str - parameters: dict - - -class MemoryConfig(TypedDict, total=False): - """Configuration for memory extraction.""" - function: FunctionSchema - system_prompt: Optional[str] - update_mode: Literal["patch", "insert"] - - -# ============================================================================ -# State Definitions -# ============================================================================ - -class MemoryState(TypedDict): - """State for memory processing workflow.""" - messages: Annotated[List, add_messages] - user_id: str - thread_id: str - extracted_memories: List[BaseModel] - semantic_results: List[Tuple[str, float]] - entity_contexts: Dict[str, Any] - context_quality_score: float - enhancement_iterations: int - max_iterations: int - context_sufficiency: str # "sufficient" | "needs_enhancement" | "poor" - detailed_context: Dict[str, Any] - final_response: str - - -# ============================================================================ -# Mock Components (replace with real implementations) -# ============================================================================ - -class MockEmbeddings: - """Mock embeddings for demonstration.""" - - def embed_text(self, text: str) -> List[float]: - """Generate a mock embedding vector.""" - hash_obj = hashlib.md5(text.encode()) - hash_hex = hash_obj.hexdigest() - np.random.seed(int(hash_hex[:8], 16)) - return np.random.randn(384).tolist() - - def embed_documents(self, texts: List[str]) -> List[List[float]]: - """Embed multiple documents.""" - return [self.embed_text(text) for text in texts] - - def similarity(self, vec1: List[float], vec2: List[float]) -> float: - """Calculate cosine similarity.""" - vec1 = np.array(vec1) - vec2 = np.array(vec2) - return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))) - - -class MockLLM: - """Mock LLM for memory extraction.""" - - def extract_memories(self, messages: List, schema: FunctionSchema) -> List[BaseModel]: - """Extract memories based on schema.""" - # Mock extraction logic - results = [] - - for msg in messages: - if isinstance(msg, HumanMessage): - content = msg.content.lower() - - if schema["name"] == "UserProfile": - # Extract user profile information - profile = UserProfile( - name="User" if "i" in content else None, - preferences={"communication": "detailed"} if "prefer" in content else {}, - interests=["technology"] if "tech" in content else [], - context={"last_topic": content[:50]} - ) - results.append(profile) - - elif schema["name"] == "ConversationEvent": - # Extract conversation events - event_type = "query" if "?" in content else "fact" - entities = [] - - # Simple entity extraction - if "product" in content: - entities.append("product:general") - if "user" in content or "customer" in content: - entities.append("user:mentioned") - - event = ConversationEvent( - event_type=event_type, - content=content[:200], - entities=entities - ) - results.append(event) - - return results - - -# ============================================================================ -# Hybrid Memory Service -# ============================================================================ - -class HybridMemoryService: - """ - Production-ready memory service combining vector search and version control. - - This service implements the patterns from langgraph-memory with ProllyTree backend. - """ - - def __init__(self, store_path: str): - """Initialize the hybrid memory service.""" - # Create subdirectory for ProllyTree - self.store_path = store_path - store_subdir = os.path.join(store_path, "memory_data") - os.makedirs(store_subdir, exist_ok=True) - - # Initialize git repo if needed - if not os.path.exists(os.path.join(store_path, '.git')): - subprocess.run(["git", "init", "--quiet"], cwd=store_path, check=True) - subprocess.run(["git", "config", "user.name", "Memory Service"], cwd=store_path, check=True) - subprocess.run(["git", "config", "user.email", "memory@example.com"], cwd=store_path, check=True) - - # Initialize ProllyTree store - self.kv_store = VersionedKvStore(store_subdir) - - # Initialize embeddings (use OpenAI if available) - api_key = os.getenv("OPENAI_API_KEY", "") - if OPENAI_AVAILABLE and api_key and api_key.startswith("sk-") and not api_key.startswith(("mock", "test")): - try: - # Test the embeddings with a simple call - test_embeddings = OpenAIEmbeddings(model="text-embedding-3-small") - test_embeddings.embed_query("test") # Test the connection - self.embeddings = test_embeddings - print("✅ Using OpenAI embeddings (text-embedding-3-small)") - except Exception as e: - print(f"⚠️ OpenAI embeddings failed: {e}") - print("🔄 Falling back to mock embeddings") - self.embeddings = MockEmbeddings() - else: - self.embeddings = MockEmbeddings() - if api_key in ["mock", "test"] or api_key.startswith("test"): - print("🔄 Using mock embeddings (mock/test API key detected)") - else: - print("🔄 Using mock embeddings (no valid OpenAI API key)") - - # Initialize LLM - self.llm = MockLLM() - - # In-memory vector store (replace with Pinecone/Weaviate in production) - self.vector_store: Dict[str, Tuple[List[float], Dict[str, Any]]] = {} - - # Memory configurations - self.memory_configs = self._create_memory_configs() - - print(f"✅ Initialized hybrid memory service at {store_subdir}") - - def _create_memory_configs(self) -> Dict[str, MemoryConfig]: - """Create memory extraction configurations.""" - return { - "user_profile": MemoryConfig( - function=FunctionSchema( - name="UserProfile", - description="Extract user profile information", - parameters={ - "type": "object", - "properties": { - "name": {"type": "string"}, - "preferences": {"type": "object"}, - "interests": {"type": "array", "items": {"type": "string"}} - } - } - ), - system_prompt="Extract user profile information from the conversation", - update_mode="patch" - ), - "conversation_events": MemoryConfig( - function=FunctionSchema( - name="ConversationEvent", - description="Extract conversation events", - parameters={ - "type": "object", - "properties": { - "event_type": {"type": "string"}, - "content": {"type": "string"}, - "entities": {"type": "array", "items": {"type": "string"}} - }, - "required": ["event_type", "content"] - } - ), - system_prompt="Extract important events from the conversation", - update_mode="insert" - ) - } - - def extract_and_store(self, messages: List, user_id: str, thread_id: str) -> Dict[str, List[BaseModel]]: - """Extract and store memories from messages.""" - extracted = {} - - for config_name, config in self.memory_configs.items(): - # Extract memories using schema - memories = self.llm.extract_memories(messages, config["function"]) - extracted[config_name] = memories - - if config["update_mode"] == "patch": - # Patch mode: update single document - self._store_patch_memory(memories, user_id, config_name) - else: - # Insert mode: add new documents - self._store_insert_memories(memories, user_id, config_name) - - return extracted - - def _store_patch_memory(self, memories: List[BaseModel], user_id: str, config_name: str): - """Store memories in patch mode (single document update).""" - if not memories: - return - - # Use the last memory as the current state - memory = memories[-1] - key = f"patch:{user_id}:{config_name}".encode('utf-8') - value = memory.json().encode('utf-8') - - # Check if exists - existing = self.kv_store.get(key) - if existing: - self.kv_store.update(key, value) - print(f" 📝 Updated {config_name} for {user_id}") - else: - self.kv_store.insert(key, value) - print(f" ➕ Created {config_name} for {user_id}") - - # Store in vector store - memory_text = memory.json() - if hasattr(self.embeddings, 'embed_query'): - # OpenAI embeddings - embedding = self.embeddings.embed_query(memory_text) - else: - # Mock embeddings - embedding = self.embeddings.embed_text(memory_text) - self.vector_store[f"patch:{user_id}:{config_name}"] = (embedding, memory.dict()) - - # Commit - self.kv_store.commit(f"Updated {config_name} for user {user_id}") - - def _store_insert_memories(self, memories: List[BaseModel], user_id: str, config_name: str): - """Store memories in insert mode (append new documents).""" - for memory in memories: - # Generate unique ID - memory_id = str(uuid.uuid4())[:8] - key = f"insert:{user_id}:{config_name}:{memory_id}".encode('utf-8') - value = memory.json().encode('utf-8') - - # Insert into ProllyTree - self.kv_store.insert(key, value) - print(f" ➕ Inserted {config_name} event {memory_id}") - - # Store in vector store - memory_text = memory.json() - if hasattr(self.embeddings, 'embed_query'): - # OpenAI embeddings - embedding = self.embeddings.embed_query(memory_text) - else: - # Mock embeddings - embedding = self.embeddings.embed_text(memory_text) - self.vector_store[f"insert:{user_id}:{config_name}:{memory_id}"] = (embedding, memory.dict()) - - if memories: - self.kv_store.commit(f"Added {len(memories)} {config_name} events for user {user_id}") - - def semantic_search(self, query: str, user_id: Optional[str] = None, top_k: int = 5) -> List[Tuple[str, float, Dict]]: - """Search for semantically similar memories.""" - # Generate query embedding - if hasattr(self.embeddings, 'embed_query'): - # OpenAI embeddings - query_embedding = self.embeddings.embed_query(query) - else: - # Mock embeddings - query_embedding = self.embeddings.embed_text(query) - - results = [] - for key, (embedding, data) in self.vector_store.items(): - # Filter by user if specified - if user_id and f":{user_id}:" not in key: - continue - - # Calculate similarity - if hasattr(self.embeddings, 'similarity'): - # Mock embeddings have similarity method - similarity = self.embeddings.similarity(query_embedding, embedding) - else: - # Calculate cosine similarity for OpenAI embeddings - query_vec = np.array(query_embedding) - embed_vec = np.array(embedding) - similarity = float(np.dot(query_vec, embed_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(embed_vec))) - results.append((key, similarity, data)) - - # Sort by similarity - results.sort(key=lambda x: x[1], reverse=True) - return results[:top_k] - - def get_entity_history(self, entity_key: str) -> List[Dict[str, Any]]: - """Get detailed version history for a specific entity.""" - try: - # Get commits that specifically affected this entity key - key_bytes = entity_key.encode('utf-8') if isinstance(entity_key, str) else entity_key - key_commits = self.kv_store.get_commits_for_key(key_bytes) - - history = [] - for commit in key_commits: - history.append({ - 'commit_id': commit['id'][:8], - 'full_commit_id': commit['id'], - 'timestamp': datetime.fromtimestamp(commit['timestamp']).isoformat(), - 'message': commit['message'], - 'author': commit['author'], - 'committer': commit['committer'] - }) - - return history - except Exception as e: - print(f"⚠️ Error getting detailed entity history for {entity_key}: {e}") - # Fallback to general commit history - commits = self.kv_store.log() - - history = [] - for commit in commits: - history.append({ - 'commit_id': commit['id'][:8], - 'timestamp': datetime.fromtimestamp(commit['timestamp']).isoformat(), - 'message': commit['message'] - }) - - return history - - def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: - """Get current user profile.""" - key = f"patch:{user_id}:user_profile".encode('utf-8') - data = self.kv_store.get(key) - if data: - return json.loads(data.decode('utf-8')) - return None - - def get_user_events(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]: - """Get recent events for a user.""" - events = [] - keys = self.kv_store.list_keys() - - for key in keys: - key_str = key.decode('utf-8') - if f"insert:{user_id}:conversation_events:" in key_str: - data = self.kv_store.get(key) - if data: - event = json.loads(data.decode('utf-8')) - events.append(event) - - # Sort by timestamp - events.sort(key=lambda x: x.get('timestamp', ''), reverse=True) - return events[:limit] - - def enhanced_semantic_search(self, query: str, user_id: Optional[str] = None, - top_k: int = 5, expand_context: bool = True, iteration: int = 0) -> List[Tuple[str, float, Dict]]: - """Enhanced semantic search with context expansion that improves with iterations.""" - # Start with basic semantic search - base_top_k = top_k if not expand_context else top_k + iteration * 2 # More results in later iterations - initial_results = self.semantic_search(query, user_id, base_top_k) - - if not expand_context: - return initial_results - - # Extract related entities and expand search - expanded_results = list(initial_results) - related_entities = set() - - for key, similarity, data in initial_results: - if 'entities' in data: - related_entities.update(data['entities']) - - # In later iterations, be more aggressive about finding related content - if iteration > 0: - # Extract keywords from the query for broader search - query_keywords = [word.lower() for word in query.split() if len(word) > 3] - - # Search for each keyword to find more context - for keyword in query_keywords[:3]: # Limit to top 3 keywords - keyword_results = self.semantic_search(keyword, user_id, top_k=2) - for result in keyword_results: - if result not in expanded_results: - # Boost similarity slightly for keyword matches in later iterations - key, similarity, data = result - boosted_similarity = min(similarity + 0.1 * iteration, 1.0) - expanded_results.append((key, boosted_similarity, data)) - - # Search for related entities (more aggressive in later iterations) - entity_search_limit = 1 + iteration # Search more entities in later iterations - for entity in list(related_entities)[:entity_search_limit]: - entity_results = self.semantic_search(entity, user_id, top_k=2) - for result in entity_results: - if result not in expanded_results: - # Boost entity-related results in later iterations - key, similarity, data = result - boosted_similarity = min(similarity + 0.05 * iteration, 1.0) - expanded_results.append((key, boosted_similarity, data)) - - # Re-sort and limit (return more results in later iterations) - expanded_results.sort(key=lambda x: x[1], reverse=True) - result_limit = top_k * (2 + iteration) # Progressive expansion - return expanded_results[:result_limit] - - def get_contextual_threads(self, user_id: str, query: str) -> List[Dict[str, Any]]: - """Get conversation threads related to the query.""" - # Find all conversation events for the user - events = self.get_user_events(user_id, limit=50) - - # Filter events related to the query using simple keyword matching - related_events = [] - query_words = set(query.lower().split()) - - for event in events: - event_words = set(event.get('content', '').lower().split()) - # Calculate word overlap - overlap = len(query_words.intersection(event_words)) - if overlap > 0: - event['relevance_score'] = overlap / len(query_words) - related_events.append(event) - - # Sort by relevance - related_events.sort(key=lambda x: x.get('relevance_score', 0), reverse=True) - return related_events[:10] - - def assess_context_quality(self, context_data: Dict[str, Any], query: str) -> Dict[str, Any]: - """Assess the quality and completeness of retrieved context.""" - quality_score = 0.0 - assessment = { - 'score': 0.0, - 'completeness': 'low', - 'relevance': 'low', - 'freshness': 'low', - 'suggestions': [] - } - - # Check semantic results quality (more generous scoring to show progress) - semantic_results = context_data.get('semantic_results', []) - if semantic_results: - avg_similarity = sum(result[1] for result in semantic_results) / len(semantic_results) - # More generous scoring to show meaningful differences - quality_score += min(avg_similarity * 300, 40) # Max 40 points, more sensitive to changes - - # Add bonus for number of results (shows context expansion working) - result_count_bonus = min(len(semantic_results) * 2, 10) # Max 10 points for result count - quality_score += result_count_bonus - - if avg_similarity > 0.2: # Lower threshold for high - assessment['relevance'] = 'high' - elif avg_similarity > 0.05: # Lower threshold for medium - assessment['relevance'] = 'medium' - - # Check entity context depth - entity_contexts = context_data.get('entity_contexts', {}) - if entity_contexts: - total_versions = sum(ctx.get('version_count', 0) for ctx in entity_contexts.values()) - quality_score += min(total_versions * 5, 30) # Max 30 points - - if total_versions > 10: - assessment['completeness'] = 'high' - elif total_versions > 3: - assessment['completeness'] = 'medium' - - # Check recent events - recent_events = context_data.get('recent_events', []) - if recent_events: - quality_score += min(len(recent_events) * 5, 20) # Max 20 points - assessment['freshness'] = 'high' if len(recent_events) > 3 else 'medium' - - # Check user profile completeness - user_profile = context_data.get('user_profile', {}) - if user_profile: - profile_fields = ['name', 'preferences', 'interests', 'context'] - filled_fields = sum(1 for field in profile_fields if user_profile.get(field)) - quality_score += filled_fields * 2.5 # Max 10 points - - assessment['score'] = quality_score - - # Generate improvement suggestions - if assessment['relevance'] == 'low': - assessment['suggestions'].append('Expand semantic search with related terms') - if assessment['completeness'] == 'low': - assessment['suggestions'].append('Retrieve more historical context for entities') - if assessment['freshness'] == 'low': - assessment['suggestions'].append('Include more recent conversation history') - - return assessment - - -# ============================================================================ -# Enhanced LangGraph Workflow Nodes with Loops and Intelligence -# ============================================================================ - -def initialize_workflow_node(state: MemoryState) -> Dict: - """Initialize workflow state with default values.""" - print("\n🚀 Initializing enhanced memory workflow...") - - return { - "context_quality_score": 0.0, - "enhancement_iterations": 0, - "max_iterations": 3, - "context_sufficiency": "needs_assessment", - "detailed_context": {}, - "final_response": "" - } - - -def extract_memories_node(state: MemoryState, service: HybridMemoryService) -> Dict: - """Extract memories from conversation with enhanced tracking.""" - messages = state["messages"] - user_id = state["user_id"] - thread_id = state["thread_id"] - iteration = state.get("enhancement_iterations", 0) - - print(f"\n📝 [Iteration {iteration + 1}] Extracting memories for user {user_id}...") - - # Extract and store memories - extracted = service.extract_and_store(messages, user_id, thread_id) - - # Flatten extracted memories - all_memories = [] - for memories in extracted.values(): - all_memories.extend(memories) - - print(f" ✅ Extracted {len(all_memories)} memories") - - return { - "extracted_memories": all_memories, - "messages": [AIMessage(content=f"Extracted {len(all_memories)} memories (iteration {iteration + 1})")] - } - - -def enhanced_semantic_search_node(state: MemoryState, service: HybridMemoryService) -> Dict: - """Perform enhanced semantic search with context expansion.""" - messages = state["messages"] - user_id = state["user_id"] - iteration = state.get("enhancement_iterations", 0) - - # Get last human message as query - query = None - for msg in reversed(messages): - if isinstance(msg, HumanMessage): - query = msg.content - break - - if not query: - return {"semantic_results": []} - - print(f"\n🔍 [Iteration {iteration + 1}] Enhanced semantic search for: {query[:50]}...") - - # Use enhanced search with context expansion - expand_context = iteration > 0 # Expand context in subsequent iterations - results = service.enhanced_semantic_search(query, user_id, top_k=5, expand_context=expand_context, iteration=iteration) - - # Format results - semantic_results = [] - for key, similarity, data in results: - print(f" 📄 Found (similarity: {similarity:.2f}): {key}") - semantic_results.append((key, similarity)) - - # Get contextual threads - contextual_threads = service.get_contextual_threads(user_id, query) - if contextual_threads: - print(f" 🧵 Found {len(contextual_threads)} related conversation threads") - - return { - "semantic_results": semantic_results, - "contextual_threads": contextual_threads, - "messages": [AIMessage(content=f"Enhanced search found {len(semantic_results)} memories + {len(contextual_threads)} threads")] - } - - -def deep_entity_analysis_node(state: MemoryState, service: HybridMemoryService) -> Dict: - """Perform deep analysis of entities with version tracking.""" - extracted_memories = state.get("extracted_memories", []) - semantic_results = state.get("semantic_results", []) - iteration = state.get("enhancement_iterations", 0) - - print(f"\n🔬 [Iteration {iteration + 1}] Deep entity analysis...") - - entity_contexts = {} - detailed_context = state.get("detailed_context", {}) - - # Extract entity references from memories - entities = set() - for memory in extracted_memories: - if hasattr(memory, 'entities'): - entities.update(memory.entities) - - # Also extract entities from semantic search results - for result in semantic_results: - if len(result) == 2: - key, similarity = result - # Try to get data from vector store - if key in service.vector_store: - _, data = service.vector_store[key] - if 'entities' in data: - entities.update(data['entities']) - elif len(result) == 3: - key, similarity, data = result - if 'entities' in data: - entities.update(data['entities']) - - # Get detailed context for each entity - for entity in list(entities)[:5]: # Increased limit for deeper analysis - history = service.get_entity_history(entity) - - # Get more detailed entity information in later iterations - if iteration > 0: - # Simulate getting richer entity context - additional_context = { - 'related_topics': [f"topic_{i}" for i in range(min(3, len(history)))], - 'interaction_frequency': len(history), - 'last_interaction': history[0]['timestamp'] if history else None - } - else: - additional_context = {} - - entity_contexts[entity] = { - 'history': history, - 'version_count': len(history), - 'additional_context': additional_context - } - print(f" 📊 Entity {entity}: {len(history)} versions" + - (f" + enhanced context" if additional_context else "")) - - # Store detailed context for quality assessment - detailed_context.update({ - 'entity_contexts': entity_contexts, - 'semantic_results': semantic_results, - 'contextual_threads': state.get("contextual_threads", []) - }) - - return { - "entity_contexts": entity_contexts, - "detailed_context": detailed_context, - "messages": [AIMessage(content=f"Deep analysis: {len(entity_contexts)} entities analyzed")] - } - - -def assess_context_quality_node(state: MemoryState, service: HybridMemoryService) -> Dict: - """Assess the quality of retrieved context and decide if enhancement is needed.""" - detailed_context = state.get("detailed_context", {}) - iteration = state.get("enhancement_iterations", 0) - max_iterations = state.get("max_iterations", 3) - - # Get query for assessment - query = None - for msg in reversed(state["messages"]): - if isinstance(msg, HumanMessage): - query = msg.content - break - - print(f"\n📊 [Iteration {iteration + 1}] Assessing context quality...") - - # Add user profile and recent events to context - user_id = state["user_id"] - detailed_context['user_profile'] = service.get_user_profile(user_id) or {} - detailed_context['recent_events'] = service.get_user_events(user_id, limit=10) - - # Assess context quality - assessment = service.assess_context_quality(detailed_context, query or "") - quality_score = assessment['score'] - - print(f" 📈 Context Quality Score: {quality_score:.1f}/100") - print(f" 📋 Relevance: {assessment['relevance']}, Completeness: {assessment['completeness']}, Freshness: {assessment['freshness']}") - - # Determine if we need more enhancement (more reasonable thresholds) - if quality_score >= 50: # Lower threshold to show completion - context_sufficiency = "sufficient" - print(" ✅ Context quality is sufficient for response generation") - elif iteration >= max_iterations - 1: - context_sufficiency = "sufficient" # Force completion after max iterations - print(" ⏰ Maximum iterations reached, proceeding with available context") - else: - context_sufficiency = "needs_enhancement" - print(" 🔄 Context needs enhancement, will iterate") - if assessment['suggestions']: - print(f" 💡 Suggestions: {', '.join(assessment['suggestions'])}") - - return { - "context_quality_score": quality_score, - "context_sufficiency": context_sufficiency, - "detailed_context": detailed_context, - "quality_assessment": assessment, - "messages": [AIMessage(content=f"Quality assessment: {quality_score:.1f}/100 - {context_sufficiency}")] - } - - -def context_enhancement_loop_node(state: MemoryState) -> Dict: - """Increment iteration counter for enhancement loop.""" - current_iteration = state.get("enhancement_iterations", 0) - new_iteration = current_iteration + 1 - - print(f"\n🔄 Context enhancement loop: Starting iteration {new_iteration}") - - return { - "enhancement_iterations": new_iteration, - "messages": [AIMessage(content=f"Enhancement iteration {new_iteration} started")] - } - - -def generate_enhanced_response_node(state: MemoryState, service: HybridMemoryService) -> Dict: - """Generate comprehensive response with all enhanced context.""" - user_id = state["user_id"] - detailed_context = state.get("detailed_context", {}) - quality_assessment = state.get("quality_assessment", {}) - iteration = state.get("enhancement_iterations", 0) - - print(f"\n💬 [Final] Generating enhanced response with {iteration} iterations of context...") - - response_parts = [] - - # Header with context quality information - quality_score = state.get("context_quality_score", 0) - response_parts.append(f"🎯 Enhanced Response (Context Quality: {quality_score:.1f}/100, {iteration} iterations)") - response_parts.append("=" * 60) - - # User profile with more detail - user_profile = detailed_context.get('user_profile', {}) - if user_profile: - response_parts.append(f"\n👤 User Profile ({user_id}):") - response_parts.append(f" Name: {user_profile.get('name', 'Unknown')}") - if user_profile.get('preferences'): - response_parts.append(f" Preferences: {json.dumps(user_profile['preferences'])}") - if user_profile.get('interests'): - response_parts.append(f" Interests: {', '.join(user_profile['interests'])}") - - # Enhanced semantic results - semantic_results = detailed_context.get('semantic_results', []) - if semantic_results: - response_parts.append(f"\n🔍 Semantic Search Results ({len(semantic_results)} found):") - for i, (key, similarity) in enumerate(semantic_results[:5], 1): - response_parts.append(f" {i}. {key} (similarity: {similarity:.3f})") - - # Enhanced entity analysis - entity_contexts = detailed_context.get('entity_contexts', {}) - if entity_contexts: - response_parts.append(f"\n🏷️ Entity Analysis ({len(entity_contexts)} entities):") - for entity, context in entity_contexts.items(): - additional = context.get('additional_context', {}) - versions = context['version_count'] - freq = additional.get('interaction_frequency', 0) - response_parts.append(f" • {entity}: {versions} versions" + - (f", {freq} interactions" if freq else "")) - - # Contextual conversation threads - contextual_threads = detailed_context.get('contextual_threads', []) - if contextual_threads: - response_parts.append(f"\n🧵 Related Conversation Threads ({len(contextual_threads)} found):") - for i, thread in enumerate(contextual_threads[:3], 1): - relevance = thread.get('relevance_score', 0) - content = thread.get('content', '')[:60] - response_parts.append(f" {i}. {content}... (relevance: {relevance:.2f})") - - # Recent events with enhanced detail - recent_events = detailed_context.get('recent_events', []) - if recent_events: - response_parts.append(f"\n📅 Recent Events ({len(recent_events)} events):") - for i, event in enumerate(recent_events[:5], 1): - event_type = event.get('event_type', 'unknown') - content = event.get('content', '')[:50] - timestamp = event.get('timestamp', '')[:19] # Remove microseconds - response_parts.append(f" {i}. [{event_type}] {content}... ({timestamp})") - - # Quality assessment summary - if quality_assessment: - response_parts.append(f"\n📊 Context Quality Assessment:") - response_parts.append(f" Overall Score: {quality_assessment.get('score', 0):.1f}/100") - response_parts.append(f" Relevance: {quality_assessment.get('relevance', 'unknown')}") - response_parts.append(f" Completeness: {quality_assessment.get('completeness', 'unknown')}") - response_parts.append(f" Freshness: {quality_assessment.get('freshness', 'unknown')}") - - response = "\n".join(response_parts) if response_parts else "No enhanced context available." - - return { - "final_response": response, - "messages": [AIMessage(content=response)] - } - - -def should_enhance_context(state: MemoryState) -> str: - """Decision function to determine if context enhancement is needed.""" - context_sufficiency = state.get("context_sufficiency", "needs_assessment") - - if context_sufficiency == "sufficient": - return "generate_response" - else: - return "enhance_context" - - -# ============================================================================ -# Workflow Visualization -# ============================================================================ - -def display_workflow_diagram(workflow): - """Display the LangGraph workflow diagram using built-in visualization.""" - print("🎨 Generating workflow diagram...") - - try: - # Generate the diagram bytes using LangGraph's built-in Mermaid rendering - diagram_bytes = workflow.get_graph(xray=True).draw_mermaid_png() - - # Save to file for viewing - temp_file = '/tmp/langgraph_workflow_diagram.png' - with open(temp_file, 'wb') as f: - f.write(diagram_bytes) - print(f"💾 Diagram saved to: {temp_file}") - - # Try to display inline if in a Jupyter environment - if IPYTHON_AVAILABLE: - try: - # Check if we're in a Jupyter notebook environment - from IPython import get_ipython - if get_ipython() is not None and get_ipython().__class__.__name__ == 'ZMQInteractiveShell': - display(Image(diagram_bytes)) - print("📊 Workflow diagram displayed inline!") - else: - print("📊 Workflow diagram generated (view at the file path above)") - print(" 💡 For inline display, run in a Jupyter notebook") - except Exception: - print("📊 Workflow diagram generated (view at the file path above)") - else: - print("📊 Workflow diagram generated (view at the file path above)") - print(" 💡 Install IPython for enhanced display: pip install ipython") - - print("✅ LangGraph built-in diagram generation successful!") - return temp_file - - except Exception as e: - print(f"⚠️ Could not generate diagram: {e}") - print(" This may require additional dependencies for Mermaid rendering") - print(" Try: pip install pygraphviz or check LangGraph documentation") - - return None - - -# ============================================================================ -# Create Memory Workflow -# ============================================================================ - -def create_enhanced_memory_workflow(service: HybridMemoryService): - """Create the enhanced memory processing workflow with loops and intelligence.""" - - # Build the graph - builder = StateGraph(MemoryState) - - # Add nodes with service injection - builder.add_node("initialize", initialize_workflow_node) - builder.add_node("extract_memories", lambda state: extract_memories_node(state, service)) - builder.add_node("enhanced_semantic_search", lambda state: enhanced_semantic_search_node(state, service)) - builder.add_node("deep_entity_analysis", lambda state: deep_entity_analysis_node(state, service)) - builder.add_node("assess_context_quality", lambda state: assess_context_quality_node(state, service)) - builder.add_node("context_enhancement_loop", context_enhancement_loop_node) - builder.add_node("generate_enhanced_response", lambda state: generate_enhanced_response_node(state, service)) - - # Define the enhanced flow with loops - builder.add_edge(START, "initialize") - builder.add_edge("initialize", "extract_memories") - builder.add_edge("extract_memories", "enhanced_semantic_search") - builder.add_edge("enhanced_semantic_search", "deep_entity_analysis") - builder.add_edge("deep_entity_analysis", "assess_context_quality") - - # Conditional edge: decide whether to enhance context or generate response - builder.add_conditional_edges( - "assess_context_quality", - should_enhance_context, - { - "enhance_context": "context_enhancement_loop", - "generate_response": "generate_enhanced_response" - } - ) - - # Loop back for context enhancement - builder.add_edge("context_enhancement_loop", "enhanced_semantic_search") - - # Final edge - builder.add_edge("generate_enhanced_response", END) - - return builder.compile() - - -# ============================================================================ -# Demonstration -# ============================================================================ - -def create_rich_baseline_data(service: HybridMemoryService): - """Create rich baseline data to demonstrate meaningful context improvement.""" - print("🗃️ Creating rich baseline conversation history...") - - # Historical conversations for Alice (quantum computing researcher) - alice_conversations = [ - ("I'm a quantum computing researcher working on QAOA algorithms", "fact"), - ("What's the current state of quantum supremacy demonstrations?", "query"), - ("I've been experimenting with variational quantum eigensolvers", "fact"), - ("Can you explain how quantum annealing compares to gate-based quantum computing?", "query"), - ("Our lab just got access to IBM's quantum computer", "fact"), - ("I'm particularly interested in quantum machine learning applications", "fact"), - ("What are the noise limitations in current NISQ devices?", "query"), - ("We're seeing promising results with quantum optimization problems", "fact"), - ("How do quantum neural networks compare to classical neural networks?", "query"), - ("I need to understand the hardware requirements for quantum ML", "query") - ] - - # Historical conversations for Bob (ML engineer) - bob_conversations = [ - ("I'm a machine learning engineer at a tech startup", "fact"), - ("We're deploying models using Kubernetes and Docker", "fact"), - ("What's the best way to handle model versioning?", "query"), - ("I've been working with transformers for NLP tasks", "fact"), - ("How do you optimize inference time for large models?", "query"), - ("Our team uses MLOps practices for model deployment", "fact"), - ("I'm interested in edge computing for ML applications", "fact"), - ("What are the trade-offs between model accuracy and latency?", "query") - ] - - # Create rich conversation history for Alice - print(" 📝 Creating Alice's quantum computing research history...") - for i, (content, event_type) in enumerate(alice_conversations): - # Simulate conversations from different times - messages = [HumanMessage(content=content)] - service.extract_and_store(messages, "alice", f"alice_history_{i}") - - # Create some entity-rich data for Alice (simulate product interactions) - alice_entities = [ - ("I've been using product:quantum_simulator extensively", "product:quantum_simulator"), - ("The product:qiskit_textbook has been very helpful", "product:qiskit_textbook"), - ("We're evaluating product:quantum_cloud_access for our research", "product:quantum_cloud_access"), - ("product:quantum_optimizer is showing great potential", "product:quantum_optimizer") - ] - - for content, entity in alice_entities: - # Manually create memory with entities - event = ConversationEvent( - event_type="fact", - content=content, - entities=[entity] - ) - service._store_insert_memories([event], "alice", "conversation_events") - - # Create Bob's ML engineering history - print(" 📝 Creating Bob's ML engineering history...") - for i, (content, event_type) in enumerate(bob_conversations): - messages = [HumanMessage(content=content)] - service.extract_and_store(messages, "bob", f"bob_history_{i}") - - # Create some shared context (simulate they work at same company) - shared_context = [ - ("Our company is exploring quantum-classical hybrid algorithms", "fact"), - ("We have a new project combining quantum computing with ML", "fact"), - ("The engineering team is evaluating quantum computing infrastructure", "fact") - ] - - for content, event_type in shared_context: - messages = [HumanMessage(content=content)] - service.extract_and_store(messages, "alice", f"shared_context") - service.extract_and_store(messages, "bob", f"shared_context") - - print(f" ✅ Created rich baseline: {len(alice_conversations)} Alice conversations, {len(bob_conversations)} Bob conversations") - print(f" 🏷️ Created entity data: {len(alice_entities)} product interactions") - print(f" 🤝 Created shared context: {len(shared_context)} collaborative discussions") - - -def demonstrate_enhanced_system(): - """Demonstrate the enhanced memory system with loops and intelligence.""" - - print("\n" + "=" * 80) - print(" 🚀 Enhanced LangGraph + ProllyTree Memory System with Loops") - print("=" * 80) - - with tempfile.TemporaryDirectory() as tmpdir: - store_path = os.path.join(tmpdir, "memory_system") - service = HybridMemoryService(store_path) - - # Create rich baseline data first - create_rich_baseline_data(service) - - enhanced_workflow = create_enhanced_memory_workflow(service) - - # Generate and display workflow diagram - print("\n📊 Displaying enhanced workflow visualization...") - display_workflow_diagram(enhanced_workflow) - print("🚀 Proceeding with enhanced demonstration...") - - # Demo 1: Complex query that should find lots of relevant context - print("\n" + "=" * 60) - print("👤 User: alice - Complex Technical Query (Should show context improvement)") - print("=" * 60) - - complex_state = enhanced_workflow.invoke({ - "messages": [HumanMessage(content="Based on my quantum computing research experience, what are the best practices for implementing quantum machine learning algorithms on NISQ devices, especially for optimization problems?")], - "user_id": "alice", - "thread_id": "thread_complex_001" - }) - - print("\n🤖 Enhanced System Response:") - for msg in complex_state["messages"][-1:]: - if isinstance(msg, AIMessage): - print(msg.content) - - print(f"\n📈 Final Statistics for Complex Query:") - print(f" • Quality Score: {complex_state.get('context_quality_score', 0):.1f}/100") - print(f" • Enhancement Iterations: {complex_state.get('enhancement_iterations', 0)}") - print(f" • Context Sufficiency: {complex_state.get('context_sufficiency', 'unknown')}") - - # Demo 2: Follow-up question that should use the enhanced context - print("\n" + "=" * 60) - print("👤 User: alice - Follow-up Question") - print("=" * 60) - - followup_state = enhanced_workflow.invoke({ - "messages": [HumanMessage(content="What are the specific hardware requirements for running quantum ML algorithms?")], - "user_id": "alice", - "thread_id": "thread_followup_002" - }) - - print("\n🤖 Enhanced System Response:") - for msg in followup_state["messages"][-1:]: - if isinstance(msg, AIMessage): - print(msg.content) - - # Demo 3: Different user with simpler query (should require fewer iterations) - print("\n" + "=" * 60) - print("👤 User: bob - Simple Query (Fewer iterations expected)") - print("=" * 60) - - simple_state = enhanced_workflow.invoke({ - "messages": [HumanMessage(content="How do I get started with machine learning?")], - "user_id": "bob", - "thread_id": "thread_simple_003" - }) - - print("\n🤖 Enhanced System Response:") - for msg in simple_state["messages"][-1:]: - if isinstance(msg, AIMessage): - print(msg.content) - - print(f"\n📈 Final Statistics for Simple Query:") - print(f" • Quality Score: {simple_state.get('context_quality_score', 0):.1f}/100") - print(f" • Enhancement Iterations: {simple_state.get('enhancement_iterations', 0)}") - - # Demo 4: Return to alice with related query (should have rich context) - print("\n" + "=" * 60) - print("👤 User: alice - Related Query (Rich Context Expected)") - print("=" * 60) - - related_state = enhanced_workflow.invoke({ - "messages": [HumanMessage(content="Based on our previous discussions about quantum computing, what's the current state of quantum machine learning research?")], - "user_id": "alice", - "thread_id": "thread_related_004" - }) - - print("\n🤖 Enhanced System Response:") - for msg in related_state["messages"][-1:]: - if isinstance(msg, AIMessage): - print(msg.content) - - # Show comprehensive system analytics - print("\n" + "=" * 60) - print("📊 Enhanced Memory System Analytics") - print("=" * 60) - - # Git history - commits = service.kv_store.log() - print(f"\n📚 Git-like Commit History ({len(commits)} total commits):") - for commit in commits[-8:]: # Show more commits - timestamp = datetime.fromtimestamp(commit['timestamp']) - print(f" {commit['id'][:8]} - {commit['message'][:70]} ({timestamp.strftime('%H:%M:%S')})") - - # Memory statistics - print(f"\n📊 Memory Store Statistics:") - patch_count = sum(1 for k in service.vector_store.keys() if k.startswith("patch:")) - insert_count = sum(1 for k in service.vector_store.keys() if k.startswith("insert:")) - print(f" • Patch memories (profiles): {patch_count}") - print(f" • Insert memories (events): {insert_count}") - print(f" • Total vector embeddings: {len(service.vector_store)}") - print(f" • Git commits: {len(commits)}") - - # User profiles with more detail - print(f"\n👥 User Profile Analysis:") - for user_id in ["alice", "bob"]: - profile = service.get_user_profile(user_id) - events = service.get_user_events(user_id, limit=5) - if profile: - print(f" • {user_id}:") - print(f" - Profile: {json.dumps(profile, indent=6)[:150]}...") - print(f" - Recent events: {len(events)}") - if events: - for i, event in enumerate(events[:2], 1): - print(f" {i}. {event.get('event_type', 'unknown')}: {event.get('content', '')[:40]}...") - else: - print(f" • {user_id}: No profile data") - - # Enhancement statistics comparison - print(f"\n🔄 Enhancement Loop Statistics:") - print(f" • Complex query iterations: {complex_state.get('enhancement_iterations', 0)}") - print(f" • Simple query iterations: {simple_state.get('enhancement_iterations', 0)}") - print(f" • Related query iterations: {related_state.get('enhancement_iterations', 0)}") - print(f" • Average quality improvement: Demonstrated through iterative context enhancement") - - -def main(): - """Run the enhanced demonstration with loops and intelligence.""" - - print("=" * 80) - print(" Enhanced LangGraph + ProllyTree Integration with Loops") - print("=" * 80) - print("\nThis enhanced demo demonstrates:") - print(" 🔄 Iterative context enhancement with quality assessment") - print(" 🧠 Intelligent loop control based on context sufficiency") - print(" 🎯 Multi-iteration retrieval for complex queries") - print(" 📊 Context quality scoring and improvement suggestions") - print(" 🔍 Enhanced semantic search with context expansion") - print(" 🏷️ Deep entity analysis with version tracking") - print(" 🧵 Contextual conversation thread retrieval") - print(" ⚡ Adaptive workflow that improves response quality") - - print("\n🔄 Workflow Features:") - print(" • START → Initialize → Extract → Search → Analyze → Assess Quality") - print(" • IF context insufficient: Loop back for enhancement") - print(" • IF context sufficient: Generate enhanced response") - print(" • Maximum 3 iterations to prevent infinite loops") - - try: - demonstrate_enhanced_system() - - print("\n" + "=" * 80) - print("✅ Enhanced Demo Complete! Advanced Features Demonstrated:") - print(" 🔄 Iterative context enhancement loops") - print(" 🧠 Intelligent quality assessment") - print(" 📊 Context scoring and improvement") - print(" 🎯 Multi-iteration retrieval optimization") - print(" 🔍 Enhanced semantic search expansion") - print(" 🏷️ Deep entity version tracking") - print(" 🧵 Contextual thread analysis") - print(" ⚡ Adaptive response generation") - print(" 📈 Quality-driven workflow decisions") - print(" 🔒 Loop control and termination") - print("=" * 80) - - except ImportError as e: - print(f"\n❌ Error: {e}") - print("\nPlease install required dependencies:") - print(" pip install langgraph langchain-core numpy") - print("\nFor real embeddings:") - print(" pip install langchain-openai") - - -if __name__ == "__main__": - main() diff --git a/python/examples/langgraph_multi_agent_branching.py b/python/examples/langgraph_multi_agent_branching.py deleted file mode 100644 index fd0e763..0000000 --- a/python/examples/langgraph_multi_agent_branching.py +++ /dev/null @@ -1,1142 +0,0 @@ -#!/usr/bin/env python3 - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Multi-Agent System with Git-like Branching using LangGraph + ProllyTree - -This example demonstrates solving the "context bleeding" problem in multi-agent systems -using ProllyTree's versioned memory store with Git-like branching, following the proper -LangGraph supervisor pattern. - -Architecture: -┌─────────────────────────────────────────────────────────────────────────┐ -│ LangGraph Supervisor Architecture │ -└─────────────────────────────────────────────────────────────────────────┘ - -┌─────────────────────────────────────────────────────────────────────────┐ -│ Agent Workflow │ -│ │ -│ Supervisor Agent │ -│ (main branch) │ -│ │ │ -│ ┌─────────────┼─────────────┐ │ -│ ▼ ▼ ▼ │ -│ Troubleshooting Billing Customer History │ -│ (branch: tech) (branch: bill) (branch: history) │ -│ │ -│ Each agent operates in isolated branch with handoff tools │ -│ Supervisor validates and merges results using semantic rules │ -└─────────────────────────────────────────────────────────────────────────┘ - -Key Features: -• LangGraph supervisor pattern with proper agent delegation -• Branch isolation prevents context bleeding between agents -• Handoff tools for controlled agent communication -• Semantic validation during merge operations -• Complete audit trail with Git-like history -""" - -import json -import os -import subprocess -import tempfile -import uuid -import base64 -from datetime import datetime, timezone -from enum import Enum -from typing import Annotated, Dict, List, Optional, Any, Literal, Tuple -from dataclasses import dataclass, field - -from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, BaseMessage -from langchain_core.tools import tool -try: - from pydantic import BaseModel, Field -except ImportError: - from pydantic.v1 import BaseModel, Field - -from langgraph.graph import StateGraph, START, END, MessagesState -from langgraph.store.base import BaseStore - -# ProllyTree imports -from prollytree import VersionedKvStore, ConflictResolution, WorktreeManager, WorktreeVersionedKvStore - -# ============================================================================ -# Agent Types and Data Models -# ============================================================================ - -class IssueType(Enum): - SLOW_INTERNET = "slow_internet" - BILLING_DISPUTE = "billing_dispute" - SERVICE_OUTAGE = "service_outage" - ACCOUNT_UPGRADE = "account_upgrade" - TECHNICAL_COMPLEX = "technical_complex" - -@dataclass -class CustomerContext: - """Customer information and current issue""" - customer_id: str - name: str - account_type: str - issue_type: IssueType - issue_description: str - priority: str - contact_history: List[Dict] = field(default_factory=list) - current_services: List[str] = field(default_factory=list) - billing_status: str = "current" - - def to_dict(self): - return { - "customer_id": self.customer_id, - "name": self.name, - "account_type": self.account_type, - "issue_type": self.issue_type.value, - "issue_description": self.issue_description, - "priority": self.priority, - "contact_history": self.contact_history, - "current_services": self.current_services, - "billing_status": self.billing_status - } - -# ============================================================================ -# Multi-Agent State with Branch Tracking -# ============================================================================ - -class MultiAgentState(MessagesState): - """State for multi-agent workflow with branch isolation""" - # Customer context - customer_context: CustomerContext - session_id: str - - # Branch management - current_branch: str - active_branches: Dict[str, str] # agent_name -> branch_name - - # Agent results with branch tracking - agent_results: Dict[str, Dict[str, Any]] - - # Validation and merging - merge_conflicts: List[str] - context_bleeding_detected: bool - isolation_success: bool - - # Final resolution - final_recommendations: List[str] - resolution_quality: str - -# ============================================================================ -# ProllyVersionedMemoryStore with Branch Isolation -# ============================================================================ - -class ProllyVersionedMemoryStore(BaseStore): - """WorktreeManager-backed memory store with true parallel execution for multi-agent systems. - - This store provides: - 1. Standard BaseStore interface for LangGraph integration - 2. Git worktree isolation for true parallel agent execution - 3. Intelligent conflict resolution during merge operations - 4. Complete audit trail of all agent operations - """ - - def __init__(self, store_path: str): - """Initialize the main store and prepare for agent-specific branches.""" - super().__init__() - - self.store_path = store_path - os.makedirs(store_path, exist_ok=True) - - # Initialize git repository if needed - if not os.path.exists(os.path.join(store_path, '.git')): - subprocess.run(["git", "init", "--quiet"], cwd=store_path, check=True) - subprocess.run(["git", "config", "user.name", "Multi-Agent System"], cwd=store_path, check=True) - subprocess.run(["git", "config", "user.email", "agents@example.com"], cwd=store_path, check=True) - - # Create initial commit - readme_path = os.path.join(store_path, "README.md") - with open(readme_path, "w") as f: - f.write("# Multi-Agent Memory Store\n") - subprocess.run(["git", "add", "."], cwd=store_path, check=True) - subprocess.run(["git", "commit", "-m", "Initial commit"], cwd=store_path, check=True) - - # Create main VersionedKvStore in data subdirectory - self.data_dir = os.path.join(store_path, "data") - os.makedirs(self.data_dir, exist_ok=True) - self.main_store = VersionedKvStore(self.data_dir) - - # Agent branch tracking - self.main_branch = "main" - # For WorktreeManager compatibility - self.agent_worktrees = {} # agent_name -> WorktreeVersionedKvStore - self.agent_stores = {} # agent_name -> VersionedKvStore (for backwards compatibility) - self.agent_branches = {} # agent_name -> branch_name - - # Initialize WorktreeManager for parallel execution - self.worktree_manager = WorktreeManager(store_path) - self.branch_metadata = {} - - print(f"✅ Initialized WorktreeManager-backed store at {store_path}") - - def _encode_value(self, value: Any) -> bytes: - """Encode any value to bytes for storage.""" - if isinstance(value, bytes): - return value - elif isinstance(value, str): - return value.encode('utf-8') - else: - # Use JSON with base64 for complex objects - json_str = json.dumps(value, default=lambda x: { - '__type': 'bytes', - 'data': base64.b64encode(x).decode() if isinstance(x, bytes) else str(x) - }) - return json_str.encode('utf-8') - - def _decode_value(self, data: bytes) -> Any: - """Decode bytes from storage back to original type.""" - if not data: - return None - - try: - # Try to decode as JSON first - json_str = data.decode('utf-8') - obj = json.loads(json_str) - - # Handle special types - if isinstance(obj, dict) and '__type' in obj: - if obj['__type'] == 'bytes': - return base64.b64decode(obj['data']) - return obj - except (json.JSONDecodeError, UnicodeDecodeError): - # Return as string if not JSON - try: - return data.decode('utf-8') - except UnicodeDecodeError: - return data - - # BaseStore interface methods - def batch(self, ops: List[Tuple]) -> List[Any]: - """Batch operations - required by BaseStore.""" - results = [] - for op in ops: - if len(op) == 2: - method, args = op - result = getattr(self, method)(*args) - results.append(result) - return results - - def abatch(self, ops: List[Tuple]) -> List[Any]: - """Async batch operations - synchronous implementation.""" - return self.batch(ops) - - def search(self, namespace: tuple, *, filter: Optional[dict] = None, limit: int = 10) -> List[tuple]: - """Search for items in a namespace.""" - prefix = ":".join(namespace) + ":" - results = [] - - # Use list_keys() to get all keys - try: - keys = self.main_store.list_keys() - count = 0 - for key in keys: - if count >= limit: - break - - key_str = key.decode('utf-8') - if key_str.startswith(prefix): - value = self.main_store.get(key) - decoded_value = self._decode_value(value) - - # Apply filter if provided - if filter: - # Simple filter matching - if not all(decoded_value.get(k) == v for k, v in filter.items() if isinstance(decoded_value, dict)): - continue - - # Extract item key from full key - item_key = key_str[len(prefix):] - results.append((namespace, item_key, decoded_value)) - count += 1 - except AttributeError: - # If list_keys not available, return empty - pass - - return results - - def put(self, namespace: tuple, key: str, value: dict) -> None: - """Store a value in a namespace.""" - full_key = ":".join(namespace) + ":" + key - key_bytes = full_key.encode('utf-8') - value_bytes = self._encode_value(value) - - # Check if key exists to decide between insert/update - existing = self.main_store.get(key_bytes) - if existing: - self.main_store.update(key_bytes, value_bytes) - print(f" 📝 Updated: {full_key}") - else: - self.main_store.insert(key_bytes, value_bytes) - print(f" ➕ Inserted: {full_key}") - - def get(self, namespace: tuple, key: str) -> Optional[dict]: - """Retrieve a value from a namespace.""" - full_key = ":".join(namespace) + ":" + key - key_bytes = full_key.encode('utf-8') - data = self.main_store.get(key_bytes) - return self._decode_value(data) if data else None - - def delete(self, namespace: tuple, key: str) -> None: - """Delete a key from a namespace.""" - full_key = ":".join(namespace) + ":" + key - key_bytes = full_key.encode('utf-8') - self.main_store.delete(key_bytes) - print(f" ❌ Deleted: {full_key}") - - # Branch management methods - def create_agent_worktree(self, agent_name: str, session_id: str) -> str: - """Create an isolated Git worktree with dedicated WorktreeVersionedKvStore for parallel execution""" - branch_name = f"{session_id}-{agent_name}-{uuid.uuid4().hex[:8]}" - - # Create Git worktree using WorktreeManager - worktree_path = os.path.join(self.store_path, f"{agent_name}_workspace") - self.worktree_manager.add_worktree(str(worktree_path), branch_name, True) - - # Create WorktreeVersionedKvStore for the agent - agent_data_path = os.path.join(worktree_path, "data") - os.makedirs(agent_data_path, exist_ok=True) - - agent_worktree_store = WorktreeVersionedKvStore.from_worktree( - str(worktree_path), - f"worktree-{agent_name}", - branch_name, - self.worktree_manager - ) - - # Store branch metadata - self.branch_metadata[branch_name] = { - 'agent_name': agent_name, - 'session_id': session_id, - 'created_at': datetime.now(tz=timezone.utc).isoformat(), - 'parent_branch': self.main_branch - } - - # Track agent mappings - self.agent_worktrees[agent_name] = agent_worktree_store - self.agent_branches[agent_name] = branch_name - - # Store metadata in the agent's worktree - metadata_key = f"metadata:agent:{agent_name}".encode('utf-8') - metadata_value = self._encode_value(self.branch_metadata[branch_name]) - agent_worktree_store.insert(metadata_key, metadata_value) - agent_worktree_store.commit(f"Initialize {agent_name} agent worktree with metadata") - - print(f"🌿 Created Git worktree '{branch_name}' with WorktreeVersionedKvStore for {agent_name}") - print(f" 📁 Worktree path: {worktree_path}") - print(f" 📊 Agent worktree branch: {agent_worktree_store.current_branch()}") - return branch_name - - def get_agent_store(self, agent_name: str) -> Optional[VersionedKvStore]: - """Get the agent's isolated branch store""" - return self.agent_stores.get(agent_name) - - def get_main_store(self) -> VersionedKvStore: - """Get the main store for supervisor operations""" - return self.main_store - - def store_agent_analysis(self, agent_name: str, analysis_type: str, data: Dict[str, Any]): - """Store agent analysis data using their dedicated VersionedKvStore""" - if agent_name not in self.agent_worktrees: - raise ValueError(f"No dedicated worktree exists for agent {agent_name}") - - # Get the agent's WorktreeVersionedKvStore instance - agent_worktree = self.agent_worktrees[agent_name] - - # Store analysis data directly in the agent's store - full_key = f"analysis:{analysis_type}" - key_bytes = full_key.encode('utf-8') - value_bytes = self._encode_value(data) - - # Check if key exists to decide between insert/update - existing = agent_worktree.get(key_bytes) - if existing: - agent_worktree.update(key_bytes, value_bytes) - print(f" 📝 {agent_name} updated: {full_key} using dedicated worktree") - else: - agent_worktree.insert(key_bytes, value_bytes) - print(f" ➕ {agent_name} inserted: {full_key} using dedicated worktree") - - # Commit using the agent's worktree - agent_worktree.commit(f"{agent_name}: Stored {analysis_type}") - print(f" 💾 {agent_name} committed: {analysis_type} on worktree {agent_worktree.current_branch()}") - - def get_agent_analysis(self, agent_name: str, analysis_type: str) -> Optional[Dict[str, Any]]: - """Get agent analysis data using their dedicated VersionedKvStore""" - if agent_name not in self.agent_stores: - return None - - # Get the agent's VersionedKvStore instance - agent_store = self.agent_stores[agent_name] - - # Get the data directly from the agent's store - full_key = f"analysis:{analysis_type}" - key_bytes = full_key.encode('utf-8') - data = agent_store.get(key_bytes) - return self._decode_value(data) if data else None - - def validate_and_merge_agent_data(self, agent_name: str, validation_fn=None, conflict_resolution_strategy: str = "ignore_conflicts") -> bool: - """Validate and merge agent data from their VersionedKvStore to main using intelligent conflict resolution""" - if agent_name not in self.agent_stores: - return False - - agent_store = self.agent_stores[agent_name] - agent_branch = self.agent_branches[agent_name] - - # Get all agent data from their store - agent_data = {} - try: - keys = agent_store.list_keys() - for key in keys: - key_str = key.decode('utf-8') - if key_str.startswith("analysis:"): - analysis_type = key_str[len("analysis:"):] - value = agent_store.get(key) - decoded_value = self._decode_value(value) - agent_data[analysis_type] = decoded_value - except AttributeError: - pass - - # Validate if function provided - if validation_fn and not validation_fn(agent_data, agent_name): - print(f" ❌ Validation failed for {agent_name}") - return False - - # Switch main store to main branch for merge - original_branch = self.main_store.current_branch() - self.main_store.checkout(self.main_branch) - - try: - # Perform merge using VersionedKvStore's merge capabilities - if conflict_resolution_strategy == "ignore_conflicts": - merge_result = self.main_store.merge_ignore_conflicts(agent_branch) - else: - # Try regular merge first - try: - conflicts = self.main_store.try_merge(agent_branch) - if conflicts: - print(f" ⚠️ Merge conflicts detected for {agent_name}: {len(conflicts)} conflicts") - # Use ignore_conflicts as fallback - merge_result = self.main_store.merge_ignore_conflicts(agent_branch) - print(f" 🔄 Used ignore_conflicts fallback for {agent_name}") - else: - merge_result = self.main_store.merge(agent_branch, ConflictResolution.TakeSource) - except Exception: - # Fallback to ignore_conflicts - merge_result = self.main_store.merge_ignore_conflicts(agent_branch) - - self.main_store.commit(f"Merged {agent_name} data using {conflict_resolution_strategy} resolution") - - print(f" ✅ Successfully merged {agent_name} data using {conflict_resolution_strategy} resolution") - print(f" 📊 Merge result: {merge_result}") - return True - - except Exception as e: - print(f" ❌ Merge failed for {agent_name}: {e}") - return False - finally: - # Restore original branch if needed - if original_branch != self.main_branch: - self.main_store.checkout(original_branch) - - def commit(self, message: str) -> str: - """Create a Git-like commit of current state in main store.""" - commit_id = self.main_store.commit(message) - print(f" 💾 Committed: {commit_id[:8]} - {message}") - return commit_id - - def get_commit_history(self) -> List[Dict[str, Any]]: - """Get commit history showing agent activities across all worktrees""" - commits = self.main_store.log() - - history = [] - for commit in commits: - history.append({ - 'id': commit['id'][:8], - 'message': commit['message'], - 'timestamp': datetime.fromtimestamp(commit['timestamp']).isoformat(), - 'author': commit.get('author', 'Unknown') - }) - - return history - - def get_branch_info(self) -> Dict[str, Any]: - """Get information about all branches and stores""" - return { - 'main_branch': self.main_branch, - 'main_store_branch': self.main_store.current_branch(), - 'all_branches': self.main_store.list_branches(), - 'agent_branches': {name: store.current_branch() for name, store in self.agent_stores.items()}, - 'branch_metadata': self.branch_metadata - } - -# ============================================================================ -# Mock LLM for Demonstration -# ============================================================================ - -class MockLLM: - """Mock LLM that simulates real AI responses for agent operations""" - - def invoke(self, messages): - """Simulate LLM response based on message content""" - if isinstance(messages, list): - content = ' '.join([msg.content for msg in messages if hasattr(msg, 'content')]) - else: - content = str(messages) - - content_lower = content.lower() - - # Supervisor responses - if "supervisor" in content_lower and "delegate" in content_lower: - if "slow internet" in content_lower or "technical" in content_lower: - return AIMessage(content="Based on the technical nature of this issue, I'll delegate to the troubleshooting agent to diagnose connectivity problems.") - elif "billing" in content_lower or "dispute" in content_lower: - return AIMessage(content="This billing-related issue should be handled by the billing agent who can review charges and apply credits.") - else: - return AIMessage(content="I'll start with customer history analysis to understand the full context before proceeding.") - - # Troubleshooting agent responses - elif "troubleshooting" in content_lower: - return AIMessage(content="""I've analyzed the technical issue. My recommendations: -1. Check signal strength and modem status -2. Schedule technician visit for line quality assessment -3. Replace modem if hardware diagnostics show issues -4. Verify area infrastructure for service outages - -Confidence: 85% - Multiple indicators suggest hardware/infrastructure problems.""") - - # Billing agent responses - elif "billing" in content_lower: - if "dispute" in content_lower: - return AIMessage(content="""I've reviewed the billing dispute. My recommendations: -1. Review all recent charges and billing history -2. Apply credit if charges are found to be incorrect -3. Provide detailed explanation of billing structure -4. Set up payment plan if needed - -Confidence: 90% - Clear billing concern requiring thorough review.""") - else: - return AIMessage(content="""For this technical issue, no billing action is required. -1. Verify account is in good standing -2. No billing implications for technical problems - -Confidence: 95% - Technical issues don't warrant billing changes.""") - - # Customer history agent responses - elif "customer history" in content_lower or "history" in content_lower: - if "premium" in content_lower: - return AIMessage(content="""Based on customer history analysis: -1. Prioritize resolution due to premium account status -2. Consider service credit for inconvenience -3. Escalate to senior support if needed -4. Document interaction for future reference - -Confidence: 80% - Premium customers require priority handling.""") - else: - return AIMessage(content="""Standard customer history assessment: -1. Follow standard support process -2. Document interaction thoroughly -3. Monitor for pattern of issues - -Confidence: 75% - Normal customer profile with standard handling.""") - - return AIMessage(content="I need more specific information to provide recommendations.") - -# Initialize LLM -try: - from langchain_openai import ChatOpenAI - api_key = os.getenv("OPENAI_API_KEY", "") - if api_key and api_key.startswith("sk-") and not api_key.startswith(("mock", "test")): - llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7) - print("✅ Using real OpenAI LLM for agents") - else: - llm = MockLLM() - print("🔄 Using mock LLM for agents (set OPENAI_API_KEY for real LLM)") -except ImportError: - llm = MockLLM() - print("🔄 Using mock LLM for agents (install langchain-openai for real LLM)") - - -# ============================================================================ -# Agent Node Functions with Branch Isolation -# ============================================================================ - -def troubleshooting_agent_node(state, store: ProllyVersionedMemoryStore): - """Process technical issues in isolated worktree""" - agent_name = "troubleshooting" - - # Create isolated worktree if not exists - if agent_name not in store.agent_worktrees: - branch_name = store.create_agent_worktree(agent_name, state["session_id"]) - - # Simulate agent analysis - customer = state["customer_context"] - print(f"🔧 {agent_name.title()} Agent analyzing: {customer.issue_description}") - - # Store analysis in isolated worktree - analysis_data = { - "agent": agent_name, - "customer_id": customer.customer_id, - "issue_type": customer.issue_type.value, - "technical_assessment": "Hardware/connectivity issue detected", - "recommendations": [ - "Check signal strength and modem status", - "Schedule technician visit for line quality assessment", - "Replace modem if hardware diagnostics show issues", - "Verify area infrastructure for service outages" - ], - "confidence": 0.85, - "requires_technician": True, - "analysis_timestamp": datetime.now(tz=timezone.utc).isoformat(), - "agent_priority": 8 # Technical expertise priority - } - - store.store_agent_analysis(agent_name, "technical_analysis", analysis_data) - - # Update state - agent_results = state.get("agent_results", {}) - agent_results[agent_name] = analysis_data - - return { - "agent_results": agent_results, - "messages": state["messages"] + [AIMessage( - content=f"Technical analysis complete. Found {analysis_data['technical_assessment']}. Recommendations: {', '.join(analysis_data['recommendations'][:2])}" - )] - } - -def billing_agent_node(state, store: ProllyVersionedMemoryStore): - """Process billing issues in isolated worktree""" - agent_name = "billing" - - # Create isolated worktree if not exists - if agent_name not in store.agent_worktrees: - branch_name = store.create_agent_worktree(agent_name, state["session_id"]) - - customer = state["customer_context"] - print(f"💰 {agent_name.title()} Agent analyzing: {customer.issue_description}") - - # Store analysis in isolated worktree - if customer.issue_type == IssueType.BILLING_DISPUTE: - analysis_data = { - "agent": agent_name, - "customer_id": customer.customer_id, - "issue_type": customer.issue_type.value, - "billing_assessment": "Duplicate charge detected in billing system", - "recommendations": [ - "Review all recent charges and billing history", - "Apply credit if charges are found to be incorrect", - "Provide detailed explanation of billing structure", - "Set up payment plan if needed" - ], - "confidence": 0.90, - "credit_required": True, - "credit_amount": 45.99, - "analysis_timestamp": datetime.now(tz=timezone.utc).isoformat(), - "agent_priority": 9 # High priority for billing disputes - } - else: - analysis_data = { - "agent": agent_name, - "customer_id": customer.customer_id, - "issue_type": customer.issue_type.value, - "billing_assessment": "No billing action required for technical issue", - "recommendations": [ - "Verify account is in good standing", - "No billing implications for technical problems" - ], - "confidence": 0.95, - "credit_required": False, - "analysis_timestamp": datetime.now(tz=timezone.utc).isoformat(), - "agent_priority": 5 # Lower priority for non-billing issues - } - - store.store_agent_analysis(agent_name, "billing_analysis", analysis_data) - - # Update state - agent_results = state.get("agent_results", {}) - agent_results[agent_name] = analysis_data - - return { - "agent_results": agent_results, - "messages": state["messages"] + [AIMessage( - content=f"Billing analysis complete. {analysis_data['billing_assessment']}. Action needed: {analysis_data.get('credit_required', False)}" - )] - } - -def customer_history_agent_node(state, store: ProllyVersionedMemoryStore): - """Process customer relationship analysis in isolated worktree""" - agent_name = "customer_history" - - # Create isolated worktree if not exists - if agent_name not in store.agent_worktrees: - branch_name = store.create_agent_worktree(agent_name, state["session_id"]) - - customer = state["customer_context"] - print(f"📊 {agent_name.title()} Agent analyzing: {customer.name}'s relationship") - - # Store analysis in isolated worktree - if customer.account_type == "Premium": - analysis_data = { - "agent": agent_name, - "customer_id": customer.customer_id, - "account_type": customer.account_type, - "relationship_assessment": "High-value customer requiring priority support", - "recommendations": [ - "Prioritize resolution due to premium account status", - "Consider service credit for inconvenience", - "Escalate to senior support if needed", - "Document interaction for future reference" - ], - "confidence": 0.80, - "priority_level": "high", - "escalation_recommended": True, - "analysis_timestamp": datetime.now(tz=timezone.utc).isoformat(), - "agent_priority": 10 # Highest priority for premium customers - } - else: - analysis_data = { - "agent": agent_name, - "customer_id": customer.customer_id, - "account_type": customer.account_type, - "relationship_assessment": "Standard customer with good payment history", - "recommendations": [ - "Follow standard support process", - "Document interaction thoroughly", - "Monitor for pattern of issues" - ], - "confidence": 0.75, - "priority_level": "normal", - "escalation_recommended": False, - "analysis_timestamp": datetime.now(tz=timezone.utc).isoformat(), - "agent_priority": 6 # Standard priority for regular customers - } - - store.store_agent_analysis(agent_name, "relationship_analysis", analysis_data) - - # Update state - agent_results = state.get("agent_results", {}) - agent_results[agent_name] = analysis_data - - return { - "agent_results": agent_results, - "messages": state["messages"] + [AIMessage( - content=f"Customer relationship analysis complete. {analysis_data['relationship_assessment']}. Priority: {analysis_data['priority_level']}" - )] - } - -# ============================================================================ -# Supervisor Node Functions -# ============================================================================ - -def supervisor_node(state, store: ProllyVersionedMemoryStore): - """Supervisor node that determines next agent to run""" - customer = state["customer_context"] - - print(f"🎯 Supervisor analyzing issue: {customer.issue_description}") - - # Determine which agent to delegate to based on issue type - if customer.issue_type in [IssueType.SLOW_INTERNET, IssueType.SERVICE_OUTAGE, IssueType.TECHNICAL_COMPLEX]: - next_agent = "troubleshooting" - print(f"🎯 Supervisor delegating to {next_agent}: Technical issue detected") - elif customer.issue_type in [IssueType.BILLING_DISPUTE]: - next_agent = "billing" - print(f"🎯 Supervisor delegating to {next_agent}: Billing issue detected") - else: - next_agent = "customer_history" - print(f"🎯 Supervisor delegating to {next_agent}: Customer relationship analysis needed") - - # Update state with next agent - return { - "current_branch": next_agent, - "messages": state["messages"] + [AIMessage( - content=f"Supervisor delegating to {next_agent} agent for specialized analysis" - )] - } - -def validation_node(state, store: ProllyVersionedMemoryStore): - """Validate and merge results from all agents using intelligent conflict resolution""" - print("🔍 Supervisor performing semantic validation and intelligent merge...") - - # Validate each agent's results using different conflict resolution strategies - validation_results = {} - merge_strategies_used = {} - - for agent_name in ["troubleshooting", "billing", "customer_history"]: - if agent_name in store.agent_stores: - # Define validation function - def validate_agent_data(data, agent): - # Check if agent stayed within their domain - for key, value in data.items(): - value_str = str(value).lower() - if agent == "billing" and any(tech_word in value_str for tech_word in ["modem", "technician", "signal"]): - print(f" ⚠️ Domain violation: {agent} handling technical terms") - return False # Billing shouldn't handle technical - if agent == "troubleshooting" and any(bill_word in value_str for bill_word in ["credit", "payment", "charge"]): - print(f" ⚠️ Domain violation: {agent} handling billing terms") - return False # Technical shouldn't handle billing - return True - - # Choose merge strategy based on agent type and issue complexity - customer = state["customer_context"] - if customer.account_type == "Premium" or customer.priority == "high": - # Use advanced merge for premium customers - merge_strategy = "conflict_aware" - print(f" 🏆 Using conflict-aware merge for {agent_name} (premium customer)") - elif agent_name == "billing" and customer.issue_type == IssueType.BILLING_DISPUTE: - # Use conflict-aware merge for complex billing data - merge_strategy = "conflict_aware" - print(f" 🧠 Using conflict-aware merge for {agent_name} (complex billing data)") - else: - # Use ignore_conflicts for standard cases - merge_strategy = "ignore_conflicts" - print(f" ⏰ Using ignore_conflicts merge for {agent_name} (standard case)") - - success = store.validate_and_merge_agent_data(agent_name, validate_agent_data, merge_strategy) - validation_results[agent_name] = success - merge_strategies_used[agent_name] = merge_strategy - - successful_merges = sum(validation_results.values()) - total_agents = len(validation_results) - - result_summary = f"Merged {successful_merges}/{total_agents} agent results using intelligent conflict resolution" - print(f"✅ {result_summary}") - - # Show merge strategies used - for agent, strategy in merge_strategies_used.items(): - success_icon = "✅" if validation_results.get(agent) else "❌" - print(f" {success_icon} {agent}: {strategy} resolution") - - # Generate final recommendations with metadata - final_recommendations = [] - agent_results = state.get("agent_results", {}) - for agent_name, result in agent_results.items(): - if result: - recommendations = result.get("recommendations", []) - # Add metadata to recommendations - for rec in recommendations: - final_recommendations.append({ - "recommendation": rec, - "agent": agent_name, - "confidence": result.get("confidence", 0.0), - "priority": result.get("agent_priority", 5), - "timestamp": result.get("analysis_timestamp") - }) - - # Sort recommendations by priority and confidence - final_recommendations.sort(key=lambda x: (-x["priority"], -x["confidence"])) - - return { - "isolation_success": successful_merges == total_agents, - "context_bleeding_detected": not (successful_merges == total_agents), - "final_recommendations": [r["recommendation"] for r in final_recommendations], # Extract just the text - "recommendation_metadata": final_recommendations, # Keep full metadata - "merge_strategies_used": merge_strategies_used, - "resolution_quality": "high" if successful_merges == total_agents else "medium", - "messages": state["messages"] + [AIMessage(content=result_summary)] - } - -def route_to_agent(state) -> str: - """Route to the appropriate agent based on supervisor decision""" - return state["current_branch"] - -# ============================================================================ -# Workflow Visualization -# ============================================================================ - -try: - from IPython.display import display, Image - IPYTHON_AVAILABLE = True -except ImportError: - IPYTHON_AVAILABLE = False - -def display_workflow_diagram(workflow): - """Display the LangGraph workflow diagram""" - print("🎨 Generating multi-agent workflow diagram...") - - try: - diagram_bytes = workflow.get_graph(xray=True).draw_mermaid_png() - temp_file = '/tmp/multi_agent_supervisor_diagram.png' - with open(temp_file, 'wb') as f: - f.write(diagram_bytes) - print(f"💾 Multi-agent supervisor diagram saved to: {temp_file}") - - if IPYTHON_AVAILABLE: - try: - from IPython import get_ipython - if get_ipython() is not None and get_ipython().__class__.__name__ == 'ZMQInteractiveShell': - display(Image(diagram_bytes)) - print("📊 Multi-agent supervisor diagram displayed inline!") - else: - print("📊 Multi-agent supervisor diagram generated (view at the file path above)") - print(" 💡 For inline display, run in a Jupyter notebook") - except Exception: - print("📊 Multi-agent supervisor diagram generated (view at the file path above)") - else: - print("📊 Multi-agent supervisor diagram generated (view at the file path above)") - print(" 💡 Install IPython for enhanced display: pip install ipython") - - print("✅ LangGraph supervisor workflow diagram generation successful!") - return temp_file - - except Exception as e: - print(f"⚠️ Could not generate diagram: {e}") - print(" This may require additional dependencies for Mermaid rendering") - - return None - -# ============================================================================ -# Multi-Agent Workflow Creation -# ============================================================================ - -def create_multi_agent_workflow(store: ProllyVersionedMemoryStore): - """Create the multi-agent workflow with supervisor pattern and branch isolation""" - - # Build the state graph - builder = StateGraph(MultiAgentState) - - # Add nodes with store injection - builder.add_node("supervisor", lambda state: supervisor_node(state, store)) - builder.add_node("troubleshooting", lambda state: troubleshooting_agent_node(state, store)) - builder.add_node("billing", lambda state: billing_agent_node(state, store)) - builder.add_node("customer_history", lambda state: customer_history_agent_node(state, store)) - builder.add_node("validate_and_merge", lambda state: validation_node(state, store)) - - # Define the workflow - builder.add_edge(START, "supervisor") - - # Route from supervisor to appropriate agent - builder.add_conditional_edges( - "supervisor", - route_to_agent, - { - "troubleshooting": "troubleshooting", - "billing": "billing", - "customer_history": "customer_history" - } - ) - - # All agents go to validation - builder.add_edge("troubleshooting", "validate_and_merge") - builder.add_edge("billing", "validate_and_merge") - builder.add_edge("customer_history", "validate_and_merge") - - # End after validation - builder.add_edge("validate_and_merge", END) - - # Compile with the external store for LangGraph integration - return builder.compile(store=store) - -# ============================================================================ -# Demonstration Functions -# ============================================================================ - -def demonstrate_supervisor_pattern(): - """Demonstrate the LangGraph supervisor pattern with branch isolation""" - - print("\n" + "="*80) - print(" 🚀 LangGraph Supervisor Pattern with Git-like Branch Isolation") - print("="*80) - print("\nThis demo shows how ProllyTree's branching prevents context bleeding:") - print(" • LangGraph supervisor manages agent delegation") - print(" • Each agent works in an isolated branch") - print(" • Semantic validation prevents inappropriate recommendations") - print(" • Clean audit trail of all agent operations") - - with tempfile.TemporaryDirectory() as tmpdir: - store_path = os.path.join(tmpdir, "supervisor_memory") - store = ProllyVersionedMemoryStore(store_path) - - # Capture initial memory state - print(f"\n🧠 INITIAL MEMORY STATE:") - initial_keys = store.main_store.list_keys() - print(f" 📊 Main memory entries before agents: {len(initial_keys)}") - - # Create workflow with external store integration - workflow = create_multi_agent_workflow(store) - - # Display workflow diagram - print(f"\n📊 LangGraph Supervisor Workflow:") - print(" START → Supervisor → Agent → Supervisor → Agent → ... → END") - print(" • Supervisor intelligently delegates based on issue type") - print(" • Each agent works in isolated branch") - print(" • Supervisor validates and merges results") - - display_workflow_diagram(workflow) - - # Test Case 1: Technical Issue - print("\n" + "="*70) - print("🔧 TEST CASE 1: Technical Issue (Slow Internet)") - print("="*70) - - customer1 = CustomerContext( - customer_id="CUST-001", - name="Alice Smith", - account_type="Premium", - issue_type=IssueType.SLOW_INTERNET, - issue_description="Internet very slow, can't stream videos", - priority="high", - contact_history=[{"date": "2024-01-15", "issue": "Setup help", "resolved": True}], - current_services=["Internet 1Gbps", "Cable TV"] - ) - - # Initialize state - initial_state = { - "messages": [HumanMessage(content=f"Customer {customer1.name} reports: {customer1.issue_description}")], - "customer_context": customer1, - "session_id": "session-001", - "current_branch": "main", - "active_branches": {}, - "agent_results": {}, - "merge_conflicts": [], - "context_bleeding_detected": False, - "isolation_success": True, - "final_recommendations": [], - "resolution_quality": "pending" - } - - print(f"📞 Customer: {customer1.name}") - print(f"❓ Issue: {customer1.issue_description}") - print(f"🎯 Expected: Supervisor should delegate to troubleshooting agent") - - # Run workflow - try: - result = workflow.invoke(initial_state) - - print(f"\n📊 Workflow Results:") - print(f" • Messages exchanged: {len(result.get('messages', []))}") - print(f" • Active branches: {result.get('active_branches', {})}") - print(f" • Context bleeding detected: {result.get('context_bleeding_detected', False)}") - - except Exception as e: - print(f"⚠️ Workflow execution error: {e}") - print(" This is expected in demo mode - showing the architecture pattern") - - # Show memory changes after agent work - print(f"\n🧠 MEMORY CHANGES AFTER AGENT WORK:") - final_keys = store.main_store.list_keys() - merged_keys = [key.decode('utf-8') for key in final_keys if key.decode('utf-8').startswith('merged:')] - - print(f" 📊 Total memory entries: {len(final_keys)}") - print(f" 📊 Entries added by agents: {len(merged_keys)}") - - if merged_keys: - print(f" 🔍 Sample merged entries:") - for key in merged_keys[:3]: - print(f" - {key}") - - # Show agent branch tracking - print(f"\n🌿 GIT BRANCH ISOLATION TRACKING:") - branch_info = store.get_branch_info() - print(f" 📊 Current Git branch: {branch_info['main_store_branch']}") - print(f" 📊 All Git branches: {branch_info['all_branches']}") - print(f" 📊 Agent→Branch mapping:") - for agent_name, branch_name in branch_info['agent_branches'].items(): - print(f" • {agent_name} → {branch_name}") - - # Show commit history - print(f"\n📚 GIT-LIKE AUDIT TRAIL:") - history = store.get_commit_history() - print(f" 📊 Total commits: {len(history)}") - for commit in history[-5:]: - print(f" {commit['id']} - {commit['message']}") - - # Test Case 2: Billing Issue - print("\n" + "="*70) - print("💰 TEST CASE 2: Billing Issue") - print("="*70) - - customer2 = CustomerContext( - customer_id="CUST-002", - name="Bob Johnson", - account_type="Basic", - issue_type=IssueType.BILLING_DISPUTE, - issue_description="Charged twice for the same service", - priority="medium", - contact_history=[], - current_services=["Internet 100Mbps"] - ) - - print(f"📞 Customer: {customer2.name}") - print(f"❓ Issue: {customer2.issue_description}") - print(f"🎯 Expected: Supervisor should delegate to billing agent") - - # Architecture Summary - print(f"\n" + "="*70) - print("🏗️ ARCHITECTURE SUMMARY") - print("="*70) - - print(f"\n✅ LangGraph Supervisor Pattern:") - print(f" • Function-based nodes with proper state management") - print(f" • Conditional routing based on issue classification") - print(f" • WorktreeManager-based ProllyVersionedMemoryStore as external long-term store") - print(f" • Supervisor validates and routes intelligently") - - print(f"\n✅ WorktreeManager Integration:") - print(f" • Proper LangGraph external store interface") - print(f" • Git worktree isolation for true parallel agent execution") - print(f" • WorktreeVersionedKvStore instances for independent operation") - print(f" • Intelligent conflict resolution with multiple strategies") - print(f" • Complete audit trail with versioned commits") - - print(f"\n✅ Context Bleeding Prevention:") - print(f" • Each agent operates in isolated worktree with dedicated WorktreeVersionedKvStore") - print(f" • True parallel execution without conflicts or race conditions") - print(f" • No cross-contamination between agent domains") - print(f" • Domain validation prevents inappropriate recommendations") - print(f" • Shared long-term memory with complete worktree-level isolation") - -def main(): - """Run the LangGraph supervisor demonstration""" - - print("="*80) - print(" Multi-Agent System with LangGraph Supervisor Pattern") - print(" Using Git-like Branching with ProllyTree") - print("="*80) - - print("\n🎯 Key Features Demonstrated:") - print(" • LangGraph supervisor pattern with WorktreeManager-backed storage") - print(" • Complete worktree isolation using WorktreeVersionedKvStore instances") - print(" • True parallel execution capability for multi-agent systems") - print(" • Intelligent conflict resolution with multiple strategies") - print(" • ConflictResolution enum for merge strategy selection") - print(" • Domain validation preventing context bleeding") - print(" • Complete Git audit trail of all agent activities") - - try: - demonstrate_supervisor_pattern() - - print("\n" + "="*80) - print("✅ LangGraph Supervisor Demonstration Complete!") - print("="*80) - print("\nKey Architectural Patterns Shown:") - print(" 1. LangGraph supervisor with WorktreeManager for intelligent delegation") - print(" 2. Complete worktree isolation prevents context bleeding and enables parallelism") - print(" 3. WorktreeVersionedKvStore instances for true parallel agent execution") - print(" 4. Multi-strategy conflict resolution (ignore_conflicts, try_merge)") - print(" 5. Domain validation ensures appropriate recommendations") - print(" 6. Git worktree history provides complete audit trail") - print(" 7. Intelligent merge operations with conflict detection") - - except ImportError as e: - print(f"\n❌ Error: {e}") - print("\nPlease install required dependencies:") - print(" pip install langgraph langchain-core prollytree") - -if __name__ == "__main__": - main() diff --git a/python/examples/langmem_integration.py b/python/examples/langmem_example.py similarity index 100% rename from python/examples/langmem_integration.py rename to python/examples/langmem_example.py diff --git a/python/examples/run_examples.sh b/python/examples/run_examples.sh index 8acd395..5cc7852 100755 --- a/python/examples/run_examples.sh +++ b/python/examples/run_examples.sh @@ -11,10 +11,8 @@ get_example_file() { "basic") echo "basic_usage.py" ;; "sql") echo "sql_example.py" ;; "langgraph") echo "langgraph_example.py" ;; - "chronological") echo "langgraph_chronological.py" ;; - "multi-agent") echo "langgraph_multi_agent_branching.py" ;; "merge") echo "merge_example.py" ;; - "langmem") echo "langmem_integration.py" ;; + "langmem") echo "langmem_example.py" ;; *) echo "" ;; esac } @@ -30,8 +28,6 @@ show_usage() { echo " basic - Basic memory usage example" echo " sql - SQL query example" echo " langgraph - LangGraph memory example" - echo " chronological - LangGraph chronological memory example" - echo " multi-agent - Multi-agent system with branch isolation" echo " merge - Branch merging with conflict resolution example" echo " langmem - LangMem integration with ProllyTree backend" echo "" @@ -95,10 +91,8 @@ if ./python/build_python.sh --all-features --install; then run_example "basic memory usage" "basic_usage.py" run_example "SQL" "sql_example.py" run_example "LangGraph memory" "langgraph_example.py" - run_example "LangGraph chronological memory" "langgraph_chronological.py" - run_example "Multi-agent branching" "langgraph_multi_agent_branching.py" run_example "merge with conflict resolution" "merge_example.py" - run_example "LangMem integration" "langmem_integration.py" + run_example "LangMem integration" "langmem_example.py" else # Run only the requested example EXAMPLE_FILE=$(get_example_file "$REQUESTED_EXAMPLE")