<a href="https://colab.research.google.com/github/seansphd/ArchiveViewSourceCode/blob/Update/ProcessPDFs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
# PDF OCR and summarisation with batch processing for GitHub repositories
# Output format: JSON

# ======== INSTALL DEPENDENCIES ========
import sys
import subprocess

def install_package(package):
    print(f"Installing {package}...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

required_packages = [
    "pytesseract",
    "pdf2image",
    "transformers",
    "torch",
    "sentencepiece",
    "einops",
    "accelerate",
    "spacy",
    "textstat",
    "tqdm",
    "numpy",
    "requests",
    "PyPDF2"
]

print("Checking and installing required packages...")
for package in required_packages:
    try:
        __import__(package)
    except ImportError:
        install_package(package)

# Install system dependencies if in Google Colab
import os
try:
    import google.colab  # type: ignore
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    print("Installing system dependencies for Colab...")
    subprocess.run(["apt-get", "update"], check=True)
    subprocess.run(["apt-get", "install", "-y", "poppler-utils", "tesseract-ocr", "tesseract-ocr-eng"], check=True)
    subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True)

# ======== IMPORTS ========
# Core libraries
import re
import io
import json
import time
import base64
import tempfile
import requests
import numpy as np
from collections import Counter
from tqdm import tqdm
from PIL import Image, ImageEnhance, ImageFilter

# PDF processing
from pdf2image import convert_from_path
import pytesseract
import PyPDF2  # For direct text extraction from PDFs

# NLP and Text Analysis
import spacy
import textstat

# ML and Summarisation
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    pipeline
)

# GitHub API
from urllib.parse import urljoin
import urllib.parse

# ======== SETUP ========
# Check if CUDA is available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Load spaCy for NER and text analysis
try:
    nlp = spacy.load("en_core_web_sm")
except OSError:
    print("Downloading spaCy model...")
    subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True)
    nlp = spacy.load("en_core_web_sm")

# Define available models
available_models = {
    "BART CNN (fast)": "facebook/bart-large-cnn",
    "T5 Small (fastest)": "t5-small",
    "T5 Base (balanced)": "t5-base",
    "FLAN-T5 Base (recommended)": "google/flan-t5-base",
    "FLAN-T5 Large (better quality)": "google/flan-t5-large"
}

# ======== GITHUB API FUNCTIONS ========
def list_github_pdf_files(repo_owner, repo_name, path="", token=None):
    """
    List all PDF files in a GitHub repository recursively.

    Args:
        repo_owner (str): GitHub repository owner
        repo_name (str): GitHub repository name
        path (str): Path within the repository
        token (str): GitHub personal access token

    Returns:
        list: List of PDF file dicts with name, path, and download_url
    """
    headers = {
        "Accept": "application/vnd.github+json",
        "X-GitHub-Api-Version": "2022-11-28"
    }
    if token:
        headers["Authorization"] = f"Bearer {token}"

    pdf_files = []

    def fetch_contents(path_segment):
        if path_segment:
            url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{path_segment}"
        else:
            url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents"

        print(f"Accessing URL: {url}")

        try:
            response = requests.get(url, headers=headers, timeout=30)

            if response.status_code != 200:
                print(f"Error fetching contents: {response.status_code}")
                error_message = "Unknown error"
                try:
                    error_data = response.json()
                    error_message = error_data.get('message', 'Unknown error')
                except Exception:
                    pass
                print(error_message)

                if response.status_code == 401 and token and "Bearer" in headers.get("Authorization", ""):
                    print("Trying alternative authorisation format...")
                    headers["Authorization"] = f"token {token}"
                    retry_response = requests.get(url, headers=headers, timeout=30)
                    if retry_response.status_code == 200:
                        print("Alternative authorisation successful")
                        response = retry_response
                    else:
                        print(f"Alternative authorisation also failed: {retry_response.status_code}")
                        return
                else:
                    return

            contents = response.json()

            if isinstance(contents, list):
                for item in contents:
                    if item["type"] == "file" and item["name"].lower().endswith(".pdf"):
                        pdf_files.append({
                            "name": item["name"],
                            "path": item["path"],
                            "download_url": item["download_url"]
                        })
                    elif item["type"] == "dir":
                        fetch_contents(item["path"])
            elif isinstance(contents, dict) and contents.get("type") == "file" and contents["name"].lower().endswith(".pdf"):
                pdf_files.append({
                    "name": contents["name"],
                    "path": contents["path"],
                    "download_url": contents["download_url"]
                })
        except requests.exceptions.RequestException as e:
            print(f"Request error: {e}")

    fetch_contents(path)
    return pdf_files

# ======== PDF PROCESSING FUNCTIONS ========
def download_pdf(url, output_path):
    """Download a PDF file from a URL."""
    response = requests.get(url)
    response.raise_for_status()
    with open(output_path, 'wb') as f:
        f.write(response.content)
    return output_path

