In [9]:
import os
from langchain_openai import AzureOpenAIEmbeddings
import pandas as pd
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter,
    SpacyTextSplitter,
    NLTKTextSplitter,
    MarkdownHeaderTextSplitter,
    HTMLHeaderTextSplitter,
    PythonCodeTextSplitter,
    LatexTextSplitter
)


from services.utils import extract_text


# Default configurations
DEFAULT_CHUNK_SIZE = 1000
DEFAULT_CHUNK_OVERLAP = 200
DEFAULT_COLLECTION_NAME = "mcp"
DEFAULT_QDRANT_HOST = "localhost"
DEFAULT_QDRANT_PORT = 6333
VECTOR_SIZE = 3072


class ChunkingMethod:
    """Available chunking methods with their configurations"""
    RECURSIVE_CHARACTER = "recursive_character"
    CHARACTER = "character"
    TOKEN = "token"
    SPACY = "spacy"
    NLTK = "nltk"
    MARKDOWN_HEADER = "markdown_header"
    HTML_HEADER = "html_header"
    PYTHON_CODE = "python_code"
    LATEX = "latex"
    # Added for consistency, although not fully implemented in your provided code
    CUSTOM_TOKEN = "custom_token" 


class DocumentProcessor:
    def __init__(self, 
                 collection_name: str = DEFAULT_COLLECTION_NAME,
                 qdrant_host: str = DEFAULT_QDRANT_HOST,
                 qdrant_port: int = DEFAULT_QDRANT_PORT,
                 embedding_model: AzureOpenAIEmbeddings = None,
                 vector_size: int = VECTOR_SIZE):
        """
        Initialize DocumentProcessor with configuration parameters.
        
        Args:
            collection_name: Name of the Qdrant collection
            qdrant_host: Qdrant server host
            qdrant_port: Qdrant server port
            embedding_model: Pre-configured AzureOpenAIEmbeddings instance
            vector_size: Expected vector dimension for the collection
        """
        self.collection_name = collection_name
        self.vector_size = vector_size
        self.client = QdrantClient(host=qdrant_host, port=qdrant_port)
        
        if not self.client.collection_exists(self.collection_name):
            print(f"Collection '{self.collection_name}' does not exist. Creating it.")
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
            )
        else:
            print(f"Collection '{self.collection_name}' already exists.")

        # Use provided embedding model or raise error if not provided
        if embedding_model is None:
            raise ValueError("embedding_model must be provided. Please configure AzureOpenAIEmbeddings externally and pass it to DocumentProcessor.")
        
        self.embedding_model = embedding_model
    
    @staticmethod
    def extract_text(file_path: str) -> str:
        """Extract text from file"""
        return extract_text(file_path)

    @staticmethod
    def get_text_splitter(
        method: str = ChunkingMethod.RECURSIVE_CHARACTER,
        chunk_size: int = DEFAULT_CHUNK_SIZE,
        chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
        **kwargs
    ):
        """Get the appropriate text splitter based on method"""
        
        if method == ChunkingMethod.RECURSIVE_CHARACTER:
            return RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                length_function=len,
                separators=["\n\n", "\n", " ", ""],
                **kwargs
            )
        
        elif method == ChunkingMethod.CHARACTER:
            return CharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                separator=kwargs.get('separator', '\n\n'),
                **kwargs
            )
        
        elif method == ChunkingMethod.TOKEN:
            return TokenTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                encoding_name=kwargs.get('encoding_name', 'cl100k_base'),
                **kwargs
            )
        
        elif method == ChunkingMethod.SPACY:
            return SpacyTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                **kwargs
            )
        
        elif method == ChunkingMethod.NLTK:
            return NLTKTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                **kwargs
            )
        
        elif method == ChunkingMethod.MARKDOWN_HEADER:
            headers_to_split_on = kwargs.get('headers_to_split_on', [
                ("#", "Header 1"),
                ("##", "Header 2"),
                ("###", "Header 3"),
            ])
            return MarkdownHeaderTextSplitter(
                headers_to_split_on=headers_to_split_on,
                **kwargs
            )
        
        elif method == ChunkingMethod.HTML_HEADER:
            headers_to_split_on = kwargs.get('headers_to_split_on', [
                ("h1", "Header 1"),
                ("h2", "Header 2"),
                ("h3", "Header 3"),
            ])
            return HTMLHeaderTextSplitter(
                headers_to_split_on=headers_to_split_on,
                **kwargs
            )
        
        elif method == ChunkingMethod.PYTHON_CODE:
            return PythonCodeTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                **kwargs
            )
        
        elif method == ChunkingMethod.LATEX:
            return LatexTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                **kwargs
            )
        
        else:
            # Default to recursive character splitter
            return RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                **kwargs
            )
        
    
    # Changed to instance method as it relies on self.client and self.embedding_model
    def process_and_add_chunks_to_qdrant(self, 
        text: str, 
        method: str = ChunkingMethod.RECURSIVE_CHARACTER,
        chunk_size: int = DEFAULT_CHUNK_SIZE, 
        overlap: int = DEFAULT_CHUNK_OVERLAP,
        file_type: str = None,
        document_name: str = None,
        **kwargs
    ) -> None:
        """Split text into chunks using various methods and add them to Qdrant"""
        
        # Use custom token method if specified (placeholder for now)
        if method == ChunkingMethod.CUSTOM_TOKEN:
            # This method needs to be implemented if CUSTOM_TOKEN is to be used
            # For now, it will raise an error if called.
            raise NotImplementedError("Custom token chunking method is not implemented.")
        else:
            # Auto-select method based on file type if not specified
            if method == "auto":
                if file_type == "md":
                    method = ChunkingMethod.MARKDOWN_HEADER
                elif file_type == "py":
                    method = ChunkingMethod.PYTHON_CODE
                elif file_type == "tex":
                    method = ChunkingMethod.LATEX
                elif file_type == "html":
                    method = ChunkingMethod.HTML_HEADER
                else:
                    method = ChunkingMethod.RECURSIVE_CHARACTER
            
            # Get the appropriate splitter
            splitter = self.get_text_splitter(method, chunk_size, overlap, **kwargs)
            
            # Handle special cases for header-based splitters
            if method in [ChunkingMethod.MARKDOWN_HEADER, ChunkingMethod.HTML_HEADER]:
                chunks = splitter.split_text(text)
                # Convert Document objects to strings if needed
                chunks = [chunk.page_content if hasattr(chunk, 'page_content') else str(chunk) for chunk in chunks]
            else:
                # Standard text splitting
                chunks = splitter.split_text(text)
        
        # Add each chunk to Qdrant
        points_to_upsert = []
        for i, chunk in enumerate(chunks):
            # Embed documents accepts a list, and returns a list of embeddings.
            # We want the first (and only) embedding for the current chunk.
            try:
                embedded_text = self.embedding_model.embed_documents([chunk])[0] 
            except Exception as e:
                print(f"Error embedding chunk {i}: {e}")
                continue # Skip this chunk if embedding fails

            metadata = {
                "chunk_id": i,
                "document_name": document_name,
                "text": chunk, # Store the actual text chunk for retrieval
                "chunk_method": method,
                "file_type": file_type
            }
            points_to_upsert.append(PointStruct(id=str(uuid.uuid4()), vector=embedded_text, payload=metadata))
        
        if points_to_upsert:
            try:
                self.client.upsert(
                    collection_name=self.collection_name,
                    wait=True, # Wait for operation to complete
                    points=points_to_upsert
                )
                print(f"Successfully upserted {len(points_to_upsert)} chunks to Qdrant for document ID: {document_name}")
            except Exception as e:
                print(f"Error upserting points to Qdrant for document ID {document_name}: {e}")
        else:
            print(f"No chunks to upsert for document ID: {document_name}")


