In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
from pathlib import Path
from typing import List, Optional, Dict, Any, Union, Set
import hashlib
import git
import pickle
import logging
from urllib.parse import urlparse
import re
from dataclasses import dataclass
from enum import Enum
import json
from collections import defaultdict, deque

import dspy
from pydantic import BaseModel, Field


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Suppress litellm logs below WARNING
logging.getLogger("litellm").setLevel(logging.WARNING)
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)

In [3]:
dspy.configure(lm=dspy.LM("gemini/gemini-2.5-flash", max_tokens=20000))

In [4]:
# Pydantic Models (same as before)
class DocumentType(str, Enum):
    TUTORIAL = "tutorial"
    REFERENCE = "reference"
    EXAMPLE = "example"
    CONCEPT = "concept"

class CodeBlock(BaseModel):
    language: str
    content: str
    line_start: int
    line_end: int

class DocumentMetadata(BaseModel):
    file_path: str
    title: Optional[str] = None
    headings: List[str] = Field(default_factory=list)
    code_blocks: List[CodeBlock] = Field(default_factory=list)
    links: List[str] = Field(default_factory=list)
    word_count: int = 0

class DocumentClassification(BaseModel):
    file_path: str
    doc_type: DocumentType
    confidence: float
    reasoning: str

class DependencyRelation(BaseModel):
    concept: str
    prerequisites: List[str] = Field(default_factory=list)
    confidence: float
    evidence: str

class AnalyzedDocument(BaseModel):
    metadata: DocumentMetadata
    classification: DocumentClassification
    content: str
    dependencies: List[DependencyRelation] = Field(default_factory=list)
    summary: str = "" 

In [5]:
# Updated DSPy Signatures
class ClassifyDocument(dspy.Signature):
    """Classify a technical document into one of four categories based on its content and structure."""
    
    content: str = dspy.InputField(desc="The document content to classify")
    file_path: str = dspy.InputField(desc="The file path for context")
    headings: List[str] = dspy.InputField(desc="List of headings in the document")
    
    classification: DocumentType = dspy.OutputField(desc="One of: tutorial, reference, example, concept")
    confidence: float = dspy.OutputField(desc="Confidence score between 0.0 and 1.0")
    reasoning: str = dspy.OutputField(desc="Brief explanation for the classification")

class ExtractDependencies(dspy.Signature):
    """Extract prerequisite concepts and dependencies from technical documentation."""
    
    content: str = dspy.InputField(desc="The document content to analyze")
    title: str = dspy.InputField(desc="Document title or main concept")
    headings: List[str] = dspy.InputField(desc="List of headings in the document")
    
    main_concepts: List[str] = dspy.OutputField(desc="List of main concepts covered in this document")
    prerequisites: List[str] = dspy.OutputField(desc="List of prerequisite concepts needed to understand this document")
    evidence: str = dspy.OutputField(desc="Text evidence supporting the identified dependencies")

class SummarizeDocument(dspy.Signature):
    """Create a concise summary of a technical document for contextual retrieval."""
    
    content: str = dspy.InputField(desc="The full document content to summarize")
    title: str = dspy.InputField(desc="Document title")
    doc_type: str = dspy.InputField(desc="Document type (tutorial, reference, example, concept)")
    
    summary: str = dspy.OutputField(desc="Concise 5-7 sentence summary providing context about this document")


In [6]:
# Fallback parsers for string responses
class FallbackParser:
    @staticmethod
    def parse_classification_response(response: str, file_path: str) -> DocumentClassification:
        """Parse string response for classification when structured output fails"""
        
        # Default values
        doc_type = DocumentType.CONCEPT
        confidence = 0.5
        reasoning = "Fallback classification"
        
        try:
            # Try to extract classification type
            classification_patterns = [
                r"classification[:\s]+(\w+)",
                r"type[:\s]+(\w+)",
                r"category[:\s]+(\w+)",
                r"(tutorial|reference|example|concept)"
            ]
            
            for pattern in classification_patterns:
                match = re.search(pattern, response.lower())
                if match:
                    found_type = match.group(1).lower()
                    if found_type in ["tutorial", "reference", "example", "concept"]:
                        doc_type = DocumentType(found_type)
                        break
            
            # Try to extract confidence
            confidence_patterns = [
                r"confidence[:\s]+([0-9]*\.?[0-9]+)",
                r"score[:\s]+([0-9]*\.?[0-9]+)",
                r"([0-9]*\.?[0-9]+)"
            ]
            
            for pattern in confidence_patterns:
                match = re.search(pattern, response)
                if match:
                    try:
                        found_confidence = float(match.group(1))
                        if 0.0 <= found_confidence <= 1.0:
                            confidence = found_confidence
                            break
                        elif found_confidence > 1.0:  # Handle percentage
                            confidence = found_confidence / 100.0
                            break
                    except ValueError:
                        continue
            
            # Extract reasoning (usually the whole response or a specific part)
            reasoning_match = re.search(r"reasoning[:\s]+(.+)", response, re.IGNORECASE | re.DOTALL)
            if reasoning_match:
                reasoning = reasoning_match.group(1).strip()
            else:
                reasoning = response[:200] + "..." if len(response) > 200 else response
                
        except Exception as e:
            logger.warning(f"Error parsing classification response: {e}")
        
        return DocumentClassification(
            file_path=file_path,
            doc_type=doc_type,
            confidence=confidence,
            reasoning=reasoning
        )
    
    @staticmethod
    def parse_dependencies_response(response: str) -> List[DependencyRelation]:
        """Parse string response for dependencies when structured output fails"""
        
        dependencies = []
        
        try:
            # Try to extract main concepts
            main_concepts = []
            concepts_patterns = [
                r"main concepts?[:\s]+(.+?)(?=prerequisites|evidence|$)",
                r"concepts?[:\s]+(.+?)(?=prerequisites|evidence|$)",
                r"topics?[:\s]+(.+?)(?=prerequisites|evidence|$)"
            ]
            
            for pattern in concepts_patterns:
                match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
                if match:
                    concepts_text = match.group(1)
                    # Split by common delimiters
                    main_concepts = [
                        c.strip().strip(',-.')
                        for c in re.split(r'[,\n\-•*]', concepts_text)
                        if c.strip() and len(c.strip()) > 2
                    ][:5]  # Limit to 5 concepts
                    break
            
            # Try to extract prerequisites
            prerequisites = []
            prereq_patterns = [
                r"prerequisites?[:\s]+(.+?)(?=evidence|main|$)",
                r"requires?[:\s]+(.+?)(?=evidence|main|$)",
                r"dependencies?[:\s]+(.+?)(?=evidence|main|$)"
            ]
            
            for pattern in prereq_patterns:
                match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
                if match:
                    prereq_text = match.group(1)
                    prerequisites = [
                        p.strip().strip(',-.')
                        for p in re.split(r'[,\n\-•*]', prereq_text)
                        if p.strip() and len(p.strip()) > 2
                    ][:5]  # Limit to 5 prerequisites
                    break
            
            # Extract evidence
            evidence = "Extracted from fallback parsing"
            evidence_patterns = [
                r"evidence[:\s]+(.+)",
                r"explanation[:\s]+(.+)",
                r"reasoning[:\s]+(.+)"
            ]
            
            for pattern in evidence_patterns:
                match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
                if match:
                    evidence = match.group(1).strip()[:500]  # Limit length
                    break
            
            # Create dependency relations
            if not main_concepts:
                main_concepts = ["Unknown concept"]
            
            for concept in main_concepts:
                dependencies.append(DependencyRelation(
                    concept=concept,
                    prerequisites=prerequisites,
                    confidence=0.6,  # Lower confidence for fallback
                    evidence=evidence
                ))
                
        except Exception as e:
            logger.warning(f"Error parsing dependencies response: {e}")
            # Return minimal dependency if parsing fails completely
            dependencies = [DependencyRelation(
                concept="Unknown concept",
                prerequisites=[],
                confidence=0.3,
                evidence="Fallback parsing failed"
            )]
        
        return dependencies

In [7]:
def truncate_num_words(text: str, num_words: int = 3000):
    words = text.split()
    return ' '.join(words[:num_words])

NUM_WORDS = 3000

In [8]:
# Updated DSPy Modules with fallback handling
class DocumentClassifier(dspy.Module):
    def __init__(self):
        super().__init__()
        self.classify = dspy.ChainOfThought(ClassifyDocument)
    
    def forward(self, content: str, file_path: str, headings: List[str]):
        try:
            result = self.classify(
                content=content[:2000],  # Truncate for efficiency
                file_path=file_path,
                headings=headings
            )
            
            # Check if we got structured output or string
            if hasattr(result, 'classification') and hasattr(result, 'confidence') and hasattr(result, 'reasoning'):
                # Structured output - validate and convert
                doc_type_mapping = {
                    "tutorial": DocumentType.TUTORIAL,
                    "reference": DocumentType.REFERENCE, 
                    "example": DocumentType.EXAMPLE,
                    "concept": DocumentType.CONCEPT
                }
                
                # Handle both enum and string responses
                if isinstance(result.classification, DocumentType):
                    doc_type = result.classification
                else:
                    doc_type = doc_type_mapping.get(str(result.classification).lower(), DocumentType.CONCEPT)
                
                confidence = float(result.confidence) if result.confidence else 0.5
                confidence = max(0.0, min(1.0, confidence))  # Clamp between 0 and 1
                
                return DocumentClassification(
                    file_path=file_path,
                    doc_type=doc_type,
                    confidence=confidence,
                    reasoning=str(result.reasoning)
                )
            else:
                # String output - use fallback parser
                logger.warning(f"Got string response for classification, using fallback parser")
                response_str = str(result) if hasattr(result, '__str__') else str(result.classification) if hasattr(result, 'classification') else ""
                return FallbackParser.parse_classification_response(response_str, file_path)
                
        except Exception as e:
            logger.error(f"Error in document classification: {e}")
            # Return default classification
            return DocumentClassification(
                file_path=file_path,
                doc_type=DocumentType.CONCEPT,
                confidence=0.3,
                reasoning=f"Classification failed: {str(e)}"
            )

