In [1]:
import os
import re
import math
import json
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Any, Optional, Union

# LangChain imports
from langchain_core.documents import Document
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    HTMLHeaderTextSplitter
)
from langchain_community.vectorstores.milvus import Milvus
from langchain_neo4j import Neo4jGraph, Neo4jVector
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers import ParentDocumentRetriever
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_google_genai import ChatGoogleGenerativeAI

# Document processing
from unstructured.partition.html import partition_html
from unstructured.partition.pdf import partition_pdf
from bs4 import BeautifulSoup
import PyPDF2
import tabula

# Google Gemini
import google.generativeai as genai
from tqdm.auto import tqdm as notebook_tqdm
print("All required libraries imported successfully.")


  from .autonotebook import tqdm as notebook_tqdm


All required libraries imported successfully.


In [2]:
from dotenv import load_dotenv
load_dotenv()

api_key = os.getenv("GOOGLE_API_KEY")

In [3]:
class DocumentAnalyzer:
    """Class to analyze document structure and extract various elements."""

    def __init__(self):
        pass

    def load_document(self, file_path: str) -> Dict:
        """
        Load document from file path and return structured content.

        Args:
            file_path: Path to the document file

        Returns:
            Dictionary with structured document content
        """
        # Get file extension
        _, ext = os.path.splitext(file_path)
        ext = ext.lower()

        if ext == '.pdf':
            return self._process_pdf(file_path)
        elif ext in ['.html', '.htm']:
            return self._process_html(file_path)
        elif ext in ['.txt', '.md']:
            return self._process_text(file_path)
        elif ext in ['.docx', '.doc']:
            return self._process_word(file_path)
        else:
            raise ValueError(f"Unsupported file format: {ext}")

    def _process_pdf(self, file_path: str) -> Dict:
        """Process PDF documents."""
        document_structure = {
            'metadata': {'source': file_path, 'type': 'pdf'},
            'elements': []
        }

        try:
            # Check if poppler is available
            import shutil
            if not shutil.which('pdftotext'):
                print("Warning: poppler-utils not found in PATH. Installing...")
                import subprocess
                subprocess.run(["apt-get", "update"], check=True)
                subprocess.run(["apt-get", "install", "-y", "poppler-utils"], check=True)
                print("poppler-utils installed successfully")

            # Read PDF and extract text
            elements = partition_pdf(
                filename=file_path,
                extract_images=True,
                infer_table_structure=True
            )

            # Extract tables separately using tabula
            tables = tabula.read_pdf(file_path, pages='all')

            # Process elements and categorize them
            for element in elements:
                elem_type = str(type(element)).lower()
                element_data = {
                    'content': str(element),
                    'type': None
                }

                if 'title' in elem_type or 'heading' in elem_type:
                    element_data['type'] = 'heading'
                elif 'table' in elem_type:
                    element_data['type'] = 'table'
                elif 'image' in elem_type:
                    element_data['type'] = 'image' 
                elif 'text' in elem_type:
                    # Check if it's strikeout or highlighted (would need more PDF-specific analysis)
                    if '~~' in str(element) or '--' in str(element):
                        element_data['type'] = 'strikeout'
                    elif any(marker in str(element) for marker in ['**', '__', '>>']):
                        element_data['type'] = 'highlight'
                    else:
                        element_data['type'] = 'paragraph'

                document_structure['elements'].append(element_data)

            # Add tables from tabula to our elements list
            for i, table in enumerate(tables):
                document_structure['elements'].append({
                    'content': table,
                    'type': 'table',
                    'pandas_table': True,
                    'table_id': i
                })
        except Exception as e:
            print(f"Error processing PDF: {e}")
            document_structure['elements'].append({
                'content': f"Error processing PDF file: {str(e)}",
                'type': 'paragraph',
        })

        return document_structure

    def _process_html(self, file_path: str) -> Dict:
        """Process HTML documents."""
        with open(file_path, 'r', encoding='utf-8') as file:
            html_content = file.read()

        soup = BeautifulSoup(html_content, 'lxml')
        document_structure = {
            'metadata': {'source': file_path, 'type': 'html'},
            'elements': []
        }

        # Extract headings
        for heading_level in range(1, 7):
            for heading in soup.find_all(f'h{heading_level}'):
                document_structure['elements'].append({
                    'content': heading.get_text(),
                    'type': 'heading',
                    'level': heading_level
                })

        # Extract paragraphs
        for para in soup.find_all('p'):
            # Check for highlighted text
            highlighted = para.find_all(['strong', 'b', 'mark', 'em'])
            strikeout = para.find_all('s')

            if highlighted:
                for h in highlighted:
                    document_structure['elements'].append({
                        'content': h.get_text(),
                        'type': 'highlight',
                    })

            if strikeout:
                for s in strikeout:
                    document_structure['elements'].append({
                        'content': s.get_text(),
                        'type': 'strikeout',
                    })

            # Add the full paragraph too
            document_structure['elements'].append({
                'content': para.get_text(),
                'type': 'paragraph',
            })

        # Extract tables
        for table in soup.find_all('table'):
            # Convert HTML table to pandas DataFrame
            table_data = []
            rows = table.find_all('tr')
            for row in rows:
                cols = row.find_all(['td', 'th'])
                cols = [ele.get_text().strip() for ele in cols]
                table_data.append(cols)

            if table_data:
                # Try to create a pandas DataFrame
                try:
                    df = pd.DataFrame(table_data[1:], columns=table_data[0])
                    document_structure['elements'].append({
                        'content': df,
                        'type': 'table',
                        'pandas_table': True
                    })
                except:
                    # Fallback to string representation
                    document_structure['elements'].append({
                        'content': str(table_data),
                        'type': 'table',
                        'pandas_table': False
                    })

        # Extract images
        for img in soup.find_all('img'):
            document_structure['elements'].append({
                'content': img.get('alt', 'Image') + f" (src: {img.get('src', '')})",
                'type': 'image',
            })

        return document_structure

    def _process_text(self, file_path: str) -> Dict:
        """Process plain text or markdown documents."""
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()

        document_structure = {
            'metadata': {'source': file_path, 'type': 'text'},
            'elements': []
        }

        # Split by double newlines to separate paragraphs
        paragraphs = content.split('\n\n')

        for para in paragraphs:
            para = para.strip()
            if not para:
                continue

            # Check if it's a heading (starts with # in markdown)
            if para.startswith('#'):
                level = len(re.match(r'^#+', para).group())
                heading_text = para.lstrip('#').strip()
                document_structure['elements'].append({
                    'content': heading_text,
                    'type': 'heading',
                    'level': level
                })
            # Check if it's a table (simple detection for markdown tables)
            elif '|' in para and '-+-' in para.replace('|', '+'):
                document_structure['elements'].append({
                    'content': para,
                    'type': 'table',
                    'pandas_table': False
                })
            # Check for strikeout text (~~text~~ in markdown)
            elif '~~' in para:
                document_structure['elements'].append({
                    'content': para,
                    'type': 'strikeout',
                })
            # Check for highlighted text (** or __ in markdown)
            elif '**' in para or '__' in para:
                document_structure['elements'].append({
                    'content': para,
                    'type': 'highlight',
                })
            # Regular paragraph
            else:
                document_structure['elements'].append({
                    'content': para,
                    'type': 'paragraph',
                })

        return document_structure

    def _process_word(self, file_path: str) -> Dict:
        """
        Process Word documents using python-docx library.
        
        Args:
            file_path: Path to the Word document
            
        Returns:
            Dictionary with structured document content
        """
        try:
            import docx
            from docx.oxml.table import CT_Tbl
            from docx.oxml.text.paragraph import CT_P
            
            document = docx.Document(file_path)
            document_structure = {
                'metadata': {'source': file_path, 'type': 'docx'},
                'elements': []
            }
            
            # Process document body
            for element in document.element.body:
                # Process paragraphs
                if isinstance(element, CT_P):
                    paragraph = docx.Document().add_paragraph()
                    paragraph._p = element
                    text = paragraph.text.strip()
                    
                    if not text:
                        continue
                    
                    # Check if it's likely a heading (based on style)
                    p = document.add_paragraph()
                    p._p = element
                    if hasattr(p, 'style') and p.style and 'heading' in p.style.name.lower():
                        document_structure['elements'].append({
                            'content': text,
                            'type': 'heading',
                            'level': int(p.style.name[-1]) if p.style.name[-1].isdigit() else 1
                        })
                    # Check for highlighted or strikeout text
                    elif '**' in text or '__' in text:
                        document_structure['elements'].append({
                            'content': text,
                            'type': 'highlight'
                        })
                    elif '~~' in text:
                        document_structure['elements'].append({
                            'content': text,
                            'type': 'strikeout'
                        })
                    else:
                        document_structure['elements'].append({
                            'content': text,
                            'type': 'paragraph'
                        })
                
                # Process tables
                elif isinstance(element, CT_Tbl):
                    table = docx.Document().add_table(rows=1, cols=1)
                    table._tbl = element
                    
                    # Convert table to pandas DataFrame
                    data = []
                    headers = []
                    
                    # Get headers from first row
                    if table.rows:
                        for cell in table.rows[0].cells:
                            headers.append(cell.text.strip())
                    
                    # Get data from remaining rows
                    for row in table.rows[1:]:
                        row_data = []
                        for cell in row.cells:
                            row_data.append(cell.text.strip())
                        data.append(row_data)
                    
                    # Create pandas DataFrame if possible
                    try:
                        if headers and data:
                            df = pd.DataFrame(data, columns=headers)
                            document_structure['elements'].append({
                                'content': df,
                                'type': 'table',
                                'pandas_table': True
                            })
                        else:
                            # Create simple text representation for table
                            table_text = "Table content:\n"
                            for row in table.rows:
                                row_text = [cell.text.strip() for cell in row.cells]
                                table_text += " | ".join(row_text) + "\n"
                            document_structure['elements'].append({
                                'content': table_text,
                                'type': 'table',
                                'pandas_table': False
                            })
                    except Exception as e:
                        print(f"Error converting table to DataFrame: {e}")
                        table_text = "Table content (error converting):\n"
                        for row in table.rows:
                            row_text = [cell.text.strip() for cell in row.cells]
                            table_text += " | ".join(row_text) + "\n"
                        document_structure['elements'].append({
                            'content': table_text,
                            'type': 'table',
                            'pandas_table': False
                        })
            
            # Process document properties (metadata)
            try:
                core_properties = document.core_properties
                document_structure['metadata']['title'] = core_properties.title
                document_structure['metadata']['author'] = core_properties.author
                document_structure['metadata']['created'] = str(core_properties.created)
                document_structure['metadata']['modified'] = str(core_properties.modified)
            except Exception as e:
                print(f"Error extracting document properties: {e}")
            
            return document_structure
        
        except ImportError:
            print("python-docx package not found. Please install it with 'pip install python-docx'")
            return {
                'metadata': {'source': file_path, 'type': 'docx'},
                'elements': [{
                    'content': f"python-docx package required for Word processing: {file_path}",
                    'type': 'paragraph',
                }]
            }
        except Exception as e:
            print(f"Error processing Word document: {e}")
            return {
                'metadata': {'source': file_path, 'type': 'docx'},
                'elements': [{
                    'content': f"Error processing Word document: {file_path}. Error: {str(e)}",
                    'type': 'paragraph',
                }]
            }

    def extract_tables_as_triplets(self, document_structure: Dict) -> List[Dict]:
        """
        Extract tables from document and convert to triplets for Neo4j.
        """
        triplets = []
        table_count = 0
        
        try:
            for idx, element in enumerate(document_structure['elements']):
                if element['type'] == 'table':
                    if element.get('pandas_table', False):
                        try:
                            df = element['content']
                            if isinstance(df, pd.DataFrame):
                                # Get table name
                                table_name = f"Table_{table_count}"
                                table_count += 1
                                
                                # Look for the closest heading before this table
                                for j in range(idx-1, -1, -1):
                                    if j < len(document_structure['elements']) and document_structure['elements'][j]['type'] == 'heading':
                                        table_name = document_structure['elements'][j]['content']
                                        break
                                        
                                # Create triplets from table
                                row_count = 0
                                for _, row in df.iterrows():
                                    for col in df.columns:
                                        try:
                                            triplets.append({
                                                'subject': f"{table_name}",
                                                'predicate': str(col),
                                                'object': str(row[col]),
                                                'row_id': row_count
                                            })
                                        except Exception as e:
                                            print(f"Error creating triplet for cell: {e}")
                                    row_count += 1
                        except Exception as e:
                            print(f"Error processing table at index {idx}: {e}")
        except Exception as e:
            print(f"Error in extract_tables_as_triplets: {e}")
            
        return triplets



