<a href="https://colab.research.google.com/github/shreyyeahh/Nokia_Internship/blob/main/single_pdf_extraction_using_docling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture
#! pip install -U ipywidgets
! pip install docling
! pip install pdf2image
! apt-get update && apt-get install -y poppler-utils

In [None]:
import logging
import time
from pathlib import Path
import pandas as pd
from pdf2image import convert_from_path
import matplotlib.pyplot as plt
from PIL import Image
import os
from tqdm.auto import tqdm
import textwrap
import torch
import openai
from docling.datamodel.base_models import InputFormat
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode

In [None]:
from sklearn.metrics.pairwise import cosine_similarity
#!pip install sentence_transformers
import numpy as np

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
# Sentence-Transformers for semantic embeddings
#from sentence_transformers import SentenceTransformer

In [None]:
try:
    from google.colab import userdata
    # Set the OpenAI API key from Colab Secrets
    os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')
    print("OpenAI API Key set successfully.")
except (ImportError, KeyError):
    print("Warning: Could not find 'OPENAI_API_KEY' in Colab Secrets.")
    print("Please set it manually or the function will fail.")
    # For local development, you might set it directly, but this is not recommended for notebooks:
    # os.environ['OPENAI_API_KEY'] = "YOUR_SK-..."

In [None]:
_log = logging.getLogger(__name__)

In [None]:
def display_file(file_path):
    """
    Display a PDF file or an image file.

    Args:
        file_path (str): Path to the PDF or image file.
    """
    # Check file extension to determine the type
    file_extension = os.path.splitext(file_path)[1].lower()

    if file_extension == '.pdf':
        # Convert and display PDF pages as images
        images = convert_from_path(file_path)
        for i, image in enumerate(images):
            plt.figure(figsize=(16, 12))
            plt.imshow(image)
            plt.axis('off')  # Hide axis for a cleaner look
            plt.show()
    elif file_extension in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']:
        # Open and display the image file
        image = Image.open(file_path)
        plt.figure(figsize=(16, 12))
        plt.imshow(image)
        plt.axis('off')  # Hide axis for a cleaner look
        plt.show()
    else:
        raise ValueError("Unsupported file type. Please provide a PDF or image file.")

In [None]:
def extract_data_with_docling(input_data_path):
    """
    Extracts data from a PDF or image file using the Docling library.
    Displays the document's content as markdown and exports any tables found in the document to CSV files.

    Args:
        input_data_path (str): The path to the input file (PDF or image).
    """ # Initialize pipeline options with table structure analysis enabled
    pipeline_options = PdfPipelineOptions(do_table_structure=True)
    pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE  # use more accurate TableFormer model

    # Create a document converter with specified format options
    doc_converter = DocumentConverter(
        allowed_formats=[
                InputFormat.PDF,
                InputFormat.IMAGE,
            ],  # whitelist formats, non-matching files are ignored.
         format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        }
    )
    result = doc_converter.convert(input_data_path)
    print(result.document.export_to_markdown())
    print("\n" + "="*50 + "\n")
    all_tables_data = []
    for table_ix, table in enumerate(result.document.tables):
        table_df: pd.DataFrame = table.export_to_dataframe()
        table_content_string = table_df.to_string()
        all_tables_data.append({
            "table_number": table_ix + 1,
            "table_content": table_content_string
        })
    summary_df = pd.DataFrame(all_tables_data)
    return summary_df

In [None]:
# TfidfVectorizer (and similar tools) includes a default tokenizer Handling of Stop Words:
# Implicit Handling:
# Even without explicitly removing stop words beforehand, TfidfVectorizer can implicitly reduce their impact
def create_keyword_index(df: pd.DataFrame):
    """Creates and returns a TF-IDF vectorizer and matrix for keyword search."""
    print("\nCreating TF-IDF keyword index...")
    corpus = df['table_content'].tolist()
    vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, 3))
    tfidf_matrix = vectorizer.fit_transform(corpus)
    print("Keyword index created successfully.")
    return vectorizer, tfidf_matrix

In [None]:
def retrieve_top_chunks(query , vectorizer, original_df , tfidf_matrix, top_n = 1):
    """
    Retrieves the top N most similar chunks for a given query.
    """
    # 2. Vectorize the query using the SAME fitted vectorizer
    query_vector = vectorizer.transform([query]) #passing query as list of strings
    # 3. Calculating the cosine similarity between query vector and all the table chunks(here individual rows)
    # The result is a 2D array, so we take the first (and only) row
    cosine_sim = cosine_similarity(query_vector , tfidf_matrix).flatten()
    # 4. Get the indices of the top N scores
    # argsort() gives indices that would sort the array, we reverse it and take the top N
    # [0.04 , 0.5 , 0.6 , 0.7,.....] -> sorting in descending and storing their original indices [3,2,1,0,....]
    top_indices = cosine_sim.argsort()[::-1][:top_n]
    #5. Retrieving the original table content using the top indices
    results = original_df.iloc[top_indices]
    results['similarity score'] = cosine_sim[top_indices] #adding a new column to display the cosine similarities
    return results



In [None]:
def generate_final_answer(query: str, retrieved_chunks_df: pd.DataFrame) -> str:
    try:
        client = openai.OpenAI()
    except openai.AuthenticationError:
        return "OpenAI API key is not set or is invalid."

    context = "\n---\n".join(retrieved_chunks_df['table_content'])

    answer_prompt = f"""
    Using ONLY the context provided below, give a direct and comprehensive answer to the user's question.
    If the context does not contain the information, state that the answer is not available in the documents.

    Context:
    ---
    {context}
    ---
    User's Question: {query}

    Final Answer:
    """

    try:
        answer_response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a helpful question-answering assistant that strictly uses the provided context to answer questions."},
                {"role": "user", "content": answer_prompt}
            ],
            temperature=0.1
        )
        final_answer = answer_response.choices[0].message.content
    except Exception as e:
        return f"Failed to generate a final answer. Error: {e}"

    return final_answer

In [None]:
# MAIN EXECUTION SCRIPT -------

if __name__ == "__main__":

# 1. Define the input and output file paths
    input_file = "/content/MT-799012W (1).pdf"
    output_csv_file = "extracted_tables_summary.csv"
    display_file(input_file)

# 2. Call the function to get the summary DataFrame
    extracted_tables_df = extract_data_with_docling(input_file)
    print(f"Successfully extracted tables and saved them to '{output_csv_file}'")



# 3. Save the DataFrame to a CSV file
    extracted_tables_df.to_csv(output_csv_file, index=False)

    # Creating tfidf vectors
    tfidf_vectorizer, tfidf_matrix = create_keyword_index(extracted_tables_df)




    while True:
            user_query = input("Enter your query (or type 'exit' to quit): ")
            if user_query.lower() == 'exit':
                break

            ### Retrieving Top Chunks ###
            top_results = retrieve_top_chunks(user_query, tfidf_vectorizer, extracted_tables_df, tfidf_matrix)


            print("\n--- Top Retrieved Chunks ---")
            print(top_results[['table_number', 'table_content', 'similarity score']])

            # Generating the final answer
            final_answer = generate_final_answer(user_query, top_results)

            print("\n" + "="*50)
            print("Final Answer")
            print("="*50)
            print(textwrap.fill(final_answer, width=80))
            print("="*50 + "\n")