In [10]:
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
import faiss
import fitz  # PyMuPDF
import re
import numpy as np
import json
from typing import List, Dict
import os

class DocumentProcessor:
    def __init__(self, chunk_size: int = 400, overlap: int = 70, embedding_model: str = "sentence-transformers/all-mpnet-base-v2", es_user: str = "elastic", es_password: str = "elastic"):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.es = Elasticsearch(
            "http://localhost:9200",
            basic_auth=(es_user, es_password)
        )
        self.embedding_model = SentenceTransformer(embedding_model)
        self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
        
        # Initialize FAISS index
        self.index = faiss.IndexFlatIP(self.embedding_dim)  # Inner product for cosine similarity
        self.document_store = []  # Store document chunks with their embeddings
    
    def pdf_to_text(self, pdf_path: str) -> str:
        """Extract text from PDF file."""
        doc = fitz.open(pdf_path)
        text = ""
        for page in doc:
            text += page.get_text()
        return text

    def clean_text(self, text: str) -> str:
        """Clean and normalize text."""
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        # Remove special characters but keep basic punctuation
        text = re.sub(r'[^\w\s.,;?!-]', '', text)
        return text

    def segment_text(self, text: str) -> List[Dict[str, str]]:
        """Split text into manageable chunks with overlap."""
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), self.chunk_size - self.overlap):
            chunk = ' '.join(words[i:i + self.chunk_size])
            if chunk:
                chunks.append({
                    'content': chunk,
                    'start_index': i,
                    'length': len(chunk)
                })
        return chunks

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for text chunks."""
        embeddings = self.embedding_model.encode(
            texts,
            batch_size=32,
            show_progress_bar=True,
            convert_to_numpy=True
        )
        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings)
        return embeddings
    
    def index_document(self, doc_id: str, title: str, chunks: List[Dict[str, str]], embeddings: np.ndarray):
        """Index document chunks in both Elasticsearch and FAISS."""
        # Create Elasticsearch index if it doesn't exist
        if not self.es.indices.exists(index="fifa_laws"):
            self.es.indices.create(
                index="fifa_laws",
                mappings={
                    "properties": {
                        "title": {"type": "text"},
                        "content": {"type": "text"},
                        "start_index": {"type": "integer"},
                        "length": {"type": "integer"},
                        "chunk_id": {"type": "keyword"}
                    }
                }
            )
        
        # Index each chunk
        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            chunk_id = f"{doc_id}_{i}"
            
            # Store in Elasticsearch
            self.es.index(
                index="fifa_laws",
                id=chunk_id,
                document={
                    "title": title,
                    "content": chunk["content"],
                    "start_index": chunk["start_index"],
                    "length": chunk["length"],
                    "chunk_id": chunk_id
                }
            )
            
            # Store in FAISS
            self.index.add(embedding.reshape(1, -1))
            
            # Store metadata
            self.document_store.append({
                "chunk_id": chunk_id,
                "title": title,
                "content": chunk["content"]
            })
    
    def process_document(self, pdf_path: str, doc_id: str, title: str):
        """Process a complete document from PDF to indexed chunks."""
        # Extract text from PDF
        text = self.pdf_to_text(pdf_path)
        
        # Clean the text
        cleaned_text = self.clean_text(text)
        
        # Segment into chunks
        chunks = self.segment_text(cleaned_text)
        
        # Generate embeddings
        embeddings = self.generate_embeddings([chunk["content"] for chunk in chunks])
        
        # Index the chunks
        self.index_document(doc_id, title, chunks, embeddings)
        
        return len(chunks)

    def save_indexes(self, path: str):
        """Save FAISS index and document store to disk."""
        os.makedirs(path, exist_ok=True)
        faiss.write_index(self.index, f"{path}/fifa_laws.faiss")
        with open(f"{path}/document_store.json", 'w') as f:
            json.dump(self.document_store, f)
    
    def load_indexes(self, path: str):
        """Load FAISS index and document store from disk."""
        self.index = faiss.read_index(f"{path}/fifa_laws.faiss")
        with open(f"{path}/document_store.json", 'r') as f:
            self.document_store = json.load(f)
# already executed
# Initialize the DocumentProcessor with Elasticsearch credentials
doc_processor = DocumentProcessor(es_user="elastic", es_password="elastic")

# # Process a document (replace 'path/to/your/document.pdf' with the actual path to your PDF file)
doc_processor.process_document(pdf_path=r'C:\Users\mouni\OneDrive\Bureau\Fifa_Law_Consultant\data\FIFA_Legal_HB_EN V7.pdf', doc_id='doc1', title='Document Title')

# # Save the indexes to the specified directory
doc_processor.save_indexes('./indexes_with_overlap_70_chunks_400')

Batches: 100%|██████████| 35/35 [09:33<00:00, 16.38s/it]


In [2]:
from elasticsearch import Elasticsearch

es = Elasticsearch(
    "http://localhost:9200",
    http_auth=("elastic", "elastic")
)

# Check if the connection is successful
if es.ping():
    print("Connected to Elasticsearch")
else:
    print("Could not connect to Elasticsearch")

Connected to Elasticsearch


  es = Elasticsearch(


In [3]:
import fitz  # PyMuPDF
import re
import numpy as np
import json
import faiss
from elasticsearch import Elasticsearch
from huggingface_hub import hf_hub_download
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import List, Dict
import torch

In [19]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict
import uvicorn
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
import faiss
import json
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer

class HybridRetriever:
    def __init__(self, es_user: str = "elastic", es_password: str = "your_password", embedding_model: str = "sentence-transformers/all-mpnet-base-v2"):
        self.es = Elasticsearch(
            "http://localhost:9200",
            basic_auth=(es_user, es_password)
        )
        # Initialize the embedding model
        self.embedding_model = SentenceTransformer(embedding_model)
        self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
        
        # Initialize FAISS index
        self.index = faiss.IndexFlatIP(self.embedding_dim)  # Inner product for cosine similarity

    def load_indexes(self, path: str):
        """Load FAISS index and document store from disk."""
        self.index = faiss.read_index(f"{path}/fifa_laws.faiss")
        with open(f"{path}/document_store.json", 'r') as f:
            self.document_store = json.load(f)

    def elasticsearch_search(self, query: str, top_k: int = 10) -> List[Dict]:
        """Perform keyword-based search using Elasticsearch."""
        response = self.es.search(
            index="fifa_laws",
            body={
                "query": {
                    "multi_match": {
                        "query": query,
                        "fields": ["content", "title"],
                        "type": "best_fields",
                        "tie_breaker": 0.3,
                        "minimum_should_match": "80%"
                    }
                },
                "size": top_k
            }
        )

        results = []
        for hit in response['hits']['hits']:
            results.append({
                "title": hit["_source"]["title"],
                "content": hit["_source"]["content"],
                "score": hit["_score"]
            })
        return results

    def faiss_search(self, query: str, top_k: int = 10) -> List[Dict]:
        """Perform vector-based search using FAISS."""
        query_embedding = self.embedding_model.encode([query], convert_to_numpy=True)
        faiss.normalize_L2(query_embedding)
        distances, indices = self.index.search(query_embedding, top_k)

        results = []
        for idx, dist in zip(indices[0], distances[0]):
            if idx == -1:
                continue
            result = self.document_store[idx]
            result["score"] = float(dist)
            results.append(result)
        return results

    def hybrid_search(self, query: str, top_k: int = 10) -> List[Dict]:
        """Combine results from both search methods."""
        # Get results from both methods
        es_results = self.elasticsearch_search(query, top_k * 2)
        faiss_results = self.faiss_search(query, top_k * 2)

        # Normalize scores
        max_es_score = max([res["score"] for res in es_results], default=1)
        max_faiss_score = max([res["score"] for res in faiss_results], default=1)

        for res in es_results:
            res["score"] /= max_es_score
        for res in faiss_results:
            res["score"] /= max_faiss_score

        # Combine and sort results
        combined_results = es_results + faiss_results
        combined_results = sorted(combined_results, key=lambda x: x["score"], reverse=True)

        return combined_results[:top_k]

class RAGPipeline:
    def __init__(self, model_name: str = "gpt2", embedding_model: str = "sentence-transformers/all-mpnet-base-v2", index_path: str = "./indexes"):
        # Initialize the retriever
        self.retriever = HybridRetriever(es_user="elastic", es_password="your_password", embedding_model=embedding_model)
        self.retriever.load_indexes(index_path)
        # Initialize the language model
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForCausalLM.from_pretrained(model_name)
        self.generator = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer)

    def generate_response(self, question: str) -> Dict:
        # Perform hybrid search
        search_results = self.retriever.hybrid_search(question)
        
        # Combine the context from the top search results
        context = " ".join([result["content"] for result in search_results[:3]])
        
        # Generate a response using the language model
        generated_text = self.generator(f"Question: {question}\nContext: {context}\nAnswer:", max_length=150, num_return_sequences=1)[0]["generated_text"]
        
        # Extract the answer from the generated text
        answer = generated_text.split("Answer:")[-1].strip()
        print("gpt2 answer",answer)
        return {
            "answer": answer,
            "context": context,
            "sources": search_results
        }

# Initialize the FastAPI app and RAG pipeline
app = FastAPI(title="FIFA Law Consultation API")

# Initialize the RAG pipeline
rag_pipeline = RAGPipeline(index_path='./indexes_with_overlap_70_chunks_400')

# Define the Query model for API input
class Query(BaseModel):
    question: str

# Define the Source model to represent the sources returned in the response
class Source(BaseModel):
    title: str
    content: str
    relevance_score: float
    chunk_id: str  # Add chunk_id to the Source model

# Define the Response model to structure the response returned to the user
class Response(BaseModel):
    answer: str
    confidence: float
    sources: List[Source]
    context: Optional[str] = None

# Create a POST endpoint to handle chat queries and generate responses using the RAG pipeline
@app.post("/api/chat", response_model=Response)
async def chat_endpoint(query: Query):
    try:
        print("ok")
        # Generate response using the RAG pipeline
        result = rag_pipeline.generate_response(query.question)
        print(result)
        # Calculate confidence score (for now, using a placeholder value or method)
        confidence = 0.9  # Replace this with your actual confidence scoring method
        
        # Construct the response model
        sources = [
            Source(
                title=source["title"],
                content=source["content"],
                relevance_score=source["score"],
                chunk_id=source.get("chunk_id", "")  # Include chunk_id in the response
            ) for source in result.get("sources", [])
        ]
        
        # Return response
        return Response(
            answer=result["answer"],
            confidence=confidence,
            sources=sources,
            context=result["context"] if confidence > 0.5 else None
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Create a POST endpoint to directly access the hybrid search results
@app.post("/api/search")
async def search_endpoint(query: Query):
    """Endpoint to directly access the hybrid search results."""
    try:
        retriever = rag_pipeline.retriever
        results = retriever.hybrid_search(query.question)
        
        # Transform the search results into the required format
        sources = [
            Source(
                title=source["title"],
                content=source["content"],
                relevance_score=source["score"],
                chunk_id=source.get("chunk_id", "")  # Include chunk_id in the response
            ) for source in results
        ]
        
        return {"results": sources}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# To run the FastAPI app, use the following command from the terminal (commented out here):
# uvicorn filename:app --reload

if __name__ == "__main__":
    # Test retriever functionality (optional)
    retriever = HybridRetriever(es_user="elastic", es_password="elastic")
    retriever.load_indexes('./indexes_with_overlap_70_chunks_400')
    results = retriever.hybrid_search("What are the obligations of member associations according to FIFA statutes?")
    print("Test search results:", results[:2])

Device set to use cpu


Test search results: [{'title': 'Document Title', 'content': 'Congress; c to nominate candidates for the FIFA presidency and the Council; d to participate in and cast their votes at all FIFA elections in accordance with the FIFA Governance Regulations; e to take part in competitions organised by FIFA; f to take part in FIFAs assistance and development programmes; and g to exercise all other rights arising from these Statutes and other regulations. 2. The exercise of these rights is subject to other provisions in these Statutes and the applicable regulations. 17 17 TABLE OF CONTENTS FIFA LEGAL HANDBOOOK TABLE OF CONTENTS FIFA STATUTES 2024 ED. 14. Member associations obligations 1. Member associations have the following obligations a to comply fully with the Statutes, regulations, directives and decisions of FIFA bodies at any time as well as the decisions of the Court of Arbitration for Sport CAS passed on appeal on the basis of article 49 paragraph 1 of the FIFA Statutes; b to take pa