def ocr_pdf(pdf_path, dpi=300, preprocess=True, lang='eng'):
    """Extract text from PDF using OCR with optional image preprocessing."""
    try:
        images = convert_from_path(pdf_path, dpi=dpi)
    except Exception as e:
        print(f"Error converting PDF to images: {e}")
        return f"Error processing PDF: {e}", []

    full_text = ""

    for i, image in enumerate(images):
        if preprocess:
            img_np = np.array(image)
            img = Image.fromarray(img_np)
            img = img.convert('L')  # greyscale
            img = ImageEnhance.Contrast(img).enhance(2.0)
            img = img.filter(ImageFilter.SHARPEN)
            threshold = 150
            img = img.point(lambda p: 255 if p > threshold else 0)
            text = pytesseract.image_to_string(img, lang=lang, config='--psm 6')
        else:
            text = pytesseract.image_to_string(image, lang=lang)

        full_text += f"\n\n--- Page {i+1} ---\n\n{text}"

    return full_text, images

def extract_text_directly(pdf_path):
    """Extract text directly from PDF without OCR."""
    try:
        full_text = ""
        num_pages = 0

        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            num_pages = len(pdf_reader.pages)

            for i, page in enumerate(pdf_reader.pages):
                text = page.extract_text() or ""
                full_text += f"\n\n--- Page {i+1} ---\n\n{text}"

        images = [None] * num_pages
        return full_text, images
    except Exception as e:
        print(f"Error extracting text directly from PDF: {e}")
        return f"Error processing PDF: {e}", []

def clean_text(text):
    """Improve text quality after extraction."""
    text = re.sub(r'\s+', ' ', text)
    text = text.replace('|', 'I')
    # Avoid replacing zero with O to keep numbers intact
    text = re.sub(r'[^\w\s.,;:!?\'"-]', '', text)
    text = re.sub(r'([.,;:!?])(\w)', r'\1 \2', text)
    return text

# ======== TEXT ANALYSIS FUNCTIONS ========
def summarise_text(text, model_name="facebook/bart-large-cnn", chunk_size=1024, max_length=150, min_length=50):
    """Summarise text with a Hugging Face model."""
    if not text.strip():
        return "No text to summarise."

    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device)
        summariser = pipeline("summarization", model=model, tokenizer=tokenizer, device=0 if device == "cuda" else -1)

        prefix = "summarize: " if "t5" in model_name else ""

        # Sentence aware chunking using spaCy
        doc = nlp(text)
        sentences = [s.text for s in doc.sents]

        chunks = []
        buf = ""
        for s in sentences:
            if len(tokenizer.encode(buf + s)) < chunk_size:
                buf += s + " "
            else:
                chunks.append(buf.strip())
                buf = s + " "
        if buf.strip():
            chunks.append(buf.strip())

        summaries = []
        for chunk in chunks:
            if not chunk.strip():
                continue
            input_text = prefix + chunk
            try:
                out = summariser(input_text, max_length=max_length, min_length=min_length, do_sample=False)
                summaries.append(out[0]['summary_text'])
            except Exception as e:
                print(f"Error summarising chunk: {e}")
                continue

        combined = " ".join(summaries).strip()

        if not combined:
            combined = text[:800]

        if len(tokenizer.encode(combined)) > chunk_size:
            combined = summariser(prefix + combined, max_length=max_length*2, min_length=min_length, do_sample=False)[0]['summary_text']

        return combined
    except Exception as e:
        print(f"Error in summarisation: {e}")
        return f"Error in summarisation: {e}"

def extract_keywords(text, top_n=10):
    """Extract keywords from text."""
    if not text.strip():
        return {}

    try:
        doc = nlp(text[:500000])  # safety limit
        keywords = []

        for chunk in doc.noun_chunks:
            keywords.append(chunk.text)

        for ent in doc.ents:
            keywords.append(ent.text)

        keyword_counter = Counter(keywords)

        filtered = {}
        for k, v in keyword_counter.items():
            tk = nlp(k)
            if len(k) > 3 and not all(t.is_stop for t in tk):
                filtered[k] = v

        top_keywords = dict(sorted(filtered.items(), key=lambda x: x[1], reverse=True)[:top_n])
        return top_keywords
    except Exception as e:
        print(f"Error extracting keywords: {e}")
        return {"error": str(e)}

def analyse_text(text):
    """Compute text statistics."""
    if not text.strip():
        return {}

    try:
        stats = {
            'flesch_reading_ease': textstat.flesch_reading_ease(text),
            'flesch_kincaid_grade': textstat.flesch_kincaid_grade(text),
            'smog_index': textstat.smog_index(text),
            'coleman_liau_index': textstat.coleman_liau_index(text),
            'automated_readability_index': textstat.automated_readability_index(text),
            'dale_chall_readability_score': textstat.dale_chall_readability_score(text),
            'word_count': textstat.lexicon_count(text),
            'sentence_count': textstat.sentence_count(text),
            'avg_sentence_length': textstat.avg_sentence_length(text),
            'avg_syllables_per_word': textstat.avg_syllables_per_word(text)
        }
        return stats
    except Exception as e:
        print(f"Error analysing text: {e}")
        return {"error": str(e)}