In [4]:

# ## 3. Implementing Chunking Strategies
# Now we'll develop functions for various chunking strategies:
# 1. Title + Content (Hierarchical) chunking
# 2. Paragraph-Based chunking
# 3. Table-Aware chunking for Neo4j Graph representation
# 4. Metadata-Aware chunking

# In[15]:


class DocumentChunker:
    """Class implementing various chunking strategies for document processing."""

    def __init__(self):
        # Initialize text splitters for different strategies
        self.recursive_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,
            chunk_overlap=100,
            separators=["\n\n", "\n", ". ", " ", ""]
        )

        self.md_header_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=[
                ("#", "header1"),
                ("##", "header2"),
                ("###", "header3"),
                ("####", "header4")
            ]
        )

        self.html_header_splitter = HTMLHeaderTextSplitter(
            headers_to_split_on=[
                ("h1", "header1"),
                ("h2", "header2"),
                ("h3", "header3"),
                ("h4", "header4")
            ]
        )

    def hierarchical_chunking(self, document_structure: Dict) -> List[Document]:
        """
        Perform Title + Content (Hierarchical) chunking.

        Args:
            document_structure: Document structure dictionary

        Returns:
            List of LangChain Document objects
        """
        # First convert the document structure to a format suitable for hierarchical processing
        markdown_content = self._convert_to_markdown_with_headers(document_structure)

        # Split using the markdown header splitter
        md_header_splits = self.md_header_splitter.split_text(markdown_content)

        # Further split long content chunks if necessary
        final_chunks = []
        for doc in md_header_splits:
            # If content is too long, split further
            content = doc.page_content
            if len(content) > 1000:  # adjust threshold as needed
                sub_chunks = self.recursive_splitter.split_text(content)
                # Copy metadata to each sub-chunk
                for chunk in sub_chunks:
                    final_chunks.append(Document(
                        page_content=chunk,
                        metadata={**doc.metadata, 'chunk_type': 'hierarchical'}
                    ))
            else:
                doc.metadata['chunk_type'] = 'hierarchical'
                final_chunks.append(doc)

        return final_chunks

    def paragraph_based_chunking(self, document_structure: Dict) -> List[Document]:
        """
        Perform Paragraph-Based chunking with line breaks preserved.

        Args:
            document_structure: Document structure dictionary

        Returns:
            List of LangChain Document objects
        """
        chunks = []

        # Extract paragraphs from the document
        paragraphs = []
        current_section = {'title': 'Document', 'content': ''}

        for element in document_structure['elements']:
            if element['type'] == 'heading':
                # If we have content in the current section, save it
                if current_section['content'].strip():
                    paragraphs.append(current_section)

                # Start a new section
                current_section = {
                    'title': element['content'],
                    'content': ''
                }
            elif element['type'] == 'paragraph':
                current_section['content'] += element['content'] + "\n\n"

        # Add the last section if it has content
        if current_section['content'].strip():
            paragraphs.append(current_section)

        # Create Document objects for each paragraph
        for para in paragraphs:
            # Split content if it's too long
            if len(para['content']) > 1000:
                sub_chunks = self.recursive_splitter.split_text(para['content'])
                for i, chunk in enumerate(sub_chunks):
                    chunks.append(Document(
                        page_content=chunk,
                        metadata={
                            'title': para['title'],
                            'chunk_type': 'paragraph',
                            'chunk_index': i,
                            'source': document_structure['metadata']['source']
                        }
                    ))
            else:
                chunks.append(Document(
                    page_content=para['content'],
                    metadata={
                        'title': para['title'],
                        'chunk_type': 'paragraph',
                        'source': document_structure['metadata']['source']
                    }
                ))

        return chunks

    def table_aware_chunking(self, document_structure: Dict) -> List[Dict]:
        """
        Perform Table-Aware chunking for Neo4j graph.

        Args:
            document_structure: Document structure dictionary

        Returns:
            List of dictionaries representing table triplets for Neo4j
        """
        # Extract tables and convert to triplets
        analyzer = DocumentAnalyzer()
        return analyzer.extract_tables_as_triplets(document_structure)

    def metadata_aware_chunking(self, document_structure: Dict) -> List[Document]:
        """
        Perform Metadata-Aware chunking with Milvus compatibility.
        """
        chunks = []
        
        # Identify semantic sections
        semantic_sections = self._identify_semantic_sections(document_structure)
        
        for section in semantic_sections:
            # Calculate chunk size based on content type
            if section['type'] == 'rule_explanation':
                chunk_size = 1500
            elif section['type'] == 'user_scenario':
                chunk_size = 1000
            elif section['type'] == 'business_logic':
                chunk_size = 800
            else:
                chunk_size = 1000
                
            custom_splitter = RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=min(100, chunk_size // 10),
                separators=["\n\n", "\n", ". ", " ", ""]
            )
            
            # Split content
            text_chunks = custom_splitter.split_text(section['content'])
            
            # Convert keywords and categories to strings for Milvus compatibility
            keywords_str = ",".join(section.get('keywords', []))
            categories_str = ",".join(section.get('categories', []))
            
            # Create Document objects with compatible metadata
            for i, chunk in enumerate(text_chunks):
                chunks.append(Document(
                    page_content=chunk,
                    metadata={
                        'title': section.get('title', 'Untitled Section'),
                        'content_type': section['type'],
                        'source': document_structure['metadata']['source'],
                        'chunk_index': i,
                        'total_chunks': len(text_chunks),
                        'chunk_type': 'metadata_aware',
                        # Store keywords and categories as strings
                        'keywords_str': keywords_str,
                        'categories_str': categories_str
                    }
                ))
        
        return chunks

    def _convert_to_markdown_with_headers(self, document_structure: Dict) -> str:
        """
        Convert document structure to markdown format with headers.

        Args:
            document_structure: Document structure dictionary

        Returns:
            Markdown string representation of the document
        """
        markdown = []
        current_heading = ""

        for element in document_structure['elements']:
            if element['type'] == 'heading':
                level = element.get('level', 1)
                heading = '#' * level + ' ' + element['content']
                markdown.append(heading)
                current_heading = element['content']
            elif element['type'] == 'paragraph':
                markdown.append(element['content'])
            elif element['type'] == 'table':
                if isinstance(element['content'], pd.DataFrame):
                    table_str = element['content'].to_markdown()
                    markdown.append(table_str)
                else:
                    markdown.append(str(element['content']))
            elif element['type'] in ['strikeout', 'highlight', 'image']:
                markdown.append(element['content'])

        return '\n\n'.join(markdown)

    def _identify_semantic_sections(self, document_structure: Dict) -> List[Dict]:
        """
        Identify semantic sections in the document.

        Args:
            document_structure: Document structure dictionary

        Returns:
            List of semantic section dictionaries
        """
        sections = []
        current_section = None

        # Patterns to identify content types
        rule_patterns = ['rule', 'regulation', 'policy', 'guidelines']
        logic_patterns = ['workflow', 'process', 'logic', 'procedure', 'business logic']
        scenario_patterns = ['scenario', 'example', 'use case', 'user story']
        interaction_patterns = ['interaction', 'interface', 'system', 'integration', 'api']

        for element in document_structure['elements']:
            if element['type'] == 'heading':
                # If there's a current section with content, save it
                if current_section and current_section.get('content'):
                    sections.append(current_section)

                # Determine the section type based on heading content
                heading_lower = element['content'].lower()

                if any(pattern in heading_lower for pattern in rule_patterns):
                    section_type = 'rule_explanation'
                elif any(pattern in heading_lower for pattern in logic_patterns):
                    section_type = 'business_logic'
                elif any(pattern in heading_lower for pattern in scenario_patterns):
                    section_type = 'user_scenario'
                elif any(pattern in heading_lower for pattern in interaction_patterns):
                    section_type = 'system_interaction'
                else:
                    section_type = 'general'

                # Create a new section
                current_section = {
                    'title': element['content'],
                    'type': section_type,
                    'content': '',
                    'keywords': self._extract_keywords(element['content']),
                    'categories': [section_type]
                }
            elif element['type'] in ['paragraph', 'strikeout', 'highlight']:
                if current_section:
                    current_section['content'] += element['content'] + "\n\n"
                else:
                    # Create a default section if none exists
                    current_section = {
                        'title': 'Introduction',
                        'type': 'general',
                        'content': element['content'] + "\n\n",
                        'keywords': [],
                        'categories': ['general']
                    }
            elif element['type'] == 'table':
                # Tables are handled separately for Neo4j, but we still include their text representation
                # in the content for completeness
                if current_section:
                    if isinstance(element['content'], pd.DataFrame):
                        current_section['content'] += "Table content:\n"
                        current_section['content'] += str(element['content']) + "\n\n"
                    else:
                        current_section['content'] += "Table content:\n" + str(element['content']) + "\n\n"

        # Add the last section if it exists
        if current_section and current_section.get('content'):
            sections.append(current_section)

        return sections

    def _extract_keywords(self, text: str) -> List[str]:
        """
        Extract keywords from text.

        Args:
            text: Text to extract keywords from

        Returns:
            List of keywords
        """
        # Simple implementation - extract meaningful words and filter stop words
        stop_words = {'a', 'an', 'the', 'and', 'or', 'but', 'if', 'then', 'else', 'when', 
                     'on', 'in', 'at', 'to', 'for', 'with', 'by', 'about', 'as', 'of'}

        words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
        keywords = [word for word in words if word not in stop_words]

        # Return unique keywords
        return list(set(keywords))






