In [1]:
import json
import re
import hashlib
from pathlib import Path
from typing import Dict, Any, List, Tuple


In [2]:
# ----------------------------
# TEXT CLEANING (safe, no info loss)
# ----------------------------
FOOTER_RE = re.compile(r"(I\s*DE\s*VINCI\s*ENGINEERING\s*SCHOOL\s*ESILV\s*\d+)", re.IGNORECASE)
POLE_RE = re.compile(r"(P[oô]le\s*L[eé]onard\s*de\s*Vinci)", re.IGNORECASE)

def normalize_spaces(s: str) -> str:
    # Keep content, just normalize whitespace
    s = s.replace("\u00a0", " ")  # NBSP
    s = re.sub(r"[ \t]+", " ", s)
    s = re.sub(r"\s*\n\s*", "\n", s)   # clean around newlines
    s = re.sub(r"\n{3,}", "\n\n", s)   # limit empty lines
    return s.strip()

def extract_footer_candidates(text: str) -> Tuple[str, List[str]]:
    """
    Do NOT remove info: we keep text_raw untouched in output.
    Here we only "detect" footer candidates and also produce a 'text_wo_footer_norm'
    for better embeddings if you want it later.
    """
    footers = FOOTER_RE.findall(text)
    # Build a version where footer strings are removed ONLY in the derived field.
    text_wo = text
    for f in set(footers):
        text_wo = re.sub(re.escape(f), " ", text_wo, flags=re.IGNORECASE)
    text_wo = normalize_spaces(text_wo)
    return text_wo, sorted(set(footers))

def basic_stats(s: str) -> Dict[str, int]:
    words = re.findall(r"\b\w+\b", s, flags=re.UNICODE)
    return {
        "chars": len(s),
        "words": len(words),
        "lines": s.count("\n") + (1 if s else 0),
    }

def stable_id(*parts: str) -> str:
    h = hashlib.sha1("||".join(parts).encode("utf-8")).hexdigest()
    return h[:16]

In [3]:
# ----------------------------
# CHUNKING (no info loss)
# - Keeps raw; chunks use normalized (optionally without detected footer)
# ----------------------------
def chunk_text(text: str, max_chars: int = 1200, overlap: int = 150) -> List[str]:
    """
    Chunk by paragraphs/sentences-ish without deleting content.
    """
    if not text:
        return []

    # Split on blank lines first (paragraphs)
    paras = re.split(r"\n\s*\n", text)
    paras = [p.strip() for p in paras if p.strip()]

    chunks = []
    buf = ""

    def flush_buf(b: str):
        if b.strip():
            chunks.append(b.strip())

    for p in paras:
        if len(buf) + len(p) + 2 <= max_chars:
            buf = (buf + "\n\n" + p).strip() if buf else p
        else:
            # If paragraph itself too large, split further
            if buf:
                flush_buf(buf)
                buf = ""

            if len(p) <= max_chars:
                buf = p
            else:
                # Split long paragraph on sentences / punctuation (fallback: hard split)
                parts = re.split(r"(?<=[\.\!\?\:;])\s+", p)
                tmp = ""
                for part in parts:
                    part = part.strip()
                    if not part:
                        continue
                    if len(tmp) + len(part) + 1 <= max_chars:
                        tmp = (tmp + " " + part).strip() if tmp else part
                    else:
                        flush_buf(tmp)
                        tmp = part
                flush_buf(tmp)

    flush_buf(buf)

    # Add overlap (character-based) to keep context continuity
    if overlap > 0 and len(chunks) > 1:
        overlapped = [chunks[0]]
        for i in range(1, len(chunks)):
            prev = overlapped[-1]
            tail = prev[-overlap:] if len(prev) > overlap else prev
            overlapped.append((tail + "\n" + chunks[i]).strip())
        chunks = overlapped

    return chunks

In [5]:
# ----------------------------
# MAIN TRANSFORM
# ----------------------------
def improve_pdf_json(data: Dict[str, Any],
                     max_chars_per_chunk: int = 1200,
                     overlap: int = 150) -> Dict[str, Any]:

    out = {
        "rubric": data.get("rubric", ""),
        "documents": []
    }

    for doc in data.get("documents", []):
        pdf_name = doc.get("pdf_name", "")
        id_pdf = doc.get("id_pdf", doc.get("pdf_id", doc.get("pdf", "")))

        new_doc = {
            "pdf_name": pdf_name,
            "id_pdf": id_pdf,
            # keep original pages, but enrich them
            "pages": [],
            # add chunks for embedding
            "chunks": []
        }

        for page_obj in doc.get("pages", []):
            page_num = str(page_obj.get("page", ""))
            raw = page_obj.get("text", "") or ""

            raw_norm = normalize_spaces(raw)
            wo_footer_norm, footer_hits = extract_footer_candidates(raw_norm)

            page_enriched = {
                "page": page_obj.get("page"),
                "text_raw": raw,                 # EXACT original OCR
                "text_norm": raw_norm,           # whitespace-normalized
                "text_norm_wo_footer": wo_footer_norm,  # derived field (optional for embedding)
                "footer_candidates": footer_hits,        # detected, not removed from raw
                "stats_raw": basic_stats(raw),
                "stats_norm": basic_stats(raw_norm),
            }
            new_doc["pages"].append(page_enriched)

            # Build embedding input from normalized-without-footer (better retrieval),
            # but you still keep the raw in the doc.
            # If you prefer, switch to raw_norm (keep footer) for chunks too.
            chunks = chunk_text(wo_footer_norm, max_chars=max_chars_per_chunk, overlap=overlap)

            for ci, c in enumerate(chunks):
                chunk_id = stable_id(pdf_name, str(id_pdf), page_num, str(ci))
                new_doc["chunks"].append({
                    "chunk_id": chunk_id,
                    "page": page_obj.get("page"),
                    "chunk_index": ci,
                    "text": c,
                    "stats": basic_stats(c),
                    # provenance
                    "pdf_name": pdf_name,
                    "id_pdf": id_pdf,
                    "rubric": data.get("rubric", ""),
                })

        out["documents"].append(new_doc)

    return out

In [8]:
import os
os.listdir("../../data/scraping_esilv")

['admissions.json',
 'entreprises-debouches.json',
 'formations.json',
 'full_pdfs.json',
 'full_pdfs.txt',
 'international.json',
 'lecole.json',
 'recherche.json']

In [9]:
if __name__ == "__main__":
    # INPUT / OUTPUT
    in_path = Path("../../data/scraping_esilv/full_pdfs.json")
    out_path = Path("../../data/scraping_esilv/full_pdfs_improved.json")

    data = json.loads(in_path.read_text(encoding="utf-8"))
    improved = improve_pdf_json(data, max_chars_per_chunk=1200, overlap=150)

    out_path.write_text(json.dumps(improved, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"OK -> {out_path} | docs={len(improved.get('documents', []))}")

OK -> ..\..\data\scraping_esilv\full_pdfs_improved.json | docs=7
