diff --git a/app/base.py b/app/base.py index 7301068..96fb35b 100644 --- a/app/base.py +++ b/app/base.py @@ -1,10 +1,12 @@ """PDFApps – BasePage: standard page layout (header + scroll + action bar).""" +import contextlib import os import subprocess import sys import tempfile import shutil +from typing import Iterable from PySide6.QtCore import Qt, QTimer, Signal from shiboken6 import isValid @@ -15,6 +17,12 @@ from app.i18n import t from app.utils import ToolHeader, ActionBar, scrolled, _paint_bg +# Iterable is referenced via string-typed annotations in +# _atomic_pdf_write / _check_not_same_path; keep it importable so +# tooling that resolves forward refs (e.g. typing.get_type_hints) +# finds the symbol. +__all__ = ["BasePage", "Iterable"] + def _reveal_file(path: str) -> None: """Open the OS file manager and highlight the given file when possible. @@ -333,6 +341,104 @@ def _open_fitz(self, path: str): doc.authenticate(self._pdf_password) return doc + def _clear_pdf_password(self) -> None: + """Best-effort wipe of the cached PDF password from memory. + + Thin wrapper around :func:`app.utils.wipe_pdf_password` so every + close / reload path (closeEvent, ``_close_pdf``, loading a + different file) can drop the cached password uniformly. See the + helper docstring for the immutability caveat. + """ + from app.utils import wipe_pdf_password + wipe_pdf_password(self) + + # ── safe PDF writer ──────────────────────────────────────────────────── + + @staticmethod + def _check_not_same_path(dst: str, + sources: "Iterable[str] | None" = None) -> None: + """Raise RuntimeError if ``dst`` resolves to any of ``sources``. + + Shared invariant for every tool that takes a PDF in and writes + a result back to disk: if the user picks the same path for + input and output, opening the output for writing truncates the + input before the writer's lazy stream reads complete and we + get silent dataloss + corrupted output. + """ + try: + dst_real = os.path.realpath(dst) + except OSError: + return + for src in (sources or ()): + if not src: + continue + try: + if os.path.realpath(src) == dst_real: + raise RuntimeError(t("tool.err.same_source_output")) + except OSError: + continue + + @staticmethod + def _atomic_pdf_write(writer, dst: str, *, + sources: "Iterable[str] | None" = None, + save_opts: "dict | None" = None) -> None: + """Write a PdfWriter (pypdf) or fitz.Document to ``dst`` atomically. + + Two defensive layers fix the silent dataloss bug where opening + ``open(dst, "wb")`` truncates the input file BEFORE the writer's + lazy stream reads complete (PdfWriter holds references into + the PdfReader; same applies to fitz.Document.save() with + incremental flags). + + 1. Reject up-front if ``dst`` resolves to any path in + ``sources`` (via ``os.path.realpath``) — this catches the + "user picked the same path for input and output" case which + was producing corrupt output + losing the original. + + 2. Write to a same-directory tempfile and atomically rename to + ``dst`` via :func:`os.replace` (works on POSIX and Windows). + + ``writer`` may be a pypdf ``PdfWriter`` (uses ``writer.write(fh)``) + or a PyMuPDF ``fitz.Document`` (uses ``writer.save(tmp)``). + Anything else with a ``.write(fh)`` method is accepted. + + Raises :class:`RuntimeError` with a translated message when the + same-source check fails; the caller's existing ``show_error`` + path surfaces it as a friendly dialog. + """ + BasePage._check_not_same_path(dst, sources) + + dst_dir = os.path.dirname(dst) or os.getcwd() + # mkstemp returns an OS-level fd; close via os.fdopen so the + # writer can stream into it. Same-volume placement guarantees + # os.replace() stays atomic. + fd, tmp = tempfile.mkstemp(suffix=".pdf", dir=dst_dir) + # Detect fitz.Document via its module to avoid importing fitz + # at base.py load time (every page imports BasePage). Modern + # PyMuPDF reports module="pymupdf"; legacy versions used "fitz". + # Both expose Document.save(path, ...). + writer_mod = type(writer).__module__ + is_fitz_doc = (writer_mod.startswith("pymupdf") + or writer_mod.startswith("fitz")) and hasattr(writer, "save") + try: + if is_fitz_doc: + # fitz.Document.save(path, ...) accepts a filesystem + # path and writes through cleanly. We close the fd + # we opened first so save() can take exclusive access. + os.close(fd) + writer.save(tmp, **(save_opts or {})) + else: + # pypdf.PdfWriter (and anything else with .write(fh)) + # streams into the open file handle. + with os.fdopen(fd, "wb") as fh: + writer.write(fh) + os.replace(tmp, dst) + except Exception: + with contextlib.suppress(Exception): + if os.path.exists(tmp): + os.unlink(tmp) + raise + # ── background-task helper ──────────────────────────────────────────── def _run_background(self, do_work_fn, total: int, label: str, diff --git a/app/editor/dialogs.py b/app/editor/dialogs.py index 83df5bd..f36e0fd 100644 --- a/app/editor/dialogs.py +++ b/app/editor/dialogs.py @@ -425,6 +425,15 @@ def _on_accept(self): from app.i18n import save_signature save_signature(tmp) + # Restrict the in-flight tmp signature to user-only on POSIX so + # other users on the host cannot read it before the editor + # stamps it into the PDF. NTFS ACLs already cover the Windows + # case via the profile owner; chmod there is a no-op. + try: + os.chmod(tmp, 0o600) + except OSError: + pass + self._result_path = tmp self.accept() diff --git a/app/editor/tab.py b/app/editor/tab.py index f331cef..3671a31 100644 --- a/app/editor/tab.py +++ b/app/editor/tab.py @@ -1,5 +1,6 @@ """PDFApps – TabEditar: visual PDF editor tool tab.""" +import contextlib import os import tempfile @@ -25,6 +26,10 @@ class TabEditar(QWidget): """Visual editor: click/drag directly on the rendered PDF.""" _MAX_REDO = 100 # cap redo history to avoid unbounded memory growth + # Cap pending-edit history to keep memory bounded in long sessions and to + # release temp signature/image files (which edits hold paths to) once + # they fall out of the rolling window. Trimmed FIFO — oldest first. + _MAX_PENDING = 500 _HI_COLORS_KEYS = ["color.yellow", "color.green", "color.pink", "color.light_blue"] _HI_COLORS_VALS = [(1,1,0), (0,1,0), (1,0.4,0.7), (0.5,0.8,1)] @@ -619,6 +624,18 @@ def _close_pdf(self): self._lbl_info.setText("") self._page_idx = 0 self._update_nav() + # Drop the cached password so a memory dump after the user + # closes the file no longer surfaces it (R5/D2). + self._clear_pdf_password() + + def _clear_pdf_password(self) -> None: + """Mirror of ``BasePage._clear_pdf_password`` — EditorTab does not + inherit from BasePage so we provide the same hook locally and + delegate to the shared :func:`app.utils.wipe_pdf_password` + helper, keeping a single implementation across the codebase. + """ + from app.utils import wipe_pdf_password + wipe_pdf_password(self) def _pick_image(self): p, _ = QFileDialog.getOpenFileName(self, t("edit.image"), DESKTOP, @@ -806,9 +823,41 @@ def _on_point(self, page_idx, pdf_pt): def _on_text_edit_committed(self, page_idx, edit): self._add(edit) - def _add(self, edit: dict): - self._redo_stack.clear() + def _add(self, edit: dict, *, _from_redo: bool = False): + # _from_redo=True is set by _redo() so consecutive redos don't + # wipe the redo stack. Previously _redo() called _add(), and the + # first line below cleared the remaining redo entries — meaning + # after a single redo all the others were silently discarded. + if not _from_redo: + self._redo_stack.clear() self._pending.append(edit) + # Trim oldest edits once we cross the cap. Without this the list + # grew unbounded across long sessions and retained references to + # temp signature/image files until the tab closed. + if len(self._pending) > self._MAX_PENDING: + to_drop = self._pending[:-self._MAX_PENDING] + self._pending = self._pending[-self._MAX_PENDING:] + # Best-effort cleanup of temp paths owned by the dropped + # entries. Only files inside the system tempdir are touched — + # the user's source image/signature picks must never be + # deleted from disk. + tmp_root = os.path.normcase(tempfile.gettempdir()) + for old in to_drop: + with contextlib.suppress(Exception): + p = old.get("path") + if (p and os.path.isfile(p) + and os.path.normcase(p).startswith(tmp_root)): + os.unlink(p) + # Mirror the trim in the visible list widget so the labels + # stay in sync with self._pending indices. We use + # ``len(to_drop)`` rather than ``count() - len(_pending)`` + # because the addItem for the *new* edit happens below: at + # this point _pending already has the trimmed length but + # _pending_list still holds the pre-trim row count, so the + # diff would be off by one and the next _undo would remove + # the wrong label (PR-B revisor finding #1). + for _ in range(len(to_drop)): + self._pending_list.takeItem(0) # Each entry's base label is fully translated via edit.label.*; # the page suffix (" — p. N") comes from a shared key so all # locales decide their own dash/spacing/abbreviation. @@ -847,7 +896,7 @@ def _redo(self): if not self._redo_stack: return edit = self._redo_stack.pop() - self._add(edit) + self._add(edit, _from_redo=True) def _on_note_deleted(self, overlay: dict): """Remove a deleted note from the pending edits list.""" diff --git a/app/i18n.py b/app/i18n.py index 5de7c74..bd2907a 100644 --- a/app/i18n.py +++ b/app/i18n.py @@ -241,10 +241,24 @@ def get_saved_signature() -> str | None: def save_signature(img_path: str): - """Copy signature image to persistent location.""" + """Copy signature image to persistent location. + + The persistent path is restricted to user-only access (``0o600``) on + POSIX so other users on the same host cannot read the cached + signature. Windows file permissions are governed by NTFS ACLs and + the user profile inherits owner-only access by default — chmod is + a no-op there but harmless. + """ import shutil os.makedirs(os.path.dirname(_SIGNATURE_PATH), exist_ok=True) shutil.copy2(img_path, _SIGNATURE_PATH) + try: + os.chmod(_SIGNATURE_PATH, 0o600) + except OSError: + # Some filesystems (FAT/exFAT on USB sticks) ignore chmod and + # raise. Permission hardening is best-effort; the copy itself + # succeeded so we must not surface this as an error. + pass def clear_saved_signature(): diff --git a/app/tools/encrypt.py b/app/tools/encrypt.py index fc244bc..7aa5f3a 100644 --- a/app/tools/encrypt.py +++ b/app/tools/encrypt.py @@ -126,7 +126,7 @@ def _run(self): w = PdfWriter(); w.append(reader) w.encrypt(user_password=user_pwd, owner_password=owner, algorithm="AES-256") - with open(out_path, "wb") as f: w.write(f) + self._atomic_pdf_write(w, out_path, sources=[pdf_path]) self._status(t("tool.encrypt.status.done", name=os.path.basename(out_path))) msg = t("tool.encrypt.done_enc", path=out_path) @@ -145,7 +145,7 @@ def _run(self): QMessageBox.warning(self, t("msg.warning"), t("tool.encrypt.wrong_pass")) return w = PdfWriter(); w.append(reader) - with open(out_path, "wb") as f: w.write(f) + self._atomic_pdf_write(w, out_path, sources=[pdf_path]) self._status(t("tool.encrypt.status.done", name=os.path.basename(out_path))) msg = t("tool.encrypt.done_dec", path=out_path) diff --git a/app/tools/extract.py b/app/tools/extract.py index f96a45b..fb4cff5 100644 --- a/app/tools/extract.py +++ b/app/tools/extract.py @@ -86,7 +86,7 @@ def _run(self): pages = parse_pages(txt, len(reader.pages)) w = PdfWriter() for p in pages: w.add_page(reader.pages[p]) - with open(out_path, "wb") as f: w.write(f) + self._atomic_pdf_write(w, out_path, sources=[pdf_path]) self._status(t("tool.extract.status.done", n=len(pages), name=os.path.basename(out_path))) msg = t("tool.extract.done", n=len(pages), path=out_path) diff --git a/app/tools/merge.py b/app/tools/merge.py index a099849..9ec0661 100644 --- a/app/tools/merge.py +++ b/app/tools/merge.py @@ -123,7 +123,7 @@ def _run(self): reader.decrypt(pwd) for page in reader.pages: w.add_page(page) - with open(out, "wb") as f: w.write(f) + self._atomic_pdf_write(w, out, sources=paths) self._status(t("tool.merge.status.done", name=os.path.basename(out))) QMessageBox.information(self, t("msg.done"), t("tool.merge.done", path=out)) except Exception as e: show_error(self, e) diff --git a/app/tools/ocr.py b/app/tools/ocr.py index f77fe64..aa0e427 100644 --- a/app/tools/ocr.py +++ b/app/tools/ocr.py @@ -1,6 +1,7 @@ """PDFApps – TabOCR: OCR text recognition tool.""" import os +import tempfile from PySide6.QtCore import Qt from PySide6.QtWidgets import ( @@ -300,8 +301,20 @@ def do_work(_self): "RGB", (pix.width, pix.height), pix.samples) texts.append( pytesseract.image_to_string(img, lang=lang)) - with open(out_path, "w", encoding="utf-8") as fh: - fh.write("\f".join(texts)) + # Reject same input==output and write atomically + # so a mistaken overlap doesn't truncate the PDF. + self._check_not_same_path(out_path, [pdf_path]) + out_dir = os.path.dirname(out_path) or os.getcwd() + _fd, _tmp = tempfile.mkstemp(suffix=".txt", dir=out_dir) + try: + with os.fdopen(_fd, "w", encoding="utf-8") as fh: + fh.write("\f".join(texts)) + os.replace(_tmp, out_path) + except Exception: + if os.path.exists(_tmp): + try: os.unlink(_tmp) + except OSError: pass + raise else: pdf_pages = [] for i, page in enumerate(doc): @@ -327,8 +340,8 @@ def do_work(_self): writer = PdfWriter() for page_bytes in pdf_pages: writer.append(PdfReader(_io.BytesIO(page_bytes))) - with open(out_path, "wb") as fh: - writer.write(fh) + self._atomic_pdf_write( + writer, out_path, sources=[pdf_path]) finally: doc.close() return out_path diff --git a/app/tools/page_numbers.py b/app/tools/page_numbers.py index 5671ee0..825bb73 100644 --- a/app/tools/page_numbers.py +++ b/app/tools/page_numbers.py @@ -277,7 +277,11 @@ def do_work(worker): if worker.is_cancelled(): return None - doc.save(out_path, garbage=4, deflate=True) + self._atomic_pdf_write( + doc, out_path, + sources=[pdf_path], + save_opts={"garbage": 4, "deflate": True}, + ) finally: doc.close() return out_path diff --git a/app/tools/reorder.py b/app/tools/reorder.py index e9eaa2f..42d8779 100644 --- a/app/tools/reorder.py +++ b/app/tools/reorder.py @@ -124,7 +124,7 @@ def _run(self): reader = self._open_reader(self.drop_in.path()) w = PdfWriter() for idx in indices: w.add_page(reader.pages[idx]) - with open(out, "wb") as f: w.write(f) + self._atomic_pdf_write(w, out, sources=[self.drop_in.path()]) self._status(t("tool.reorder.status.done", name=os.path.basename(out))) msg = t("tool.reorder.done", path=out) if self._pipeline_active: diff --git a/app/tools/rotate.py b/app/tools/rotate.py index 4f6a72a..21a8284 100644 --- a/app/tools/rotate.py +++ b/app/tools/rotate.py @@ -87,7 +87,7 @@ def _run(self): for i, page in enumerate(reader.pages): if i in pages: page.rotate(angle) w.add_page(page) - with open(out_path, "wb") as f: w.write(f) + self._atomic_pdf_write(w, out_path, sources=[pdf_path]) self._status(t("tool.rotate.status.done", name=os.path.basename(out_path))) if self._pipeline_active: self._pipeline_success(t("tool.rotate.done", path=out_path), out_path) diff --git a/app/tools/watermark.py b/app/tools/watermark.py index 9e818d2..3a2e4f8 100644 --- a/app/tools/watermark.py +++ b/app/tools/watermark.py @@ -130,8 +130,7 @@ def do_work(worker): current=i + 1, total=n)) if worker.is_cancelled(): return None - with open(out_path, "wb") as f: - w.write(f) + self._atomic_pdf_write(w, out_path, sources=[pdf_path, wm_path]) return out_path def on_done(saved): diff --git a/app/translations.json b/app/translations.json index 63183ff..30c9591 100644 --- a/app/translations.json +++ b/app/translations.json @@ -52,6 +52,7 @@ "viewer.error_open": "Error opening PDF", "viewer.error_open_msg": "Could not open the file:\n{ex}", "viewer.delete_comment": "Delete comment", + "viewer.confirm_delete_comment": "Are you sure you want to delete this comment? This change is saved immediately and cannot be undone.", "viewer.copy_chars": " Copy ({n} chars)", "search.placeholder": "Search in PDF...", "search.prev": "Previous match", @@ -153,6 +154,8 @@ "edit.open_prompt": "Open a PDF to edit", "edit.note_popup": "Note", "msg.warning": "Warning", + "msg.confirm": "Confirm", + "tool.err.same_source_output": "Output path cannot be the same as the source PDF. Choose a different filename.", "msg.error": "Error", "msg.unexpected": "Something went wrong. Click 'Show Details' below for the technical error, and the full traceback has been written to the log file.", "msg.done": "Done", @@ -627,6 +630,7 @@ "viewer.error_open": "Erro ao abrir PDF", "viewer.error_open_msg": "Não foi possível abrir o ficheiro:\n{ex}", "viewer.delete_comment": "Apagar comentário", + "viewer.confirm_delete_comment": "Tens a certeza que queres apagar este comentário? Esta alteração é guardada imediatamente e não pode ser desfeita.", "viewer.copy_chars": " Copiar ({n} car.)", "search.placeholder": "Pesquisar no PDF...", "search.prev": "Resultado anterior", @@ -728,6 +732,8 @@ "edit.open_prompt": "Abra um PDF para editar", "edit.note_popup": "Nota", "msg.warning": "Aviso", + "msg.confirm": "Confirmar", + "tool.err.same_source_output": "O caminho de saída não pode ser o mesmo que o PDF de origem. Escolhe outro nome de ficheiro.", "msg.error": "Erro", "msg.unexpected": "Algo correu mal. Clique em 'Mostrar Detalhes' em baixo para ver o erro técnico; o traceback completo foi escrito no ficheiro de log.", "msg.done": "Concluído", @@ -1202,6 +1208,7 @@ "viewer.error_open": "Error al abrir PDF", "viewer.error_open_msg": "No se pudo abrir el archivo:\n{ex}", "viewer.delete_comment": "Eliminar comentario", + "viewer.confirm_delete_comment": "¿Seguro que quieres eliminar este comentario? Este cambio se guarda inmediatamente y no se puede deshacer.", "viewer.copy_chars": " Copiar ({n} caract.)", "search.placeholder": "Buscar en PDF...", "search.prev": "Resultado anterior", @@ -1303,6 +1310,8 @@ "edit.open_prompt": "Abra un PDF para editar", "edit.note_popup": "Nota", "msg.warning": "Aviso", + "msg.confirm": "Confirmar", + "tool.err.same_source_output": "La ruta de salida no puede ser la misma que el PDF de origen. Elige otro nombre de archivo.", "msg.error": "Error", "msg.unexpected": "Algo salió mal. Haga clic en 'Mostrar detalles' abajo para ver el error técnico; el rastreo completo se ha escrito en el archivo de registro.", "msg.done": "Hecho", @@ -1777,6 +1786,7 @@ "viewer.error_open": "Erreur d'ouverture du PDF", "viewer.error_open_msg": "Impossible d'ouvrir le fichier :\n{ex}", "viewer.delete_comment": "Supprimer le commentaire", + "viewer.confirm_delete_comment": "Voulez-vous vraiment supprimer ce commentaire ? Cette modification est enregistrée immédiatement et ne peut pas être annulée.", "viewer.copy_chars": " Copier ({n} caract.)", "search.placeholder": "Rechercher dans le PDF...", "search.prev": "Résultat précédent", @@ -1878,6 +1888,8 @@ "edit.open_prompt": "Ouvrir un PDF pour modifier", "edit.note_popup": "Note", "msg.warning": "Avertissement", + "msg.confirm": "Confirmer", + "tool.err.same_source_output": "Le chemin de sortie ne peut pas être identique au PDF source. Choisissez un autre nom de fichier.", "msg.error": "Erreur", "msg.unexpected": "Une erreur s'est produite. Cliquez sur « Afficher les détails » ci-dessous pour voir l'erreur technique ; la trace complète a été écrite dans le fichier journal.", "msg.done": "Terminé", @@ -2352,6 +2364,7 @@ "viewer.error_open": "Fehler beim Öffnen des PDF", "viewer.error_open_msg": "Die Datei konnte nicht geöffnet werden:\n{ex}", "viewer.delete_comment": "Kommentar löschen", + "viewer.confirm_delete_comment": "Soll dieser Kommentar wirklich gelöscht werden? Diese Änderung wird sofort gespeichert und kann nicht rückgängig gemacht werden.", "viewer.copy_chars": " Kopieren ({n} Zeichen)", "search.placeholder": "Im PDF suchen...", "search.prev": "Vorheriges Ergebnis", @@ -2453,6 +2466,8 @@ "edit.open_prompt": "PDF zum Bearbeiten öffnen", "edit.note_popup": "Notiz", "msg.warning": "Warnung", + "msg.confirm": "Bestätigen", + "tool.err.same_source_output": "Der Ausgabepfad darf nicht mit der Quell-PDF identisch sein. Wählen Sie einen anderen Dateinamen.", "msg.error": "Fehler", "msg.unexpected": "Etwas ist schiefgegangen. Klicken Sie unten auf 'Details anzeigen', um den technischen Fehler zu sehen; das vollständige Traceback wurde in die Protokolldatei geschrieben.", "msg.done": "Fertig", @@ -2927,6 +2942,7 @@ "viewer.error_open": "打开 PDF 出错", "viewer.error_open_msg": "无法打开文件:\n{ex}", "viewer.delete_comment": "删除批注", + "viewer.confirm_delete_comment": "确定要删除此批注吗?此更改将立即保存且无法撤销。", "viewer.copy_chars": " 复制 ({n} 个字符)", "search.placeholder": "在 PDF 中搜索...", "search.prev": "上一个匹配", @@ -3028,6 +3044,8 @@ "edit.open_prompt": "打开 PDF 以编辑", "edit.note_popup": "备注", "msg.warning": "警告", + "msg.confirm": "确认", + "tool.err.same_source_output": "输出路径不能与源 PDF 相同。请选择其他文件名。", "msg.error": "错误", "msg.unexpected": "出了点问题。点击下方的“显示详情”查看技术错误;完整的堆栈跟踪已写入日志文件。", "msg.done": "完成", @@ -3502,6 +3520,7 @@ "viewer.error_open": "Errore durante l'apertura del PDF", "viewer.error_open_msg": "Impossibile aprire il file:\n{ex}", "viewer.delete_comment": "Elimina commento", + "viewer.confirm_delete_comment": "Sei sicuro di voler eliminare questo commento? Questa modifica viene salvata immediatamente e non può essere annullata.", "viewer.copy_chars": " Copia ({n} caratteri)", "search.placeholder": "Cerca nel PDF...", "search.prev": "Corrispondenza precedente", @@ -3603,6 +3622,8 @@ "edit.open_prompt": "Apri un PDF per modificarlo", "edit.note_popup": "Nota", "msg.warning": "Avviso", + "msg.confirm": "Conferma", + "tool.err.same_source_output": "Il percorso di output non può coincidere con il PDF di origine. Scegli un nome file diverso.", "msg.error": "Errore", "msg.unexpected": "Qualcosa è andato storto. Fare clic su 'Mostra dettagli' qui sotto per vedere l'errore tecnico; il traceback completo è stato scritto nel file di registro.", "msg.done": "Fatto", @@ -4077,6 +4098,7 @@ "viewer.error_open": "Fout bij openen PDF", "viewer.error_open_msg": "Kon het bestand niet openen:\n{ex}", "viewer.delete_comment": "Opmerking verwijderen", + "viewer.confirm_delete_comment": "Weet je zeker dat je deze opmerking wilt verwijderen? Deze wijziging wordt onmiddellijk opgeslagen en kan niet ongedaan worden gemaakt.", "viewer.copy_chars": " Kopiëren ({n} tekens)", "search.placeholder": "Zoeken in PDF...", "search.prev": "Vorige overeenkomst", @@ -4178,6 +4200,8 @@ "edit.open_prompt": "Open een PDF om te bewerken", "edit.note_popup": "Notitie", "msg.warning": "Waarschuwing", + "msg.confirm": "Bevestigen", + "tool.err.same_source_output": "Het uitvoerpad mag niet hetzelfde zijn als de bron-PDF. Kies een andere bestandsnaam.", "msg.error": "Fout", "msg.unexpected": "Er is iets misgegaan. Klik hieronder op 'Details weergeven' om de technische fout te zien; de volledige traceback is naar het logbestand geschreven.", "msg.done": "Voltooid", diff --git a/app/updater.py b/app/updater.py index c8aad60..b36af92 100644 --- a/app/updater.py +++ b/app/updater.py @@ -1,5 +1,6 @@ """PDFApps – Auto-updater module.""" +import contextlib import json import os import re @@ -269,44 +270,52 @@ def _apply_update_unix(downloaded: str): current = sys.executable backup = current + ".bak" try: - shutil.move(current, backup) - dest_dir = os.path.dirname(current) - abs_dest = os.path.abspath(dest_dir) - if downloaded.endswith(".tar.gz"): - import tarfile - with tarfile.open(downloaded, "r:gz") as tar: - # Validate ALL members before extracting any - for m in tar.getmembers(): - if m.issym() or m.islnk(): - raise ValueError(f"Symlink/hardlink rejected: {m.name}") - extracted = os.path.abspath(os.path.join(dest_dir, m.name)) - if not extracted.startswith(abs_dest + os.sep) and extracted != abs_dest: - raise ValueError(f"Path traversal detected: {m.name}") - if m.name != "PDFApps": - raise ValueError(f"Unexpected member: {m.name}") - # Safe to extract after full validation - for m in tar.getmembers(): - tar.extract(m, dest_dir) - elif downloaded.endswith(".zip"): - import zipfile - with zipfile.ZipFile(downloaded, "r") as zf: - # Validate ALL members before extracting any - for info in zf.infolist(): - extracted = os.path.abspath(os.path.join(dest_dir, info.filename)) - if not extracted.startswith(abs_dest + os.sep) and extracted != abs_dest: - raise ValueError(f"Path traversal detected: {info.filename}") - if info.filename != "PDFApps": - raise ValueError(f"Unexpected member: {info.filename}") - zf.extract("PDFApps", dest_dir) - else: - shutil.copy2(downloaded, current) - os.chmod(current, os.stat(current).st_mode | stat.S_IEXEC) - os.remove(downloaded) - os.remove(backup) - except Exception: - if os.path.isfile(backup): - shutil.move(backup, current) - raise + try: + shutil.move(current, backup) + dest_dir = os.path.dirname(current) + abs_dest = os.path.abspath(dest_dir) + if downloaded.endswith(".tar.gz"): + import tarfile + with tarfile.open(downloaded, "r:gz") as tar: + # Validate ALL members before extracting any + for m in tar.getmembers(): + if m.issym() or m.islnk(): + raise ValueError(f"Symlink/hardlink rejected: {m.name}") + extracted = os.path.abspath(os.path.join(dest_dir, m.name)) + if not extracted.startswith(abs_dest + os.sep) and extracted != abs_dest: + raise ValueError(f"Path traversal detected: {m.name}") + if m.name != "PDFApps": + raise ValueError(f"Unexpected member: {m.name}") + # Safe to extract after full validation + for m in tar.getmembers(): + tar.extract(m, dest_dir) + elif downloaded.endswith(".zip"): + import zipfile + with zipfile.ZipFile(downloaded, "r") as zf: + # Validate ALL members before extracting any + for info in zf.infolist(): + extracted = os.path.abspath(os.path.join(dest_dir, info.filename)) + if not extracted.startswith(abs_dest + os.sep) and extracted != abs_dest: + raise ValueError(f"Path traversal detected: {info.filename}") + if info.filename != "PDFApps": + raise ValueError(f"Unexpected member: {info.filename}") + zf.extract("PDFApps", dest_dir) + else: + shutil.copy2(downloaded, current) + os.chmod(current, os.stat(current).st_mode | stat.S_IEXEC) + os.remove(backup) + except Exception: + if os.path.isfile(backup): + shutil.move(backup, current) + raise + finally: + # Drop the ~100 MB installer tempfile on every exit — success + # AND failure (R5/F3). Previously os.remove(downloaded) only ran + # on the success path, so a failed apply left the artefact in + # /tmp until the OS reboot cleaned it up. + with contextlib.suppress(Exception): + if os.path.isfile(downloaded): + os.unlink(downloaded) os.execv(current, sys.argv) @@ -377,11 +386,12 @@ def __init__(self, release: dict, parent=None): self._update_btn.setEnabled(False) self._status.setText(t("update.no_asset")) - self._signals = _Signals() - self._signals.progress.connect(self._on_progress) - self._signals.finished.connect(self._on_finished) - self._signals.error.connect(self._on_error) - self._signals.cancelled.connect(self._on_cancelled) + # Signals are created per-download in _start_download so a retry + # within the same dialog doesn't cross-pollinate the + # finished/error/cancelled handlers with stale _dl_thread + # objects (R6/B6-B7). Until the first download starts no signals + # exist yet — that's fine; nothing connects to them. + self._signals: _Signals | None = None self._dest = "" # Holder for the in-flight urlopen response so closeEvent/reject # can abort a blocked read() immediately (urlopen.read() blocks @@ -422,6 +432,23 @@ def run(self): self._cancel_holder["resp"] = None self._cancel_holder["cancelled"] = False + # Fresh _Signals instance per download. Previously the dialog + # reused a single _Signals from __init__ — on retry the new + # thread reconnected finished/error/cancelled WITHOUT disconnecting + # the previous thread's slots, so when this download finished Qt + # delivered the signal to both the live and the already-quit + # historical threads (R6/B6-B7). Disposing the old object frees + # those connections atomically and lets Qt GC the underlying + # QObject without risk of stale references. + if self._signals is not None: + with contextlib.suppress(RuntimeError): + self._signals.deleteLater() + self._signals = _Signals() + self._signals.progress.connect(self._on_progress) + self._signals.finished.connect(self._on_finished) + self._signals.error.connect(self._on_error) + self._signals.cancelled.connect(self._on_cancelled) + self._dl_thread = QThread() self._dl_worker = _Worker(url, self._dest, self._signals, expected_hash, self._cancel_holder) diff --git a/app/utils.py b/app/utils.py index 581f517..74c0063 100644 --- a/app/utils.py +++ b/app/utils.py @@ -108,6 +108,38 @@ def pick_folder(parent: QWidget) -> str: return QFileDialog.getExistingDirectory(parent, t("btn.select_folder")) +def wipe_pdf_password(obj) -> None: + """Best-effort wipe of the cached PDF password attribute on ``obj``. + + Python ``str`` is immutable, so we cannot scrub the original bytes — + the interpreter may keep the original buffer alive via interning or + constant tables. What we *can* do is drop the only reachable + reference so the password no longer surfaces in the live object + graph. The ctypes block allocates a zeroed buffer of the same length + as a defensive hint to memory scanners; it does not touch the + original PyUnicode storage. + + Centralised here so BasePage, EditorTab and PdfViewerPanel share a + single implementation (used to be three near-identical copies — the + review for PR-B flagged the duplication as DRY rot). + + Always assigns ``obj._pdf_password = ""`` afterwards, so callers can + rely on the attribute being defined for the rest of the object's + lifecycle. + """ + try: + pwd = getattr(obj, "_pdf_password", "") + except Exception: + pwd = "" + if pwd: + with contextlib.suppress(Exception): + import ctypes + buf = ctypes.create_string_buffer(len(pwd.encode("utf-8"))) + ctypes.memset(ctypes.addressof(buf), 0, len(buf)) + del buf + obj._pdf_password = "" + + def prompt_pdf_password(path: str, parent=None) -> tuple[bool, str]: """Open the PDF and, if encrypted, prompt the user for a password. diff --git a/app/viewer/canvas.py b/app/viewer/canvas.py index 5a90fdc..7cf57b4 100644 --- a/app/viewer/canvas.py +++ b/app/viewer/canvas.py @@ -555,7 +555,7 @@ def keyPressEvent(self, e): # ── Context menu ─────────────────────────────────────────────────────── def contextMenuEvent(self, e): - from PySide6.QtWidgets import QMenu + from PySide6.QtWidgets import QMenu, QMessageBox pos = e.pos() # Check if right-click is on a note icon hit = self._note_icon_at(pos) @@ -568,23 +568,52 @@ def contextMenuEvent(self, e): entry = self._entries[page_idx] if entry.annots and annot_idx < len(entry.annots): rect, txt = entry.annots[annot_idx] + # saveIncr() writes directly to the user's file with no + # undo. Confirm first (default=No) so a stray right-click + # can't silently destroy a comment. + reply = QMessageBox.question( + self, t("msg.confirm"), + t("viewer.confirm_delete_comment"), + QMessageBox.StandardButton.Yes + | QMessageBox.StandardButton.No, + QMessageBox.StandardButton.No, + ) + if reply != QMessageBox.StandardButton.Yes: + return # Remove annotation from fitz doc if self._doc: import fitz - page = self._doc[page_idx] - for annot in page.annots() or []: - if annot.type[0] == fitz.PDF_ANNOT_TEXT: - content = annot.info.get("content", "") or "" - if content.strip() == txt.strip(): - page.delete_annot(annot) - break - # Save the doc - if self._path: - self._doc.saveIncr() + try: + page = self._doc[page_idx] + for annot in page.annots() or []: + if annot.type[0] == fitz.PDF_ANNOT_TEXT: + content = annot.info.get("content", "") or "" + if content.strip() == txt.strip(): + page.delete_annot(annot) + break + # Save the doc. saveIncr() raises on read-only + # files / permission errors; surface a friendly + # dialog instead of crashing the Qt event loop. + if self._path: + self._doc.saveIncr() + except Exception as exc: + from app.utils import show_error + show_error(self, exc) + return # Remove from entry annots list entry.annots.pop(annot_idx) - if self._open_note == hit: - self._open_note = None + # The _open_note tuple stores (page_idx, annot_idx). + # After the pop, indices on the same page that were + # greater than annot_idx shift down by one — fix the + # stored reference so reopening the popup doesn't + # show the wrong note (B-extra). + if self._open_note is not None: + open_page, open_idx = self._open_note + if open_page == page_idx: + if open_idx == annot_idx: + self._open_note = None + elif open_idx > annot_idx: + self._open_note = (open_page, open_idx - 1) self.update() return if not self._sel_text: diff --git a/app/viewer/panel.py b/app/viewer/panel.py index 7a21c84..d59dc7e 100644 --- a/app/viewer/panel.py +++ b/app/viewer/panel.py @@ -422,9 +422,22 @@ def load(self, path: str): t("viewer.invalid_msg")) return import fitz + # Close the previous document through the canvas helper so the + # canvas drops its _doc reference + bumps _gen BEFORE the + # fitz.Document is actually closed. Without this ordering, a + # paintEvent or _on_page_ready queued between the panel's + # _fitz_doc.close() and the next _canvas.load() touches a + # freed Document and raises ``RuntimeError: document closed`` + # (B1). _canvas.close_doc() also handles closing the underlying + # doc, so don't double-close from this side. if self._fitz_doc: - self._fitz_doc.close() + self._canvas.close_doc() self._fitz_doc = None + # Forget the previous file's password before we start the + # new one's prompt flow (R5/D2). _clear_pdf_password is a + # best-effort wipe — Python str immutability blocks a true + # zero-scrub. + self._clear_pdf_password() try: doc = fitz.open(path) except Exception as ex: @@ -607,27 +620,88 @@ def _print_pdf(self): if not painter.begin(printer): return + import fitz page_count = len(self._fitz_doc) - for i in range(page_count): - if i > 0: - printer.newPage() - page = self._fitz_doc[i] - # Render at high DPI for print quality - dpi = printer.resolution() - zoom = dpi / 72.0 - mat = __import__("fitz").Matrix(zoom, zoom) - pix = page.get_pixmap(matrix=mat) - img = QImage(pix.samples, pix.width, pix.height, - pix.stride, QImage.Format.Format_RGB888) - target = QRectF(painter.viewport()) - source = QRectF(0, 0, img.width(), img.height()) - # Scale to fit page while maintaining aspect ratio - scale = min(target.width() / source.width(), - target.height() / source.height()) - w = source.width() * scale - h = source.height() * scale - x = (target.width() - w) / 2 - y = (target.height() - h) / 2 - painter.drawImage(QRectF(x, y, w, h), img, source) + # Honour QPrintDialog settings — pre-fix the loop always printed + # every page once in forward order, ignoring the user's choice + # of range / copies / reverse (R7/N7-H1). + # - fromPage()/toPage() return 0 when "all pages" is selected + # - copyCount() reports how many copies the user asked for + # - pageOrder() == LastPageFirst means print in reverse + from_page = printer.fromPage() + to_page = printer.toPage() + if from_page == 0 and to_page == 0: + pages = list(range(page_count)) + else: + # Qt's fromPage/toPage are 1-based and inclusive. Clamp to + # [0, page_count) before iterating so an out-of-range value + # (PDFs with fewer pages than the dialog suggested) cannot + # crash the render loop. + start = max(0, from_page - 1) + end = min(page_count, to_page) + pages = list(range(start, end)) + try: + reverse = (printer.pageOrder() + == QPrinter.PageOrder.LastPageFirst) + except AttributeError: + # PySide6 < 6.4 exposed the enum at module scope; the guard + # keeps the loop running on legacy bindings (worst case: + # forward order, same as before this fix). + reverse = False + if reverse: + pages = list(reversed(pages)) + copies = max(1, printer.copyCount()) + + first_page_printed = True # avoids a leading newPage() + for copy in range(copies): + for i in pages: + if not first_page_printed: + printer.newPage() + first_page_printed = False + page = self._fitz_doc[i] + # Render at high DPI for print quality + dpi = printer.resolution() + zoom = dpi / 72.0 + mat = fitz.Matrix(zoom, zoom) + # alpha=False avoids RGBA pixmaps (n=4) being misread as + # Format_RGB888 (3 bytes/pixel); n != 3 catches the residual + # cases (CMYK n=4, greyscale n=1) — convert them to RGB. + pix = page.get_pixmap(matrix=mat, alpha=False) + if pix.n != 3: + pix = fitz.Pixmap(fitz.csRGB, pix) + # QImage(pix.samples, ...) views the native pixmap buffer; + # on the next loop iteration the old pix is freed and the + # painter would be drawing from freed memory. .copy() forces + # an eager copy so the QImage no longer depends on pix + # lifetime (same fix that landed in OCR Round 3). + img = QImage(pix.samples, pix.width, pix.height, + pix.stride, QImage.Format.Format_RGB888).copy() + target = QRectF(painter.viewport()) + source = QRectF(0, 0, img.width(), img.height()) + # Scale to fit page while maintaining aspect ratio + scale = min(target.width() / source.width(), + target.height() / source.height()) + w = source.width() * scale + h = source.height() * scale + x = (target.width() - w) / 2 + y = (target.height() - h) / 2 + painter.drawImage(QRectF(x, y, w, h), img, source) painter.end() + + # ── Password lifecycle ────────────────────────────────────────────── + def _clear_pdf_password(self) -> None: + """Best-effort wipe of the cached PDF password (R5/D2). + + Thin wrapper around :func:`app.utils.wipe_pdf_password` so the + ``load`` (new file) and ``closeEvent`` (panel teardown) paths + share a single implementation with BasePage and EditorTab. + """ + from app.utils import wipe_pdf_password + wipe_pdf_password(self) + + def closeEvent(self, event): + # Wipe cached password before the C++ widget is destroyed so a + # heap dump after teardown no longer surfaces it. + self._clear_pdf_password() + super().closeEvent(event) diff --git a/tests/test_atomic_pdf_write.py b/tests/test_atomic_pdf_write.py new file mode 100644 index 0000000..b055fbf --- /dev/null +++ b/tests/test_atomic_pdf_write.py @@ -0,0 +1,182 @@ +"""Regression tests for BasePage._atomic_pdf_write / _check_not_same_path +(N7-CRIT-1). + +The bug: 8 PDF tools wrote output with ``open(out_path, "wb")``. When +the user picked the same path for input and output, the ``open("wb")`` +truncated the original PDF BEFORE PdfWriter's lazy stream reads +completed → silent dataloss + corrupted output. + +The fix landed two defensive layers in ``BasePage``: + +1. ``_check_not_same_path`` rejects up-front via ``os.path.realpath`` + equality. + +2. ``_atomic_pdf_write`` writes to a same-directory tempfile and + atomically renames via ``os.replace``. + +These tests exercise both layers without instantiating a QWidget — +the helpers are ``@staticmethod`` exactly so they can be unit-tested +straight from the class. +""" + +import os +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +# A QCoreApplication is needed by app.i18n.t() (it calls QSettings). +# Use the offscreen platform so headless CI doesn't try to open a +# display. +os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") +from PySide6.QtWidgets import QApplication +_app = QApplication.instance() or QApplication([]) + +from app.base import BasePage # noqa: E402 +from pypdf import PdfReader, PdfWriter # noqa: E402 + + +def _make_pdf(path: Path, pages: int = 2) -> Path: + import fitz + doc = fitz.open() + for _ in range(pages): + doc.new_page(width=595, height=842) + doc.save(str(path)) + doc.close() + return path + + +def _writer_for(path: Path) -> PdfWriter: + r = PdfReader(str(path)) + w = PdfWriter() + for page in r.pages: + w.add_page(page) + return w + + +# ── _check_not_same_path ──────────────────────────────────────────────── + + +def test_check_rejects_exact_same_path(tmp_path: Path): + src = _make_pdf(tmp_path / "in.pdf") + with pytest.raises(RuntimeError): + BasePage._check_not_same_path(str(src), [str(src)]) + + +def test_check_rejects_via_realpath(tmp_path: Path): + """A relative path and an absolute path to the same file must + both flag as colliding (realpath equality).""" + src = _make_pdf(tmp_path / "in.pdf") + rel = os.path.relpath(str(src)) + with pytest.raises(RuntimeError): + BasePage._check_not_same_path(rel, [str(src)]) + + +def test_check_allows_distinct_paths(tmp_path: Path): + src = _make_pdf(tmp_path / "in.pdf") + dst = tmp_path / "out.pdf" + # Must not raise. + BasePage._check_not_same_path(str(dst), [str(src)]) + + +def test_check_skips_empty_or_missing_sources(tmp_path: Path): + dst = tmp_path / "out.pdf" + # Empty string + None + non-existent are all tolerated. + BasePage._check_not_same_path(str(dst), ["", None, str(tmp_path / "ghost.pdf")]) + + +# ── _atomic_pdf_write (PdfWriter branch) ──────────────────────────────── + + +def test_atomic_write_same_source_raises_and_preserves_input(tmp_path: Path): + """The defining regression: dst == src must raise BEFORE any + bytes hit disk, and the source PDF must remain readable.""" + src = _make_pdf(tmp_path / "in.pdf", pages=3) + src_bytes_before = src.read_bytes() + w = _writer_for(src) + with pytest.raises(RuntimeError): + BasePage._atomic_pdf_write(w, str(src), sources=[str(src)]) + # Critical assertion: the source file is byte-identical. + assert src.read_bytes() == src_bytes_before + # And still parses correctly. + assert len(PdfReader(str(src)).pages) == 3 + + +def test_atomic_write_writes_to_distinct_dst(tmp_path: Path): + src = _make_pdf(tmp_path / "in.pdf", pages=2) + dst = tmp_path / "out.pdf" + w = _writer_for(src) + BasePage._atomic_pdf_write(w, str(dst), sources=[str(src)]) + assert dst.exists() + assert len(PdfReader(str(dst)).pages) == 2 + + +def test_atomic_write_uses_os_replace(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """The implementation MUST use os.replace for the final rename — + that's what makes the write atomic on Windows + POSIX. If a future + refactor swaps in os.rename or shutil.move this test catches it.""" + src = _make_pdf(tmp_path / "in.pdf") + dst = tmp_path / "out.pdf" + w = _writer_for(src) + + calls = [] + real_replace = os.replace + + def spy(a, b): + calls.append((a, b)) + return real_replace(a, b) + + monkeypatch.setattr("app.base.os.replace", spy) + BasePage._atomic_pdf_write(w, str(dst), sources=[str(src)]) + assert calls, "os.replace was not called — the rename isn't atomic" + assert calls[-1][1] == str(dst) + + +def test_atomic_write_cleans_tempfile_on_writer_error(tmp_path: Path): + """If writer.write raises, the tempfile must be cleaned up — we + don't want orphans accumulating in the user's output folder.""" + dst = tmp_path / "out.pdf" + + class _Boom: + def write(self, _fh): + raise RuntimeError("boom") + + with pytest.raises(RuntimeError, match="boom"): + BasePage._atomic_pdf_write(_Boom(), str(dst), sources=[]) + # No tempfile lingering in the destination directory. + leftover = [p for p in tmp_path.iterdir() if p.suffix == ".pdf"] + assert leftover == [] + + +# ── _atomic_pdf_write (fitz.Document branch) ──────────────────────────── + + +def test_atomic_write_accepts_fitz_document(tmp_path: Path): + import fitz + src = _make_pdf(tmp_path / "in.pdf", pages=2) + dst = tmp_path / "out.pdf" + doc = fitz.open(str(src)) + try: + BasePage._atomic_pdf_write(doc, str(dst), + sources=[str(src)], + save_opts={"garbage": 4, "deflate": True}) + finally: + doc.close() + assert dst.exists() + assert len(fitz.open(str(dst))) == 2 + + +def test_atomic_write_fitz_rejects_same_source(tmp_path: Path): + import fitz + src = _make_pdf(tmp_path / "in.pdf") + src_bytes_before = src.read_bytes() + doc = fitz.open(str(src)) + try: + with pytest.raises(RuntimeError): + BasePage._atomic_pdf_write(doc, str(src), sources=[str(src)]) + finally: + doc.close() + # Source intact. + assert src.read_bytes() == src_bytes_before diff --git a/tests/test_editor_undo.py b/tests/test_editor_undo.py new file mode 100644 index 0000000..4b4308b --- /dev/null +++ b/tests/test_editor_undo.py @@ -0,0 +1,308 @@ +"""Regression tests for the editor undo/redo stack (CRIT-1). + +The bug: ``EditorTab._redo()`` called ``self._add(edit)``, and the +first line of ``_add`` was ``self._redo_stack.clear()``. After a +single redo, the remaining redo entries were silently dropped. + +These tests exercise the post-fix invariant: consecutive redos +restore every undone edit, in the original LIFO order, without +ever wiping the redo stack mid-sequence. They avoid touching Qt +widgets by using a thin headless harness that mirrors the real +``_add/_undo/_redo`` semantics. +""" + +import os +import sys +from pathlib import Path + +import pytest + +# Make project root importable so we can read EditorTab's source +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +# ── Headless harness ───────────────────────────────────────────────────── + +class _UndoHarness: + """Mirror EditorTab's _add/_undo/_redo without the Qt widgets. + + Only the fields touched by those three methods are reproduced — + that's enough to assert the redo-stack invariant the bug broke.""" + + _MAX_REDO = 100 + _MAX_PENDING = 500 + + def __init__(self): + self._pending: list = [] + self._redo_stack: list = [] + self._list_count = 0 # stand-in for _pending_list row count + self.dropped_paths: list = [] # records temp paths the trim unlinked + + # Mirror of the real _add() post-fix: + def _add(self, edit: dict, *, _from_redo: bool = False): + import os + import tempfile + if not _from_redo: + self._redo_stack.clear() + self._pending.append(edit) + self._list_count += 1 + if len(self._pending) > self._MAX_PENDING: + to_drop = self._pending[:-self._MAX_PENDING] + self._pending = self._pending[-self._MAX_PENDING:] + tmp_root = os.path.normcase(tempfile.gettempdir()) + for old in to_drop: + p = old.get("path") + if (p and os.path.isfile(p) + and os.path.normcase(p).startswith(tmp_root)): + try: + os.unlink(p) + self.dropped_paths.append(p) + except OSError: + pass + self._list_count = len(self._pending) + + def _undo(self): + if not self._pending: + return + edit = self._pending.pop() + self._redo_stack.append(edit) + if len(self._redo_stack) > self._MAX_REDO: + self._redo_stack.pop(0) + self._list_count -= 1 + + def _redo(self): + if not self._redo_stack: + return + edit = self._redo_stack.pop() + self._add(edit, _from_redo=True) + + +def _mk(i: int) -> dict: + return {"type": "redact", "page": 0, "id": i} + + +# ── Tests ──────────────────────────────────────────────────────────────── + + +def test_three_edits_three_undos_three_redos_restores_all(): + """3 edits → undo×3 → redo×3 leaves the same 3 edits in order. + + Pre-fix this test failed: after the first redo the redo stack + was cleared, so the second and third redos were no-ops. + """ + h = _UndoHarness() + h._add(_mk(1)) + h._add(_mk(2)) + h._add(_mk(3)) + assert [e["id"] for e in h._pending] == [1, 2, 3] + + h._undo(); h._undo(); h._undo() + assert h._pending == [] + assert [e["id"] for e in h._redo_stack] == [3, 2, 1] + + h._redo(); h._redo(); h._redo() + assert [e["id"] for e in h._pending] == [1, 2, 3] + assert h._redo_stack == [] + + +def test_redo_preserves_remaining_stack(): + """A single redo only consumes one entry; the rest stay redoable. + + Sequence: + add(0), add(1), add(2) -> _pending=[0,1,2], _redo=[] + undo×3 -> _pending=[], _redo=[2,1,0] (LIFO) + redo -> _pending=[0], _redo=[2,1] + + The bug being guarded: pre-fix the redo would clear the stack so + _redo would become [] and the next two redo calls would be no-ops. + """ + h = _UndoHarness() + for i in range(3): + h._add(_mk(i)) + for _ in range(3): + h._undo() + h._redo() # restores id=0 (pushed first into redo_stack -> top) + assert [e["id"] for e in h._pending] == [0] + # The remaining redo entries must still be present — this is + # exactly what the bug broke. + assert [e["id"] for e in h._redo_stack] == [2, 1] + + +def test_new_edit_after_undo_clears_redo(): + """New edits MUST still clear the redo stack (normal _add path).""" + h = _UndoHarness() + h._add(_mk(1)); h._add(_mk(2)) + h._undo() + assert [e["id"] for e in h._redo_stack] == [2] + # User performs a new edit instead of redoing — redo branch + # is discarded as expected. + h._add(_mk(99)) + assert h._redo_stack == [] + + +def test_editor_tab_source_matches_fix(): + """Static check: the real EditorTab._redo passes _from_redo=True + and _add accepts the kwarg. Guards against accidental regressions + in the production code (we can't easily import the Qt class here + without instantiating QWidget).""" + src = (Path(__file__).resolve().parent.parent + / "app" / "editor" / "tab.py").read_text(encoding="utf-8") + assert "_from_redo: bool = False" in src, "the fix kwarg is missing from _add" + assert "_from_redo=True" in src, "_redo no longer flags the call as redo" + assert "if not _from_redo:" in src, "_add no longer gates the redo_stack clear" + + +# ── R5/A2: _pending unbounded growth ───────────────────────────────────── + + +def test_pending_caps_at_max(): + """Adding 600 edits leaves _pending at MAX_PENDING (500) with the + 100 oldest dropped. + + Pre-fix _pending grew without bound; long-edit sessions leaked + memory and pinned temp signature/image files until the tab + closed. + """ + h = _UndoHarness() + for i in range(600): + h._add({"type": "redact", "page": 0, "id": i}) + assert len(h._pending) == h._MAX_PENDING + # Oldest dropped: the surviving range starts at id=100. + ids = [e["id"] for e in h._pending] + assert ids[0] == 100 + assert ids[-1] == 599 + # _pending_list shadow must stay in sync — otherwise the QListWidget + # rows diverge from the underlying edit indices. + assert h._list_count == len(h._pending) + + +def test_pending_trim_unlinks_temp_files(tmp_path, monkeypatch): + """When an edit dropped by the trim owns a temp-file path, the + file must be unlinked. Files outside the system tempdir (real + source images the user picked) must be left alone.""" + import tempfile as _tf + # Force the tempdir to our pytest tmp so we don't pollute the real + # /tmp during the test. + monkeypatch.setattr(_tf, "gettempdir", lambda: str(tmp_path)) + + h = _UndoHarness() + # First 50 edits own real temp files; the remaining 600 are simple + # redacts. The trim will drop the first 150 entries (650 - 500), so + # all 50 temp files belong to the dropped slice. + temp_paths = [] + for i in range(50): + p = tmp_path / f"sig_{i}.png" + p.write_bytes(b"PNG_BYTES") + temp_paths.append(str(p)) + h._add({"type": "signature", "page": 0, "path": str(p)}) + + # Add a user-owned image OUTSIDE the tempdir (simulate real source + # picked from Desktop). It will get trimmed too but must NOT be + # deleted from disk. + user_dir = tmp_path.parent / "user_pick" + user_dir.mkdir(exist_ok=True) + user_path = user_dir / "real_image.png" + user_path.write_bytes(b"PNG_USER") + h._add({"type": "image", "page": 0, "path": str(user_path)}) + + for i in range(600): + h._add({"type": "redact", "page": 0, "id": i}) + + assert len(h._pending) == h._MAX_PENDING + # Every temp file the harness owned should have been unlinked. + for p in temp_paths: + assert not os.path.isfile(p), f"trim must unlink dropped temp: {p}" + # The user-picked image must survive disk-side even though it + # fell out of the rolling window. + assert os.path.isfile(user_path), "user-picked file must not be deleted" + + +def test_undo_empty_pending_is_noop(): + """After trim drops oldest edits the undo button still works + cleanly — calling _undo on an empty list returns silently rather + than raising IndexError.""" + h = _UndoHarness() + # Empty start + h._undo() + assert h._pending == [] + assert h._redo_stack == [] + # After a full trim cycle then drain-undo, the next undo must + # also be a no-op. + for i in range(550): + h._add({"type": "redact", "page": 0, "id": i}) + while h._pending: + h._undo() + h._undo() # extra undo on an already-empty stack + assert h._pending == [] + + +def test_pending_max_constants_in_source(): + """Static guard: the production constants must exist and the + trim block must reference both _MAX_PENDING and a tempdir check.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "editor" / "tab.py").read_text(encoding="utf-8") + assert "_MAX_PENDING" in src, "production constant missing" + assert "tempfile.gettempdir" in src, \ + "trim must restrict unlink to the system tempdir" + + +# ── PR-B revisor finding #1 — QListWidget vs _pending drift ───────────── + + +def test_pending_list_widget_count_matches_pending_after_trim(): + """The real QListWidget must stay in sync with ``_pending`` after + the cap-trim. Regression for PR-B revisor finding #1. + + Pre-fix the trim computed ``extra = list.count() - len(_pending)`` + *before* the new edit's ``addItem`` ran. In steady state at the + cap, that diff was 0 so no ``takeItem`` ever fired, and the + subsequent ``addItem`` left the QListWidget with +1 row vs the + underlying ``_pending`` list. The next ``_undo`` then removed the + wrong label (the visible most-recent row, not the row matching the + popped edit). + """ + os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") + from PySide6.QtWidgets import QApplication, QListWidget + _app = QApplication.instance() or QApplication([]) # noqa: F841 + + from app.editor.tab import TabEditar + + class _Canvas: + def set_overlays(self, *a, **kw): pass + + class _Stub: + _MAX_PENDING = TabEditar._MAX_PENDING + _MAX_REDO = TabEditar._MAX_REDO + + def __init__(self): + self._pending: list = [] + self._redo_stack: list = [] + self._pending_list = QListWidget() + self._canvas = _Canvas() + + def _status(self, *a, **kw): pass + + # Bind the real production method so we test the real code path. + _add = TabEditar._add + _undo = TabEditar._undo + + s = _Stub() + + # Push past the cap by 50 edits. + for i in range(TabEditar._MAX_PENDING + 50): + s._add({"type": "redact", "page": 0, "id": i}) + + assert len(s._pending) == TabEditar._MAX_PENDING, \ + "production trim should bound _pending to _MAX_PENDING" + assert s._pending_list.count() == len(s._pending), ( + f"QListWidget rows ({s._pending_list.count()}) drifted from " + f"_pending size ({len(s._pending)}) — PR-B finding #1 regressed" + ) + + # _undo must remove the row matching the popped edit, leaving the + # widget and the list in lock-step. Pre-fix the widget would be 1 + # row ahead so an undo would take the wrong label. + before_count = s._pending_list.count() + s._undo() + assert s._pending_list.count() == len(s._pending) + assert s._pending_list.count() == before_count - 1 diff --git a/tests/test_password_lifecycle.py b/tests/test_password_lifecycle.py new file mode 100644 index 0000000..dc7bf4b --- /dev/null +++ b/tests/test_password_lifecycle.py @@ -0,0 +1,133 @@ +"""Regression tests for R5/D2 — cached PDF password is wiped on close. + +Pre-fix ``self._pdf_password`` persisted in memory until the BasePage +/ EditorTab / ViewerPanel object was GC'd. A heap dump after the user +finished viewing a confidential PDF could surface the password as a +plain string. The fix adds ``_clear_pdf_password()`` (best-effort +wipe) and wires it into every close path. + +Python str immutability prevents a real zero-scrub of the original +bytes; the helper's job is to drop the *only reachable* reference +and free the attribute slot. Memory scanners may still see lingering +copies in the interner — documented as a known limitation. +""" + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +# ── _clear_pdf_password() on BasePage ──────────────────────────────────── + + +def _make_dummy_base(): + """Build a BasePage-like stub WITHOUT invoking the Qt constructor. + + The helper only touches ``self._pdf_password``; we don't need a + QApplication for that. + """ + from app.base import BasePage + + class _Stub: + _pdf_password = "" + _clear_pdf_password = BasePage._clear_pdf_password + + return _Stub() + + +def test_clear_pwd_zeroes_attribute(): + """After clear, the attribute must be the empty string.""" + s = _make_dummy_base() + s._pdf_password = "topsecret" + s._clear_pdf_password() + assert s._pdf_password == "" + + +def test_clear_pwd_idempotent(): + """Calling clear on an already-empty attribute is a no-op.""" + s = _make_dummy_base() + s._pdf_password = "" + s._clear_pdf_password() + assert s._pdf_password == "" + s._clear_pdf_password() # twice, just to be sure + assert s._pdf_password == "" + + +def test_clear_pwd_handles_long_strings(): + """Long passwords (above the small-string optimisation threshold) + must wipe the same way short ones do.""" + s = _make_dummy_base() + s._pdf_password = "a" * 4096 + s._clear_pdf_password() + assert s._pdf_password == "" + + +def test_clear_pwd_handles_unicode(): + """Multibyte passwords (emoji, accented chars) must not break the + ctypes buffer hint.""" + s = _make_dummy_base() + s._pdf_password = "señha-€-🔑" + s._clear_pdf_password() + assert s._pdf_password == "" + + +def test_clear_pwd_handles_missing_attr(): + """If somebody calls the helper before _pdf_password was ever set + (subclass init order edge case), it must not raise.""" + from app.base import BasePage + + class _Stub: + _clear_pdf_password = BasePage._clear_pdf_password + + s = _Stub() + s._clear_pdf_password() + # Helper assigned the empty string to keep the attribute + # always-defined for the rest of the lifecycle. + assert s._pdf_password == "" + + +# ── EditorTab carries its own helper ───────────────────────────────────── + + +def test_editor_tab_close_wires_password_wipe(): + """Static guard: EditorTab._close_pdf must call + ``self._clear_pdf_password()`` so the cached password is dropped + when the user closes the editor's PDF.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "editor" / "tab.py").read_text(encoding="utf-8") + close_idx = src.find("def _close_pdf(self):") + assert close_idx > 0 + body = src[close_idx: close_idx + 800] + assert "_clear_pdf_password" in body, \ + "_close_pdf must wipe the cached PDF password" + + +# ── ViewerPanel ────────────────────────────────────────────────────────── + + +def test_viewer_panel_wires_password_wipe(): + """ViewerPanel.load() and closeEvent must both invoke the wipe + helper so neither a new-doc load nor a panel teardown leaves a + stale password reachable.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "viewer" / "panel.py").read_text(encoding="utf-8") + assert "def _clear_pdf_password(" in src, \ + "_clear_pdf_password helper missing from PdfViewerPanel" + # closeEvent must call it. + close_idx = src.find("def closeEvent(self, event):") + assert close_idx > 0, "PdfViewerPanel.closeEvent missing" + close_body = src[close_idx: close_idx + 400] + assert "_clear_pdf_password" in close_body, \ + "closeEvent must wipe the cached password" + + +def test_base_page_clear_pwd_in_source(): + """Static guard: BasePage._clear_pdf_password must exist (other + test_pdfapps lookups depend on it being on the base class).""" + src = (Path(__file__).resolve().parent.parent + / "app" / "base.py").read_text(encoding="utf-8") + assert "def _clear_pdf_password(" in src + assert "self._pdf_password = \"\"" in src diff --git a/tests/test_pdfapps.py b/tests/test_pdfapps.py index 600729c..bc8024a 100644 --- a/tests/test_pdfapps.py +++ b/tests/test_pdfapps.py @@ -11,6 +11,13 @@ # Make the project root importable so `from app.utils import ...` works. sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +# Anchor source-file lookups to the repo root so the suite passes from +# any cwd (IDEs, CI matrices, sub-directories). Previously several +# tests opened ``"app/base.py"`` etc. via a path relative to the +# process cwd — running pytest from anywhere other than the repo root +# broke them with FileNotFoundError. +_REPO_ROOT = Path(__file__).resolve().parent.parent + from pypdf import PdfReader, PdfWriter # Import the real parse_pages from app.utils so the test catches drift — @@ -591,7 +598,7 @@ def test_toast_guard_uses_shiboken_not_qpointer(self): # PySide6 has no QPointer — the toast hide-timer must guard the # widget liveness check via shiboken6.isValid(). Importing # QPointer from PySide6 would fail with ImportError at load. - src = open("app/base.py", encoding="utf-8").read() + src = open(_REPO_ROOT / "app" / "base.py", encoding="utf-8").read() assert "from shiboken6 import isValid" in src assert "isValid(t)" in src # No QPointer import or instantiation — only the explanatory @@ -606,7 +613,7 @@ def test_restart_app_handles_pyinstaller_frozen(self): # _restart_app must branch on sys.frozen — using # os.path.dirname(__file__) + "pdfapps.py" breaks in frozen # bundles because __file__ points inside _MEIPASS. - src = open("app/window.py", encoding="utf-8").read() + src = open(_REPO_ROOT / "app" / "window.py", encoding="utf-8").read() # The fixed implementation references sys.frozen. assert 'getattr(sys, "frozen"' in src, \ "_restart_app must check sys.frozen" @@ -618,7 +625,7 @@ def test_restart_app_handles_pyinstaller_frozen(self): def test_pdfapps_spec_reads_version_dynamically(self): # Avoids drift between APP_VERSION and the macOS BUNDLE # CFBundleVersion / CFBundleShortVersionString. - spec = open("pdfapps.spec", encoding="utf-8").read() + spec = open(_REPO_ROOT / "pdfapps.spec", encoding="utf-8").read() assert "_app_version" in spec assert "APP_VERSION" in spec # parsed from app/constants.py assert "CFBundleVersion': '1.13" not in spec, \ @@ -627,7 +634,7 @@ def test_pdfapps_spec_reads_version_dynamically(self): def test_installer_pins_third_party_hashes(self): # Tesseract and Ghostscript exes are downloaded and run with # admin — they MUST be hash-pinned in installer.py. - src = open("installer.py", encoding="utf-8").read() + src = open(_REPO_ROOT / "installer.py", encoding="utf-8").read() assert "TESSERACT_SHA256" in src assert "GHOSTSCRIPT_SHA256" in src assert "hmac.compare_digest" in src @@ -689,7 +696,7 @@ def test_editor_handles_encrypted_pdfs(self): # The editor's _load_pdf must prompt for a password and pass it # through to the canvas. The audit flagged this as broken — the # job opened with fitz.open without authenticate(). - src = open("app/editor/tab.py", encoding="utf-8").read() + src = open(_REPO_ROOT / "app" / "editor" / "tab.py", encoding="utf-8").read() # _load_pdf integrates the password prompt i = src.find("def _load_pdf(self, p: str):") assert i > 0 @@ -699,7 +706,7 @@ def test_editor_handles_encrypted_pdfs(self): assert "password=self._pdf_password" in block, \ "canvas.load must receive the password" # The canvas job must accept and apply the password - canvas_src = open("app/editor/canvas.py", encoding="utf-8").read() + canvas_src = open(_REPO_ROOT / "app" / "editor" / "canvas.py", encoding="utf-8").read() assert "doc.authenticate(self._password)" in canvas_src, \ "_EditPageJob.run must authenticate the document" @@ -711,7 +718,7 @@ def test_taskrunner_cancel_uses_lambda_wrap(self): # queue never drains. The lambda wrap forces a plain Python # call on the dialog's thread (main), which mutates the flag # immediately. Pin this so it can't be "simplified" back. - worker_src = open("app/worker.py", encoding="utf-8").read() + worker_src = open(_REPO_ROOT / "app" / "worker.py", encoding="utf-8").read() assert "lambda: runner.cancel()" in worker_src, \ "cancel must be wrapped in a lambda; bare bound method gets queued" @@ -744,7 +751,7 @@ def test_draw_ink_annot_uses_tuple_points(self): # ValueError: arg must be seq of seq of float pairs. # tab.py builds the stroke as plain (float, float) tuples now; # this test fails if anyone reintroduces fitz.Point wrapping. - src = open("app/editor/tab.py", encoding="utf-8").read() + src = open(_REPO_ROOT / "app" / "editor" / "tab.py", encoding="utf-8").read() # Locate the draw branch i = src.find('elif e["type"] == "draw":') assert i > 0, "draw branch missing in tab.py" @@ -759,9 +766,9 @@ def test_flatpak_manifest_tag_is_current(self): # Bump script now keeps it in sync; this test ensures it matches # APP_VERSION at any given point. import re - const = open("app/constants.py", encoding="utf-8").read() + const = open(_REPO_ROOT / "app" / "constants.py", encoding="utf-8").read() version = re.search(r'APP_VERSION\s*=\s*"([^"]+)"', const).group(1) - manifest = open("flatpak/io.github.nelsonduarte.PDFApps.yml", + manifest = open(_REPO_ROOT / "flatpak" / "io.github.nelsonduarte.PDFApps.yml", encoding="utf-8").read() assert f"tag: v{version}" in manifest, \ f"Flatpak manifest tag must match APP_VERSION ({version})" diff --git a/tests/test_signature_perms.py b/tests/test_signature_perms.py new file mode 100644 index 0000000..e7ecfa4 --- /dev/null +++ b/tests/test_signature_perms.py @@ -0,0 +1,78 @@ +"""Regression tests for R5/C1 — signature file 0o600 hardening. + +The persistent signature is cached under ``~/.pdfapps_signature.png`` +(macOS/Windows) or ``$XDG_CONFIG_HOME/pdfapps/signature.png`` (Linux). +Pre-fix it was saved with the default umask (0o644 on most distros), +so any other local user could read it on multi-user hosts. The fix +calls ``os.chmod(path, 0o600)`` right after the copy. + +POSIX-only — Windows file permissions are governed by NTFS ACLs and +the user profile inherits owner-only access by default, so the chmod +becomes a no-op there. We skip the mode assertion on Windows but +still verify the source carries the chmod call. +""" + +import os +import sys +import tempfile +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +# ── Source-level guard (runs everywhere) ───────────────────────────────── + + +def test_save_signature_calls_chmod_600(): + """The production helper must apply 0o600 after the copy.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "i18n.py").read_text(encoding="utf-8") + # Find the save_signature function body. + i = src.find("def save_signature(") + assert i > 0, "save_signature helper missing from app/i18n.py" + body = src[i: i + 800] + assert "os.chmod" in body, "save_signature must restrict perms" + assert "0o600" in body, "save_signature must use owner-only mode" + + +def test_signature_dialog_chmods_tmp(): + """The in-flight tmp signature produced by _SignatureDialog must + also be 0o600 — it sits in /tmp until the editor stamps it into + the PDF, and could otherwise be read by other local users.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "editor" / "dialogs.py").read_text(encoding="utf-8") + i = src.find("def _on_accept(") + assert i > 0, "_on_accept handler missing from dialogs.py" + body = src[i: i + 2000] + assert "0o600" in body, "tmp signature must be chmod 0o600 before accept" + + +# ── Functional check (POSIX only) ──────────────────────────────────────── + + +@pytest.mark.skipif(sys.platform == "win32", + reason="Windows file perms are ACL-based; chmod is a no-op") +def test_save_signature_results_in_user_only_mode(tmp_path, monkeypatch): + """End-to-end: calling save_signature on a fresh image must leave + the cached file with mode 0o600 — owner read/write only.""" + # Point the i18n module at an isolated cache so we don't clobber the + # user's real saved signature during the test. + from app import i18n + cache_dir = tmp_path / "pdfapps_cfg" + cache_dir.mkdir() + fake_path = cache_dir / "signature.png" + monkeypatch.setattr(i18n, "_SIGNATURE_PATH", str(fake_path)) + + # Create a source image with a permissive mode so the chmod under + # test has something to actually narrow. + src = tmp_path / "source.png" + src.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 32) + os.chmod(src, 0o644) + + i18n.save_signature(str(src)) + + assert fake_path.is_file(), "save_signature must produce the cache file" + mode = os.stat(fake_path).st_mode & 0o777 + assert mode == 0o600, f"expected 0o600, got {oct(mode)}" diff --git a/tests/test_updater.py b/tests/test_updater.py index f54b222..756718a 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -330,3 +330,173 @@ def test_download_cancel_holder_resp_cleared_on_success(self, tmp_path): cancel_holder=cancel_holder) assert cancel_holder["resp"] is None assert len(signals.finished.emissions) == 1 + + +# ── R5/F3: _apply_update_unix tempfile cleanup ───────────────────────── + + +class TestApplyUpdateUnixCleanup: + """Verifies that _apply_update_unix wraps its body in try/finally + so the ~100 MB downloaded installer tempfile is removed even on + failed applies (R5/F3). Pre-fix os.remove(downloaded) only ran + on the success path, leaking the artefact in /tmp on every retry. + """ + + def test_apply_update_unix_uses_try_finally(self): + """Source guard: the function body must contain a finally + block that unlinks ``downloaded``.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "updater.py").read_text(encoding="utf-8") + i = src.find("def _apply_update_unix(") + assert i > 0 + body = src[i: i + 4000] + assert "finally:" in body, \ + "_apply_update_unix must wrap cleanup in try/finally" + # The cleanup must reference downloaded (or alias) inside the + # finally — verify by checking the unlink/remove call appears + # AFTER a finally: keyword. + finally_idx = body.index("finally:") + post = body[finally_idx:] + assert "downloaded" in post, \ + "finally block must reference the downloaded tempfile" + assert ("os.unlink(downloaded)" in post + or "os.remove(downloaded)" in post), \ + "finally block must unlink the downloaded tempfile" + + +# ── R6/B6-B7: UpdateDialog signal duplication on retry ────────────────── + + +class TestUpdateDialogSignalLifecycle: + """Pre-fix the dialog reused a single ``_Signals`` instance across + retries; the second download reconnected finished/error/cancelled + on top of the prior thread's connections. When the second download + completed, Qt delivered the signal to BOTH the live and the + historical (already deleted) threads — warnings or crash. + + The fix creates a fresh _Signals instance per ``_start_download`` + call. These tests guard that behaviour at the source level. + """ + + def test_signals_created_per_download(self): + src = (Path(__file__).resolve().parent.parent + / "app" / "updater.py").read_text(encoding="utf-8") + # __init__ no longer pre-creates the _Signals object — it sets + # the attribute to None and waits for _start_download. + init_idx = src.find("def __init__(self, release: dict, parent=None):") + assert init_idx > 0 + init_body = src[init_idx: src.find("def _start_download", init_idx)] + # Either the attribute is initialised to None or no _Signals() + # call appears in __init__ at all — both prove signals are no + # longer reused. + assert (": _Signals | None = None" in init_body + or "self._signals = None" in init_body), \ + "__init__ must defer _Signals creation to _start_download" + + def test_start_download_creates_new_signals(self): + src = (Path(__file__).resolve().parent.parent + / "app" / "updater.py").read_text(encoding="utf-8") + sd_idx = src.find("def _start_download") + assert sd_idx > 0 + body = src[sd_idx: sd_idx + 3000] + assert "self._signals = _Signals()" in body, \ + "_start_download must instantiate a fresh _Signals per run" + # And the previous one must be deleted/dropped — confirm a + # deleteLater call (or explicit disconnect) appears before + # reassignment. + assert "deleteLater" in body, \ + "previous _signals must be released (deleteLater)" + + def test_no_signals_construction_outside_start_download(self): + """Belt-and-suspenders: only _start_download may instantiate + _Signals on the dialog. Other call sites would re-introduce + the cross-pollination bug.""" + src = (Path(__file__).resolve().parent.parent + / "app" / "updater.py").read_text(encoding="utf-8") + # The class body itself may still expose the type, but + # "self._signals = _Signals()" should appear exactly once + # (inside _start_download). + assert src.count("self._signals = _Signals()") == 1, \ + "self._signals = _Signals() must appear exactly once" + + +# ── Live behaviour: signal de-pollution across simulated retries ──────── + + +class _LegacyDialog: + """Toy reproduction of the pre-fix bug to confirm the post-fix + behaviour: holds a single _Signals object across "retries" and + reconnects on top. After two starts the finished signal is delivered + twice — that's exactly the breakage.""" + + def __init__(self): + self.received = [] + self._signals = _Signals() + self._signals.finished.connect(self._on_finished) + + def _on_finished(self, path): + self.received.append(path) + + def start(self): + # Same pattern as the pre-fix code path: reconnects on every + # retry without disconnecting prior connections. + self._signals.finished.connect(self._on_finished) + + +class _FixedDialog: + """Same shape as the production fix — fresh _Signals per start.""" + + def __init__(self): + self.received = [] + self._signals = None + + def _on_finished(self, path): + self.received.append(path) + + def start(self): + if self._signals is not None: + try: + self._signals.deleteLater() + except RuntimeError: + pass + self._signals = _Signals() + self._signals.finished.connect(self._on_finished) + + +@pytest.fixture(scope="module") +def qt_app(): + """Live Qt tests need a QApplication. Module-scope so we don't + pay the construction cost per test.""" + try: + from PySide6.QtWidgets import QApplication + except ImportError: + pytest.skip("PySide6 unavailable") + app = QApplication.instance() or QApplication([]) + yield app + + +class TestSignalsLifecycleLive: + def test_legacy_pattern_double_fires(self, qt_app): + """Sanity: the pre-fix pattern really did double-fire after a + retry. Documents the bug we are fixing.""" + d = _LegacyDialog() + d.start() # mimics _start_download + d._signals.finished.emit("a.bin") + # The fixed dialog must NOT have this behaviour. + assert len(d.received) >= 2, \ + "legacy pattern is expected to double-fire (confirms the bug)" + + def test_fixed_pattern_fires_once_per_emit(self, qt_app): + """The production fix isolates each download's signals so a + single emit triggers a single handler call no matter how many + retries preceded it.""" + d = _FixedDialog() + d.start() + d._signals.finished.emit("a.bin") + first = list(d.received) + d.start() # simulate retry + d._signals.finished.emit("b.bin") + # First emit produced exactly one record; second emit added + # exactly one more. No cross-pollination. + assert first == ["a.bin"] + assert d.received == ["a.bin", "b.bin"] diff --git a/tests/test_viewer_safety.py b/tests/test_viewer_safety.py new file mode 100644 index 0000000..8c78fd3 --- /dev/null +++ b/tests/test_viewer_safety.py @@ -0,0 +1,155 @@ +"""Smoke tests for viewer safety fixes (CRIT-2, CRIT-3, B1, B-extra). + +These bugs all live inside Qt widget code paths that require a real +event loop to exercise end-to-end (right-click menu, QPrintDialog, +paintEvent races). The tests here are source-level guards: they read +the production files and confirm the fixes are still present so a +future refactor can't silently regress them. Pair with manual QA on +the next release candidate. +""" + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +CANVAS = (Path(__file__).resolve().parent.parent + / "app" / "viewer" / "canvas.py").read_text(encoding="utf-8") +PANEL = (Path(__file__).resolve().parent.parent + / "app" / "viewer" / "panel.py").read_text(encoding="utf-8") + + +# ── CRIT-2: saveIncr requires confirmation + try/except ───────────────── + + +def test_save_incr_is_inside_try_except(): + """saveIncr must be wrapped in a try/except that routes to + show_error — read-only PDFs / permission errors must not crash + the Qt event loop.""" + # All occurrences of saveIncr() must appear inside a try-block + # with the corresponding except in the contextMenuEvent path. + assert "self._doc.saveIncr()" in CANVAS + # Cheap but effective: the production fix introduces a try: above + # the saveIncr call and an except Exception below. + idx = CANVAS.index("self._doc.saveIncr()") + before = CANVAS[max(0, idx - 800): idx] + after = CANVAS[idx: idx + 400] + assert "try:" in before, "saveIncr is no longer inside a try block" + assert "except Exception" in after, "saveIncr error path is missing" + assert "show_error" in after, "saveIncr errors no longer route to show_error" + + +def test_delete_comment_asks_for_confirmation(): + """contextMenuEvent must prompt via QMessageBox.question with + default=No before destroying a comment.""" + assert "QMessageBox.question" in CANVAS + assert "viewer.confirm_delete_comment" in CANVAS + # Default button must be No so an accidental Enter press cancels. + assert "QMessageBox.StandardButton.No" in CANVAS + + +# ── B-extra: stale _open_note tuple after delete ───────────────────────── + + +def test_open_note_index_is_decremented_after_delete(): + """When a note above the open one is deleted, _open_note's index + must shift down — otherwise reopening shows the wrong comment.""" + assert "open_idx - 1" in CANVAS, "stale _open_note index not adjusted" + + +# ── CRIT-3: print pixmap handles alpha/CMYK and avoids race ───────────── + + +def test_print_pixmap_uses_alpha_false_and_csrgb_fallback(): + """Print path must: + - request alpha=False so we never end up reading RGBA as RGB888; + - fall back to fitz.Pixmap(csRGB, pix) for CMYK/greyscale (n != 3); + - call .copy() on the QImage so the painter doesn't draw from the + pixmap buffer after pix is freed on the next iteration.""" + assert "alpha=False" in PANEL + assert "fitz.csRGB" in PANEL + assert "pix.n != 3" in PANEL + # QImage(...).copy() is the lifetime-decoupling bit. + assert ").copy()" in PANEL + + +# ── B1: viewer load() close→reopen race ───────────────────────────────── + + +def test_close_doc_called_when_loading_new_doc(): + """panel.load() must funnel the old-doc teardown through + _canvas.close_doc() so the canvas clears _doc + bumps _gen BEFORE + the underlying fitz.Document is closed. Without that ordering, a + paintEvent/_on_page_ready queued between close and the next load + touches a freed Document and raises ``RuntimeError: document + closed``.""" + assert "self._canvas.close_doc()" in PANEL + # The panel must NOT also call _fitz_doc.close() in addition to + # close_doc(), because the canvas helper already closes the + # underlying document — a second close raises. + load_idx = PANEL.index("def load(self, path: str)") + load_body = PANEL[load_idx: load_idx + 1200] + assert "self._canvas.close_doc()" in load_body, "close_doc no longer routed via canvas" + + +def test_canvas_close_doc_clears_doc_and_bumps_gen(): + """The canvas-side close_doc must drop self._doc and bump self._gen + so any in-flight render jobs that complete after this point are + discarded by _on_page_ready.""" + cd_idx = CANVAS.index("def close_doc(self)") + body = CANVAS[cd_idx: cd_idx + 800] + assert "self._doc = None" in body + assert "self._gen" in body + + +# ── R7/N7-H1: print honours range, copies, and reverse order ──────────── + + +def test_print_respects_from_page_to_page(): + """The print loop must read printer.fromPage()/toPage() and + iterate only that range — pre-fix it always printed every page.""" + print_idx = PANEL.index("def _print_pdf") + body = PANEL[print_idx: print_idx + 3500] + assert "printer.fromPage()" in body, \ + "print path must honour QPrintDialog fromPage()" + assert "printer.toPage()" in body, \ + "print path must honour QPrintDialog toPage()" + + +def test_print_respects_copy_count(): + """copyCount() must drive an outer copy loop so the user gets the + requested number of copies.""" + print_idx = PANEL.index("def _print_pdf") + body = PANEL[print_idx: print_idx + 3500] + assert "copyCount()" in body, \ + "print path must honour QPrintDialog copyCount()" + # The copies loop must wrap the page iteration; cheapest check is + # that "for copy in range" appears inside the print function. + assert "for copy in range" in body, \ + "print path must repeat the page loop copyCount() times" + + +def test_print_respects_reverse_order(): + """LastPageFirst page order must reverse the iteration list.""" + print_idx = PANEL.index("def _print_pdf") + body = PANEL[print_idx: print_idx + 3500] + assert "pageOrder()" in body, \ + "print path must read pageOrder() from the printer" + assert "LastPageFirst" in body, \ + "print path must branch on the reverse-order enum" + assert "reversed" in body, \ + "print path must actually reverse the page list" + + +def test_print_loop_preserves_pixmap_safety_fix(): + """The R7-H1 patch must NOT regress CRIT-3 (alpha=False, csRGB + fallback, .copy()) which lives in the same loop body.""" + print_idx = PANEL.index("def _print_pdf") + body = PANEL[print_idx: print_idx + 3500] + assert "alpha=False" in body + assert "fitz.csRGB" in body + assert "pix.n != 3" in body + assert ").copy()" in body