### 4. Setting Up the Embedding Model
 We'll configure a local embedding model to generate vector representations of our chunks.

In [5]:

class EmbeddingManager:
    """Class to manage embeddings for document chunks."""

    def __init__(self, model_name="all-MiniLM-L6-v2"):
        """
        Initialize the embedding manager.

        Args:
            model_name: Name of the HuggingFace model to use for embeddings
        """
        self.model_name = model_name
        self.embeddings = HuggingFaceEmbeddings(model_name=model_name)
        print(f"Initialized embedding model: {model_name}")

    def generate_embedding(self, text: str) -> List[float]:
        """
        Generate an embedding for text.

        Args:
            text: Text to embed

        Returns:
            List of floats representing the embedding vector
        """
        # The HuggingFaceEmbeddings class follows the LangChain embedding interface
        return self.embeddings.embed_query(text)

    def batch_embed(self, texts: List[str]) -> List[List[float]]:
        """
        Batch embed multiple texts.

        Args:
            texts: List of texts to embed

        Returns:
            List of embedding vectors
        """
        return self.embeddings.embed_documents(texts)

    def embed_documents(self, documents: List[Document]) -> Tuple[List[List[float]], List[Document]]:
        """
        Embed LangChain Document objects.

        Args:
            documents: List of Document objects to embed

        Returns:
            Tuple of (embeddings, documents)
        """
        texts = [doc.page_content for doc in documents]
        embeddings = self.batch_embed(texts)
        return embeddings, documents

    def embed_triplets(self, triplets: List[Dict]) -> List[Dict]:
        """
        Embed triplets for Neo4j graph.

        Args:
            triplets: List of triplet dictionaries

        Returns:
            Triplets with embeddings added
        """
        for triplet in triplets:
            # Create a concatenated text representation of the triplet
            triplet_text = f"{triplet['subject']} {triplet['predicate']} {triplet['object']}"
            triplet['embedding'] = self.generate_embedding(triplet_text)

        return triplets



