## TASK : PDF SUMMARIZATION & KEYWORD EXTRACTION

### PDF Ingestion & Parsing Code

##### My Pipeline is able to process multiple PDFs from a folder on the desktop, i.e. if the path of a folder is provided in the code, it should ingest all the documents in that folder. It must handle documents of varying lengths:
    - Short PDFs (1-10 pages)
    - Medium PDFs (10-30 pages)
    - Long PDFs (30+ pages)


In [1]:
pip install PyPDF2 pymongo #install the requirements and dependencies

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install PyPDF2 #library for pdf processing

Note: you may need to restart the kernel to use updated packages.


##### When each PDF is ingested, its metadata (document name, path, size, etc.) are stored in a MongoDB collection.

In [2]:
# Ingestion Code
import os
from PyPDF2 import PdfReader
import pymongo
from concurrent.futures import ThreadPoolExecutor

# MongoDB Setup
client = pymongo.MongoClient("mongodb+srv://TestUser:nidhisahani@myfirstcluster.jgfeu.mongodb.net/")
db = client['pdf_summarizer']
collection = db['pdf_documents']

# Function to process a single PDF using PdfReader
def process_pdf(file_path):
    try:
        with open(file_path, 'rb') as f:
            pdf_reader = PdfReader(f)
            num_pages = len(pdf_reader.pages)
            text = ''
            for page_num in range(num_pages):
                text += pdf_reader.pages[page_num].extract_text()

        # Store metadata in MongoDB
        doc_metadata = {
            "file_name": os.path.basename(file_path),
            "file_path": file_path,
            "size": os.path.getsize(file_path),
            "num_pages": num_pages,
            "text": text,
            "summary": "",
            "keywords": []
        }
        collection.insert_one(doc_metadata)
        return doc_metadata
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# Function to handle all PDFs in the folder
def process_all_pdfs(folder_path):
    pdf_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.pdf')]
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_pdf, pdf_files))
    
    return results

# My Example
folder_path = 'ALL PDF FOLDER'
process_all_pdfs(folder_path)