class DependencyExtractor(dspy.Module):
    def __init__(self):
        super().__init__()
        self.extract = dspy.ChainOfThought(ExtractDependencies)
    
    def forward(self, content: str, title: str, headings: List[str]):
        try:
            result = self.extract(
                content=content[:3000],  # Larger context for dependencies
                title=title,
                headings=headings
            )
            
            # Check if we got structured output
            if (hasattr(result, 'main_concepts') and 
                hasattr(result, 'prerequisites') and 
                hasattr(result, 'evidence')):
                
                # Structured output - process lists
                main_concepts = result.main_concepts
                prerequisites = result.prerequisites
                evidence = str(result.evidence)
                
                # Handle both list and string responses for concepts/prerequisites
                if isinstance(main_concepts, str):
                    main_concepts = [c.strip() for c in main_concepts.split(",") if c.strip()]
                if isinstance(prerequisites, str):
                    prerequisites = [p.strip() for p in prerequisites.split(",") if p.strip()]
                
                dependencies = []
                for concept in main_concepts:
                    if concept.strip():  # Only add non-empty concepts
                        dependencies.append(DependencyRelation(
                            concept=concept.strip(),
                            prerequisites=prerequisites,
                            confidence=0.8,
                            evidence=evidence
                        ))
                
                return dependencies
            else:
                # String output - use fallback parser
                logger.warning(f"Got string response for dependencies, using fallback parser")
                response_str = str(result) if hasattr(result, '__str__') else ""
                return FallbackParser.parse_dependencies_response(response_str)
                
        except Exception as e:
            logger.error(f"Error in dependency extraction: {e}")
            # Return minimal dependency
            return [DependencyRelation(
                concept=title or "Unknown",
                prerequisites=[],
                confidence=0.3,
                evidence=f"Dependency extraction failed: {str(e)}"
            )]  
        
class DocumentSummarizer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.summarize = dspy.ChainOfThought(SummarizeDocument)
    
    def forward(self, content: str, title: str, doc_type: str):
        result = self.summarize(
            content=truncate_num_words(content, NUM_WORDS),
            title=title,
            doc_type=doc_type
        )
        return result.summary

In [9]:
# RepoManager
class RepoManager:
    """Handles repository cloning, caching, and file discovery"""
    
    def __init__(self, cache_dir: str):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        
    def _get_repo_cache_path(self, repo_url: str) -> Path:
        """Generate cache path for repository"""
        repo_hash = hashlib.md5(repo_url.encode()).hexdigest()
        repo_name = urlparse(repo_url).path.strip('/').replace('/', '_')
        return self.cache_dir / f"{repo_name}_{repo_hash}"
    
    def _get_analysis_cache_path(self, repo_url: str) -> Path:
        """Generate cache path for document analysis results"""
        cache_path = self._get_repo_cache_path(repo_url)
        return cache_path / "analysis_results.pkl"
    
    def clone_or_update_repo(self, repo_url: str, force_update: bool = False) -> Path:
        """Clone repository or update if it exists"""
        repo_path = self._get_repo_cache_path(repo_url)
        
        if repo_path.exists() and not force_update:
            logger.info(f"Repository already cached at {repo_path}")
            try:
                repo = git.Repo(repo_path)
                repo.remotes.origin.pull()
                logger.info("Updated repository with latest changes")
            except Exception as e:
                logger.warning(f"Warning: Could not update repository: {e}")
            return repo_path
        
        if repo_path.exists():
            import shutil
            shutil.rmtree(repo_path)
            
        logger.info(f"Cloning repository to {repo_path}")
        git.Repo.clone_from(repo_url, repo_path)
        return repo_path
    
    def find_documentation_files(self, repo_path: Path, include_folders: Optional[List[str]] = None) -> List[Path]:
        """Find all markdown files in repository, optionally filtered by folders"""
        md_files = []
        for ext in ['*.md', '*.mdx']:
            md_files.extend(repo_path.rglob(ext))
        
        excluded_patterns = {
            'node_modules', '.git', '__pycache__', '.pytest_cache',
            'venv', 'env', '.venv', 'build', 'dist', 'tests', '.github'
        }
        
        filtered_files = []
        for file_path in md_files:
            if not any(excluded in file_path.parts for excluded in excluded_patterns):
                filtered_files.append(file_path)

        # Remove common non-content files
        filtered_files = [
            file for file in filtered_files 
            if not file.name.lower().startswith(('license', 'contributing', 'code_of_conduct', 'security', 'patents', 'changelog'))
        ]
        
        # Filter by include_folders if specified
        if include_folders:
            folder_filtered_files = []
            for file_path in filtered_files:
                rel_path = file_path.relative_to(repo_path)
                rel_path_str = str(rel_path)
                
                for include_folder in include_folders:
                    include_folder = include_folder.strip('/')
                    
                    if (rel_path_str.startswith(include_folder + '/') or 
                        rel_path_str.startswith(include_folder + '\\') or
                        (include_folder == '.' and '/' not in rel_path_str and '\\' not in rel_path_str) or
                        str(rel_path.parent) == include_folder or 
                        str(rel_path.parent).replace('\\', '/') == include_folder):
                        folder_filtered_files.append(file_path)
                        break
            
            filtered_files = folder_filtered_files
            logger.info(f"Filtered to {len(filtered_files)} files from specified folders: {include_folders}")
        
        return sorted(filtered_files)
    
    def save_analysis_cache(self, analysis_results: List[AnalyzedDocument], repo_url: str):
        """Save document analysis results to cache"""
        cache_path = self._get_analysis_cache_path(repo_url)
        cache_path.parent.mkdir(exist_ok=True)
        
        # Convert Pydantic models to dict for serialization
        serializable_results = [doc.dict() for doc in analysis_results]
        
        with open(cache_path, 'wb') as f:
            pickle.dump(serializable_results, f)
        logger.info(f"Saved analysis cache to {cache_path}")
    
    def load_analysis_cache(self, repo_url: str) -> Optional[List[AnalyzedDocument]]:
        """Load document analysis results from cache"""
        cache_path = self._get_analysis_cache_path(repo_url)
        
        if not cache_path.exists():
            return None
            
        try:
            with open(cache_path, 'rb') as f:
                serialized_results = pickle.load(f)
            
            # Convert back to Pydantic models
            results = [AnalyzedDocument(**doc_data) for doc_data in serialized_results]
            logger.info(f"Loaded analysis cache from {cache_path}")
            return results
        except Exception as e:
            logger.error(f"Error loading analysis cache: {e}")
            return None

# DocAnalyzer (updated to work with new modules)
class DocAnalyzer:
    """Analyzes technical documentation using DSPy modules"""
    
    def __init__(self):
        self.classifier = DocumentClassifier()
        self.dependency_extractor = DependencyExtractor()
        self.summarizer = DocumentSummarizer()

    def _extract_metadata(self, file_path: Path, content: str) -> DocumentMetadata:
        """Extract metadata from document content"""
        
        # Extract title (first H1 or filename)
        title_match = re.search(r'^# (.+)$', content, re.MULTILINE)
        title = title_match.group(1) if title_match else file_path.stem
        
        # Extract all headings
        headings = re.findall(r'^#{1,6} (.+)$', content, re.MULTILINE)
        
        # Extract code blocks
        code_blocks = []
        code_pattern = r'```(\w+)?\n(.*?)\n```'
        for i, match in enumerate(re.finditer(code_pattern, content, re.DOTALL)):
            language = match.group(1) or 'text'
            code_content = match.group(2)
            
            # Calculate line numbers (approximate)
            lines_before = content[:match.start()].count('\n')
            lines_in_block = code_content.count('\n')
            
            code_blocks.append(CodeBlock(
                language=language,
                content=code_content,
                line_start=lines_before + 1,
                line_end=lines_before + lines_in_block + 1
            ))
        
        # Extract links
        links = re.findall(r'\[([^\]]+)\]\(([^)]+)\)', content)
        link_urls = [link[1] for link in links]
        
        # Word count
        word_count = len(content.split())
        
        return DocumentMetadata(
            file_path=str(file_path),
            title=title,
            headings=headings,
            code_blocks=code_blocks,
            links=link_urls,
            word_count=word_count
        )
    
    def analyze_document(self, file_path: Path, content: str) -> AnalyzedDocument:
        """Analyze a single document"""
        
        # Extract metadata
        metadata = self._extract_metadata(file_path, content)
        
        # Classify document
        classification = self.classifier(
            content=content,
            file_path=str(file_path),
            headings=metadata.headings
        )
        
        # Extract dependencies
        dependencies = self.dependency_extractor(
            content=content,
            title=metadata.title or "",
            headings=metadata.headings
        )
        
        # Generate document summary for contextual retrieval
        doc_summary = self.summarizer(
            content=content,
            title=metadata.title or file_path.stem,
            doc_type=classification.doc_type.value
        )
        
        return AnalyzedDocument(
            metadata=metadata,
            classification=classification,
            content=content,
            dependencies=dependencies,
            summary=doc_summary
        )
    
    def analyze_repository(self, file_paths: List[Path]) -> List[AnalyzedDocument]:
        """Analyze all documents in a repository"""
        analyzed_docs = []
        
        for file_path in file_paths:
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                analyzed_doc = self.analyze_document(file_path, content)
                analyzed_docs.append(analyzed_doc)
                
                logger.info(f"Analyzed {file_path}: {analyzed_doc.classification.doc_type} (confidence: {analyzed_doc.classification.confidence:.2f})")
                
            except Exception as e:
                logger.error(f"Error analyzing {file_path}: {e}")
                continue
        
        return analyzed_docs
    
    def get_classified_docs(self, analyzed_docs: List[AnalyzedDocument]) -> Dict[DocumentType, List[AnalyzedDocument]]:
        """Group analyzed documents by classification"""
        classified = {doc_type: [] for doc_type in DocumentType}
        
        for doc in analyzed_docs:
            classified[doc.classification.doc_type].append(doc)
        
        return classified
    
    def build_dependency_map(self, analyzed_docs: List[AnalyzedDocument]) -> Dict[str, List[str]]:
        """Build a simple dependency map from analyzed documents"""
        dependency_map = {}
        
        for doc in analyzed_docs:
            for dep_relation in doc.dependencies:
                if dep_relation.concept not in dependency_map:
                    dependency_map[dep_relation.concept] = []
                dependency_map[dep_relation.concept].extend(dep_relation.prerequisites)
        
        # Remove duplicates
        for concept in dependency_map:
            dependency_map[concept] = list(set(dependency_map[concept]))
        
        return dependency_map

In [10]:
repo_url = "https://github.com/modelcontextprotocol/docs"
repo_manager = RepoManager(cache_dir=".cache")
doc_analyzer = DocAnalyzer()

In [11]:
# Clone a repository
repo_path = repo_manager.clone_or_update_repo(repo_url)

# Find documentation files
docs_files = repo_manager.find_documentation_files(
    repo_path, 
    include_folders=["docs", "tutorials", "quickstart"]
)
print(f"Found {len(docs_files)} documentation files")