### 5. Configuring Vector Databases (Milvus and Neo4j)
Set up connections and schemas for Milvus and Neo4j databases.

In [6]:
class VectorDBManager:
    """Class to manage vector database connections and operations."""

    def __init__(self, embedding_manager: EmbeddingManager):
        """Initialize with the embedding manager."""
        self.embedding_manager = embedding_manager
        self.milvus_client = None
        self.neo4j_client = None

        # Collections in Milvus (document_id -> collection)
        self.collections = {}
        
        # Document metadata store
        self.doc_metadata = {}

    def setup_milvus(self, host='localhost', port='2379'):
        """Set up connection to Milvus."""
        try:
            # Define Milvus connection parameters
            self.milvus_connection_params = {
                "host": host,
                "port": port
            }
            
            print("Milvus connection parameters configured.")
            self.milvus_available = True
            
            # Check if connection works
            from pymilvus import connections
            connections.connect(host=host, port=port)
            print("Milvus connection test successful")
            connections.disconnect(alias="default")
            
        except Exception as e:
            print(f"Error setting up Milvus: {e}")
            self.milvus_available = False

    def setup_neo4j(self, uri='bolt://localhost:7687', username='neo4j', password='venev'):
        """Set up connection to Neo4j."""
        try:
            # Store Neo4j connection parameters
            self.neo4j_connection_params = {
                "url": uri,
                "username": username,
                "password": password
            }
            
            # Initialize Neo4j graph client
            self.neo4j_client = Neo4jGraph(
                url=uri,
                username=username,
                password=password
            )
            
            # Initialize Neo4j vector client
            self.neo4j_vector = Neo4jVector(
                url=uri,
                username=username,
                password=password,
                embedding=self.embedding_manager.embeddings,
                index_name="documentVectors",
                node_label="DocumentChunk"
            )
            
            # Set up schema for our document graph (only once)
            self._create_neo4j_schema()
            print("Neo4j connection established and schema created.")
            self.neo4j_available = True
        except Exception as e:
            print(f"Error setting up Neo4j: {e}")
            self.neo4j_client = None
            self.neo4j_vector = None
            self.neo4j_available = False

    def _create_neo4j_schema(self):
        """Create Neo4j schema for document storage."""
        # Create constraints that work with Community Edition
        self.neo4j_client.query("""
            CREATE CONSTRAINT document_id IF NOT EXISTS
            FOR (d:Document) REQUIRE d.id IS UNIQUE
        """)
        
        self.neo4j_client.query("""
            CREATE CONSTRAINT chunk_id IF NOT EXISTS
            FOR (c:DocumentChunk) REQUIRE c.id IS UNIQUE
        """)
        
        # Create vector index if needed
        self.neo4j_client.query("""
            CREATE VECTOR INDEX documentVectors IF NOT EXISTS
            FOR (c:DocumentChunk) 
            ON c.embedding
            OPTIONS {indexConfig: {
              `vector.dimensions`: 384,
              `vector.similarity_function`: 'cosine'
            }}
        """)

    def store_in_milvus(self, documents: List[Document], doc_id: str, content_type: str):
        """Store documents in Milvus using doc_id for collection naming."""
        if not hasattr(self, 'milvus_available') or not self.milvus_available:
            print("Skipping Milvus storage - Milvus not available")
            return
            
        try:
            # Sanitize the document ID and content type to create a valid collection name
            # Replace spaces and special characters with underscores
            sanitized_doc_id = re.sub(r'[^a-zA-Z0-9_]', '_', doc_id)
            sanitized_content_type = re.sub(r'[^a-zA-Z0-9_]', '_', content_type)
            
            # Create a collection name that includes sanitized document ID and content type
            collection_name = f"doc_{sanitized_doc_id}_{sanitized_content_type}"
            
            print(f"Using sanitized collection name: {collection_name}")
            
            # Generate embeddings for documents
            embeddings = [self.embedding_manager.generate_embedding(doc.page_content) for doc in documents]

            # Create Milvus collection
            vector_store = Milvus.from_documents(
                documents=documents,
                embedding=self.embedding_manager.embeddings,
                collection_name=collection_name,
                connection_args=self.milvus_connection_params
            )

            # Store the collection reference with original doc_id
            if doc_id not in self.collections:
                self.collections[doc_id] = {}
            
            self.collections[doc_id][content_type] = vector_store

            print(f"Stored {len(documents)} documents in Milvus collection: {collection_name}")

        except Exception as e:
            print(f"Error storing documents in Milvus: {e}")

    def store_in_neo4j(self, triplets: List[Dict], doc_id: str):
        """Store triplets in Neo4j with document ID."""
        if not hasattr(self, 'neo4j_available') or not self.neo4j_available:
            print("Skipping Neo4j storage - Neo4j not available")
            return
            
        try:
            # Get the Neo4j driver session properly
            session = self.neo4j_client._driver.session()
            
            # Process in smaller batches for better error handling
            batch_size = 50
            for i in range(0, len(triplets), batch_size):
                batch = triplets[i:i+batch_size]
                try:
                    # Begin a transaction for this batch
                    with session.begin_transaction() as tx:
                        # Convert parameters for Neo4j
                        params = {
                            "batch_size": len(batch),
                            "subjects": [t["subject"] for t in batch],
                            "predicates": [t["predicate"] for t in batch],
                            "objects": [t["object"] for t in batch],
                            "embeddings": [t["embedding"] for t in batch],
                            "doc_id": doc_id
                        }

                        # Enhanced Cypher query with document ID
                        query = """
                        UNWIND range(0, $batch_size - 1) as i
                        MERGE (d:Document {id: $doc_id})
                        MERGE (s:Subject {name: $subjects[i], doc_id: $doc_id})
                        MERGE (o:Object {name: $objects[i], doc_id: $doc_id})
                        WITH d, s, o, i
                        CREATE (s)-[r:HAS_PROPERTY {name: $predicates[i]}]->(o)
                        CREATE (s)-[:BELONGS_TO]->(d)
                        CREATE (o)-[:BELONGS_TO]->(d)
                        WITH d, s, o, i
                        CREATE (c:DocumentChunk {
                            id: randomUUID(),
                            text: $subjects[i] + ' ' + $predicates[i] + ' ' + $objects[i],
                            subject: $subjects[i],
                            predicate: $predicates[i],
                            object: $objects[i],
                            doc_id: $doc_id
                        })
                        SET c.embedding = $embeddings[i]
                        CREATE (c)-[:SUBJECT_OF]->(s)
                        CREATE (c)-[:OBJECT_OF]->(o)
                        CREATE (c)-[:PART_OF]->(d)
                        """

                        # Execute query within transaction
                        result = tx.run(query, params)
                        result.consume()  # Ensure execution completes
                    
                    print(f"Processed batch {i//batch_size + 1}/{(len(triplets) + batch_size - 1)//batch_size}")
                        
                except Exception as e:
                    print(f"Error processing batch {i//batch_size + 1}: {e}")
                    # Continue to the next batch even if this one fails
            
            # Close the session when done
            session.close()
            print("Stored triplets in Neo4j (completed batches)")

        except Exception as e:
            print(f"Error initializing Neo4j transaction: {e}")
            # Make sure session is closed if an error occurs
            if 'session' in locals():
                session.close()

    def register_document_metadata(self, doc_id: str, metadata: Dict):
        """Register document metadata for later reference."""
        self.doc_metadata[doc_id] = metadata
        
    def get_document_metadata(self, doc_id: str) -> Dict:
        """Get metadata for a document."""
        return self.doc_metadata.get(doc_id, {})
        
    def list_documents(self) -> List[str]:
        """List all documents in the system."""
        return list(self.collections.keys())

    def document_exists(self, doc_id: str) -> bool:
        """Check if document already exists in the system."""
        return doc_id in self.collections