[{'file_name': 'chapter1.pdf',
  'file_path': 'ALL PDF FOLDER\\chapter1.pdf',
  'size': 3236335,
  'num_pages': 25,
  'summary': '',
  'keywords': [],
  '_id': ObjectId('6716d1f545bda2ad8b8dd866')},
 {'file_name': 'chapter2.pdf',
  'file_path': 'ALL PDF FOLDER\\chapter2.pdf',
  'size': 5296661,
  'num_pages': 30,
  'text': ' THEMES IN INDIAN HISTORY – PART  II 140\nWe saw in Chapter 4 that by the mid-first\nmillennium CE the landscape of the subcontinent\nwas dotted with a variety of religious structures –\nstupas , monasteries, temples. If these typified\ncertain religious beliefs and practices, others have\nbeen reconstructed from textual traditions,\nincluding the Puranas , many of which received\ntheir present shape around the same time, and yet\nothers remain only faintly visible in textual and\nvisual records.\nNew textual sources available from this period\ninclude compositions attributed to poet-saints,\nmost of whom expressed themselves orally in\nregional languages used by or

### Summarization Code

##### Dynamically generate summaries and the summary length and detail are correspond to the document length. 

In [3]:
pip install transformers #library to summarize the pdf text


Note: you may need to restart the kernel to use updated packages.


In [35]:
from transformers import pipeline

# Summarization pipeline with an model
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")

# Function to summarize text
def summarize_text(text):
    try:
        # Adjust max_length based on the input length
        input_length = len(text.split())
        max_length = min(130, input_length - 5)  # Make max_length a bit shorter than input
        min_length = min(30, input_length // 2)  # Adjust min_length based on input

        summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
        return summary[0]['summary_text']
    except Exception as e:
        print(f"Error summarizing text: {e}")
        return ""

# My Example summarization with little text
text = "This is a sample long text extracted from a PDF. Add more text here to test the summarization process. The text should have enough words to make a meaningful summary."
summary = summarize_text(text)
print(summary)


 This is a sample long text extracted from a PDF . Add more text here to test the summarization process .


### Keyword Extraction Code

##### : Extract  non-generic , domain-specific keywords that reflect key ideas or themes of the document. This code avoid common or irrelevant keywords.

In [36]:
pip install keybert #library to extract keyword




In [39]:
pip install transformers sentence-transformers scikit-learn

Note: you may need to restart the kernel to use updated packages.


In [40]:
from keybert import KeyBERT

# Keyword extraction using KeyBERT
kw_model = KeyBERT()

def extract_keywords(text):
    # Extract keywords, unigrams and bigrams, and remove stop words
    keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 2), stop_words='english')
    return [kw[0] for kw in keywords]

# My Example 
text = "This is a sample long text extracted from a PDF document. KeyBERT can extract meaningful keywords from this text."
keywords = extract_keywords(text)
print(keywords)

['keywords text', 'keybert extract', 'document keybert', 'keywords', 'text extracted']


##### After summarization and keyword extraction, the MongoDB entry for each document are updated with the JSON output, including the generated summary and extracted keywords.

### Update MongoDB with Summaries & Keywords Code

##### Summaries and keywords are formatted in JSON, which will then be stored in the MongoDB document. It handle updates efficiently after processing each document.

In [62]:
def update_mongodb_with_summary_and_keywords(doc_metadata):
    summary = summarize_text(doc_metadata['text'])
    keywords = extract_keywords(doc_metadata['text'])
    
    # Update MongoDB
    collection.update_one(
        {"_id": doc_metadata["_id"]},
        {"$set": {"summary": summary, "keywords": keywords}}
    )

# My Example of updating a document
for doc_metadata in collection.find():
    update_mongodb_with_summary_and_keywords(doc_metadata)

### Performance Metrics Code

#### To monitor the performance , I used time module to calculate the time taken for each PDF, with complete steps of Code at once.

##### I provide some data on how well my system scales, especially in terms of how quickly it processes large and multiple PDFs concurrently.

In [68]:
import os
import time
import logging
from concurrent.futures import ThreadPoolExecutor
from pymongo import MongoClient
from bson import ObjectId
from transformers import pipeline
from keybert import KeyBERT

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Initialize MongoDB client
client = MongoClient('mongodb+srv://TestUser:nidhisahani@myfirstcluster.jgfeu.mongodb.net/admin')  # My MongoDB connection string
db = client['TestUser']  # My database name
collection = db['Collection Name']  # My collection name

# Summarization pipeline
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
kw_model = KeyBERT()

def summarize_text(text):
    try:
        input_length = len(text.split())
        max_length = min(130, input_length - 5)
        min_length = min(30, input_length // 2)

        summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
        return summary[0]['summary_text']
    except Exception as e:
        logging.error(f"Error summarizing text: {e}")
        return ""

def extract_keywords(text):
    keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 2), stop_words='english')
    return [kw[0] for kw in keywords]

def update_or_create_document(text, document_id):
    summary = summarize_text(text)
    keywords = extract_keywords(text)

    # Make the update query
    update_query = {
        'summary': summary,
        'keywords': keywords
    }

    # We Check if the document exists
    existing_document = collection.find_one({'_id': document_id})

    if existing_document:
        # Document exists, perform the update
        result = collection.update_one({'_id': document_id}, {'$set': update_query})
        if result.modified_count > 0:
            logging.info(f"Document {document_id} updated successfully.")
        else:
            logging.info(f"No changes made to document {document_id}.")
    else:
        # Document does not exist, insert a new document
        new_document = {
            '_id': document_id,
            'summary': summary,
            'keywords': keywords
        }
        collection.insert_one(new_document)
        logging.info(f"New document {document_id} created successfully.")

def process_pdf(file_path):
    # implementation of PDF processing
    # Replace this with actual code to read the PDF and extract text
    try:
        # Simulate PDF processing; replace this with your actual implementation
        return {
            'text': "This is a sample text extracted from the PDF.",
            'id': file_path.split("/")[-1].replace(".pdf", "")  # Example: using filename as ID
        }
    except Exception as e:
        logging.error(f"Error processing PDF {file_path}: {e}")
        return None