INFO:__main__:Repository already cached at .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641
INFO:__main__:Updated repository with latest changes
INFO:__main__:Filtered to 14 files from specified folders: ['docs', 'tutorials', 'quickstart']


Found 14 documentation files


In [12]:
# Analyze documents
analyzed_docs = doc_analyzer.analyze_repository(docs_files)
print(f"Analyzed {len(analyzed_docs)} documents")

INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/architecture.mdx: DocumentType.CONCEPT (confidence: 0.98)
INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/prompts.mdx: DocumentType.CONCEPT (confidence: 0.95)
INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/resources.mdx: DocumentType.CONCEPT (confidence: 0.98)
INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/roots.mdx: DocumentType.CONCEPT (confidence: 0.98)
INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/sampling.mdx: DocumentType.REFERENCE (confidence: 0.95)
INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/tools.mdx: DocumentType.CONCEPT (confidence: 0.95)
INFO:__main__:Analyzed .cache/modelcontextprotocol_docs_9b

Analyzed 14 documents


In [None]:
# analyzed_docs[0].model_dump()['metadata']
# analyzed_docs[0].model_dump()['classification']
# analyzed_docs[0].model_dump()['dependencies']

In [13]:
import chromadb
from chromadb.config import Settings
import dspy
from pydantic import BaseModel, Field
import openai
from chromadb.errors import InvalidCollectionException  # Add this import

In [14]:
class DocumentChunk(BaseModel):
    """Represents a chunk with contextual information"""
    id: str
    content: str
    contextual_content: str  # Original content + document summary
    metadata: Dict[str, Any]

class QueryResult(BaseModel):
    """Result from vector database query"""
    chunk: DocumentChunk
    score: float

In [15]:
CHUNK_SIZE = 3000
CHUNK_OVERLAP = 100

class VectorDB:
    """Simplified vector database with contextual retrieval"""
    
    def __init__(self, 
                 db_path: str = "./vector_db",
                 collection_name: str = "docs",
                 embedding_model: str = "text-embedding-3-small",
                 chunk_size: int = 800,
                 chunk_overlap: int = 100):
        
        self.db_path = Path(db_path)
        self.db_path.mkdir(exist_ok=True)
        
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # Initialize ChromaDB
        self.client = chromadb.PersistentClient(
            path=str(self.db_path),
            settings=Settings(anonymized_telemetry=False)
        )
        
        self.collection_name = collection_name
        self.embedding_model = embedding_model
        
        # Get or create collection - Fix the exception handling
        try:
            self.collection = self.client.get_collection(name=collection_name)
            logger.info(f"Loaded existing collection: {collection_name}")
        except InvalidCollectionException:  # Changed from ValueError
            self.collection = self.client.create_collection(
                name=collection_name,
                metadata={"description": "Technical documentation with contextual retrieval"}
            )
            logger.info(f"Created new collection: {collection_name}")
    
    def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings using OpenAI"""
        try:
            response = openai.embeddings.create(
                model=self.embedding_model,
                input=texts
            )
            return [data.embedding for data in response.data]
        except Exception as e:
            logger.error(f"Error generating embeddings: {e}")
            raise
    
    def _chunk_with_context(self, doc) -> List[DocumentChunk]:
        """Create contextual chunks from document"""
        chunks = []
        
        # Simple word-based chunking
        words = doc.content.split()
        
        # Create document context prefix
        context_prefix = f"Document: {doc.metadata.title or 'Untitled'} ({doc.classification.doc_type.value}). {doc.summary}\n\n"
        
        # Extract and serialize complex metadata
        main_concepts = [dep.concept for dep in doc.dependencies]
        prerequisites = list(set([prereq for dep in doc.dependencies for prereq in dep.prerequisites]))
        code_languages = list(set([block.language for block in doc.metadata.code_blocks]))
        
        # Serialize complex objects as JSON strings
        headings_json = json.dumps(doc.metadata.headings)
        code_blocks_json = json.dumps([block.model_dump() for block in doc.metadata.code_blocks])
        dependencies_json = json.dumps(doc.model_dump()['dependencies'])
        links_json = json.dumps(doc.metadata.links)
        
        for i in range(0, len(words), self.chunk_size - self.chunk_overlap):
            chunk_words = words[i:i + self.chunk_size]
            chunk_content = ' '.join(chunk_words)
            
            if len(chunk_content.strip()) < 100:  # Skip very short chunks
                continue
            
            # Create contextual content (summary + chunk)
            contextual_content = context_prefix + chunk_content
            
            chunk_id = f"{hashlib.md5(doc.metadata.file_path.encode()).hexdigest()[:8]}_{i // (self.chunk_size - self.chunk_overlap)}"
            
            # Detect if this specific chunk contains code
            has_code = "```" in chunk_content or any(lang in chunk_content.lower() for lang in ["python", "javascript", "java", "cpp", "rust", "go"])
            
            # Find which heading section this chunk belongs to (approximate)
            relevant_heading = ""
            for heading in doc.metadata.headings:
                if heading.lower() in chunk_content.lower():
                    relevant_heading = heading
                    break
            
            chunk = DocumentChunk(
                id=chunk_id,
                content=chunk_content,
                contextual_content=contextual_content,
                metadata={
                    # Basic info
                    "file_path": doc.metadata.file_path,
                    "title": doc.metadata.title or "",
                    "doc_type": doc.classification.doc_type.value,
                    "chunk_index": i // (self.chunk_size - self.chunk_overlap),
                    
                    # Content analysis
                    "summary": doc.summary,
                    "main_concepts": ", ".join(main_concepts) if main_concepts else "",
                    "prerequisites": ", ".join(prerequisites) if prerequisites else "",
                    
                    # Classification info
                    "classification_confidence": float(doc.classification.confidence),
                    "classification_reasoning": doc.classification.reasoning,
                    
                    # Technical metadata  
                    "code_languages": ", ".join(code_languages) if code_languages else "",
                    "has_code": has_code,
                    "relevant_heading": relevant_heading,
                    
                    # Document structure
                    "total_headings": len(doc.metadata.headings),
                    "total_code_blocks": len(doc.metadata.code_blocks),
                    "total_links": len(doc.metadata.links),
                    "doc_word_count": doc.metadata.word_count,
                    
                    # Chunk position
                    "word_start": i,
                    "word_end": i + len(chunk_words),
                    "chunk_word_count": len(chunk_words),
                    
                    # Rich structured data as JSON strings
                    "headings_json": headings_json,
                    "code_blocks_json": code_blocks_json,
                    "dependencies_json": dependencies_json,
                    "links_json": links_json,
                    
                    # Full document metadata as JSON (for advanced queries)
                    "full_metadata_json": json.dumps(doc.metadata.model_dump()),
                    "full_classification_json": json.dumps(doc.classification.model_dump())
                }
            )
            
            chunks.append(chunk)
        
        return chunks
    
    def add_documents(self, analyzed_docs: List, batch_size: int = 50):  # Remove type hint for now
        """Add documents to vector database with contextual retrieval"""
        all_chunks = []
        
        # Create contextual chunks for all documents
        for doc in analyzed_docs:
            try:
                chunks = self._chunk_with_context(doc)
                all_chunks.extend(chunks)
                logger.info(f"Created {len(chunks)} contextual chunks for {doc.metadata.file_path}")
            except Exception as e:
                logger.error(f"Error processing document {doc.metadata.file_path}: {e}")
                continue
        
        # Add chunks to database in batches
        for i in range(0, len(all_chunks), batch_size):
            batch = all_chunks[i:i + batch_size]
            
            try:
                # Use contextual content for embeddings
                contextual_texts = [chunk.contextual_content for chunk in batch]
                embeddings = self._generate_embeddings(contextual_texts)
                
                # Store original content but embed contextual content
                ids = [chunk.id for chunk in batch]
                documents = [chunk.content for chunk in batch]  # Store original content
                metadatas = [chunk.metadata for chunk in batch]
                
                self.collection.add(
                    ids=ids,
                    embeddings=embeddings,
                    documents=documents,
                    metadatas=metadatas
                )
                
                logger.info(f"Added batch {i//batch_size + 1}/{(len(all_chunks) + batch_size - 1)//batch_size}")
                
            except Exception as e:
                logger.error(f"Error adding batch: {e}")
                continue
        
        logger.info(f"Successfully added {len(all_chunks)} contextual chunks")
    
    def query(self, 
            query_text: str, 
            n_results: int = 5,
            doc_types: Optional[List[str]] = None,
            concepts: Optional[List[str]] = None) -> List[QueryResult]:
        """Query the vector database"""
        
        try:
            # Generate embedding for query
            query_embedding = self._generate_embeddings([query_text])[0]
            
            # Build filter
            where_clause = None
            if doc_types:
                where_clause = {"doc_type": {"$in": doc_types}}
            
            # Query
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=n_results,
                where=where_clause
            )
            
            # Convert to QueryResult objects
            query_results = []
            for i in range(len(results['ids'][0])):
                chunk = DocumentChunk(
                    id=results['ids'][0][i],
                    content=results['documents'][0][i],
                    contextual_content="",  # We don't store this
                    metadata=results['metadatas'][0][i]
                )
                
                # Convert distance to similarity score
                distance = results['distances'][0][i]
                score = 1 / (1 + distance)
                
                query_results.append(QueryResult(chunk=chunk, score=score))
            
            # Additional concept filtering if specified
            if concepts:
                filtered_results = []
                for result in query_results:
                    chunk_concepts_str = result.chunk.metadata.get('main_concepts', '')
                    chunk_concepts = [c.strip() for c in chunk_concepts_str.split(',') if c.strip()]
                    
                    if any(concept.lower() in [c.lower() for c in chunk_concepts] for concept in concepts):
                        filtered_results.append(result)
                query_results = filtered_results
            
            return query_results
            
        except Exception as e:
            logger.error(f"Error querying: {e}")
            return []

    def query_advanced(self, 
                    query_text: str, 
                    n_results: int = 5,
                    doc_types: Optional[List[str]] = None,
                    code_languages: Optional[List[str]] = None,
                    has_code: Optional[bool] = None,
                    min_confidence: Optional[float] = None,
                    concepts: Optional[List[str]] = None) -> List[QueryResult]:
        """Advanced query with multiple filters"""
        
        try:
            # Generate embedding for query
            query_embedding = self._generate_embeddings([query_text])[0]
            
            # Build complex where clause
            where_conditions = []
            
            if doc_types:
                where_conditions.append({"doc_type": {"$in": doc_types}})
            
            if has_code is not None:
                where_conditions.append({"has_code": has_code})
                
            if min_confidence is not None:
                where_conditions.append({"classification_confidence": {"$gte": min_confidence}})
            
            # Build final where clause
            where_clause = None
            if where_conditions:
                if len(where_conditions) == 1:
                    where_clause = where_conditions[0]
                else:
                    where_clause = {"$and": where_conditions}
            
            # Query
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=n_results,
                where=where_clause
            )
            
            # Convert to QueryResult objects
            query_results = []
            for i in range(len(results['ids'][0])):
                chunk = DocumentChunk(
                    id=results['ids'][0][i],
                    content=results['documents'][0][i],
                    contextual_content="",
                    metadata=results['metadatas'][0][i]
                )
                
                distance = results['distances'][0][i]
                score = 1 / (1 + distance)
                
                query_results.append(QueryResult(chunk=chunk, score=score))
            
            # Post-filter for complex string matching
            if code_languages or concepts:
                filtered_results = []
                for result in query_results:
                    include = True
                    
                    # Filter by code languages
                    if code_languages:
                        chunk_languages = result.chunk.metadata.get('code_languages', '').split(', ')
                        if not any(lang in chunk_languages for lang in code_languages):
                            include = False
                    
                    # Filter by concepts
                    if concepts:
                        chunk_concepts = result.chunk.metadata.get('main_concepts', '').split(', ')
                        if not any(concept.lower() in [c.lower() for c in chunk_concepts] for concept in concepts):
                            include = False
                    
                    if include:
                        filtered_results.append(result)
                
                query_results = filtered_results
            
            return query_results
            
        except Exception as e:
            logger.error(f"Error in advanced query: {e}")
            return []
    
    def get_stats(self):
        """Get database statistics"""
        try:
            count = self.collection.count()
            if count == 0:
                return {
                    "total_chunks": 0,
                    "doc_type_distribution": {}
                }
                
            sample = self.collection.get(limit=min(100, count))
            
            doc_types = {}
            for metadata in sample['metadatas']:
                doc_type = metadata.get('doc_type', 'unknown')
                doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
            
            return {
                "total_chunks": count,
                "doc_type_distribution": doc_types
            }
        except Exception as e:
            logger.error(f"Error getting stats: {e}")
            return {"error": str(e)}

    def clear_collection(self):
        """Clear all data from the collection"""
        try:
            self.client.delete_collection(self.collection_name)
            self.collection = self.client.create_collection(
                name=self.collection_name,
                metadata={"description": "Technical documentation with contextual retrieval"}
            )
            logger.info(f"Cleared collection: {self.collection_name}")
        except Exception as e:
            logger.error(f"Error clearing collection: {e}")

In [16]:
class VectorDBManager:
    """Simple manager for vector database operations"""
    
    def __init__(self, db_path: str = "./vector_db"):
        self.vector_db = VectorDB(db_path=db_path)
    
    def initialize_from_analysis(self, analyzed_docs: List):
        """Initialize database from analyzed documents"""
        logger.info("Adding documents with contextual retrieval...")
        self.vector_db.add_documents(analyzed_docs)
        return self.vector_db.get_stats()
    
    def search(self, query: str, n_results: int = 5, doc_types: List[str] = None) -> List[QueryResult]:
        """Basic search"""
        return self.vector_db.query(query, n_results, doc_types)
    
    def search_by_heading(self, query: str, heading_keyword: str, n_results: int = 5) -> List[QueryResult]:
        """Search for content under specific headings"""
        results = self.vector_db.query(query, n_results * 2)  # Get more results to filter
        
        filtered_results = []
        for result in results:
            try:
                headings = json.loads(result.chunk.metadata.get('headings_json', '[]'))
                if any(heading_keyword.lower() in heading.lower() for heading in headings):
                    filtered_results.append(result)
                if len(filtered_results) >= n_results:
                    break
            except json.JSONDecodeError:
                continue
        
        return filtered_results
    
    def search_by_code_language(self, query: str, language: str, n_results: int = 5) -> List[QueryResult]:
        """Search for code examples in specific language"""
        results = self.vector_db.query(query, n_results * 2)
        
        filtered_results = []
        for result in results:
            try:
                code_blocks = json.loads(result.chunk.metadata.get('code_blocks_json', '[]'))
                if any(block.get('language', '').lower() == language.lower() for block in code_blocks):
                    filtered_results.append(result)
                if len(filtered_results) >= n_results:
                    break
            except json.JSONDecodeError:
                continue
        
        return filtered_results
    
    def search_by_dependency(self, query: str, required_concept: str, n_results: int = 5) -> List[QueryResult]:
        """Search for content that depends on a specific concept"""
        results = self.vector_db.query(query, n_results * 2)
        
        filtered_results = []
        for result in results:
            try:
                dependencies = json.loads(result.chunk.metadata.get('dependencies_json', '[]'))
                for dep in dependencies:
                    if required_concept.lower() in dep.get('concept', '').lower():
                        filtered_results.append(result)
                        break
                    if required_concept.lower() in [p.lower() for p in dep.get('prerequisites', [])]:
                        filtered_results.append(result)
                        break
                if len(filtered_results) >= n_results:
                    break
            except json.JSONDecodeError:
                continue
        
        return filtered_results
    
    def get_chunk_analysis(self, chunk: DocumentChunk) -> Dict:
        """Extract rich analysis from a chunk's JSON metadata"""
        try:
            analysis = {
                "headings": json.loads(chunk.metadata.get('headings_json', '[]')),
                "code_blocks": json.loads(chunk.metadata.get('code_blocks_json', '[]')),
                "dependencies": json.loads(chunk.metadata.get('dependencies_json', '[]')),
                "links": json.loads(chunk.metadata.get('links_json', '[]')),
                "full_metadata": json.loads(chunk.metadata.get('full_metadata_json', '{}')),
                "classification": json.loads(chunk.metadata.get('full_classification_json', '{}'))
            }
            return analysis
        except json.JSONDecodeError as e:
            logger.error(f"Error parsing chunk metadata: {e}")
            return {}
    
    def find_learning_prerequisites(self, concept: str, n_results: int = 5) -> List[QueryResult]:
        """Find content that teaches prerequisites for a concept"""
        results = self.vector_db.query(f"learn {concept} prerequisites requirements", n_results * 3)
        
        prerequisite_chunks = []
        for result in results:
            try:
                dependencies = json.loads(result.chunk.metadata.get('dependencies_json', '[]'))
                for dep in dependencies:
                    if concept.lower() in dep.get('concept', '').lower():
                        # This chunk teaches something that requires our concept
                        for prereq in dep.get('prerequisites', []):
                            prereq_results = self.vector_db.query(f"{prereq} tutorial guide", 3)
                            prerequisite_chunks.extend(prereq_results)
                        break
                if len(prerequisite_chunks) >= n_results:
                    break
            except json.JSONDecodeError:
                continue
        
        return prerequisite_chunks[:n_results]
    
    def clear_database(self):
        """Clear the entire database"""
        self.vector_db.clear_collection()