### 6. Document Processing Pipeline
Now let's develop a pipeline that processes documents by content type, applies appropriate chunking strategies, and stores data in the correct database.


In [7]:
class DocumentProcessingPipeline:
    """Pipeline for document processing, chunking, and storage."""

    def __init__(
        self, 
        analyzer: DocumentAnalyzer, 
        chunker: DocumentChunker, 
        embedding_manager: EmbeddingManager,
        vector_db_manager: VectorDBManager
    ):
        self.analyzer = analyzer
        self.chunker = chunker
        self.embedding_manager = embedding_manager
        self.vector_db_manager = vector_db_manager

        # Content type mapping
        self.content_mapping = {
            'rule_explanation': {'chunking': 'hierarchical', 'storage': 'milvus'},
            'business_logic': {'chunking': 'paragraph', 'storage': 'milvus'},
            'table': {'chunking': 'table', 'storage': 'neo4j'},
            'user_scenario': {'chunking': 'paragraph', 'storage': 'milvus'},
            'system_interaction': {'chunking': 'metadata', 'storage': 'both'}
        }

    def process_document(self, file_path: str, doc_id: str = None):
        """
        Process a document file through the entire pipeline.
        
        Args:
            file_path: Path to the document file
            doc_id: Optional document ID, defaults to file basename
        """
        if doc_id is None:
            doc_id = os.path.basename(file_path).split('.')[0]
            
        print(f"Processing document: {file_path} (ID: {doc_id})")
        
        # Check if document has already been processed
        if self.vector_db_manager.document_exists(doc_id):
            print(f"Document {doc_id} has already been processed. Skipping.")
            return

        # Step 1: Analyze document
        document_structure = self.analyzer.load_document(file_path)
        print(f"Document analysis complete. Found {len(document_structure['elements'])} elements.")

        # Store document metadata
        document_structure['metadata']['doc_id'] = doc_id
        self.vector_db_manager.register_document_metadata(doc_id, document_structure['metadata'])

        # Step 2: Process each content type with appropriate chunking
        content_types = self._identify_content_types(document_structure)
        print(f"Identified content types: {', '.join(content_types)}")

        # Step 3: Apply chunking strategies and store in appropriate databases
        self._process_content_types(document_structure, content_types, doc_id)

        print(f"Document {doc_id} processing complete.")
        
    # Add doc_id parameter to relevant methods
    def _process_content_types(self, document_structure: Dict, content_types: List[str], doc_id: str):
        """Process content types with doc_id."""
        # Create a copy of content_types to avoid modification during iteration
        types_to_process = content_types.copy()
        
        # Process tables first if present
        if 'table' in types_to_process:
            print("Processing tables...")
            try:
                triplets = self.chunker.table_aware_chunking(document_structure)
                
                if triplets:
                    # Process triplets with doc_id
                    if hasattr(self.vector_db_manager, 'neo4j_available') and self.vector_db_manager.neo4j_available:
                        try:
                            triplets_with_embeddings = self.embedding_manager.embed_triplets(triplets)
                            self.vector_db_manager.store_in_neo4j(triplets_with_embeddings, doc_id)
                        except Exception as e:
                            print(f"Error storing triplets in Neo4j: {e}")
                            
                    # Fallback to Milvus for redundancy
                    try:
                        print("Storing table data in Milvus for backup")
                        table_chunks = []
                        for triplet in triplets:
                            table_chunks.append(Document(
                                page_content=f"{triplet['subject']} - {triplet['predicate']}: {triplet['object']}",
                                metadata={
                                    'title': triplet['subject'],
                                    'chunk_type': 'table_fallback',
                                    'source': document_structure['metadata']['source'],
                                    'doc_id': doc_id
                                }
                            ))
                        if table_chunks:
                            self.vector_db_manager.store_in_milvus(table_chunks, doc_id, 'business_logic')
                    except Exception as e:
                        print(f"Error storing table fallback in Milvus: {e}")
                else:
                    print("No valid tables found for extraction")
                    
            except Exception as e:
                print(f"Error processing tables: {e}")
                
            types_to_process.remove('table')
        
        # Process other content types with better error handling
        for content_type in types_to_process:
            print(f"Processing content type: {content_type}")
            
            try:
                chunking_strategy = self.content_mapping[content_type]['chunking']
                storage = self.content_mapping[content_type]['storage']
                
                # Apply chunking strategy with error handling
                chunks = []
                try:
                    if chunking_strategy == 'hierarchical':
                        chunks = self.chunker.hierarchical_chunking(document_structure)
                    elif chunking_strategy == 'paragraph':
                        chunks = self.chunker.paragraph_based_chunking(document_structure)
                    elif chunking_strategy == 'metadata':
                        chunks = self.chunker.metadata_aware_chunking(document_structure)
                    else:
                        print(f"Unknown chunking strategy for {content_type}: {chunking_strategy}")
                        continue
                    
                    # Add doc_id to all chunks
                    for chunk in chunks:
                        chunk.metadata['doc_id'] = doc_id
                        
                    print(f"Created {len(chunks)} chunks using {chunking_strategy} strategy")
                except Exception as e:
                    print(f"Error applying chunking strategy '{chunking_strategy}': {e}")
                    continue
                    
                # Store in appropriate database with fallbacks
                if not chunks:
                    print(f"No chunks created for {content_type}. Skipping storage.")
                    continue
                    
                try:
                    # Always try Milvus first for reliability
                    if hasattr(self.vector_db_manager, 'milvus_available') and self.vector_db_manager.milvus_available:
                        print(f"Storing {len(chunks)} chunks in Milvus")
                        self.vector_db_manager.store_in_milvus(chunks, doc_id, content_type)
                        
                    # If Neo4j storage is required and available
                    if (storage == 'neo4j' or storage == 'both'):
                        try:
                            print("Converting chunks to triplets for Neo4j storage")
                            triplets = self._convert_chunks_to_triplets(chunks, content_type)
                            triplets_with_embeddings = self.embedding_manager.embed_triplets(triplets)
                            
                            # Try Neo4j storage with doc_id
                            if hasattr(self.vector_db_manager, 'neo4j_available') and self.vector_db_manager.neo4j_available:
                                try:
                                    self.vector_db_manager.store_in_neo4j(triplets_with_embeddings, doc_id)
                                except Exception as e:
                                    print(f"Error storing in Neo4j: {e}")
                        except Exception as e:
                            print(f"Error preparing triplets: {e}")
                except Exception as e:
                    print(f"Error storing chunks for {content_type}: {e}")
            except Exception as e:
                print(f"Error processing content type {content_type}: {e}")
                
    def _identify_content_types(self, document_structure: Dict) -> List[str]:
        """
        Identify content types in the document.
        
        Args:
            document_structure: Document structure dictionary
            
        Returns:
            List of content types found in the document
        """
        content_types = set()
        
        # First, check if we have tables
        for element in document_structure['elements']:
            if element['type'] == 'table':
                content_types.add('table')
                break
        
        # Analyze headings to determine content types
        heading_texts = []
        for element in document_structure['elements']:
            if element['type'] == 'heading':
                heading_text = element['content'].lower()
                heading_texts.append(heading_text)
                
                # Look for keywords indicating content types
                if any(pattern in heading_text for pattern in ['rule', 'regulation', 'policy']):
                    content_types.add('rule_explanation')
                
                if any(pattern in heading_text for pattern in ['workflow', 'process', 'procedure']):
                    content_types.add('business_logic')
                    
                if any(pattern in heading_text for pattern in ['scenario', 'example', 'use case']):
                    content_types.add('user_scenario')
                    
                if any(pattern in heading_text for pattern in ['system', 'interaction', 'interface']):
                    content_types.add('system_interaction')
        
        # If no specific content types are identified, add default ones
        if not content_types:
            # Default to treating it as business logic and rule explanation
            content_types.add('business_logic')
            
        # Always add the table type if tables are detected
        has_tables = any(element['type'] == 'table' for element in document_structure['elements'])
        if has_tables:
            content_types.add('table')
            
        return list(content_types)
        
    def _convert_chunks_to_triplets(self, chunks: List[Document], content_type: str) -> List[Dict]:
        """
        Convert document chunks to triplets for Neo4j.
        
        Args:
            chunks: List of document chunks
            content_type: Type of content
            
        Returns:
            List of triplets
        """
        triplets = []
        
        for i, chunk in enumerate(chunks):
            title = chunk.metadata.get('title', 'Untitled')
            content = chunk.page_content
            
            # Create triplet - simple subject-predicate-object format
            triplet = {
                'subject': title,
                'predicate': f"has_{content_type}",
                'object': content[:200] if len(content) > 200 else content,  # Truncate long content
                'chunk_id': i
            }
            
            triplets.append(triplet)
            
        return triplets

