In [None]:
!pip install sentence-transformers transformers faiss-gpu pymupdf nltk datasets faiss-cpu langdetect

In [None]:
import os
from sentence_transformers import SentenceTransformer
from transformers import pipeline, T5ForConditionalGeneration, T5Tokenizer
import torch
import faiss
import gc
import numpy as np
import fitz
import re
import unicodedata
import spacy
import nltk
import json
nltk.download('punkt')
from nltk.tokenize import word_tokenize, sent_tokenize
from langdetect import detect, LangDetectException

In [3]:
class PDFValidator:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")  # Load English model for spaCy

    def is_valid_pdf(self, pdf_path):
        # Check if the file is a valid PDF and not empty
        try:
            document = fitz.open(pdf_path)  # Open the PDF file
            if document.page_count == 0:
                print(f"PDF is empty: {pdf_path}")
                return False
            text = ""
            for page_num in range(document.page_count):
                page = document.load_page(page_num)  # Load each page
                text += page.get_text()  # Extract text from the page
            document.close()  # Close the PDF file
            if not text.strip():
                print(f"PDF has no extractable text: {pdf_path}")
                return False
            if not self.is_english(text):
                print(f"PDF is not in English: {pdf_path}")
                return False
            return True  # PDF is valid if it passes all checks
        except Exception as e:
            print(f"Error opening PDF: {e}")
            return False

    def is_english(self, text):
        # Check if the text is in English
        try:
            lang = detect(text)  # Detect the language of the text
            return lang == 'en'  # Return True if language is English ('en')
        except LangDetectException:
            return False  # Return False if an error occurs during language detection

In [4]:
class QuestionValidator:
    @staticmethod
    def is_valid_question(question):
        # Validate the question
        if not question.strip() or question.isdigit():
            return False  # Return False if the question is empty or consists only of digits
        return True  # Return True if the question is valid