In [17]:
vector_db_manager = VectorDBManager(db_path="./vector_db")
vector_db_manager.clear_database()
stats = vector_db_manager.initialize_from_analysis(analyzed_docs)

INFO:__main__:Loaded existing collection: docs
INFO:__main__:Cleared collection: docs
INFO:__main__:Adding documents with contextual retrieval...
INFO:__main__:Created 2 contextual chunks for .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/architecture.mdx
INFO:__main__:Created 2 contextual chunks for .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/prompts.mdx
INFO:__main__:Created 2 contextual chunks for .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/resources.mdx
INFO:__main__:Created 1 contextual chunks for .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/roots.mdx
INFO:__main__:Created 2 contextual chunks for .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/sampling.mdx
INFO:__main__:Created 2 contextual chunks for .cache/modelcontextprotocol_docs_9b06b34c6341a02b233055dc593dd641/docs/concepts/tools.mdx
INFO:__main__:Created 

In [18]:
t= vector_db_manager.search(query="tool")

In [19]:
t[0].model_dump()

{'chunk': {'id': 'cc0bfbf9_0',
  'content': '--- title: "Tools" description: "Enable LLMs to perform actions through your server" --- Tools are a powerful primitive in the Model Context Protocol (MCP) that enable servers to expose executable functionality to clients. Through tools, LLMs can interact with external systems, perform computations, and take actions in the real world. <Note> Tools are designed to be **model-controlled**, meaning that tools are exposed from servers to clients with the intention of the AI model being able to automatically invoke them (with a human in the loop to grant approval). </Note> ## Overview Tools in MCP allow servers to expose executable functions that can be invoked by clients and used by LLMs to perform actions. Key aspects of tools include: - **Discovery**: Clients can list available tools through the `tools/list` endpoint - **Invocation**: Tools are called using the `tools/call` endpoint, where servers perform the requested operation and return res

In [20]:
# Pydantic BaseModels
class LearningModule(BaseModel):
    """A learning module covering a specific topic"""
    title: str
    tutorial_content: List[str] = Field(default_factory=list)  # chunk IDs
    concept_content: List[str] = Field(default_factory=list)   # chunk IDs  
    example_content: List[str] = Field(default_factory=list)   # chunk IDs
    reference_content: List[str] = Field(default_factory=list) # chunk IDs
    estimated_time: int = 0  # minutes
    content_summary: str = ""

class LearningPath(BaseModel):
    """Complete learning path with ordered modules"""
    modules: List[LearningModule]
    difficulty_level: str = "intermediate"
    total_time: int = 0
    module_count: int = 0
    discovery_reasoning: str = ""
    ordering_reasoning: str = ""
    content_gaps: List[str] = Field(default_factory=list)

class ContentSummary(BaseModel):
    """Summary of available documentation content"""
    total_docs: int
    doc_types: Dict[str, int]  # {"tutorial": 5, "concept": 10, ...}
    main_topics: List[str]
    sample_titles: List[str]

In [21]:
# DSPy Signatures
class DiscoverModules(dspy.Signature):
    """Analyze available documentation and suggest logical learning modules."""
    
    content_summary: str = dspy.InputField(desc="Summary of available documentation topics and types")
    difficulty_level: str = dspy.InputField(desc="Target difficulty: beginner, intermediate, or advanced") 
    user_modules: str = dspy.InputField(desc="User-provided modules if any, or 'none'")
    
    modules: List[str] = dspy.OutputField(desc="List of 3-8 logical learning modules")
    reasoning: str = dspy.OutputField(desc="Brief explanation of module choices and structure")

class OrderModules(dspy.Signature):
    """Order learning modules in optimal pedagogical sequence."""
    
    modules: List[str] = dspy.InputField(desc="List of learning modules to order")
    content_overview: str = dspy.InputField(desc="Overview of available content")
    difficulty_level: str = dspy.InputField(desc="Target difficulty level")
    
    ordered_modules: List[str] = dspy.OutputField(desc="Modules ordered from foundational to advanced")
    reasoning: str = dspy.OutputField(desc="Explanation of the ordering logic and prerequisites")


class GenerateSearchQueries(dspy.Signature):
    """Generate optimal search queries for finding content related to a learning module."""
    
    module_title: str = dspy.InputField(desc="The learning module title to search for")
    difficulty_level: str = dspy.InputField(desc="Target difficulty: beginner, intermediate, or advanced")
    doc_type: str = dspy.InputField(desc="Document type to search: tutorial, concept, example, or reference")
    available_content: str = dspy.InputField(desc="Brief overview of what content is available")
    
    search_queries: List[str] = dspy.OutputField(desc="List of 3-5 specific search queries optimized for this module and difficulty")
    reasoning: str = dspy.OutputField(desc="Brief explanation of the search strategy")


In [22]:
class FallbackParser:
    @staticmethod
    def parse_modules_response(response: str) -> tuple[List[str], str]:
        """Parse module list from string response"""
        modules = []
        reasoning = ""
        
        try:
            # Look for numbered lists or bullet points
            module_patterns = [
                r'\d+\.\s*([^\n]+)',  # 1. Module Name
                r'[-•*]\s*([^\n]+)',  # - Module Name  
                r'^([A-Z][^\n:]+)$'   # Plain module names
            ]
            
            for pattern in module_patterns:
                matches = re.findall(pattern, response, re.MULTILINE)
                if matches:
                    modules = [m.strip().strip('.,') for m in matches if len(m.strip()) > 3][:8]
                    break
            
            # Extract reasoning
            reasoning_patterns = [
                r'reasoning[:\s]+(.+)',
                r'explanation[:\s]+(.+)',
                r'rationale[:\s]+(.+)'
            ]
            
            for pattern in reasoning_patterns:
                match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
                if match:
                    reasoning = match.group(1).strip()[:300]
                    break
            
            if not reasoning:
                reasoning = "Modules extracted from LLM response"
                
        except Exception as e:
            logger.warning(f"Error parsing modules response: {e}")
            modules = ["Introduction", "Core Concepts", "Implementation", "Advanced Topics"]
            reasoning = "Fallback module structure used"
        
        return modules, reasoning
    
    @staticmethod
    def parse_ordering_response(response: str, original_modules: List[str]) -> tuple[List[str], str]:
        """Parse ordered modules from string response"""
        ordered_modules = []
        reasoning = ""
        
        try:
            # Look for ordered lists
            order_patterns = [
                r'\d+\.\s*([^\n]+)',
                r'[-•*]\s*([^\n]+)',
            ]
            
            for pattern in order_patterns:
                matches = re.findall(pattern, response, re.MULTILINE)
                if matches:
                    # Try to match with original modules
                    for match in matches:
                        match_clean = match.strip().strip('.,')
                        for orig_module in original_modules:
                            if (orig_module.lower() in match_clean.lower() or 
                                match_clean.lower() in orig_module.lower()):
                                if orig_module not in ordered_modules:
                                    ordered_modules.append(orig_module)
                                break
                    break
            
            # If no good matches, return original order
            if len(ordered_modules) < len(original_modules) * 0.8:
                ordered_modules = original_modules
            
            # Extract reasoning
            reasoning_match = re.search(r'reasoning[:\s]+(.+)', response, re.IGNORECASE | re.DOTALL)
            reasoning = reasoning_match.group(1).strip()[:300] if reasoning_match else "Module ordering applied"
            
        except Exception as e:
            logger.warning(f"Error parsing ordering response: {e}")
            ordered_modules = original_modules
            reasoning = "Original module order preserved"
        
        return ordered_modules, reasoning


In [23]:
class ModuleDiscoverer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.discover = dspy.ChainOfThought(DiscoverModules)
    
    def forward(self, content_summary: str, difficulty_level: str, user_modules: Optional[List[str]] = None):
        try:
            user_modules_str = ", ".join(user_modules) if user_modules else "none"
            
            result = self.discover(
                content_summary=content_summary,
                difficulty_level=difficulty_level,
                user_modules=user_modules_str
            )
            
            # Check if we got structured output
            if hasattr(result, 'modules') and hasattr(result, 'reasoning'):
                modules = result.modules
                reasoning = str(result.reasoning)
                
                # Handle case where modules is a string
                if isinstance(modules, str):
                    modules, reasoning = FallbackParser.parse_modules_response(modules)
                
                return modules, reasoning
            else:
                # String response - use fallback
                response_str = str(result)
                return FallbackParser.parse_modules_response(response_str)
                
        except Exception as e:
            logger.error(f"Error in module discovery: {e}")
            return ["Introduction", "Core Concepts", "Implementation"], "Fallback modules used due to error"

class ModuleOrderer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.order = dspy.ChainOfThought(OrderModules)
    
    def forward(self, modules: List[str], content_overview: str, difficulty_level: str):
        try:
            result = self.order(
                modules=modules,
                content_overview=content_overview,
                difficulty_level=difficulty_level
            )
            
            # Check if we got structured output
            if hasattr(result, 'ordered_modules') and hasattr(result, 'reasoning'):
                ordered_modules = result.ordered_modules
                reasoning = str(result.reasoning)
                
                # Handle case where ordered_modules is a string
                if isinstance(ordered_modules, str):
                    ordered_modules, reasoning = FallbackParser.parse_ordering_response(ordered_modules, modules)
                
                return ordered_modules, reasoning
            else:
                # String response - use fallback
                response_str = str(result)
                return FallbackParser.parse_ordering_response(response_str, modules)
                
        except Exception as e:
            logger.error(f"Error in module ordering: {e}")
            return modules, "Original order preserved due to error"

class QueryGenerator(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate = dspy.ChainOfThought(GenerateSearchQueries)
    
    def forward(self, module_title: str, difficulty_level: str, doc_type: str, available_content: str):
        try:
            result = self.generate(
                module_title=module_title,
                difficulty_level=difficulty_level,
                doc_type=doc_type,
                available_content=available_content
            )
            
            # Check if we got structured output
            if hasattr(result, 'search_queries') and hasattr(result, 'reasoning'):
                queries = result.search_queries
                reasoning = str(result.reasoning)
                
                # Handle case where queries is a string
                if isinstance(queries, str):
                    queries = self._parse_queries_from_string(queries)
                
                return queries, reasoning
            else:
                # String response - parse it
                response_str = str(result)
                queries = self._parse_queries_from_string(response_str)
                return queries, "Generated from LLM response"
                
        except Exception as e:
            logger.error(f"Error generating search queries: {e}")
            # Fallback to simple queries
            return [f"{module_title} {doc_type}", f"how to {module_title}"], "Fallback queries used"
    
    def _parse_queries_from_string(self, response: str) -> List[str]:
        """Parse search queries from string response"""
        queries = []
        
        try:
            # Look for numbered lists, bullet points, or quoted strings
            patterns = [
                r'\d+\.\s*["\']?([^"\'\n]+)["\']?',  # 1. "query"
                r'[-•*]\s*["\']?([^"\'\n]+)["\']?',  # - "query"
                r'"([^"]+)"',  # "quoted strings"
                r"'([^']+)'",  # 'quoted strings'
            ]
            
            for pattern in patterns:
                matches = re.findall(pattern, response)
                if matches:
                    queries = [q.strip() for q in matches if len(q.strip()) > 5][:5]
                    break
            
            # If no structured queries found, split by lines
            if not queries:
                lines = [line.strip() for line in response.split('\n') if line.strip()]
                queries = [line for line in lines if len(line) > 5 and '?' not in line][:3]
            
            # Ensure we have at least one query
            if not queries:
                queries = ["basic information"]
                
        except Exception as e:
            logger.warning(f"Error parsing queries: {e}")
            queries = ["basic information"]
        
        return queries

In [24]:
class PathBuilder:
    """Builds learning paths using LLM intelligence and vector search"""
    
    def __init__(self, analyzed_docs: List, vector_db_manager):
        self.analyzed_docs = analyzed_docs
        self.vector_db = vector_db_manager
        self.module_discoverer = ModuleDiscoverer()
        self.module_orderer = ModuleOrderer()
        self.query_generator = QueryGenerator()  # Add this
    
    def _create_available_content_summary(self) -> str:
        """Create a brief summary of available content types"""
        doc_types = {}
        sample_topics = set()
        
        for doc in self.analyzed_docs:
            doc_type = doc.classification.doc_type.value
            doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
            
            # Add some sample topics
            if len(sample_topics) < 20:
                sample_topics.add(doc.metadata.title or "Untitled")
        
        summary = f"Available: {doc_types}. Sample topics: {', '.join(list(sample_topics)[:10])}"
        return summary
    
    def _search_content_for_module(self, module_title: str, difficulty_level: str) -> Dict[str, List[str]]:
        """Search for content related to a module using AI-generated queries"""
        
        content = {
            "tutorial": [],
            "concept": [],
            "example": [],
            "reference": []
        }
        
        # Create overview of available content for query generation
        available_content = self._create_available_content_summary()
        
        # Generate queries for each document type using LLM
        for doc_type in content.keys():
            logger.info(f"Generating search queries for {module_title} - {doc_type} - {difficulty_level}")
            
            try:
                # Generate intelligent search queries
                search_queries, reasoning = self.query_generator(
                    module_title=module_title,
                    difficulty_level=difficulty_level,
                    doc_type=doc_type,
                    available_content=available_content
                )
                
                logger.info(f"Generated queries for {doc_type}: {search_queries}")
                logger.info(f"Reasoning: {reasoning}")
                
                # Execute each generated query
                for query in search_queries:
                    try:
                        results = self.vector_db.search(
                            query=query,
                            n_results=3,
                            doc_types=[doc_type]
                        )
                        
                        chunk_ids = [result.chunk.id for result in results]
                        content[doc_type].extend(chunk_ids)
                        
                    except Exception as e:
                        logger.warning(f"Search failed for '{query}' in {doc_type}: {e}")
                        continue
                        
            except Exception as e:
                logger.error(f"Query generation failed for {module_title} - {doc_type}: {e}")
                # Fallback to simple query
                try:
                    results = self.vector_db.search(
                        query=f"{module_title} {doc_type}",
                        n_results=3,
                        doc_types=[doc_type]
                    )
                    chunk_ids = [result.chunk.id for result in results]
                    content[doc_type].extend(chunk_ids)
                except:
                    continue
        
        # Remove duplicates
        for doc_type in content:
            content[doc_type] = list(dict.fromkeys(content[doc_type]))  # Preserve order
        
        return content
    
    # Rest of the methods remain the same...
    def _create_rich_content_summary(self) -> str:
        """Create rich content summary with complete document information"""
        
        # Group documents by type for better organization
        docs_by_type = {}
        for doc in self.analyzed_docs:
            doc_type = doc.classification.doc_type.value
            if doc_type not in docs_by_type:
                docs_by_type[doc_type] = []
            docs_by_type[doc_type].append(doc)
        
        summary_text = f"Available Documentation for Learning Path Creation:\n\n"
        summary_text += f"Total Documents: {len(self.analyzed_docs)}\n\n"
        
        # Add ALL documents organized by type - no limits
        for doc_type, docs in docs_by_type.items():
            summary_text += f"{doc_type.upper()} DOCUMENTS ({len(docs)} total):\n"
            
            # Include ALL documents - no truncation
            for i, doc in enumerate(docs):
                doc_context = f"""  {i+1}. Title: {doc.metadata.title or 'Untitled'}
     Headings: {', '.join(doc.metadata.headings)}
     Summary: {doc.summary}
"""
                summary_text += doc_context
            
            summary_text += "\n"
        
        return summary_text
    
    def _create_content_overview_for_ordering(self, modules: List[str]) -> str:
        """Create complete content overview for module ordering"""
        
        overview = f"Content Overview for Module Ordering:\n\n"
        overview += f"Proposed Modules: {', '.join(modules)}\n\n"
        
        # For each proposed module, show ALL related content
        for module in modules:
            overview += f"CONTENT AVAILABLE FOR '{module.upper()}':\n"
            
            # Find ALL documents that might relate to this module
            related_docs = []
            for doc in self.analyzed_docs:
                # Check if module name appears in title, headings, or summary
                search_text = f"{doc.metadata.title} {' '.join(doc.metadata.headings)} {doc.summary}".lower()
                if (module.lower() in search_text or 
                    any(word in search_text for word in module.lower().split())):
                    related_docs.append(doc)
            
            if related_docs:
                # Include ALL related docs - no limits
                for doc in related_docs:
                    overview += f"  - {doc.classification.doc_type.value}: {doc.metadata.title}\n"
                    overview += f"    Headings: {', '.join(doc.metadata.headings)}\n"
                    overview += f"    Summary: {doc.summary}\n"
            else:
                overview += f"  - No directly related content found for this module\n"
            
            overview += "\n"
        
        return overview
    
    def _estimate_module_time(self, content: Dict[str, List[str]]) -> int:
        """Estimate time for a module based on content"""
        
        time_estimates = {
            "tutorial": 15,    # 15 minutes per tutorial chunk
            "concept": 10,     # 10 minutes per concept chunk
            "example": 8,      # 8 minutes per example chunk
            "reference": 5     # 5 minutes per reference chunk
        }
        
        total_time = 0
        for doc_type, chunks in content.items():
            total_time += len(chunks) * time_estimates.get(doc_type, 10)
        
        return max(total_time, 15)  # Minimum 15 minutes per module
    
    def _create_content_summary_for_module(self, content: Dict[str, List[str]]) -> str:
        """Create a summary of content found for a module"""
        
        summary_parts = []
        for doc_type, chunks in content.items():
            if chunks:
                summary_parts.append(f"{len(chunks)} {doc_type} chunks")
        
        return ", ".join(summary_parts) if summary_parts else "No content found"
    
    def build_learning_path(self, 
                          module_headings: Optional[List[str]] = None,
                          difficulty_level: str = "intermediate") -> LearningPath:
        """Build a complete learning path with AI-generated search queries"""
        
        logger.info(f"Building learning path for difficulty: {difficulty_level}")
        
        # Step 1: Create complete rich content summary
        content_summary_text = self._create_rich_content_summary()
        logger.info(f"Created complete content summary with {len(self.analyzed_docs)} documents")
        
        # Step 2: Discover modules
        if module_headings:
            modules = module_headings
            discovery_reasoning = f"Using user-provided modules: {', '.join(modules)}"
            logger.info(f"Using provided modules: {modules}")
        else:
            modules, discovery_reasoning = self.module_discoverer(
                content_summary=content_summary_text,
                difficulty_level=difficulty_level
            )
            logger.info(f"Discovered modules: {modules}")
        
        # Step 3: Create complete targeted overview for ordering
        content_overview = self._create_content_overview_for_ordering(modules)
        
        # Step 4: Order modules
        ordered_modules, ordering_reasoning = self.module_orderer(
            modules=modules,
            content_overview=content_overview,
            difficulty_level=difficulty_level
        )
        
        logger.info(f"Ordered modules: {ordered_modules}")
        
        # Step 5: Build learning modules with AI-generated queries
        learning_modules = []
        content_gaps = []
        total_time = 0
        
        for module_title in ordered_modules:
            logger.info(f"Building content for module: {module_title}")
            
            # Search for content using AI-generated queries
            content = self._search_content_for_module(module_title, difficulty_level)
            
            # Check for content gaps
            empty_types = [doc_type for doc_type, chunks in content.items() if not chunks]
            if empty_types:
                content_gaps.append(f"Module '{module_title}' missing: {', '.join(empty_types)}")
            
            # Calculate time
            module_time = self._estimate_module_time(content)
            total_time += module_time
            
            # Create learning module
            learning_module = LearningModule(
                title=module_title,
                tutorial_content=content["tutorial"],
                concept_content=content["concept"],
                example_content=content["example"],
                reference_content=content["reference"],
                estimated_time=module_time,
                content_summary=self._create_content_summary_for_module(content)
            )
            
            learning_modules.append(learning_module)
        
        # Step 6: Create final learning path
        learning_path = LearningPath(
            modules=learning_modules,
            difficulty_level=difficulty_level,
            total_time=total_time,
            module_count=len(learning_modules),
            discovery_reasoning=discovery_reasoning,
            ordering_reasoning=ordering_reasoning,
            content_gaps=content_gaps
        )
        
        logger.info(f"Built learning path: {len(learning_modules)} modules, {total_time} minutes")
        return learning_path

In [25]:
class LearningPathManager:
    """High-level manager for learning path operations"""
    
    def __init__(self, analyzed_docs: List, vector_db_manager):
        self.path_builder = PathBuilder(analyzed_docs, vector_db_manager)
    
    def create_path(self, 
                   module_headings: Optional[List[str]] = None,
                   difficulty_level: str = "intermediate") -> LearningPath:
        """Create a learning path"""
        return self.path_builder.build_learning_path(module_headings, difficulty_level)
    
    def preview_available_content(self) -> ContentSummary:
        """Preview what content is available"""
        return self.path_builder._create_content_summary()
    
    def test_module_content(self, module_title: str, difficulty_level: str = "intermediate") -> Dict:
        """Test what content would be found for a specific module"""
        content = self.path_builder._search_content_for_module(module_title, difficulty_level)
        
        return {
            "module": module_title,
            "difficulty": difficulty_level,
            "content_found": {doc_type: len(chunks) for doc_type, chunks in content.items()},
            "total_chunks": sum(len(chunks) for chunks in content.values()),
            "estimated_time": self.path_builder._estimate_module_time(content)
        }

In [26]:
path_manager = LearningPathManager(analyzed_docs, vector_db_manager)

In [None]:
modules = path_manager.create_path(difficulty_level="beginner")

INFO:__main__:Building learning path for difficulty: beginner
INFO:__main__:Created complete content summary with 14 documents
INFO:__main__:Discovered modules: ['Understanding MCP Fundamentals', 'Core MCP Primitives: Resources, Prompts, and Tools', 'Defining Server Scope with Roots', 'Building Your First MCP Client', 'Building Your First MCP Server', 'Debugging and Testing MCP Integrations']
INFO:__main__:Ordered modules: ['Understanding MCP Fundamentals', 'Core MCP Primitives: Resources, Prompts, and Tools', 'Defining Server Scope with Roots', 'Building Your First MCP Server', 'Building Your First MCP Client', 'Debugging and Testing MCP Integrations']
INFO:__main__:Building content for module: Understanding MCP Fundamentals
INFO:__main__:Generating search queries for Understanding MCP Fundamentals - tutorial - beginner
INFO:__main__:Generated queries for tutorial: ['MCP fundamentals beginner tutorial', 'beginner tutorial MCP roots tools', 'how to create MCP project directory tutorial

In [None]:
modules.model_dump()

In [138]:
# Pydantic Models for Generated Content
class CodeExample(BaseModel):
    """A code example with explanation"""
    title: str
    code: str
    language: str
    explanation: str
    difficulty_level: str

class Exercise(BaseModel):
    """A practical exercise for learners"""
    title: str
    description: str
    instructions: List[str]
    hints: List[str] = Field(default_factory=list)
    expected_outcome: str
    difficulty_level: str

class AssessmentQuestion(BaseModel):
    """An assessment question to test understanding"""
    question: str
    question_type: str  # multiple_choice, true_false, short_answer, code_completion
    options: List[str] = Field(default_factory=list)  # For multiple choice
    correct_answer: str
    explanation: str
    difficulty_level: str

class LearningObjective(BaseModel):
    """A specific learning objective"""
    objective: str
    bloom_level: str  # remember, understand, apply, analyze, evaluate, create
    measurable_outcome: str

class GeneratedContent(BaseModel):
    """Complete generated content for a learning module"""
    module_title: str
    learning_objectives: List[LearningObjective]
    lesson_text: str
    code_examples: List[CodeExample]
    exercises: List[Exercise]
    assessment_questions: List[AssessmentQuestion]
    estimated_time: int
    prerequisites: List[str] = Field(default_factory=list)
    key_concepts: List[str] = Field(default_factory=list)

class ContentChunk(BaseModel):
    """Content chunk retrieved from vector DB"""
    id: str
    content: str
    doc_type: str
    title: str
    metadata: Dict[str, Any]

In [139]:
# DSPy Signatures
class SynthesizeContent(dspy.Signature):
    """Synthesize multiple content chunks into a coherent learning lesson."""
    
    module_title: str = dspy.InputField(desc="Title of the learning module")
    content_chunks: str = dspy.InputField(desc="Multiple content chunks to synthesize")
    difficulty_level: str = dspy.InputField(desc="Target difficulty level")
    bloom_level: str = dspy.InputField(desc="Bloom's taxonomy level to target")
    
    lesson_text: str = dspy.OutputField(desc="Coherent lesson text that teaches the module concepts")
    key_concepts: List[str] = dspy.OutputField(desc="List of key concepts covered in the lesson")

class GenerateExercises(dspy.Signature):
    """Generate practical exercises from code examples and tutorials."""
    
    module_title: str = dspy.InputField(desc="Title of the learning module")
    lesson_content: str = dspy.InputField(desc="The lesson content and examples")
    difficulty_level: str = dspy.InputField(desc="Target difficulty level")
    available_examples: str = dspy.InputField(desc="Available code examples and tutorials")
    
    exercises: List[Exercise] = dspy.OutputField(desc="List of 2-4 practical exercises")

class CreateAssessment(dspy.Signature):
    """Create assessment questions to test understanding of concepts."""
    
    module_title: str = dspy.InputField(desc="Title of the learning module")
    lesson_content: str = dspy.InputField(desc="The lesson content to assess")
    key_concepts: List[str] = dspy.InputField(desc="Key concepts to test")
    difficulty_level: str = dspy.InputField(desc="Target difficulty level")
    
    assessment_questions: List[AssessmentQuestion] = dspy.OutputField(desc="List of 3-6 assessment questions")

class WriteLearningObjectives(dspy.Signature):
    """Write clear, measurable learning objectives for a module."""
    
    module_title: str = dspy.InputField(desc="Title of the learning module")
    lesson_content: str = dspy.InputField(desc="The lesson content")
    difficulty_level: str = dspy.InputField(desc="Target difficulty level")
    bloom_level: str = dspy.InputField(desc="Target Bloom's taxonomy level")
    
    learning_objectives: List[LearningObjective] = dspy.OutputField(desc="List of 3-5 clear learning objectives")

class ExtractCodeExamples(dspy.Signature):
    """Extract and enhance code examples from content chunks."""
    
    module_title: str = dspy.InputField(desc="Title of the learning module")
    content_with_code: str = dspy.InputField(desc="Content chunks containing code examples")
    difficulty_level: str = dspy.InputField(desc="Target difficulty level")
    
    code_examples: List[CodeExample] = dspy.OutputField(desc="List of enhanced code examples with explanations")


In [140]:
# Fallback Parsers for Content Generation
class ContentFallbackParser:
    @staticmethod
    def parse_exercises_response(response: str, module_title: str, difficulty_level: str) -> List[Exercise]:
        """Parse exercises from string response"""
        exercises = []
        
        try:
            # Look for exercise patterns
            exercise_sections = re.split(r'(?:Exercise|EXERCISE)\s*\d*[:\.]?\s*', response)[1:]
            
            for i, section in enumerate(exercise_sections[:4]):  # Max 4 exercises
                lines = [line.strip() for line in section.split('\n') if line.strip()]
                
                if lines:
                    title = lines[0].strip().strip('*-.')
                    description = ""
                    instructions = []
                    
                    # Parse description and instructions
                    in_instructions = False
                    for line in lines[1:]:
                        if any(keyword in line.lower() for keyword in ['instruction', 'step', 'task', 'do:']):
                            in_instructions = True
                            continue
                        
                        if in_instructions:
                            if line.startswith(('-', '•', '*')) or re.match(r'^\d+\.', line):
                                instructions.append(line.strip('- •*').strip())
                        else:
                            description += line + " "
                    
                    if not instructions:
                        instructions = [description.strip()] if description.strip() else [f"Practice {title}"]
                    
                    exercises.append(Exercise(
                        title=title if title else f"Exercise {i+1}",
                        description=description.strip() or f"Practice exercise for {module_title}",
                        instructions=instructions,
                        expected_outcome=f"Complete understanding of {module_title} concepts",
                        difficulty_level=difficulty_level
                    ))
        
        except Exception as e:
            logger.warning(f"Error parsing exercises: {e}")
        
        if not exercises:
            exercises = [Exercise(
                title=f"Practice {module_title}",
                description=f"Complete a practical exercise on {module_title}",
                instructions=[f"Apply the concepts learned in {module_title}"],
                expected_outcome=f"Demonstrate understanding of {module_title}",
                difficulty_level=difficulty_level
            )]
        
        return exercises
    
    @staticmethod
    def parse_assessment_response(response: str, module_title: str, difficulty_level: str) -> List[AssessmentQuestion]:
        """Parse assessment questions from string response"""
        questions = []
        
        try:
            # Look for question patterns
            question_sections = re.split(r'(?:Question|QUESTION)\s*\d*[:\.]?\s*', response)[1:]
            
            for i, section in enumerate(question_sections[:6]):  # Max 6 questions
                lines = [line.strip() for line in section.split('\n') if line.strip()]
                
                if lines:
                    question_text = lines[0].strip()
                    options = []
                    correct_answer = ""
                    explanation = ""
                    
                    # Parse options and answer
                    for line in lines[1:]:
                        if re.match(r'^[A-Da-d][\.\)]\s*', line):
                            options.append(line.strip())
                        elif 'answer:' in line.lower() or 'correct:' in line.lower():
                            correct_answer = line.split(':')[-1].strip()
                        elif 'explanation:' in line.lower():
                            explanation = line.split(':')[-1].strip()
                    
                    question_type = "multiple_choice" if options else "short_answer"
                    
                    questions.append(AssessmentQuestion(
                        question=question_text,
                        question_type=question_type,
                        options=options,
                        correct_answer=correct_answer or "Sample answer",
                        explanation=explanation or f"Tests understanding of {module_title}",
                        difficulty_level=difficulty_level
                    ))
        
        except Exception as e:
            logger.warning(f"Error parsing assessment: {e}")
        
        if not questions:
            questions = [AssessmentQuestion(
                question=f"What are the key concepts of {module_title}?",
                question_type="short_answer",
                correct_answer=f"Key concepts of {module_title}",
                explanation=f"Tests basic understanding of {module_title}",
                difficulty_level=difficulty_level
            )]
        
        return questions

In [None]:
class ContentSynthesizer(dspy.Module):
    def __init__(self):
        super().__init__()
        self.synthesize = dspy.ChainOfThought(SynthesizeContent)
    
    def forward(self, module_title: str, content_chunks: List[ContentChunk], 
                difficulty_level: str, bloom_level: str = "understand"):
        try:
            # Limit content to avoid token limits
            chunks_text = "\n\n".join([
                f"[{chunk.doc_type.upper()}] {chunk.title}:\n{chunk.content[:500]}..."  # Limit each chunk
                for chunk in content_chunks[:5]  # Limit number of chunks
            ])
            
            result = self.synthesize(
                module_title=module_title,
                content_chunks=chunks_text,
                difficulty_level=difficulty_level,
                bloom_level=bloom_level
            )
            
            if hasattr(result, 'lesson_text') and hasattr(result, 'key_concepts'):
                lesson_text = str(result.lesson_text) if result.lesson_text else f"Learning content for {module_title}"
                key_concepts = result.key_concepts if result.key_concepts else [module_title]
                
                if isinstance(key_concepts, str):
                    key_concepts = [c.strip() for c in key_concepts.split(',') if c.strip()]
                
                return lesson_text, key_concepts
            else:
                response_str = str(result) if result else f"Learning content for {module_title}"
                return response_str, [module_title]
                
        except Exception as e:
            logger.error(f"Error in content synthesis: {e}")
            return f"Learning content for {module_title}. This module covers important concepts related to {module_title}.", [module_title]

class ExerciseGenerator(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate = dspy.ChainOfThought(GenerateExercises)
    
    def forward(self, module_title: str, lesson_content: str, 
                difficulty_level: str, available_examples: str):
        try:
            result = self.generate(
                module_title=module_title,
                lesson_content=lesson_content[:1500],  # Limit for token efficiency
                difficulty_level=difficulty_level,
                available_examples=available_examples[:1000]  # Limit examples too
            )
            
            if hasattr(result, 'exercises') and result.exercises is not None:
                exercises = result.exercises
                
                if isinstance(exercises, str):
                    exercises = ContentFallbackParser.parse_exercises_response(
                        exercises, module_title, difficulty_level
                    )
                elif not isinstance(exercises, list):
                    exercises = []
                
                return exercises if exercises else self._create_fallback_exercise(module_title, difficulty_level)
            else:
                response_str = str(result) if result else ""
                parsed = ContentFallbackParser.parse_exercises_response(
                    response_str, module_title, difficulty_level
                )
                return parsed if parsed else self._create_fallback_exercise(module_title, difficulty_level)
                
        except Exception as e:
            logger.error(f"Error generating exercises: {e}")
            return self._create_fallback_exercise(module_title, difficulty_level)
    
    def _create_fallback_exercise(self, module_title: str, difficulty_level: str) -> List[Exercise]:
        """Create fallback exercise when generation fails"""
        return [Exercise(
            title=f"Practice {module_title}",
            description=f"Apply the concepts learned in {module_title}",
            instructions=[
                f"Review the key concepts of {module_title}",
                f"Try implementing a simple example",
                f"Test your understanding with practical application"
            ],
            expected_outcome=f"Demonstrate understanding of {module_title} concepts",
            difficulty_level=difficulty_level
        )]

class AssessmentCreator(dspy.Module):
    def __init__(self):
        super().__init__()
        self.create = dspy.ChainOfThought(CreateAssessment)
    
    def forward(self, module_title: str, lesson_content: str, 
                key_concepts: List[str], difficulty_level: str):
        try:
            result = self.create(
                module_title=module_title,
                lesson_content=lesson_content[:1500],
                key_concepts=key_concepts[:5],  # Limit concepts
                difficulty_level=difficulty_level
            )
            
            if hasattr(result, 'assessment_questions') and result.assessment_questions is not None:
                questions = result.assessment_questions
                
                if isinstance(questions, str):
                    questions = ContentFallbackParser.parse_assessment_response(
                        questions, module_title, difficulty_level
                    )
                elif not isinstance(questions, list):
                    questions = []
                
                return questions if questions else self._create_fallback_assessment(module_title, difficulty_level)
            else:
                response_str = str(result) if result else ""
                parsed = ContentFallbackParser.parse_assessment_response(
                    response_str, module_title, difficulty_level
                )
                return parsed if parsed else self._create_fallback_assessment(module_title, difficulty_level)
                
        except Exception as e:
            logger.error(f"Error creating assessment: {e}")
            return self._create_fallback_assessment(module_title, difficulty_level)
    
    def _create_fallback_assessment(self, module_title: str, difficulty_level: str) -> List[AssessmentQuestion]:
        """Create fallback assessment when generation fails"""
        return [
            AssessmentQuestion(
                question=f"What are the main concepts of {module_title}?",
                question_type="short_answer",
                correct_answer=f"The main concepts include the core principles and applications of {module_title}",
                explanation=f"Tests basic understanding of {module_title}",
                difficulty_level=difficulty_level
            ),
            AssessmentQuestion(
                question=f"How would you apply {module_title} in practice?",
                question_type="short_answer",
                correct_answer=f"Practical application involves implementing {module_title} concepts in real scenarios",
                explanation=f"Tests practical application skills",
                difficulty_level=difficulty_level
            )
        ]

class ObjectiveWriter(dspy.Module):
    def __init__(self):
        super().__init__()
        self.write = dspy.ChainOfThought(WriteLearningObjectives)
    
    def forward(self, module_title: str, lesson_content: str, 
                difficulty_level: str, bloom_level: str = "understand"):
        try:
            result = self.write(
                module_title=module_title,
                lesson_content=lesson_content[:1500],
                difficulty_level=difficulty_level,
                bloom_level=bloom_level
            )
            
            if hasattr(result, 'learning_objectives') and result.learning_objectives is not None:
                objectives = result.learning_objectives
                
                if isinstance(objectives, str):
                    obj_lines = [line.strip() for line in objectives.split('\n') if line.strip()]
                    objectives = [
                        LearningObjective(
                            objective=line.strip('- •*'),
                            bloom_level=bloom_level,
                            measurable_outcome=f"Demonstrate {bloom_level} of {module_title}"
                        )
                        for line in obj_lines[:5] if len(line.strip()) > 10
                    ]
                elif not isinstance(objectives, list):
                    objectives = []
                
                return objectives if objectives else self._create_fallback_objectives(module_title, bloom_level)
            else:
                return self._create_fallback_objectives(module_title, bloom_level)
                
        except Exception as e:
            logger.error(f"Error writing objectives: {e}")
            return self._create_fallback_objectives(module_title, bloom_level)
    
    def _create_fallback_objectives(self, module_title: str, bloom_level: str) -> List[LearningObjective]:
        """Create fallback objectives when generation fails"""
        return [
            LearningObjective(
                objective=f"Understand the core concepts of {module_title}",
                bloom_level=bloom_level,
                measurable_outcome=f"Explain the key principles of {module_title}"
            ),
            LearningObjective(
                objective=f"Apply {module_title} in practical scenarios",
                bloom_level="apply",
                measurable_outcome=f"Implement {module_title} concepts in real-world examples"
            )
        ]

class CodeExampleExtractor(dspy.Module):
    def __init__(self):
        super().__init__()
        self.extract = dspy.ChainOfThought(ExtractCodeExamples)
    
    def forward(self, module_title: str, content_with_code: str, difficulty_level: str):
        try:
            # Limit content to avoid token issues
            limited_content = content_with_code[:2000] if content_with_code else ""
            
            result = self.extract(
                module_title=module_title,
                content_with_code=limited_content,
                difficulty_level=difficulty_level
            )
            
            if hasattr(result, 'code_examples') and result.code_examples is not None:
                examples = result.code_examples
                
                if isinstance(examples, str):
                    examples = self._extract_code_from_string(examples, module_title, difficulty_level)
                elif not isinstance(examples, list):
                    examples = []
                
                return examples if examples else []
            else:
                response_str = str(result) if result else ""
                return self._extract_code_from_string(response_str, module_title, difficulty_level)
                
        except Exception as e:
            logger.error(f"Error extracting code examples: {e}")
            return []
    
    def _extract_code_from_string(self, content: str, module_title: str, difficulty_level: str) -> List[CodeExample]:
        """Extract code examples from string content"""
        examples = []
        
        try:
            # Look for code blocks
            code_blocks = re.findall(r'```(\w+)?\n(.*?)\n```', content, re.DOTALL)
            
            for i, (lang, code) in enumerate(code_blocks[:3]):  # Max 3 examples
                if code.strip():
                    examples.append(CodeExample(
                        title=f"{module_title} Example {i+1}",
                        code=code.strip(),
                        language=lang.lower() if lang else 'text',
                        explanation=f"This example demonstrates {module_title} concepts",
                        difficulty_level=difficulty_level
                    ))
        except Exception as e:
            logger.warning(f"Error parsing code examples: {e}")
        
        return examples

In [142]:
# Main Content Generator
class ContentGenerator:
    """Generates complete learning content from learning modules"""
    
    def __init__(self, vector_db_manager):
        self.vector_db = vector_db_manager
        self.content_synthesizer = ContentSynthesizer()
        self.exercise_generator = ExerciseGenerator()
        self.assessment_creator = AssessmentCreator()
        self.objective_writer = ObjectiveWriter()
        self.code_extractor = CodeExampleExtractor()
    
    def _retrieve_content_chunks(self, chunk_ids: List[str]) -> List[ContentChunk]:
        """Retrieve actual content from chunk IDs"""
        chunks = []
        
        for chunk_id in chunk_ids[:10]:  # Limit to 10 chunks to avoid token issues
            try:
                results = self.vector_db.vector_db.collection.get(ids=[chunk_id])
                
                if results['ids']:
                    chunk = ContentChunk(
                        id=chunk_id,
                        content=results['documents'][0],
                        doc_type=results['metadatas'][0].get('doc_type', 'unknown'),
                        title=results['metadatas'][0].get('title', 'Untitled'),
                        metadata=results['metadatas'][0]
                    )
                    chunks.append(chunk)
                    
            except Exception as e:
                logger.warning(f"Could not retrieve chunk {chunk_id}: {e}")
                continue
        
        return chunks
    
    def generate_content(self, 
                        learning_module, 
                        difficulty_level: str = "intermediate",
                        bloom_level: str = "understand") -> GeneratedContent:
        """Generate complete learning content for a module"""
        
        logger.info(f"Generating content for module: {learning_module.title}")
        
        # Step 1: Retrieve content chunks (limit to avoid token issues)
        all_chunk_ids = (
            learning_module.tutorial_content[:5] +
            learning_module.concept_content[:5] +
            learning_module.example_content[:3] +
            learning_module.reference_content[:3]
        )
        
        content_chunks = self._retrieve_content_chunks(all_chunk_ids)
        logger.info(f"Retrieved {len(content_chunks)} content chunks")
        
        # Initialize with defaults to avoid validation errors
        lesson_text = f"Learning content for {learning_module.title}"
        key_concepts = [learning_module.title]
        code_examples = []
        exercises = []
        assessment_questions = []
        learning_objectives = []
        
        if content_chunks:
            try:
                # Step 2: Synthesize main lesson content
                lesson_text, key_concepts = self.content_synthesizer(
                    module_title=learning_module.title,
                    content_chunks=content_chunks,
                    difficulty_level=difficulty_level,
                    bloom_level=bloom_level
                )
            except Exception as e:
                logger.error(f"Error in content synthesis: {e}")
            
            try:
                # Step 3: Extract code examples
                code_content = "\n\n".join([
                    chunk.content[:500] for chunk in content_chunks 
                    if '```' in chunk.content or chunk.doc_type in ['tutorial', 'example']
                ][:3])  # Limit content
                
                if code_content:
                    code_examples = self.code_extractor(
                        module_title=learning_module.title,
                        content_with_code=code_content,
                        difficulty_level=difficulty_level
                    )
            except Exception as e:
                logger.error(f"Error extracting code examples: {e}")
                code_examples = []
            
            try:
                # Step 4: Generate exercises
                example_content = "\n\n".join([
                    chunk.content[:400] for chunk in content_chunks
                    if chunk.doc_type in ['tutorial', 'example']
                ][:3])
                
                exercises = self.exercise_generator(
                    module_title=learning_module.title,
                    lesson_content=lesson_text,
                    difficulty_level=difficulty_level,
                    available_examples=example_content
                )
            except Exception as e:
                logger.error(f"Error generating exercises: {e}")
                exercises = []
            
            try:
                # Step 5: Create assessment questions
                assessment_questions = self.assessment_creator(
                    module_title=learning_module.title,
                    lesson_content=lesson_text,
                    key_concepts=key_concepts,
                    difficulty_level=difficulty_level
                )
            except Exception as e:
                logger.error(f"Error creating assessment: {e}")
                assessment_questions = []
            
            try:
                # Step 6: Write learning objectives
                learning_objectives = self.objective_writer(
                    module_title=learning_module.title,
                    lesson_content=lesson_text,
                    difficulty_level=difficulty_level,
                    bloom_level=bloom_level
                )
            except Exception as e:
                logger.error(f"Error writing objectives: {e}")
                learning_objectives = []
        
        # Ensure all fields have valid defaults
        generated_content = GeneratedContent(
            module_title=learning_module.title,
            learning_objectives=learning_objectives or [],
            lesson_text=lesson_text,
            code_examples=code_examples or [],
            exercises=exercises or [],
            assessment_questions=assessment_questions or [],
            estimated_time=learning_module.estimated_time,
            key_concepts=key_concepts or []
        )
        
        logger.info(f"Generated content for {learning_module.title}: "
                   f"{len(code_examples)} examples, {len(exercises)} exercises, "
                   f"{len(assessment_questions)} questions")
        
        return generated_content

In [143]:
# Usage Manager
class ContentGenerationManager:
    """High-level manager for content generation"""
    
    def __init__(self, vector_db_manager):
        self.content_generator = ContentGenerator(vector_db_manager)
    
    def generate_course_content(self, 
                               learning_path, 
                               difficulty_level: str = "intermediate") -> List[GeneratedContent]:
        """Generate content for an entire learning path"""
        
        generated_modules = []
        
        for module in learning_path.modules:
            try:
                generated_content = self.content_generator.generate_content(
                    learning_module=module,
                    difficulty_level=difficulty_level
                )
                generated_modules.append(generated_content)
                
            except Exception as e:
                logger.error(f"Error generating content for {module.title}: {e}")
                continue
        
        return generated_modules
    
    def generate_single_module_content(self, 
                                     learning_module, 
                                     difficulty_level: str = "intermediate",
                                     bloom_level: str = "understand") -> GeneratedContent:
        """Generate content for a single module"""
        
        return self.content_generator.generate_content(
            learning_module=learning_module,
            difficulty_level=difficulty_level,
            bloom_level=bloom_level
        )

In [144]:
# Initialize content generation
content_manager = ContentGenerationManager(vector_db_manager)

In [None]:
# Generate content for entire learning path
generated_course = content_manager.generate_course_content(
    learning_path=modules,
    difficulty_level="intermediate"
)

In [None]:
# # Or generate content for a single module
# single_module_content = content_manager.generate_single_module_content(
#     learning_module=learning_path.modules[0],
#     difficulty_level="intermediate",
#     bloom_level="apply"
# )