### 8. Query Processing with Gemini LLM
Let's integrate the Gemini-1.5-flash-8b model via API to process queries against the retrieved document chunks.


In [8]:
class GeminiProcessor:
    """Enhanced class for processing queries with Gemini LLM."""

    def __init__(self, api_key=None):
        """Initialize the Gemini processor."""
        # Try to get API key from parameter, then environment variable
        self.api_key = api_key or os.environ.get("GOOGLE_API_KEY")
        
        if not self.api_key:
            print("Warning: No API key provided for Gemini. Set GOOGLE_API_KEY environment variable.")
        else:
            try:
                genai.configure(api_key=self.api_key)
                self.model = genai.GenerativeModel('gemini-1.5-flash-8b')
                print("Gemini model initialized successfully")
            except Exception as e:
                print(f"Error initializing Gemini: {e}")
                self.model = None
    
    def analyze_query_topics(self, query: str, num_topics: int = 3) -> List[str]:
        """Extract main topics from query to aid in document relevance."""
        if not self.model:
            return []
        
        try:
            prompt = f"""
            Identify the {num_topics} main topics or subjects in this query. 
            Return only a comma-separated list of topics without explanation.
            
            Query: {query}
            """
            
            response = self.model.generate_content(prompt)
            topics = [topic.strip() for topic in response.text.split(',')]
            return topics[:num_topics]  # Ensure we don't exceed requested number
        except Exception as e:
            print(f"Error analyzing query topics: {e}")
            return []

    def process_query(self, query: str, retrieved_docs: List[Document]) -> str:
        """Process a query using the Gemini LLM and retrieved documents."""
        if not self.model:
            return "Gemini model not initialized. Please provide a valid API key."

        try:
            # Prepare context with document metadata
            context_elements = []
            for i, doc in enumerate(retrieved_docs):
                # Extract document ID and other useful metadata
                doc_id = doc.metadata.get('doc_id', 'unknown')
                source = doc.metadata.get('source', 'unknown')
                content_type = doc.metadata.get('content_type', '')
                title = doc.metadata.get('title', '')
                
                # Format the context entry
                context_entry = f"Document {i+1} [{doc_id}]:\n"
                if title:
                    context_entry += f"Title: {title}\n"
                if content_type:
                    context_entry += f"Content type: {content_type}\n"
                context_entry += f"Content: {doc.page_content}\n"
                
                context_elements.append(context_entry)
                
            context = "\n\n".join(context_elements)

            # Enhanced prompt with metadata awareness
            prompt = f"""
            You are an intelligent assistant that answers questions based on provided context.
            The context consists of information from various documents, each with its own ID and metadata.

            Context information:
            {context}
            
            User question: {query}
            
            Please answer the question based only on the provided context. If the context doesn't contain enough information
            to fully answer the question, acknowledge what you can answer based on the context and what information is missing.
            When referring to specific information, mention which document it comes from.
            """

            # Generate response
            response = self.model.generate_content(prompt)
            return response.text

        except Exception as e:
            print(f"Error processing query with Gemini: {e}")
            return f"Error processing query: {str(e)}"

### 7. Hybrid Retrieval System with LangChain
Hybrid retrieval system using LangChain v0.3 to combine search results from both Milvus and Neo4j.