In [5]:
class PDFQASystem:
    def __init__(self, t5_model_name='google/flan-t5-xl', sentence_transformer_model_name='sentence-transformers/multi-qa-mpnet-base-dot-v1', device='cuda'):
        """
        Initialize the PDFQASystem with specified models and device.

        Args:
        - t5_model_name (str): Name or path of the T5 model to use for question answering.
        - sentence_transformer_model_name (str): Name or path of the Sentence Transformer model for sentence embeddings.
        - device (str): Device to run the models on ('cuda' for GPU, 'cpu' for CPU).

        Raises:
        - Exception: If there's an error loading the specified models.
        """
        self.device = device
        self.sentence_transformer_model_name = sentence_transformer_model_name
        self.nlp = spacy.load("en_core_web_sm")  # Load spaCy English model

        # Load T5 model and tokenizer, and Sentence Transformer model
        try:
            self.t5_model = T5ForConditionalGeneration.from_pretrained(t5_model_name).to(device)
            self.tokenizer = T5Tokenizer.from_pretrained(t5_model_name)
            self.sentence_model = SentenceTransformer(sentence_transformer_model_name).to(device)
        except Exception as e:
            print(f"Error loading models: {e}")
            raise

        self.embeddings = None  # Placeholder for embeddings
        self.index = None  # Placeholder for Faiss index
        self.chunks = []  # List to store text chunks extracted from PDFs

    def extract_text_from_pdf(self, pdf_path):
        """
        Extract text content from a PDF file.

        Args:
        - pdf_path (str): Path to the PDF file.

        Returns:
        - str: Extracted text content from the PDF.

        Raises:
        - Exception: If there's an error extracting text from the PDF.
        """
        try:
            document = fitz.open(pdf_path)  # Open the PDF file
            text = ""
            for page_num in range(document.page_count):
                page = document.load_page(page_num)  # Load each page
                text += page.get_text()  # Extract text from the page
            document.close()  # Close the PDF file
            return text
        except Exception as e:
            print(f"Error extracting text from PDF: {e}")
            raise

    def clean_text(self, text):
        """
        Clean and normalize text.

        Args:
        - text (str): Text to clean and normalize.

        Returns:
        - str: Cleaned and normalized text.
        """
        text = re.sub(r'\s+', ' ', text)  # Replace multiple spaces with single space
        text = text.lower()  # Convert text to lowercase
        text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('ascii')  # Remove accents
        return text

    def chunk_text_by_tokens(self, document, max_token_limit=128):
        """
        Chunk the document into segments based on token length.

        Args:
        - document (str): Text document to chunk.
        - max_token_limit (int): Maximum token length per chunk.

        Returns:
        - list: List of text chunks.
        """
        text = self.nlp(document)  # Process text with spaCy
        sentences = list(text.sents)  # Split text into sentences

        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence_text = str(sentence).strip()
            tokens = self.tokenizer.encode(sentence_text)  # Tokenize sentence
            sentence_length = len(tokens)

            if current_length + sentence_length > max_token_limit:
                chunks.append(' '.join(current_chunk))
                current_chunk = [sentence_text]
                current_length = sentence_length
            else:
                current_chunk.append(sentence_text.strip())
                current_length += sentence_length

        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return chunks

    def generate_embeddings(self, chunks):
        """
        Generate embeddings for text chunks using Sentence Transformer model.

        Args:
        - chunks (list): List of text chunks.

        Returns:
        - torch.Tensor: Tensor of embeddings for all chunks.
        """
        embeddings = []

        for i in range(0, len(chunks), 10):  # Process chunks in batches of 10
            batch = chunks[i:i+10]
            batch_embeddings = self.sentence_model.encode(batch, convert_to_tensor=True, show_progress_bar=True)  # Encode chunks into embeddings
            embeddings.append(batch_embeddings.cpu().numpy())  # Append batch embeddings to list
            del batch_embeddings  # Delete batch embeddings to free memory
            torch.cuda.empty_cache()  # Clear GPU memory cache
            gc.collect()  # Perform garbage collection

        return torch.tensor(np.vstack(embeddings))  # Stack embeddings and convert to PyTorch tensor

    def setup_vector_database(self, embeddings):
        """
        Setup Faiss vector database for efficient similarity search.

        Args:
        - embeddings (torch.Tensor): Tensor of embeddings.

        Returns:
        - faiss.IndexFlatL2: Faiss index for embeddings.
        """
        dimension = embeddings.shape[1]  # Get embedding dimension
        index = faiss.IndexFlatL2(dimension)  # Initialize Faiss index
        index.add(embeddings.cpu().numpy())  # Add embeddings to Faiss index
        return index

    def get_query_embedding(self, query):
        """
        Generate embedding for a query using Sentence Transformer model.

        Args:
        - query (str): Query string.

        Returns:
        - torch.Tensor: Embedding for the query.
        """
        query_embedding = self.sentence_model.encode([query], convert_to_tensor=True, show_progress_bar=True)  # Encode query into embedding
        return query_embedding

    def search_relevant_chunks(self, query_embedding, top_k=3):
        """
        Search for relevant chunks based on query embedding.

        Args:
        - query_embedding (torch.Tensor): Embedding for the query.
        - top_k (int): Number of top chunks to retrieve.

        Returns:
        - list: List of relevant chunks.
        """
        scores, indices = self.index.search(query_embedding.cpu().numpy(), top_k)  # Perform similarity search using Faiss index
        relevant_chunks = [self.chunks[idx] for idx in indices[0]]  # Get relevant chunks based on indices
        return relevant_chunks

    def generate_answer(self, query, relevant_chunks):
        """
        Generate an answer to the query based on relevant chunks.

        Args:
        - query (str): Query string.
        - relevant_chunks (list): List of relevant text chunks.

        Returns:
        - str: Generated answer to the query.
        """
        prompt = f"Question: {query}\nContext:\n"  # Define prompt for T5 model
        for chunk in relevant_chunks:
            prompt += f"{chunk}\n"  # Add relevant chunks to context in prompt
        prompt += "Answer:"
        input_ids = self.tokenizer.encode(prompt, return_tensors='pt').to(self.device)  # Tokenize and encode prompt
        output = self.t5_model.generate(input_ids, max_length=512, early_stopping=False, num_beams=5, no_repeat_ngram_size=2)  # Generate answer using T5 model
        answer = self.tokenizer.decode(output[0], skip_special_tokens=True)  # Decode generated answer
        return answer

    def process_documents(self, pdf_paths):
        """
        Process multiple PDF documents to extract text, clean, chunk, generate embeddings, and setup index.

        Args:
        - pdf_paths (list): List of paths to PDF documents.
        """
        all_chunks = []
        for pdf_path in pdf_paths:
            text = self.extract_text_from_pdf(pdf_path)  # Extract text from PDF
            text = self.clean_text(text)  # Clean and normalize text
            chunks = self.chunk_text_by_tokens(text)  # Chunk text into segments
            all_chunks.extend(chunks)  # Extend list of all chunks

        print('Total number of chunks:', len(all_chunks))

        self.chunks = all_chunks  # Store all chunks
        self.embeddings = self.generate_embeddings(all_chunks)  # Generate embeddings for all chunks
        self.index = self.setup_vector_database(self.embeddings)  # Setup Faiss index for embeddings

    def ask_question(self, query, top_k=3):
        """
        Process a question to find relevant chunks, generate an answer, and return the answer.

        Args:
        - query (str): Question to answer.
        - top_k (int): Number of top relevant chunks to consider.

        Returns:
        - str: Generated answer to the question.
        """
        query_embedding = self.get_query_embedding(query)  # Generate embedding for the query
        relevant_chunks = self.search_relevant_chunks(query_embedding, top_k)  # Search for relevant chunks
        print('Relevant Chunks:', relevant_chunks)

        answer = self.generate_answer(query, relevant_chunks)  # Generate answer based on relevant chunks
        print(f"Answer: {answer}")
        return answer