if __name__ == "__main__":
    # Example usage - configuration should be loaded externally
    # Load Azure OpenAI configuration from environment variables
    azure_embedding_endpoint = os.getenv("AZURE_OPENAI_EMBEDDING_ENDPOINT")
    azure_embedding_api_key = os.getenv("AZURE_OPENAI_EMBEDDING_API_KEY")
    azure_embedding_model = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT")
    azure_embedding_api_version = os.getenv("AZURE_OPENAI_EMBEDDING_API_VERSION")

    # Create embedding model
    embedding_model = AzureOpenAIEmbeddings(
        model=azure_embedding_model,
        azure_endpoint=azure_embedding_endpoint,
        api_key=azure_embedding_api_key,
        openai_api_version=azure_embedding_api_version
    )

    # Create a dummy mcp.md file for demonstration if it doesn't exist
    document_path = "mcp.md"
    if not os.path.exists(document_path):
        print(f"Creating a dummy file: {document_path}")
        with open(document_path, "w", encoding="utf-8") as f:
            f.write("# Meeting Minutes\n\n")
            f.write("## June 19, 2025\n\n")
            f.write("### Attendees\n")
            f.write("- Alice\n")
            f.write("- Bob\n")
            f.write("- Charlie\n\n")
            f.write("### Discussion Points\n")
            f.write("1.  **Project Alpha**: Reviewed progress. On track for phase 1 completion.\n")
            f.write("2.  **Budget Review**: Discussed Q2 expenditures. Need to optimize cloud spending.\n")
            f.write("3.  **New Initiatives**: Brainstormed ideas for next quarter. Focus on AI integration.\n\n")
            f.write("### Action Items\n")
            f.write(" - Alice: Prepare a detailed report on cloud spending by EOD.\n")
            f.write(" - Bob: Research potential AI integration partners.\n")
            f.write(" - Charlie: Schedule a follow-up meeting for next week.\n")
    else:
        print(f"Using existing file: {document_path}")

    processor = DocumentProcessor(
        collection_name="mcp",
        qdrant_host="localhost",
        qdrant_port=6333,
        embedding_model=embedding_model,
        vector_size=VECTOR_SIZE  # Adjust based on your embedding model
    )

    document_name = document_path

    try:
        # Extract text from the markdown file
        print(f"Extracting text from {document_path}...")
        document_text = processor.extract_text(document_path)
        print(f"Text extracted (first 200 chars): {document_text[:200]}...")
        # Process and add chunks to Qdrant
        print(f"Processing and adding chunks to Qdrant using '{ChunkingMethod.MARKDOWN_HEADER}' method...")
        # Use 'auto' or 'markdown_header' explicitly for .md files
        processor.process_and_add_chunks_to_qdrant(
            text=document_text,
            method="auto", # This will correctly identify markdown
            file_type="md",
            document_name=document_name,
        )
        print("Document processing complete.")

    except FileNotFoundError as e:
        print(f"Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during processing: {e}")

