In [194]:
import os
from typing import List, Dict, Optional, Tuple
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
import pymupdf
import re
import nltk
import json
import time
from contextlib import contextmanager
import notebook
import pickle

nltk.download('punkt')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\a5115690\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\a5115690\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [None]:
def _save_output_to_file(papers: List[Dict[str, str]], stats: Optional[Dict[str, int]], output_file: str) -> None:
    """Save papers and statistics to a JSON file."""
    output_data = {
        "papers": papers,
        "statistics": stats
    }
    with open(output_file, 'w') as f:
        json.dump(output_data, f, indent=4)


def _generate_and_save_stats(
        papers: List[Dict[str, str]],
        generate_stats: bool,
        save_output: bool,
        output_file: str
) -> Optional[Dict[str, int]]:
    """Generate statistics from papers and save to file if requested."""
    if not generate_stats:
        return None

    paper_count = len(papers)

    # Prepare statistics
    stats = {
        "total_papers": paper_count,
        "average_word_count": 0,  
        "min_word_count": 0,  
        "max_word_count": 0, 
    }

    if paper_count > 0:
        # Calculate statistics using word count if available
        word_counts = [len(paper['text'].split()) for paper in papers]  # Calculate word count from text

        total_word_count = sum(word_counts)
        stats["average_word_count"] = total_word_count / paper_count
        stats["min_word_count"] = min(word_counts)
        stats["max_word_count"] = max(word_counts)

    # Save statistics and papers to output file
    if save_output:
        _save_output_to_file(papers, stats, output_file)

    return stats

def _extract_text_from_pdf(pdf_path: str) -> str:
    """Extract text from a PDF file, handling potential exceptions."""
    try:
        doc = pymupdf.open(pdf_path)
        text = ""
        for page in doc:
            text += page.get_text()

        return text
    except pymupdf.pymupdf.FileDataError:
        print(f"Error: Unable to open or read the PDF file: {pdf_path}")
        return ""
    except Exception as e:
        print(f"An unexpected error occurred while processing {pdf_path}: {str(e)}")
        return ""

def _debug_print(step, before, after, on=False):
    if on:
        print(f"Debug: {step}")
        print(f"Before Text Count: {len(before)} characters")
        print(f"After Text Count: {len(after)} characters")
        print(f"Before:\n{before}\n")
        print(f"After:\n{after}\n")

def process_papers(data_folder_path: str, generate_stats: bool = True, save_output: bool = True) -> Tuple[List[Dict[str, str]], Optional[Dict[str, int]]]:
    """Process all PDF papers in the given folder and its subfolders, optionally generate statistics."""
    papers = []

    # Traverse the directory tree
    for root, _, files in os.walk(data_folder_path):
        for filename in files:
            if filename.endswith(".pdf"):  # Ensure case-insensitive matching
                file_path = os.path.join(root, filename)  # Use root for the correct path
                text = _extract_text_from_pdf(file_path)

                if text:  # Only add papers that were successfully extracted
                    papers.append({"id": filename, "text": text})

    # Save papers to a fixed pickle file path
    with open("papers.pkl", "wb") as pickle_file:
        pickle.dump(papers, pickle_file)

    # Optionally generate and save statistics
    stats = None
    if generate_stats:
        stats = _generate_and_save_stats(papers, generate_stats, save_output, "process_stats.json")

    return papers, stats

def clean_papers(
        papers: List[Dict[str, str]],
        debug_mode,
        model_name: str = "distilbert-base-uncased-finetuned-conll03-english",  # Default model name
        generate_stats: bool = True,
        save_output: bool = True
) -> Tuple[List[Dict[str, str]], Optional[Dict[str, int]]]:
    """Clean the extracted text using a specified Hugging Face model for Named Entity Recognition (NER) and regex
    expressions."""
    if debug_mode:
        print("!!!Debug mode enabled!!!")

    cleaned_papers = []

    # Construct the model path assuming models are stored in a folder called 'models'
    model_path = os.path.join("models", model_name)

    # Check if the model is present in the models directory
    if not os.path.exists(model_path):
        # Download the model and tokenizer
        print(f"Model '{model_name}' not found in 'models' directory. Downloading...")
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForTokenClassification.from_pretrained(model_name)
    else:
        # Load the tokenizer and model from the local path
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        model = AutoModelForTokenClassification.from_pretrained(model_path)

    # Create the NER pipeline
    nlp_pipeline = pipeline("ner", model=model, tokenizer=tokenizer)

    count = 0
    for paper in papers:
        if debug_mode and count == 10:
            print("Stop cleaning because debug mode is enabled")
            break
        # Initialize
        text = paper['text']

        # Convert to lowercase
        text = text.lower()

        # Dictionary of compiled regex patterns and their replacements
        patterns = {
            'non_ascii': re.compile(r'[^\x00-\x7F]+'),
            'isbn': re.compile(r'\b(?:isbn(?:-1[03])?:? )?(?=[-0-9xX ]{13,17})(97[89][- ]?)?[0-9]{1,5}[- ]?[0-9]+[- ]?[0-9]+[- ]?[0-9xX]\b'),
            'url': re.compile(r'http\S+|www\S+|https\S+'),
            'email': re.compile(r'\S+@\S+'),
            'reference': re.compile(r'\[\d+\]'),
            'allowed_chars': re.compile(r'[^A-Za-z0-9.,?!:;"(){}\[\]<>@#$%^&*_+=/\\|~\s]'),
            'anchors': re.compile(r'\b(?:vol\.|no\.|fig\.|pp\.|p\.|pg\.|table)\s\d+\b')
        }

        # Loop through the patterns and apply the substitutions
        for key, pattern in patterns.items():
            text_before = text
            text = pattern.sub('', text)
            _debug_print(text_before, text, key)

        # Remove ellipses (three or more dots)
        text = re.sub(r'\.\.\.+', '', text)

        # Remove empty lines, lines with just numbers, lines with less than 5 characters, and lines without any letters
        text = "\n".join([line for line in text.split("\n") if line.strip() != "" and not line.strip().isdigit() and len(line.strip()) >= 5 and re.search(r'[a-zA-Z]', line)])

        # Use the NER pipeline to process the cleaned text
        entities = nlp_pipeline(text)

        # Create a list of entities to be removed, e.g., locations and organizations
        types = ['B-LOC']
        # types = ['B-LOC', 'B-ORG']
        entities_to_remove = {ent['word'] for ent in entities if ent['entity'] in types and len(ent['word']) >= 3 and not ent['word'].startswith('##')}
        print(f'Entities to remove: {entities_to_remove}')

        # Remove identified entities
        for entity in entities_to_remove:
            text = text.replace(entity, '')

        cleaned_papers.append({"id": paper['id'], "text": text})

        count += 1

    # Optionally generate and save statistics
    stats = None
    if generate_stats:
        stats = _generate_and_save_stats(cleaned_papers, generate_stats, save_output, "clean_stats.json")

    return cleaned_papers, stats

def save_texts_to_files(texts: List[Dict[str, str]], output_path: str) -> int:
    num_files_saved = 0
    
    # Create the folder if it does not exist
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    # # Check if there is at least one file in the folder
    # if any(os.scandir(output_path)):
    #     # Prompt the user once for confirmation to overwrite all files
    #     overwrite = input(f"The folder '{output_path}' already contains files. Overwrite all? (y/n): ").strip().lower()
    #     if overwrite != 'y':
    #         print("Skipping save operation")
    #         return num_files_saved

    # Loop through every dictionary in the list
    for element in texts:
        # Create a safe filename by replacing any unsupported characters
        title = element["id"]
        text = element["text"]
        
        # Replace unsupported characters with underscores
        safe_filename = re.sub(r'[^a-zA-Z0-9]', '_', title)
        if not safe_filename:  # Ensure the filename is not empty
            print(f"Skipping empty filename for title: {title}")
            continue
            
        filename = f"{safe_filename}.txt"
        file_path = os.path.join(output_path, filename)

        try:
            # Write the text to the file
            with open(file_path, "w") as file:
                file.write(text)
            num_files_saved += 1
            print(f"Saved '{title}' to '{file_path}'")  # Uncomment to see confirmation of saved files
        except Exception as e:
            print(f"Failed to save '{title}': {e}")
    
    return num_files_saved

@contextmanager
def time_step():
    # Startup code
    # print("Setting up timer")
    start_time = time.time()

    try:
        yield
    finally:
        # Teardown code
        total_time = time.time() - start_time
        print(f"***Total time: {total_time:.2f} seconds")

In [None]:
# Settings
data_folder_path = "data"
output_path = "cleaned_text"
file_stats = {}
loaded_papers = False

In [198]:
# Process
# Check if the pickle file exists
if os.path.exists("papers.pkl") and DEBUG_MODE:
    with open("papers.pkl", "rb") as pickle_file:
        papers = pickle.load(pickle_file)
    print("Loaded processed papers")
    loaded_papers = True
else:
    print("Processing papers...")
    with time_step():
        papers, process_stats = process_papers(data_folder_path)
        file_stats["num_processed_papers"] = len(papers)

Processing papers...
An unexpected error occurred while processing data\Miscellaneous Published Articles\Effects of transcranial magnetic stimulation of the primary motor cortex on the grip and net forces in the tripod grasp.pdf: no such file: 'data\Miscellaneous Published Articles\Effects of transcranial magnetic stimulation of the primary motor cortex on the grip and net forces in the tripod grasp.pdf'
An unexpected error occurred while processing data\Miscellaneous Published Articles\Energy Prediction for Teleoperation Systems That Combine the Time Domain Passivity Approach with Perceptual Deadband-Based Haptic Data Reduction.pdf: no such file: 'data\Miscellaneous Published Articles\Energy Prediction for Teleoperation Systems That Combine the Time Domain Passivity Approach with Perceptual Deadband-Based Haptic Data Reduction.pdf'
An unexpected error occurred while processing data\Miscellaneous Published Articles\FEELing -key-Pressed Implicit Touch Pressure Bests Brain Activity in Mo

In [199]:
# Clean (!!!check DEBUG_MODE flag for clean_papers function!!!)
print("Cleaning papers...")
with time_step():
    cleaned_papers: List[Dict[str, str]]
    clean_stats: Dict[str, int] 
    cleaned_papers, clean_stats = clean_papers(papers, debug_mode=DEBUG_MODE)
    file_stats["num_cleaned_papers"] = len(cleaned_papers)

Cleaning papers...
Entities to remove: set()
Entities to remove: {'vancouver', 'mont', 'canada'}
Entities to remove: {'vancouver', 'montreal', 'canada', 'british'}
Entities to remove: {'germany', 'west', 'israel', 'leiden', 'karlsruhe', 'web', 'china', 'usa', 'new', 'dortmund', 'beijing', 'netherlands', 'ithaca', 'aachen'}
Entities to remove: {'germany', 'west', 'karlsruhe', 'canada', 'china', 'usa', 'new', 'dortmund', 'denmark', 'beijing', 'netherlands', 'france', 'ithaca', 'hamburg'}
Entities to remove: {'germany', 'stanford', 'madras', 'israel', 'santa', 'usa', 'london', 'dortmund', 'pittsburgh', 'ithaca', 'berkeley', 'lancaster', 'los', 'india', 'switzerland', 'guildford'}
Entities to remove: {'italy', 'germany', 'stanford', 'israel', 'pisa', 'usa', 'dortmund', 'japan', 'pittsburgh', 'zurich', 'ithaca', 'chennai', 'berkeley', 'lancaster', 'los', 'india', 'switzerland', 'guildford'}
Entities to remove: {'germany', 'irvine', 'madras', 'israel', 'usa', 'madrid', 'berlin', 'pittsburgh'

In [200]:
# Output statistics
if not loaded_papers:
    if process_stats:
        with time_step():
            print("Process statistics:", process_stats)

    if clean_stats:
        with time_step():
            print("Clean statistics:", clean_stats)

Process statistics: {'total_papers': 1432, 'average_word_count': 14442.671787709498, 'min_word_count': 65, 'max_word_count': 361333}
***Total time: 0.00 seconds
Clean statistics: {'total_papers': 1432, 'average_word_count': 13878.167597765363, 'min_word_count': 64, 'max_word_count': 343445}
***Total time: 0.00 seconds


In [201]:
# Save texts
print("Saving cleaned papers...")
with time_step():
    num_saved_papers = save_texts_to_files(cleaned_papers, output_path)
    file_stats["num_saved_papers"] = num_saved_papers

print(file_stats)

Saving cleaned papers...
Saved 'book.pdf' to 'cleaned_text\book_pdf.txt'
Saved 'DoItYourselfHapticsPart2.pdf' to 'cleaned_text\DoItYourselfHapticsPart2_pdf.txt'
Saved 'DoItYourselfHapticsPartI.pdf' to 'cleaned_text\DoItYourselfHapticsPartI_pdf.txt'
Saved '978-3-030-58147-3.pdf' to 'cleaned_text\978_3_030_58147_3_pdf.txt'
Saved '978-3-031-06249-0.pdf' to 'cleaned_text\978_3_031_06249_0_pdf.txt'
Saved '978-3-319-42321-0.pdf' to 'cleaned_text\978_3_319_42321_0_pdf.txt'
Saved '978-3-319-93445-7.pdf' to 'cleaned_text\978_3_319_93445_7_pdf.txt'
Saved '978-3-540-69057-3.pdf' to 'cleaned_text\978_3_540_69057_3_pdf.txt'
Saved '978-3-642-14064-8.pdf' to 'cleaned_text\978_3_642_14064_8_pdf.txt'
Saved '978-3-642-31401-8.pdf' to 'cleaned_text\978_3_642_31401_8_pdf.txt'
Saved '978-3-662-44193-0.pdf' to 'cleaned_text\978_3_662_44193_0_pdf.txt'
Saved 'Haptics_IEEE_Transactions_on_-_Volume_10_-_Issue_2.pdf' to 'cleaned_text\Haptics_IEEE_Transactions_on___Volume_10___Issue_2_pdf.txt'
Saved 'Haptics_IEEE