In [6]:
import json  # Import JSON module for JSON parsing

class SimpleParser:
    @staticmethod
    def parse(raw_answer):
        """
        Static method to parse a raw answer by stripping leading and trailing whitespace.

        Args:
        - raw_answer (str): Raw answer string.

        Returns:
        - str: Parsed answer with stripped whitespace.
        """
        return raw_answer.strip()

class JSONParser:
    @staticmethod
    def parse(raw_answer):
        """
        Static method to parse a raw answer into JSON format.

        Args:
        - raw_answer (str): Raw answer string.

        Returns:
        - str: JSON-encoded answer with {"answer": raw_answer} format.
        """
        return json.dumps({"answer": raw_answer.strip()})

class ResponseParser:
    def __init__(self, parser_type):
        """
        Initialize ResponseParser with a specified parser type.

        Args:
        - parser_type (str): Type of parser to use ('simple' or 'json').

        Raises:
        - ValueError: If an invalid parser type is provided.
        """
        if parser_type == 'simple':
            self.parser = SimpleParser()  # Initialize SimpleParser instance
        elif parser_type == 'json':
            self.parser = JSONParser()  # Initialize JSONParser instance
        else:
            raise ValueError("Invalid parser type. Supported types: 'simple', 'json'")

    def parse_answer(self, raw_answer):
        """
        Parse a raw answer using the selected parser.

        Args:
        - raw_answer (str): Raw answer string.

        Returns:
        - str: Parsed answer based on the selected parser's logic.
        """
        return self.parser.parse(raw_answer)  # Delegate parsing to the selected parser


In [None]:
def main():
    pdf_paths = []
    qa_system = PDFQASystem()  # Initialize PDF Question Answering System
    pdf_validator = PDFValidator()  # Initialize PDF validator for checking PDF validity
    question_validator = QuestionValidator()  # Initialize question validator for checking question validity

    parser_type = input("Enter parser type ('simple' or 'json'): ").lower()  # Prompt user to choose parser type
    response_parser = ResponseParser(parser_type)  # Initialize response parser based on user's choice

    # Collect paths of PDFs from user input until 'done' is entered
    while True:
        pdf_path = input("Enter PDF path (or 'done' to finish): ")
        if pdf_path.lower() == 'done':
            break
        pdf_paths.append(pdf_path)

    valid_pdf_paths = []
    # Validate each PDF path and filter out invalid ones
    for path in pdf_paths:
        if not pdf_validator.is_valid_pdf(path):
            print(f"Invalid PDF: {path}")
        else:
            valid_pdf_paths.append(path)

    if valid_pdf_paths:
        qa_system.process_documents(valid_pdf_paths)  # Process valid PDFs to extract information
    else:
        print("No valid PDFs to process.")

    # Prompt user to ask questions until 'exit' is entered
    while True:
        question = input("Enter your question (or 'exit' to quit): ")
        if question.lower() == 'exit':
            break

        if not question_validator.is_valid_question(question):
            print("Invalid question. Please enter a valid non-empty string question.")
            continue

        raw_answer = qa_system.ask_question(question)  # Get raw answer from QA system
        parsed_answer = response_parser.parse_answer(raw_answer)  # Parse raw answer based on chosen parser
        print(parsed_answer)  # Display parsed answer to the user

if __name__ == "__main__":
    main()
