# Imports

In [1]:
from pathlib import Path
from datetime import datetime
from tqdm.auto import tqdm
import subprocess
import sys
import shutil
import json
import raffalib
from natsort import natsorted

import pypdf
import fpdf

import io, sys
from fpdf import FPDF, get_scale_factor

import win32com.client
from pypdf import PdfWriter, PdfReader, PageObject

In [2]:
directory = Path(
    r"C:\Users\raffaele\Dipartimento di Scienze Aziendali Dropbox\Raffaele Mancuso\papers\paper_5_AIPUB\3_PAPER"
)

In [3]:
gspath = Path(r"C:\Program Files\gs\gs10.06.0\bin\gswin64c.exe")
assert gspath.exists(), f"Ghostscript not found at '{gspath}'"

In [4]:
debug_dont_unlink_after_merge = False

In [5]:
logger = raffalib.logging.create_logger()
logger.info(f"python version: {sys.version}")
logger.info(f"pypdf version: {pypdf.__version__}")
logger.info(f"fpdf version: {fpdf.__version__}")

In [6]:
with open(directory / "merge_docx.json", "r") as f:
    config = json.load(f)

files_map = config["files_map"]
overlay = config["overlay"]

# Checks

Checks files in all but not in used

In [None]:
logger.info(f"Working directory: {directory.resolve()}")

In [None]:
all_docx = [x.name for x in directory.glob("*.docx") if not x.name.startswith("~")]
logger.info(f"All docx files: {json.dumps(all_docx, indent=4)}")

In [None]:
used_docs = [item for sublist in files_map.values() for item in sublist]
logger.info(f"Used docx files: {json.dumps(used_docs, indent=4)}")

In [7]:
missing = set(all_docx) - set(used_docs)
if len(missing) > 0:
    missing = natsorted(missing)
    raise Exception(
        f"Missing files (in all but not in used): {json.dumps(list(missing), indent=4)}"
    )

Make sure all used files exist

In [8]:
for input_file in used_docs:
    input_file = directory / input_file
    assert input_file.exists(), f"File {input_file} does not exist"

Make sure no used file is listed multiple times

In [9]:
assert len(used_docs) == len(set(used_docs))

Remove all existing PDFs

In [10]:
if not debug_dont_unlink_after_merge:
    for input_file in directory.glob("*.pdf"):
        input_file.unlink()

# Functions

## Convert docx to PDF

In [11]:
def ini_word():
    word = win32com.client.Dispatch("Word.Application")
    word.Visible = False
    return word

In [12]:
def docx2pdf_word(input_path: Path, output_path: Path, word):
    wdExportFormatPDF = 17
    wdExportDocumentContent = 0  # Exports the document without markup.
    wdExportCreateNoBookmarks = 0  # Do not create bookmarks in the exported document.
    wdExportOptimizeForPrint = 0  # Export for print, which is higher quality and results in a larger file size.
    doc = word.Documents.Open(str(input_path.resolve()))
    doc.ExportAsFixedFormat3(
        str(output_path.resolve()),
        OptimizeFor=wdExportOptimizeForPrint,
        Item=wdExportDocumentContent,
        ExportFormat=wdExportFormatPDF,
        IncludeDocProps=False,
        KeepIRM=False,
        CreateBookmarks=wdExportCreateNoBookmarks,
        BitmapMissingFonts=False,
        OptimizeForImageQuality=True,
    )
    doc.Close()

In [13]:
def docx2pdf_libreoffice(input_path: Path, output_path: Path):
    cmd = [
        "soffice",
        "--headless",
        "--convert-to",
        "pdf",
        "--outdir",
        str(output_path.parent.resolve()),
        str(input_path.resolve()),
    ]
    subprocess.run(cmd, check=True)

In [14]:
def docx2pdf_all(input_files: list[Path]):
    output_files = []
    for input_file in tqdm(input_files):
        output_file = input_file.with_suffix(".pdf")
        if output_file.exists():
            logger.info(f"Skipping conversion as {output_file.name} already exists")
        else:
            # if input_file.name.startswith("figure_"):
            if False:
                logger.info(
                    f"[LIBREOFFICE] Converting '{input_file.name}' -> '{output_file.name}'"
                )
                docx2pdf_libreoffice(input_file, output_file)
            else:
                logger.info(
                    f"[WORD] Converting '{input_file.name}' -> '{output_file.name}'"
                )
                docx2pdf_word(input_file, output_file, word)
        output_files.append(output_file)
    return output_files

## Merge PDF

In [15]:
def is_blank(page: PageObject) -> bool:
    has_text = len(page.extract_text().strip()) != 0
    has_image = len(page.images) != 0
    has_annotations = page.annotations is not None
    return not any([has_text, has_image, has_annotations])

In [16]:
def merge_pdfs(input_files: list[Path], output_file: Path):
    last_page = 1
    output = PdfWriter()
    # Loop over input files
    for input_file in input_files:
        input = PdfReader(input_file)
        pages_to_add = [page for page in input.pages if not is_blank(page)]
        [output.add_page(page) for page in pages_to_add]
        # Add a bookmark (pages are 0-indexed)
        output.add_outline_item(input_file.stem, last_page-1)
        last_page += len(pages_to_add)
        if not debug_dont_unlink_after_merge:
            input_file.unlink()
    # Request to display the bookmark pane when displaying the document (optional)
    output.page_mode = "/UseOutlines"
    output.write(output_file)