def process_with_timing(file_path):
    start_time = time.time()
    doc_metadata = process_pdf(file_path)
    
    if doc_metadata:
        text = doc_metadata['text']
        document_id = doc_metadata['id']  # Extract the ID from the metadata
        update_or_create_document(text, document_id)
    
    end_time = time.time()
    logging.info(f"Processed {file_path} in {end_time - start_time:.2f} seconds")

def process_all_pdfs_in_folder(folder_path):
    pdf_files = [f for f in os.listdir(folder_path) if f.endswith('.pdf')]  # List all PDF files

    with ThreadPoolExecutor(max_workers=5) as executor:  # Adjust the number of workers as we needed
        futures = {executor.submit(process_with_timing, os.path.join(folder_path, file_name)): file_name for file_name in pdf_files}
        
        for future in futures:
            try:
                future.result()  # Wait for the future to complete
            except Exception as e:
                logging.error(f"Error in processing PDF file {futures[future]}: {e}")

# My Example
folder_path = "ALL PDF FOLDER"  # path to my PDF folder
process_all_pdfs_in_folder(folder_path)

INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
INFO:root:New document ALL PDF FOLDER\chapter3 created successfully.
INFO:root:New document ALL PDF FOLDER\chapter4 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter3.pdf in 2.73 seconds
INFO:root:Processed ALL PDF FOLDER\chapter4.pdf in 2.71 seconds
INFO:root:New document ALL PDF FOLDER\chapter1 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter1.pdf in 2.80 seconds
INFO:root:New document ALL PDF FOLDER\chapter5 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter5.pdf in 2.87 seconds
INFO:root:New document ALL PDF FOLDER\chapter2 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter2.pdf in 2.95 seconds
INFO:root:New document ALL PDF FOLDER\chapter6 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter6.pdf in 0.73 seconds


### Concurrency & Error Handling Code

#### I already use the ThreadPoolExecutor for concurrency when ingesting PDF. For error handling, I log any issues(eg. Corrupt pdf) and ensure that MongoDB updates are still completed without breaking the pipeline. Also I done this step at whole complete step at once.

##### Log any errors (e.g., for corrupted PDFs or unsupported formats) and My MongoDB records are not affected by such issues.

In [69]:
import os
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pymongo import MongoClient
from transformers import pipeline
from keybert import KeyBERT

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Initialize MongoDB client
client = MongoClient('mongodb+srv://TestUser:nidhisahani@myfirstcluster.jgfeu.mongodb.net/admin')  # My MongoDB connection string
db = client['TestUser']  # My database name
collection = db['Collection Name']  # My collection name

# Summarization pipeline
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
kw_model = KeyBERT()