In [9]:
class HybridRetriever:
    """Class for hybrid retrieval from multiple vector databases."""

    def __init__(self, vector_db_manager: VectorDBManager):
        """Initialize with the vector database manager."""
        self.vector_db_manager = vector_db_manager
        self.retrievers = {}
        self.doc_weights = {}  # For document-specific weighting
        
    def setup_langchain_retrieval(self, use_gemini_for_query_expansion=True):
        """Set up LangChain retrieval with smarter strategies."""
        try:
            # Create retriever objects for different documents and collections
            self.retrievers = {}
            
            # Add Milvus retrievers for each document
            for doc_id, content_types in self.vector_db_manager.collections.items():
                for content_type, collection in content_types.items():
                    if collection is not None:
                        try:
                            retriever_name = f"milvus_{doc_id}_{content_type}"
                            self.retrievers[retriever_name] = collection.as_retriever(
                                search_type="similarity",
                                search_kwargs={"k": 3}
                            )
                        except Exception as e:
                            print(f"Error setting up retriever for {doc_id}/{content_type}: {e}")

            # Add Neo4j retriever if available
            if hasattr(self.vector_db_manager, 'neo4j_vector') and self.vector_db_manager.neo4j_vector:
                try:
                    self.retrievers["neo4j"] = self.vector_db_manager.neo4j_vector.as_retriever(
                        search_kwargs={"k": 3}
                    )
                except Exception as e:
                    print(f"Error setting up Neo4j retriever: {e}")
            
            # Set up query expansion if requested
            if use_gemini_for_query_expansion and self.retrievers:
                try:
                    if "GOOGLE_API_KEY" in os.environ and os.environ["GOOGLE_API_KEY"]:
                        # Set up Gemini for multi-query generation
                        self.gemini_llm = ChatGoogleGenerativeAI(
                            model="gemini-1.5-flash-8b",
                            temperature=0.2
                        )

                        # Use first available retriever for multi-query
                        first_retriever_name = list(self.retrievers.keys())[0]
                        try:
                            self.multi_query_retriever = MultiQueryRetriever.from_llm(
                                retriever=self.retrievers[first_retriever_name],
                                llm=self.gemini_llm
                            )
                            print("Multi-query retriever set up with Gemini")
                        except Exception as e:
                            print(f"Error setting up multi-query: {e}")
                    else:
                        print("No Google API key found. Skipping query expansion setup.")
                except Exception as e:
                    print(f"Error setting up query expansion: {e}")

            print(f"Set up {len(self.retrievers)} retrievers for hybrid search")

        except Exception as e:
            print(f"Error setting up LangChain retrieval: {e}")
            # Initialize empty retrievers to avoid errors
            self.retrievers = {}
    
    def analyze_query_for_relevance(self, query: str) -> Dict[str, float]:
        """Analyze query to determine relevant document weights."""
        # Default to equal weights if no special analysis
        doc_ids = set()
        for retriever_name in self.retrievers.keys():
            if retriever_name.startswith("milvus_"):
                parts = retriever_name.split("_")
                if len(parts) >= 2:
                    doc_ids.add(parts[1])
                    
        weights = {doc_id: 1.0 for doc_id in doc_ids}
        
        # If Gemini is available, use it for better weighting
        if "GOOGLE_API_KEY" in os.environ and os.environ["GOOGLE_API_KEY"]:
            try:
                gemini_processor = GeminiProcessor()
                # Get topic analysis from query
                topics = gemini_processor.analyze_query_topics(query)
                
                # For each document, check metadata for topic relevance
                for doc_id in doc_ids:
                    metadata = self.vector_db_manager.get_document_metadata(doc_id)
                    # Calculate relevance score (basic implementation)
                    relevance = 1.0  # default
                    if 'title' in metadata:
                        # Simple string matching for relevance
                        title = metadata['title'].lower()
                        for topic in topics:
                            if topic.lower() in title:
                                relevance += 0.5  # Boost for topic match
                    weights[doc_id] = relevance
            except Exception as e:
                print(f"Error analyzing query for document relevance: {e}")
                
        return weights

    def hybrid_retrieve(self, query: str, k: int = 5):
        """Perform hybrid retrieval with smart strategies."""
        all_results = []
        active_retrievers = 0
        
        # Check if there are any retrievers configured
        if not self.retrievers:
            print("No retrievers configured. Returning empty results.")
            return []
        
        # Analyze query to weight documents by relevance
        doc_weights = self.analyze_query_for_relevance(query)
            
        # Use multi-query retriever if available
        if hasattr(self, 'multi_query_retriever'):
            try:
                print("Using multi-query retriever")
                multi_results = self.multi_query_retriever.invoke(query)
                all_results.extend(multi_results)
                active_retrievers += 1
            except Exception as e:
                print(f"Error with multi-query retrieval: {e}")
        
        # Use all retrievers, weighted by document relevance
        for name, retriever in self.retrievers.items():
            try:
                # Weight results based on document
                doc_id = None
                if name.startswith("milvus_"):
                    parts = name.split("_")
                    if len(parts) >= 2:
                        doc_id = parts[1]
                
                # Calculate how many results to retrieve based on weight
                retrieval_k = k
                if doc_id and doc_id in doc_weights:
                    weight = doc_weights[doc_id]
                    # Adjust k based on weight (min 1, max 2*k)
                    retrieval_k = max(1, min(k*2, round(k * weight)))
                
                print(f"Querying retriever: {name} (k={retrieval_k})")
                results = retriever.invoke(query)
                
                # Add weight to each result's metadata for later sorting
                if doc_id and doc_id in doc_weights:
                    for doc in results:
                        doc.metadata['relevance_weight'] = doc_weights[doc_id]
                
                all_results.extend(results)
                active_retrievers += 1
            except Exception as e:
                print(f"Error retrieving from {name}: {e}")
        
        if active_retrievers == 0:
            print("Warning: No retrievers were successfully queried.")
        
        # Smart deduplication and re-ranking
        return self._smart_rerank(all_results, query, k)
    
    def _smart_rerank(self, results: List[Document], query: str, k: int) -> List[Document]:
        """Smart reranking of results considering duplication, relevance weights and semantic similarity."""
        if not results:
            return []
            
        # Group similar content using cosine similarity of embeddings
        try:
            unique_results = []
            seen_contents = set()
            
            # Sort by relevance weight first if available
            results = sorted(results, 
                            key=lambda x: x.metadata.get('relevance_weight', 1.0), 
                            reverse=True)
            
            # Use embedding manager to compute query embedding
            query_embedding = None
            try:
                query_embedding = self.vector_db_manager.embedding_manager.generate_embedding(query)
            except Exception as e:
                print(f"Error generating query embedding: {e}")
            
            # Calculate similarity scores if query embedding available
            if query_embedding:
                for doc in results:
                    content_hash = hash(doc.page_content)
                    if content_hash not in seen_contents:
                        seen_contents.add(content_hash)
                        # Try to get or compute document embedding
                        if hasattr(doc, 'embedding'):
                            doc_embedding = doc.embedding
                        else:
                            doc_embedding = self.vector_db_manager.embedding_manager.generate_embedding(doc.page_content)
                        
                        # Calculate similarity score
                        similarity = self._cosine_similarity(query_embedding, doc_embedding)
                        doc.metadata['similarity_score'] = similarity
                        unique_results.append(doc)
                
                # Final sort by similarity score
                unique_results = sorted(unique_results,
                                       key=lambda x: x.metadata.get('similarity_score', 0.0),
                                       reverse=True)
            else:
                # Simple deduplication if no embeddings
                for doc in results:
                    content = doc.page_content
                    if content not in seen_contents:
                        seen_contents.add(content)
                        unique_results.append(doc)
            
            return unique_results[:k]
        except Exception as e:
            print(f"Error in smart reranking: {e}")
            # Fallback to simple deduplication
            seen_contents = set()
            unique_results = []
            for doc in results:
                content = doc.page_content
                if content not in seen_contents:
                    seen_contents.add(content)
                    unique_results.append(doc)
            return unique_results[:k]
    
    def _cosine_similarity(self, vec1, vec2):
        """Calculate cosine similarity between two vectors."""
        dot_product = sum(a*b for a, b in zip(vec1, vec2))
        mag1 = math.sqrt(sum(a*a for a in vec1))
        mag2 = math.sqrt(sum(b*b for b in vec2))
        if mag1 * mag2 == 0:
            return 0
        return dot_product / (mag1 * mag2)

### 9. Document Manager
The Document Manager will handle the entire lifecycle of documents, including ingestion, processing, and retrieval.


In [10]:
class DocumentManager:
    """Class to manage document processing and storage."""
    
    def __init__(self, milvus_host='localhost', milvus_port='19530', 
                 neo4j_uri='bolt://localhost:7687', neo4j_username='neo4j', neo4j_password='venev'):
        """Initialize document manager with database connections."""
        # Initialize components
        self.analyzer = DocumentAnalyzer()
        self.chunker = DocumentChunker()
        self.embedding_manager = EmbeddingManager(model_name="all-MiniLM-L6-v2")
        
        # Initialize vector database manager
        self.vector_db_manager = VectorDBManager(self.embedding_manager)
        
        # Setup database connections
        try:
            self.vector_db_manager.setup_milvus(host=milvus_host, port=milvus_port)
        except Exception as e:
            print(f"Warning: Failed to set up Milvus: {e}")
        
        try:
            self.vector_db_manager.setup_neo4j(uri=neo4j_uri, username=neo4j_username, password=neo4j_password)
        except Exception as e:
            print(f"Warning: Failed to set up Neo4j: {e}")
            
        # Create retriever
        self.retriever = HybridRetriever(self.vector_db_manager)
        self.retriever.setup_langchain_retrieval()
        
        # Track processed documents with BOTH path and ID
        self.processed_documents = set()
        self.processed_doc_ids = set()
        
    def process_document(self, file_path: str) -> bool:
        """
        Process a document and store in the databases.
        Returns True if processing succeeded.
        """
        # Extract document ID from the file name
        doc_id = os.path.basename(file_path).split('.')[0]

        # Skip if document has already been processed
        if file_path in self.processed_documents:
            print(f"Document {file_path} has already been processed")
            return True
            
        print(f"Processing document: {file_path}")
        
        # Create a pipeline for this document
        pipeline = DocumentProcessingPipeline(
            analyzer=self.analyzer,
            chunker=self.chunker,
            embedding_manager=self.embedding_manager,
            vector_db_manager=self.vector_db_manager
        )
        
        # Process the document
        try:
            # Process document with unique identifier
            pipeline.process_document(file_path, doc_id=doc_id)
            
            # Mark as processed - both path and ID
            self.processed_documents.add(file_path)
            self.processed_doc_ids.add(doc_id)
            
            # Refresh retriever to include the new document
            self.refresh_retriever()
            
            return True
        except Exception as e:
            print(f"Error processing document {file_path}: {e}")
            return False
        
            
    def process_multiple_documents(self, file_paths: List[str]) -> List[str]:
        """Process multiple documents and return list of successfully processed ones."""
        successful = []
        for file_path in file_paths:
            if self.process_document(file_path):
                successful.append(file_path)
        return successful
    
    def refresh_retriever(self):
        """Refresh the retriever to recognize new documents."""
        self.retriever = HybridRetriever(self.vector_db_manager)
        self.retriever.setup_langchain_retrieval()
        
    def query(self, query_text: str, k: int = 5):
        """Query the system and return documents and LLM response."""
        documents = self.retriever.hybrid_retrieve(query_text, k=k)
        
        # Process with LLM if API key is available
        if "GOOGLE_API_KEY" in os.environ:
            gemini_processor = GeminiProcessor()
            response = gemini_processor.process_query(query_text, documents)
            return documents, response
        else:
            return documents, None

    def list_processed_documents(self):
        """Return list of processed documents."""
        return list(self.processed_documents)