def extract_entities(text):
    """Named Entity Recognition."""
    if not text.strip():
        return {}

    try:
        doc = nlp(text[:500000])  # safety limit
        entities = {}

        for ent in doc.ents:
            if ent.label_ not in entities:
                entities[ent.label_] = []
            if ent.text not in entities[ent.label_]:
                entities[ent.label_].append(ent.text)

        return entities
    except Exception as e:
        print(f"Error extracting entities: {e}")
        return {"error": str(e)}

# ======== MAIN PROCESSING FUNCTION ========
def process_pdf(url, pdf_name, model_name="facebook/bart-large-cnn", dpi=300, preprocess=True, use_ocr=True):
    """Process a single PDF file."""
    results = {
        "pdf_name": pdf_name,
        "url": url,
        "status": "success",
        "error": None,
        "num_pages": 0,
        "word_count": 0,
        "summary": "",
        "top_keywords": {},
        "readability": {},
        "entities": {}
    }

    try:
        with tempfile.TemporaryDirectory() as temp_dir:
            pdf_path = os.path.join(temp_dir, "document.pdf")
            download_pdf(url, pdf_path)

            if use_ocr:
                print(f"Performing OCR on {pdf_name}...")
                raw_text, images = ocr_pdf(pdf_path, dpi=dpi, preprocess=preprocess)
            else:
                print(f"Extracting text directly from {pdf_name}...")
                raw_text, images = extract_text_directly(pdf_path)

            results["num_pages"] = len(images) if images else 0

            text = clean_text(raw_text)
            results["word_count"] = len(text.split())

            results["summary"] = summarise_text(text, model_name=model_name)

            keywords = extract_keywords(text)
            results["top_keywords"] = keywords

            text_stats = analyse_text(text)
            results["readability"] = text_stats

            entities = extract_entities(text)
            results["entities"] = entities

    except Exception as e:
        results["status"] = "failed"
        results["error"] = str(e)

    return results

# ======== JSON SAVE FUNCTION ========
def save_results_to_json(results, output_file):
    """Save processing results to a JSON file."""
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
        print(f"Results saved to {output_file}")
    except Exception as e:
        print(f"Error saving results to JSON: {e}")

# ======== BATCH PROCESSING FUNCTION ========
def batch_process_pdfs(pdf_files, model_name="facebook/bart-large-cnn", dpi=300,
                       preprocess=True, use_ocr=True, output_json="pdf_summaries.json"):
    """Process multiple PDF files and save results to JSON."""
    all_results = []

    for i, pdf_file in enumerate(tqdm(pdf_files, desc="Processing PDFs")):
        print(f"\n[{i+1}/{len(pdf_files)}] Processing: {pdf_file['name']}")
        result = process_pdf(
            pdf_file["download_url"],
            pdf_file["name"],
            model_name=model_name,
            dpi=dpi,
            preprocess=preprocess,
            use_ocr=use_ocr
        )
        all_results.append(result)

        # Save progress after each file
        save_results_to_json(all_results, output_json)

        time.sleep(1)

    return all_results

# ======== MAIN FUNCTION ========
def main():
    """Main function."""
    # Configuration
    repo_owner = input("Enter GitHub repository owner: ")
    repo_name = input("Enter GitHub repository name: ")
    path = input("Enter repository path (leave blank for root): ")
    token = input("Enter GitHub token (leave blank if not needed): ")

    if not token:
        token = None

    use_ocr = input("\nDo the PDFs require OCR? (y/n): ").lower() == 'y'

    dpi = 300
    preprocess = True
    if use_ocr:
        try:
            dpi = int(input("Enter OCR resolution (DPI, recommended 300): ") or "300")
        except ValueError:
            dpi = 300
        preprocess = input("Enhance images before OCR? (y/n): ").lower() == 'y'
    else:
        print("Skipping OCR and using direct text extraction...")

    print("\nAvailable models:")
    for i, (name, _) in enumerate(available_models.items()):
        print(f"{i+1}. {name}")

    while True:
        try:
            model_idx = int(input("\nSelect model (1-5): ")) - 1
            if 0 <= model_idx < len(available_models):
                break
        except ValueError:
            pass
        print("Please enter a number between 1 and 5.")
    model_name = list(available_models.values())[model_idx]

    output_json = input("Enter output JSON filename (default: pdf_summaries.json): ") or "pdf_summaries.json"

    print("\nFetching PDF files from repository...")
    pdf_files = list_github_pdf_files(repo_owner, repo_name, path, token)

    if not pdf_files:
        print("No PDF files found in the repository.")
        return

    print(f"\nFound {len(pdf_files)} PDF files:")
    for i, pdf in enumerate(pdf_files):
        print(f"{i+1}. {pdf['path']}")

    confirm = input(f"\nProcess {len(pdf_files)} PDFs? (y/n): ").lower()
    if confirm != 'y':
        print("Operation cancelled.")
        return

    results = batch_process_pdfs(
        pdf_files,
        model_name=model_name,
        dpi=dpi,
        preprocess=preprocess,
        use_ocr=use_ocr,
        output_json=output_json
    )

    print(f"\nProcessing complete. Results saved to {output_json}")

if __name__ == "__main__":
    main()

#example prompt answers
#1 seansphd, 2 PAGE-Archive - leave the rest blank