## Add overlay

In [17]:
def _create_overlay_file(mediaboxes_mm, unit="mm") -> io.BytesIO:
    pdf = FPDF(format=(mediaboxes_mm[0][2], mediaboxes_mm[0][3]), unit=unit)
    pdf.add_font(family=overlay["font_family"], fname=overlay["font_file"])
    pdf.set_font(family=overlay["font_family"], size=overlay["font_size"])

    page_count = len(mediaboxes_mm)

    for i in range(page_count):
        width = mediaboxes_mm[i][2]
        height = mediaboxes_mm[i][3]

        pdf.add_page(format=(width, height))

        # In FPDF2, (0,0) is the top-left corner
        if overlay["page_number"]:
            pdf.text(x=width * 0.9, y=height * 0.96, text=f"{i + 1} / {page_count}")
        if overlay["snapshot_date"]:
            pdf.text(
                x=width * 0.05,
                y=height * 0.96,
                text=f"Snapshot of {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            )
        if overlay["working_paper"]:
            pdf.text(
                x=width * 0.35,
                y=height * 0.03,
                text=f"WORKING PAPER - PLEASE DO NOT DISTRIBUTE",
            )

    # <DEBUG>
    # outfp = stage1fp.with_stem(stage1fp.stem + "_overlay")
    # print(outfp)
    # pdf.output(outfp)
    # </DEBUG>
    return io.BytesIO(pdf.output())

In [18]:
# Convert coordinates in mm
def pdfbox2mm(box):
    return [float(coord) / get_scale_factor("mm") for coord in box]

In [19]:
def add_overlay(input_file: Path, output_file: Path):
    logger.info(f"Adding overlay and saving to '{output_file}'")
    input_reader = PdfReader(input_file)
    input_mediaboxes_mm = [pdfbox2mm(page.mediabox) for page in input_reader.pages]

    overlay_bytesio = _create_overlay_file(input_mediaboxes_mm)
    overlay_reader = PdfReader(overlay_bytesio)

    output_writer = PdfWriter(clone_from=input_reader)

    for i, input_page in enumerate(output_writer.pages):
        overlay_page = overlay_reader.pages[i]
        input_page.merge_page(page2=overlay_page)

    output_writer.write(output_file)

## Compress PDF

In [20]:
def compress_pdf_ghostscript(infp, outfp):
    # WARNING: You need a recent version of Ghotscript
    # there was a bug that messed up image colors
    # see: https://bugs.ghostscript.com/show_bug.cgi?id=709123
    cmd = [
        str(gspath.resolve()),
        "-sDEVICE=pdfwrite",
        "-dCompatibilityLevel=1.7",
        "-dPDFSETTINGS=/default",
        "-dNOPAUSE",
        "-dQUIET",
        "-dBATCH",
        "-dUseCropBox",
        f"-sOutputFile={str(outfp.resolve())}",  # output file
        str(infp.resolve()),  # input file
    ]
    subprocess.run(cmd, capture_output=True, text=True, check=True)

In [21]:
def compress_pdf_pypdf(infp, outfp, compress_identical_objects=True, compress_content_streams=True, compress_images=False):
    writer = PdfWriter(clone_from=infp)
    if compress_identical_objects:
        writer.compress_identical_objects(remove_identicals=True, remove_orphans=True)
    #writer.remove_images()
    if compress_images:
        for page in writer.pages:
            for img in page.images:
                img.replace(img.image, quality=80)
    if compress_content_streams:
        for page in writer.pages:
            page.compress_content_streams()  # This is CPU intensive!
    writer.write(outfp)

In [22]:
def compress_pdf(infp, outfp):
    compress_pdf_pypdf(infp, outfp)
    initial_size = temp_pdf.stat().st_size
    final_size = output_file.stat().st_size
    logger.info(f"Compressed, saved into {outfp}. {initial_size =:,d}; {final_size =:,d}; difference {(final_size-initial_size)/initial_size:.2%}")

# MAIN

In [23]:
logger.info(f"Input directory: {directory}")
word = ini_word()
temp_pdf = directory / "temp.pdf"
for output_file in tqdm(files_map.keys()):
    # Skip black hole
    if output_file == "blackhole":
        logger.info(f"Skipping '{output_file}'")
        continue
    logger.info(f"Creating '{output_file}'")
    # Convert .docx to .pdf
    input_docx_files = [directory / x for x in files_map[output_file]]
    if len(input_docx_files):
        logger.info(f"List of input files is empty. Skipping.")
    pdf_files = docx2pdf_all(input_docx_files)
    # Merge .pdf files
    merge_pdfs(pdf_files, temp_pdf)
    # Add overlay
    if overlay["overlay"]:
        add_overlay(temp_pdf, temp_pdf)
    # Compress
    output_file = directory / output_file
    compress_pdf(temp_pdf, output_file)
    temp_pdf.unlink()

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/22 [00:00<?, ?it/s]

0it [00:00, ?it/s]

IndexError: list index out of range

In [None]:
if word:
    word.Quit()
    word = None

# 