### Usage Example

In [11]:
# Create a document manager
doc_manager = DocumentManager(
    milvus_host="localhost",
    milvus_port="19530", 
    neo4j_uri="bolt://localhost:7687",
    neo4j_username="neo4j",
    neo4j_password="venev"
)

# Process documents (once)
doc_paths = [
    "/workspaces/fantastic-engine/knowledge-base/data/sample_pdf.pdf"
]
successful = doc_manager.process_multiple_documents(doc_paths)
print(f"Successfully processed {len(successful)} documents")


Initialized embedding model: all-MiniLM-L6-v2
Milvus connection parameters configured.
Milvus connection test successful
Neo4j connection established and schema created.
Multi-query retriever set up with Gemini
Set up 1 retrievers for hybrid search
Processing document: /workspaces/fantastic-engine/knowledge-base/data/sample_pdf.pdf
Processing document: /workspaces/fantastic-engine/knowledge-base/data/sample_pdf.pdf (ID: sample_pdf)
Neo4j connection established and schema created.
Multi-query retriever set up with Gemini
Set up 1 retrievers for hybrid search
Processing document: /workspaces/fantastic-engine/knowledge-base/data/sample_pdf.pdf
Processing document: /workspaces/fantastic-engine/knowledge-base/data/sample_pdf.pdf (ID: sample_pdf)


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Failed to import jpype dependencies. Fallback to subprocess.
No module named 'jpype'
Failed to import jpype dependencies. Fallback to subprocess.
No module named 'jpype'


Document analysis complete. Found 206 elements.
Identified content types: table, rule_explanation, business_logic, user_scenario
Processing tables...
Processed batch 1/4
Processed batch 1/4
Processed batch 2/4
Processed batch 3/4
Processed batch 2/4
Processed batch 3/4
Processed batch 4/4
Stored triplets in Neo4j (completed batches)
Storing table data in Milvus for backup
Using sanitized collection name: doc_sample_pdf_business_logic
Processed batch 4/4
Stored triplets in Neo4j (completed batches)
Storing table data in Milvus for backup
Using sanitized collection name: doc_sample_pdf_business_logic
Stored 176 documents in Milvus collection: doc_sample_pdf_business_logic
Processing content type: rule_explanation
Created 108 chunks using hierarchical strategy
Storing 108 chunks in Milvus
Using sanitized collection name: doc_sample_pdf_rule_explanation
Stored 176 documents in Milvus collection: doc_sample_pdf_business_logic
Processing content type: rule_explanation
Created 108 chunks usin

### Querying the System

In [15]:
results, answer = doc_manager.query("Describe the refund flow when both company and customer initiate cancellation.")

# Print document summaries
print("Retrieved documents:")
for i, doc in enumerate(results):
    print(f"\nDocument {i+1} [Source: {doc.metadata.get('doc_id', 'unknown')}]:")
    content_preview = doc.page_content[:100] + "..." if len(doc.page_content) > 100 else doc.page_content
    print(content_preview)
    
    # Print similarity score if available
    if 'similarity_score' in doc.metadata:
        print(f"Similarity score: {doc.metadata['similarity_score']:.4f}")

# Format and display LLM answer
if answer:
    print("\n\nLLM Answer:")
    from IPython.display import Markdown, display
    display(Markdown(answer))
else:
    print("\n\nNo LLM answer available - API key may not be configured")

Gemini model initialized successfully
Using multi-query retriever
Using multi-query retriever
Querying retriever: milvus_sample_pdf_business_logic (k=5)
Querying retriever: milvus_sample_pdf_rule_explanation (k=5)
Querying retriever: milvus_sample_pdf_user_scenario (k=5)
Querying retriever: neo4j (k=5)
Querying retriever: milvus_sample_pdf_business_logic (k=5)
Querying retriever: milvus_sample_pdf_rule_explanation (k=5)
Querying retriever: milvus_sample_pdf_user_scenario (k=5)
Querying retriever: neo4j (k=5)
Gemini model initialized successfully
Gemini model initialized successfully
Retrieved documents:

Document 1 [Source: sample_pdf]:
4 Refund generated after cancellation communication should be send to net payable amount. (Click


Similarity score: 0.6561

Document 2 [Source: sample_pdf]:
4 Refund generated after cancellation communication should be send to net payable amount. (Click  
C...
Similarity score: 0.5945

Document 3 [Source: sample_pdf]:
3) Cancel & rebook Pre cancellatio

The provided documents describe the refund process for cancellations, but don't specifically differentiate between company-initiated and customer-initiated cancellations in a comprehensive way.  They mention the need for a "full refund amount" to be displayed on the pre-cancellation screen (Documents 3, 4, and 5), and that the refund amount should be calculated *without* HR points (Documents 3, 4, and 5).  Document 5 further states that this should apply to both company and customer-initiated cancellations.

Document 2 discusses a BAU (Business As Usual) process where refunds might involve a differential amount if full premium plus HR points were paid, but it's unclear how this applies to company-initiated cancellations specifically.

Missing information:  A clear and distinct breakdown of the refund calculation process for company and customer-initiated cancellations is not present in the documents.  Details on how the refund amount is actually calculated (beyond the exclusion of HR points) are not given.


In [12]:
%%bash
docker rm $(docker ps -a -q) -f
sudo rm -rf /workspaces/fantastic-engine/knowledge-base/volumes/*
sudo rm -rf /workspaces/fantastic-engine/volumes/*

5ea01b4adafd
63517eda4fc1
63517eda4fc1


In [13]:
%%bash
docker compose up -d
bash /workspaces/fantastic-engine/standalone_embed.sh start

 Container neo4j-graph-db  Creating
 Container neo4j-graph-db  Created
 Container neo4j-graph-db  Starting
 Container neo4j-graph-db  Created
 Container neo4j-graph-db  Starting
 Container neo4j-graph-db  Started
 Container neo4j-graph-db  Started


Wait for Milvus Starting...
Start successfully.
To change the default Milvus configuration, add your settings to the user.yaml file and then restart the service.
Start successfully.
To change the default Milvus configuration, add your settings to the user.yaml file and then restart the service.