Using existing file: mcp.md
Collection 'mcp' does not exist. Creating it.
Extracting text from mcp.md...
Text extracted (first 200 chars): # Meeting Minutes

## June 19, 2025

### Attendees
- Alice
- Bob
- Charlie

### Discussion Points
1.  **Project Alpha**: Reviewed progress. On track for phase 1 completion.
2.  **Budget Review**: Disc...
Processing and adding chunks to Qdrant using 'markdown_header' method...
Successfully upserted 3 chunks to Qdrant for document ID: mcp.md
Document processing complete.


---

In [None]:
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Literal
import os
import asyncio
import base64
import json

from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import AzureChatOpenAI


# ----------------------------
# Document Management Routes
# ----------------------------
@app.post("/documents/upload")
async def upload_document(file: UploadFile = File(...), 
                         metadata: Optional[str] = Form(None)):
    """Upload a document file"""
    try:
        # Read file content
        file_content = await file.read()
        file_base64 = base64.b64encode(file_content).decode('utf-8')
        
        # Parse metadata if provided
        parsed_metadata = None
        if metadata:
            try:
                parsed_metadata = json.loads(metadata)
            except json.JSONDecodeError:
                raise HTTPException(status_code=400, detail="Invalid metadata JSON format")
        
        # Call document service via MCP (matching the function signature from mcp_server_document.py)
        result = await mcp_client.call_tool("DocumentService", "upload_document", {
            "file_content": file_base64,
            "filename": file.filename
        })
        
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# ----------------------------
# Health Check
# ----------------------------
@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "agent_initialized": agent is not None,
        "mcp_client_initialized": mcp_client is not None
    }

In [None]:
import os
import uuid
import base64
import aiofiles
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime

from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import httpx
import asyncio

# Initialize FastAPI app
app = FastAPI(
    title="Document Upload Service",
    description="Service for uploading and processing documents with vector embeddings",
    version="1.0.0"
)

