# PDF <span style="font-size: larger; font-weight: bold;">→</span> Word Converter
## Rio Jia, DS 5690 Spring 2024 Final Project


## Install packages

In [1]:
!pip install python-docx

Collecting python-docx
  Downloading python_docx-1.1.0-py3-none-any.whl (239 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/239.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m235.5/239.6 kB[0m [31m7.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m239.6/239.6 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: python-docx
Successfully installed python-docx-1.1.0


In [2]:
!pip install -q pymupdf python-Levenshtein nltk

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.8/30.8 MB[0m [31m51.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m177.4/177.4 kB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m83.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
!pip install -q git+https://github.com/huggingface/transformers.git

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for transformers (pyproject.toml) ... [?25l[?25hdone


In [4]:
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

In [5]:
from transformers import AutoProcessor, VisionEncoderDecoderModel
import torch

processor = AutoProcessor.from_pretrained("facebook/nougat-small")
model = VisionEncoderDecoderModel.from_pretrained("facebook/nougat-small") # nougat small 250M parameters compared to nougat base 350M parameters

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/479 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/4.49k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.14M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/96.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.77k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/165 [00:00<?, ?B/s]

In [6]:
%%capture
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

In [7]:
import re
from docx import Document
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from typing import Optional, List
import io
import fitz
from pathlib import Path
from PIL import Image
from transformers import StoppingCriteria, StoppingCriteriaList
from collections import defaultdict
from google.colab import files
from tqdm import tqdm

## Helper Functions

In [8]:
def rasterize_paper(
    pdf: Path,
    outpath: Optional[Path] = None,
    dpi: int = 96,
    return_pil=False,
    pages=None,
) -> Optional[List[io.BytesIO]]:
    """
    Rasterize a PDF file to PNG images.

    Args:
        pdf (Path): The path to the PDF file.
        outpath (Optional[Path], optional): The output directory. If None, the PIL images will be returned instead. Defaults to None.
        dpi (int, optional): The output DPI. Defaults to 96.
        return_pil (bool, optional): Whether to return the PIL images instead of writing them to disk. Defaults to False.
        pages (Optional[List[int]], optional): The pages to rasterize. If None, all pages will be rasterized. Defaults to None.

    Returns:
        Optional[List[io.BytesIO]]: The PIL images if `return_pil` is True, otherwise None.
    """

    pillow_images = []
    if outpath is None:
        return_pil = True
    try:
        if isinstance(pdf, (str, Path)):
            pdf = fitz.open(pdf)
        if pages is None:
            pages = range(len(pdf))
        for i in pages:
            page_bytes: bytes = pdf[i].get_pixmap(dpi=dpi).pil_tobytes(format="PNG")
            if return_pil:
                pillow_images.append(io.BytesIO(page_bytes))
            else:
                with (outpath / ("%02d.png" % (i + 1))).open("wb") as f:
                    f.write(page_bytes)
    except Exception:
        pass
    if return_pil:
        return pillow_images

In [9]:
class RunningVarTorch:
    """
    This class maintains a rolling window of recent values and computes their variance.

    Attributes:
        values (torch.Tensor): The tensor storing the most recent values up to a specified length `L`.
        L (int): The maximum number of recent values to store. Determines the rolling window size.
        norm (bool): A flag indicating whether to normalize the variance by the number of elements.

    Methods:
        push(x): Adds a new value `x` to the rolling window, updating the `values` tensor.
        variance(): Calculates and returns the variance of the values in the rolling window. Returns normalized variance if `norm` is True.
    """
    def __init__(self, L=15, norm=False):
        self.values = None
        self.L = L
        self.norm = norm

    def push(self, x: torch.Tensor):
        assert x.dim() == 1
        if self.values is None:
            self.values = x[:, None]
        elif self.values.shape[1] < self.L:
            self.values = torch.cat((self.values, x[:, None]), 1)
        else:
            self.values = torch.cat((self.values[:, 1:], x[:, None]), 1)

    def variance(self):
        # if self.values is None:
        #     return
        if self.values is None or self.values.size(1) < 2:
            return torch.zeros(self.values.size(0)) if self.values is not None else None
        if self.norm:
            return torch.var(self.values, 1) / self.values.shape[1]
        else:
            return torch.var(self.values, 1)


class StoppingCriteriaScores(StoppingCriteria):
    """
    This class defines a stopping criterion for a process based on the variance of recent scores.

    The stopping criterion is evaluated using a rolling variance of recent scores. If the variance falls below a threshold over a specified window size, the process can be stopped.

    Attributes:
        threshold (float): The variance threshold for stopping the process.
        vars (RunningVarTorch): An instance of RunningVarTorch to track the rolling variance of the scores.
        varvars (RunningVarTorch): Another instance of RunningVarTorch to track the variance of the rolling variances.
        stop_inds (defaultdict(int)): Stores the stopping indices for each batch.
        stopped (defaultdict(bool)): Flags indicating if the process should be stopped for each batch.
        size (int): The current size of the window.
        window_size (int): The number of elements to consider for calculating variance.

    Methods:
        __call__(input_ids, scores): Takes input ids and scores, updates the rolling variance and evaluates the stopping criteria.
    """
    def __init__(self, threshold: float = 0.015, window_size: int = 200):
        super().__init__()
        self.threshold = threshold
        self.vars = RunningVarTorch(norm=True)
        self.varvars = RunningVarTorch(L=window_size)
        self.stop_inds = defaultdict(int)
        self.stopped = defaultdict(bool)
        self.size = 0
        self.window_size = window_size

    @torch.no_grad()
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor):
        last_scores = scores[-1]
        self.vars.push(last_scores.max(1)[0].float().cpu())
        self.varvars.push(self.vars.variance())
        self.size += 1
        if self.size < self.window_size:
            return False

        varvar = self.varvars.variance()
        for b in range(len(last_scores)):
            if varvar[b] < self.threshold:
                if self.stop_inds[b] > 0 and not self.stopped[b]:
                    self.stopped[b] = self.stop_inds[b] >= self.size
                else:
                    self.stop_inds[b] = int(
                        min(max(self.size, 1) * 1.15 + 150 + self.window_size, 4095)
                    )
            else:
                self.stop_inds[b] = 0
                self.stopped[b] = False
        return all(self.stopped.values()) and len(self.stopped) > 0

In [10]:
def process_image_batch(batch: List[Image.Image], processor, model, device):
    """
    Processes a batch of images to generate text descriptions using a provided model and processor.

    The function converts a list of PIL images into pixel values, which are then passed to the model for text generation.
    It employs a specified stopping criterion to control the generation process. After generating texts,
    post-processing is applied to refine the output.

    Args:
        batch (List[Image.Image]): A list of images in PIL format to be processed.
        processor: The processor to be used for preparing the images and decoding the generated text.
        model: The model used for generating text from images.
        device: The device (e.g., CPU, GPU) on which the model computations will be executed.

    Returns:
        generated_texts (List[str]): A list of strings, where each string is the generated text description for the corresponding image in the batch.

    The function is designed to handle batches efficiently and can be integrated into larger pipelines for image-based text generation tasks.
    """
    pixel_values = processor(images=batch, return_tensors="pt").pixel_values.to(device)

    outputs = model.generate(
        pixel_values,
        min_length=1,
        max_length=3584,
        bad_words_ids=[[processor.tokenizer.unk_token_id]],
        return_dict_in_generate=True,
        output_scores=True,
        stopping_criteria=StoppingCriteriaList([StoppingCriteriaScores()]),
    )

    generated_texts = []
    for output in outputs.sequences:
        generated = processor.decode(output, skip_special_tokens=True)
        generated = processor.post_process_generation(generated, fix_markdown=False)
        generated_texts.append(generated)

    return generated_texts

In [11]:
def rasterize_and_process_pdf(filename, batch_size=4, dpi=72):
    """
    Rasterizes a PDF file and processes each page to generate text descriptions using a deep learning model.

    The function first converts each page of the PDF into an image using the specified DPI.
    It then processes these images in batches, generating text descriptions for each image.
    The processing is memory-efficient, with a cleanup step to free memory after each batch.

    Args:
        filename (str or Path): The path to the PDF file to be processed.
        batch_size (int, optional): The number of images to process in each batch. Defaults to 4.
        dpi (int, optional): The dots per inch (DPI) resolution used for rasterizing the PDF pages. Defaults to 72.

    Returns:
        generated_texts (List[str]): A list of generated text descriptions corresponding to each page of the PDF.
    """
    images = rasterize_paper(pdf=filename, return_pil=True, dpi=dpi)
    generated_texts = []

    for i in tqdm(range(0, len(images), batch_size), desc="Processing PDF"):
        batch = images[i:i + batch_size]
        pil_images = [Image.open(img_bytes) for img_bytes in batch]

        with torch.no_grad():  # Avoid storing gradients to save memory
            generated_texts.extend(process_image_batch(pil_images, processor, model, device))

        # Clear memory
        del pil_images
        torch.cuda.empty_cache()

    return generated_texts

In [12]:
def apply_styles_to_paragraph(para, styled_text):
    """
    Apply styles to runs within a single paragraph.
    Args:
        para (Paragraph): The Word document paragraph to modify.
        styled_text (List of tuples): Each tuple contains a string and its style type.
    """
    for content, style in styled_text:
        if content:  # Add only if the content is not empty
            run = para.add_run(content)
            if style == 'bold':
                run.bold = True
            elif style == 'italic':
                run.italic = True

def markdown_to_word_styles(text, para):
    """
    Convert markdown formatting in the text to Word styles and apply them to a paragraph.
    Args:
        text (str): Text with markdown formatting.
        para (Paragraph): Paragraph to add runs to.
    """
    # Split text into lines to correctly process bold headers and preserve line breaks
    lines = text.split('\n')

    for i, line in enumerate(lines):
        if line.startswith('#'):
            # Remove markdown headers and set style as bold
            content = re.sub(r'#+\s*', '', line)
            apply_styles_to_paragraph(para, [(content, 'bold')])
        else:
            # Process italic text
            italic_pattern = r'_(.*?)_'
            styled_text = []
            start_idx = 0
            for it_match in re.finditer(italic_pattern, line):
                it_start, it_end = it_match.span()
                styled_text.append((line[start_idx:it_start], 'normal'))
                styled_text.append((it_match.group(1), 'italic'))
                start_idx = it_end
            styled_text.append((line[start_idx:], 'normal'))
            apply_styles_to_paragraph(para, styled_text)

        # Add a newline if it's not the last line
        if i < len(lines) - 1:
            para.add_run('\n')

def write_to_word(texts, file_name):
    """
    Creates and saves a Microsoft Word document from a list of text strings, applying Markdown styles to each text string.
    Args:
        texts (List[str]): A list of text strings to be written into the Word document. Each text string is treated as a separate paragraph.
        file_name (str): The name of the Word document file to be created and saved.
    """
    doc = Document()
    para = doc.add_paragraph()
    for text in texts:
        markdown_to_word_styles(text, para)
    doc.save(file_name)
    files.download(file_name)

## PDF <span style="font-size: larger; font-weight: bold;">→</span> Word

### Upload PDF File to Convert

In [13]:
uploaded = files.upload()
filename = next(iter(uploaded))

print(f"User uploaded file '{filename}'")

Saving Bailey_PDF.pdf to Bailey_PDF.pdf
User uploaded file 'Bailey_PDF.pdf'


### Extract Text from PDF File

In [14]:
generated_texts = rasterize_and_process_pdf(filename, batch_size=4)

Processing PDF: 100%|██████████| 5/5 [00:32<00:00,  6.53s/it]


### Save Formatted Text into Word Document & Download

In [15]:
word_file_name = filename.rsplit('.', 1)[0] + '_doc.docx'

write_to_word(generated_texts, word_file_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>