def summarize_text(text):
    """Summarize the input text using a pretrained model."""
    try:
        input_length = len(text.split())
        max_length = min(130, input_length - 5)
        min_length = min(30, input_length // 2)

        summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
        return summary[0]['summary_text']
    except Exception as e:
        logging.error(f"Error summarizing text: {e}")
        return ""

def extract_keywords(text):
    """Extract keywords from the input text using KeyBERT."""
    keywords = kw_model.extract_keywords(text, keyphrase_ngram_range=(1, 2), stop_words='english')
    return [kw[0] for kw in keywords]

def update_or_create_document(text, document_id):
    """Update an existing document or create a new one in MongoDB."""
    summary = summarize_text(text)
    keywords = extract_keywords(text)

    # Make the update query
    update_query = {
        'summary': summary,
        'keywords': keywords
    }

    # We Check if the document exists
    existing_document = collection.find_one({'_id': document_id})

    if existing_document:
        # Document exists, perform the update
        result = collection.update_one({'_id': document_id}, {'$set': update_query})
        if result.modified_count > 0:
            logging.info(f"Document {document_id} updated successfully.")
        else:
            logging.info(f"No changes made to document {document_id}.")
    else:
        # Document does not exist, insert a new document
        new_document = {
            '_id': document_id,
            'summary': summary,
            'keywords': keywords
        }
        collection.insert_one(new_document)
        logging.info(f"New document {document_id} created successfully.")

def process_pdf(file_path):
    """Mock implementation of PDF processing. Replace with actual PDF reading logic."""
    try:
        # Simulate reading from a PDF
        return {
            'text': "This is a sample text extracted from the PDF.",
            'id': os.path.basename(file_path).replace(".pdf", "")  
        }
    except Exception as e:
        logging.error(f"Error processing PDF {file_path}: {e}")
        return None

def process_with_timing(file_path):
    """Process a PDF file and log the processing time."""
    start_time = time.time()
    doc_metadata = process_pdf(file_path)
    
    if doc_metadata:
        text = doc_metadata['text']
        document_id = doc_metadata['id']  # Extract the ID from the metadata
        update_or_create_document(text, document_id)
    
    end_time = time.time()
    logging.info(f"Processed {file_path} in {end_time - start_time:.2f} seconds")

def process_all_pdfs_in_folder(folder_path):
    """Process all PDF files in a specified folder using ThreadPoolExecutor."""
    pdf_files = [f for f in os.listdir(folder_path) if f.endswith('.pdf')]  # List all PDF files

    with ThreadPoolExecutor(max_workers=5) as executor:  # Adjust the number of workers as  we needed
        futures = {executor.submit(process_with_timing, os.path.join(folder_path, file_name)): file_name for file_name in pdf_files}
        
        for future in as_completed(futures):
            file_name = futures[future]
            try:
                future.result()  # Wait to complete and check for exceptions
            except Exception as e:
                logging.error(f"Error in processing PDF file {file_name}: {e}")

# My Example
folder_path = "ALL PDF FOLDER"  # Path to my PDF folder
process_all_pdfs_in_folder(folder_path)


INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
INFO:root:New document chapter1 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter1.pdf in 2.91 seconds
INFO:root:New document chapter5 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter5.pdf in 2.96 seconds
INFO:root:New document chapter2 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter2.pdf in 3.06 seconds
INFO:root:New document chapter4 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter4.pdf in 3.10 seconds
INFO:root:New document chapter6 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter6.pdf in 0.81 seconds
INFO:root:New document chapter3 created successfully.
INFO:root:Processed ALL PDF FOLDER\chapter3.pdf in 3.82 seconds


##### My pipeline has been designed to handle multiple documents simultaneously, leveraging parallel processing to improve speed. This is called Efficient multi-document parallel processing.

In [70]:
def process_all_pdfs_in_folder_concurrently(folder_path):
    """Process all PDF files in the specified folder using parallel processing."""
    pdf_files = [f for f in os.listdir(folder_path) if f.endswith('.pdf')]  # List all PDF files

    if not pdf_files:
        logging.warning(f"No PDF files found in {folder_path}")
        return

    #  I Use ThreadPoolExecutor to process PDFs concurrently
    with ThreadPoolExecutor(max_workers=5) as executor:  # Adjust the number of workers as we needed
        futures = {executor.submit(process_with_timing, os.path.join(folder_path, file_name)): file_name for file_name in pdf_files}
        
        for future in as_completed(futures):
            file_name = futures[future]
            try:
                future.result()  # Wait to complete and check for exceptions
            except Exception as e:
                logging.error(f"Error in processing PDF file {file_name}: {e}")

# Example usage
folder_path = "ALL PDF FOLDER"  # Path to my PDF folder
process_all_pdfs_in_folder_concurrently(folder_path)

INFO:root:No changes made to document chapter5.
INFO:root:No changes made to document chapter1.
INFO:root:No changes made to document chapter4.
INFO:root:Processed ALL PDF FOLDER\chapter5.pdf in 11.77 seconds
INFO:root:Processed ALL PDF FOLDER\chapter1.pdf in 11.84 seconds
INFO:root:Processed ALL PDF FOLDER\chapter4.pdf in 11.78 seconds
INFO:root:No changes made to document chapter3.
INFO:root:Processed ALL PDF FOLDER\chapter3.pdf in 12.32 seconds
INFO:root:No changes made to document chapter2.
INFO:root:Processed ALL PDF FOLDER\chapter2.pdf in 12.34 seconds
INFO:root:No changes made to document chapter6.
INFO:root:Processed ALL PDF FOLDER\chapter6.pdf in 3.32 seconds