# Configuration
UPLOAD_DIR = Path("./uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

MCP_SERVER_URL = "http://localhost:8001"  # Your MCP server URL
MAX_FILE_SIZE = 50 * 1024 * 1024  # 50MB limit
ALLOWED_EXTENSIONS = {
    'pdf', 'docx', 'doc', 'txt', 'md', 'html', 'htm', 
    'rtf', 'csv', 'xlsx', 'xls', 'pptx', 'ppt'
}

# Response models
class DocumentUploadResponse(BaseModel):
    status: str
    document_id: Optional[str] = None
    filename: str
    file_path: Optional[str] = None
    collection_id: Optional[str] = None
    upload_time: str
    processing_status: str
    error: Optional[str] = None

class DocumentProcessingRequest(BaseModel):
    file_content: str
    filename: str
    metadata: Optional[Dict[str, Any]] = None

# Utility functions
def get_file_extension(filename: str) -> str:
    """Extract file extension from filename"""
    return filename.split('.')[-1].lower() if '.' in filename else ''

def is_allowed_file(filename: str) -> bool:
    """Check if file extension is allowed"""
    return get_file_extension(filename) in ALLOWED_EXTENSIONS

async def call_mcp_server(file_content: str, filename: str) -> Dict[str, Any]:
    """Call MCP server to process the document"""
    try:
        async with httpx.AsyncClient(timeout=300.0) as client:  # 5 minute timeout
            response = await client.post(
                f"{MCP_SERVER_URL}/tools/upload_document",
                json={
                    "file_content": file_content,
                    "filename": filename
                },
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                return {
                    "status": "error",
                    "error": f"MCP server error: {response.status_code} - {response.text}"
                }
    except httpx.TimeoutException:
        return {
            "status": "error",
            "error": "MCP server timeout - processing may take longer than expected"
        }
    except Exception as e:
        return {
            "status": "error",
            "error": f"Failed to connect to MCP server: {str(e)}"
        }

# Main upload endpoint
@app.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(
    file: UploadFile = File(...),
    metadata: Optional[str] = Form(None)  # JSON string of metadata
) -> DocumentUploadResponse:
    """
    Upload a document file, store it locally, and send to MCP server for processing.
    
    Args:
        file: The uploaded file
        metadata: Optional JSON string containing additional metadata
    
    Returns:
        DocumentUploadResponse with upload and processing status
    """
    upload_time = datetime.now().isoformat()
    
    try:
        # Validate file
        if not file.filename:
            raise HTTPException(status_code=400, detail="No filename provided")
        
        if not is_allowed_file(file.filename):
            raise HTTPException(
                status_code=400, 
                detail=f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"
            )
        
        # Check file size
        file_content = await file.read()
        if len(file_content) > MAX_FILE_SIZE:
            raise HTTPException(
                status_code=413, 
                detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB"
            )
        
        # Generate unique document ID and save file
        document_id = str(uuid.uuid4())
        file_path = UPLOAD_DIR / f"{document_id}_{file.filename}"
        
        # Save file to uploads directory
        async with aiofiles.open(file_path, 'wb') as f:
            await f.write(file_content)
        
        # Encode file content to base64 for MCP server
        file_content_b64 = base64.b64encode(file_content).decode('utf-8')
        
        # Process metadata if provided
        parsed_metadata = None
        if metadata:
            try:
                import json
                parsed_metadata = json.loads(metadata)
            except json.JSONDecodeError:
                # If metadata is invalid JSON, treat as string
                parsed_metadata = {"note": metadata}
        
        # Send to MCP server for processing
        mcp_response = await call_mcp_server(
            file_content=file_content_b64,
            filename=file.file

In [6]:
import asyncio
import aiohttp
import json
import uuid
import os

async def test_process_document(file_path: str):
    """Simple test function to process a document"""
    
    # Check if file exists
    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        return
    
    # Prepare request
    filename = os.path.basename(file_path)
    document_id = str(uuid.uuid4())
    
    payload = {
        "method": "tools/call",
        "params": {
            "name": "process_document",
            "arguments": {
                "file_path": file_path,
                "filename": filename,
                "document_id": document_id
            }
        }
    }
    
    # Call the service
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                "http://localhost:8001/",
                json=payload,
                headers={"Content-Type": "application/json"}
            ) as response:
                result = await response.json()
                print(f"Status: {response.status}")
                print(f"Result: {json.dumps(result, indent=2)}")
                
        except Exception as e:
            print(f"Error: {e}")

# Test with your file path
if __name__ == "__main__":
    file_path = "data\mcp.md"
    try:
        # Try to run in existing event loop (Jupyter)
        await test_process_document(file_path)
    except:
        # Fallback for normal Python script
        asyncio.run(test_process_document(file_path))

Error: 404, message='Attempt to decode JSON with unexpected mimetype: text/plain; charset=utf-8', url='http://localhost:8001/'


In [8]:
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.session import ClientSession

async def main():
    async with streamablehttp_client("http://localhost:8009/mcp") as (read, write, _):
        async with ClientSession(read, write) as session:
            await session.initialize()

            result = await session.call_tool("get_weather", {"location": "Warsaw"})
            print(result)  # 12

import asyncio
asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop