# Studium Parisiense → Dataset “LLM‑ready” (JSONL chunks)

Ce notebook transforme un export **JSON Lines** (1 profil par ligne) issu de Studium Parisiense en un dataset plus simple à donner à un LLM.

**Entrée :**
- Un fichier `.jsonl` (chaque ligne = 1 fiche)

**Sorties (dans `OUTPUT_DIR/`) :**
1. `studium_llm_chunks.jsonl`  
   → dataset “LLM‑ready” : **1 ligne = 1 chunk de texte** avec métadonnées (`person_reference`, `url`, `doc_type`, etc.)
2. `studium_field_paths_coverage.csv`  
   → inventaire des attributs (chemins) + fréquence de présence
3. `studium_top_level_sections.csv`  
   → fréquence des sections top‑level

> Pourquoi “chunker” ?  
> Certaines fiches (ex. `textualProduction`) peuvent être très longues : chunker évite de dépasser la fenêtre de contexte et facilite un pipeline stable (extraction entités‑relations, RAG, etc.).

In [None]:
# --- Imports & configuration ---
import json
import os
import re
from collections import Counter
from typing import Any, Dict, List, Set

import pandas as pd

try:
    from tqdm.auto import tqdm
except Exception:
    def tqdm(x, **kwargs):
        return x

INPUT_JSONL = "studium_parisiense_dataset.jsonl"

BASE_URL = "http://studium-parisiense.univ-paris1.fr"

OUTPUT_DIR = "llm_ready_outputs"

# Taille max d'un chunk (caractères)
MAX_CHARS = 2500

INCLUDE_META = True          # ajoute un mini-résumé des meta (dates/lieux/noms) si dispo
INCLUDE_SOURCES = False      # ajoute les champs 'reference' (souvent très longs)

# Sections qu'on ignore en général pour le texte LLM (bruit / HTML / redondant)
SKIP_TOP_LEVEL_KEYS = {"_id", "raw", "extras"}

BIO_SECTION_ORDER = [
    "identity",
    "origin",
    "curriculum",
    "ecclesiasticalCareer",
    "professionalCareer",
    "relationalInsertion",
    # d'autres sections peuvent exister : elles seront ajoutées après automatiquement
]

In [5]:
# --- Helpers (nettoyage / rendu en texte / inventaire d'attributs) ---

def clean_markup(s: str) -> str:
    # Nettoie les marqueurs typiques ($...$, %...%, &...&) et espaces
    if not isinstance(s, str):
        return s
    s = s.replace("\t", " ").replace("\n", " ")
    s = s.replace("$", "").replace("%", "").replace("&", "")
    s = re.sub(r"\s+", " ", s).strip()
    return s

def summarize_meta(meta: Dict[str, Any]) -> str:
    # Résumé compact des meta utiles (dates/places/names/titles) si présentes
    if not meta or not isinstance(meta, dict):
        return ""

    parts = []

    dates = meta.get("dates")
    if isinstance(dates, list) and dates:
        def _fmt_date(d: Dict[str, Any]) -> str:
            if not isinstance(d, dict):
                return ""
            t = d.get("type")
            if t == "SIMPLE":
                return str(d.get("date", "")).strip()
            if t == "BEFORE":
                return f"≤{d.get('date','')}"
            if t == "AFTER":
                return f"≥{d.get('date','')}"
            if t == "INTERVAL":
                sd = d.get("startDate", {}) or {}
                ed = d.get("endDate", {}) or {}
                s1 = sd.get("date", "")
                e1 = ed.get("date", "")
                if s1 or e1:
                    return f"{s1}-{e1}"
            return ""

        dstr = ", ".join([x for x in (_fmt_date(x) for x in dates) if x])
        if dstr:
            parts.append(f"dates: {dstr}")

    for key, label in [("places", "lieux"), ("names", "noms"), ("titles", "titres")]:
        arr = meta.get(key)
        if isinstance(arr, list) and arr:
            arr2 = [clean_markup(str(x)) for x in arr if str(x).strip()]
            if arr2:
                head = ", ".join(arr2[:6])
                if len(arr2) > 6:
                    head += f", …(+{len(arr2)-6})"
                parts.append(f"{label}: {head}")

    return " | ".join(parts)

def render_item(item: Any) -> str:
    # Rend un 'item' (souvent dict avec value/meta/reference) en texte
    if item is None:
        return ""
    if isinstance(item, str):
        return clean_markup(item)

    if isinstance(item, dict):
        val = item.get("value")
        if val is None:
            for k in ("mainTitle", "title", "name"):
                if k in item:
                    val = item.get(k)
                    break

        out = clean_markup(str(val)) if val is not None else ""

        if INCLUDE_META and isinstance(item.get("meta"), dict) and item["meta"]:
            m = summarize_meta(item["meta"])
            if m:
                out = f"{out} ({m})" if out else f"({m})"

        if INCLUDE_SOURCES and item.get("reference"):
            ref = item["reference"]
            if isinstance(ref, list):
                ref = " ; ".join([clean_markup(str(x)) for x in ref if str(x).strip()])
            else:
                ref = clean_markup(str(ref))
            if ref:
                out = f"{out} [source: {ref}]" if out else f"[source: {ref}]"

        return out.strip()

    if isinstance(item, list):
        return " | ".join([x for x in (render_item(x) for x in item) if x])

    return clean_markup(str(item))

def render_field(field_name: str, field_value: Any) -> List[str]:
    # Rend un champ (souvent liste) en lignes de texte
    lines: List[str] = []
    if field_value is None:
        return lines

    if isinstance(field_value, list):
        vals = [render_item(x) for x in field_value]
        vals = [v for v in vals if v]
        if vals:
            if len(vals) == 1:
                lines.append(f"- {field_name}: {vals[0]}")
            else:
                lines.append(f"- {field_name}:")
                for v in vals:
                    lines.append(f"  - {v}")
        return lines

    v = render_item(field_value)
    if v:
        lines.append(f"- {field_name}: {v}")
    return lines

def render_generic_section(section_name: str, section_obj: Any) -> List[str]:
    # Rendu générique d'une section top-level (dict de champs)
    lines: List[str] = [f"[{section_name}]"]
    if isinstance(section_obj, dict):
        for k, v in section_obj.items():
            lines.extend(render_field(k, v))
    elif isinstance(section_obj, list):
        for it in section_obj:
            v = render_item(it)
            if v:
                lines.append(f"- {v}")
    else:
        v = render_item(section_obj)
        if v:
            lines.append(f"- {v}")
    return lines

def render_textual_production(tp: Dict[str, Any]) -> List[List[str]]:
    # Rend textualProduction en blocs (1 bloc ~ 1 oeuvre) pour chunker facilement
    blocks: List[List[str]] = []
    if not isinstance(tp, dict):
        return blocks

    for category, cat_obj in tp.items():
        if not isinstance(cat_obj, dict):
            continue
        opus = cat_obj.get("opus", [])
        if not isinstance(opus, list) or not opus:
            continue

        for work in opus:
            lines: List[str] = [f"[textualProduction] category: {clean_markup(category)}"]
            if isinstance(work, dict):
                main_title = work.get("mainTitle")
                if main_title:
                    lines.append(f"- title: {clean_markup(str(main_title))}")
                elif "title" in work:
                    title_vals = render_item(work.get("title"))
                    if title_vals:
                        lines.append(f"- title: {title_vals}")

                for k, v in work.items():
                    if k in ("mainTitle", "title"):
                        continue
                    if v is None:
                        continue
                    extra_lines = render_field(k, v)
                    if extra_lines:
                        lines.extend(extra_lines)

            blocks.append(lines)

    return blocks

def detect_person_name(rec: Dict[str, Any]) -> str:
    # Nom principal: identity.name[0].value sinon title
    try:
        name_list = rec.get("identity", {}).get("name", [])
        if isinstance(name_list, list) and name_list:
            nm = render_item(name_list[0])
            if nm:
                return nm
    except Exception:
        pass
    return clean_markup(str(rec.get("title", ""))).strip()

def build_full_url(rec: Dict[str, Any]) -> str:
    link = rec.get("link", "") or ""
    link = str(link)
    if link.startswith("http://") or link.startswith("https://"):
        return link
    if link.startswith("/"):
        return BASE_URL.rstrip("/") + link
    if link:
        return BASE_URL.rstrip("/") + "/" + link
    return BASE_URL

def chunk_lines(header_lines: List[str], body_lines: List[str], max_chars: int) -> List[str]:
    # Chunk un ensemble de lignes en plusieurs textes (<= max_chars), header répété à chaque chunk
    chunks: List[str] = []
    header = "\n".join(header_lines).strip() + "\n"
    base_len = len(header)

    current_lines: List[str] = []
    current_len = base_len

    def flush():
        nonlocal current_lines, current_len
        if current_lines:
            chunks.append(header + "\n".join(current_lines).strip() + "\n")
            current_lines = []
            current_len = base_len

    for line in body_lines:
        if len(line) > max_chars - base_len - 10:
            line = line[: max(0, max_chars - base_len - 10)] + "…"

        add_len = len(line) + 1
        if current_lines and (current_len + add_len > max_chars):
            flush()

        current_lines.append(line)
        current_len += add_len

    flush()
    return chunks

def flatten_paths(obj: Any, prefix: str = "") -> Set[str]:
    # Retourne l'ensemble des chemins présents. Les listes sont notées avec [] (sans index).
    paths: Set[str] = set()

    if isinstance(obj, dict):
        for k, v in obj.items():
            p = f"{prefix}.{k}" if prefix else str(k)
            paths.add(p)
            paths |= flatten_paths(v, p)
        return paths

    if isinstance(obj, list):
        p = f"{prefix}[]" if prefix else "[]"
        paths.add(p)
        for it in obj:
            paths |= flatten_paths(it, p)
        return paths

    if prefix:
        paths.add(prefix)
    return paths

In [None]:
# --- Étape 1 : Scanner le JSONL (inventaire sections & attributs) ---

assert os.path.exists(INPUT_JSONL), (
    f"Fichier introuvable: {INPUT_JSONL}\n"
    "Mets le JSONL dans le même dossier que ce notebook ou modifie INPUT_JSONL."
)

os.makedirs(OUTPUT_DIR, exist_ok=True)

top_level_counts = Counter()
path_counts = Counter()
total_records = 0

with open(INPUT_JSONL, "r", encoding="utf-8") as f:
    for line in tqdm(f, desc="Scanning JSONL"):
        line = line.strip()
        if not line:
            continue
        try:
            rec = json.loads(line)
        except Exception:
            continue

        total_records += 1

        # top-level keys (présence par fiche)
        for k in set(rec.keys()):
            top_level_counts[k] += 1

        # chemins (présence par fiche)
        rec_paths = flatten_paths(rec)
        for p in rec_paths:
            path_counts[p] += 1

print("Records:", total_records)
print("Unique top-level keys:", len(top_level_counts))
print("Unique paths:", len(path_counts))

Scanning JSONL: 0it [00:00, ?it/s]

Records: 20149
Unique top-level keys: 21
Unique paths: 7159


In [None]:
# --- Export CSV : couverture des sections & des chemins ---

top_level_df = (
    pd.DataFrame(
        [{"key": k, "count": int(c), "coverage": c / total_records} for k, c in top_level_counts.most_common()]
    )
    .sort_values(["coverage", "key"], ascending=[False, True])
    .reset_index(drop=True)
)

paths_df = (
    pd.DataFrame(
        [{"path": p, "count": int(c), "coverage": c / total_records} for p, c in path_counts.most_common()]
    )
    .sort_values(["coverage", "path"], ascending=[False, True])
    .reset_index(drop=True)
)

top_level_csv = os.path.join(OUTPUT_DIR, "studium_top_level_sections.csv")
paths_csv = os.path.join(OUTPUT_DIR, "studium_field_paths_coverage.csv")

top_level_df.to_csv(top_level_csv, index=False)
paths_df.to_csv(paths_csv, index=False)

print("Saved:", top_level_csv)
print("Saved:", paths_csv)

display(top_level_df.head(15))

✅ Saved: llm_ready_outputs\studium_top_level_sections.csv
✅ Saved: llm_ready_outputs\studium_field_paths_coverage.csv


Unnamed: 0,key,count,coverage
0,_id,20149,1.0
1,extras,20149,1.0
2,identity,20149,1.0
3,link,20149,1.0
4,raw,20149,1.0
5,reference,20149,1.0
6,title,20149,1.0
7,bibliography,20127,0.998908
8,curriculum,19614,0.973448
9,origin,11193,0.555511


In [8]:
# --- Étape 2 : Conversion JSON -> texte chunké (LLM-ready) ---

SECTION_TO_DOCTYPE = {
    "bibliography": "bibliography",
    "textualProduction": "works",
}

def record_to_llm_chunks(rec: Dict[str, Any]) -> List[Dict[str, Any]]:
    ref = str(rec.get("reference", "")).strip()
    name = detect_person_name(rec)
    url = build_full_url(rec)

    header_lines = [
        f"person_reference: {ref}",
        f"person_name: {name}",
        f"url: {url}",
    ]

    # --- BIO = toutes sections sauf bibliography/textualProduction + skip ---
    bio_sections: List[str] = []

    all_section_keys = [k for k in rec.keys() if k not in SKIP_TOP_LEVEL_KEYS]
    all_section_keys = [k for k in all_section_keys if k not in ("reference", "title", "link")]

    seen = set()
    for k in BIO_SECTION_ORDER:
        if k in rec and k not in SECTION_TO_DOCTYPE:
            bio_sections.append(k)
            seen.add(k)
    for k in all_section_keys:
        if k not in seen and k not in SECTION_TO_DOCTYPE and k not in ("_id",):
            bio_sections.append(k)
            seen.add(k)

    bio_lines: List[str] = []
    for sec in bio_sections:
        sec_obj = rec.get(sec)
        if sec_obj is None:
            continue
        bio_lines.extend(render_generic_section(sec, sec_obj))
        bio_lines.append("")

    chunks_out: List[Dict[str, Any]] = []

    # chunk bio
    if bio_lines:
        bio_texts = chunk_lines(header_lines, bio_lines, MAX_CHARS)
        for i, txt in enumerate(bio_texts, start=1):
            chunks_out.append({
                "chunk_id": f"{ref}_bio_{i:04d}",
                "person_reference": ref,
                "person_name": name,
                "url": url,
                "doc_type": "bio",
                "text": txt,
            })

    # bibliography
    if "bibliography" in rec and isinstance(rec.get("bibliography"), dict):
        biblio_lines = render_generic_section("bibliography", rec["bibliography"])
        biblio_texts = chunk_lines(header_lines, biblio_lines, MAX_CHARS)
        for i, txt in enumerate(biblio_texts, start=1):
            chunks_out.append({
                "chunk_id": f"{ref}_bibliography_{i:04d}",
                "person_reference": ref,
                "person_name": name,
                "url": url,
                "doc_type": "bibliography",
                "text": txt,
            })

    # works (textualProduction)
    if "textualProduction" in rec and isinstance(rec.get("textualProduction"), dict):
        blocks = render_textual_production(rec["textualProduction"])

        header = "\n".join(header_lines).strip() + "\n"
        base_len = len(header)
        work_body_lines: List[str] = []
        work_chunks: List[str] = []
        cur_len = base_len

        def flush_work():
            nonlocal work_body_lines, work_chunks, cur_len
            if work_body_lines:
                work_chunks.append(header + "\n".join(work_body_lines).strip() + "\n")
                work_body_lines = []
                cur_len = base_len

        for block in blocks:
            block_lines = block + [""]
            block_len = sum(len(x) + 1 for x in block_lines)

            if work_body_lines and (cur_len + block_len > MAX_CHARS):
                flush_work()

            # si bloc énorme et chunk vide, chunker direct par lignes
            if block_len > MAX_CHARS - base_len and not work_body_lines:
                direct = chunk_lines(header_lines, block, MAX_CHARS)
                work_chunks.extend(direct)
                continue

            work_body_lines.extend(block_lines)
            cur_len += block_len

        flush_work()

        for i, txt in enumerate(work_chunks, start=1):
            chunks_out.append({
                "chunk_id": f"{ref}_works_{i:04d}",
                "person_reference": ref,
                "person_name": name,
                "url": url,
                "doc_type": "works",
                "text": txt,
            })

    return chunks_out

In [9]:
# --- Génération du JSONL chunké (LLM-ready) ---

out_jsonl = os.path.join(OUTPUT_DIR, "studium_llm_chunks.jsonl")

n_chunks = 0
n_records = 0

with open(INPUT_JSONL, "r", encoding="utf-8") as f_in, open(out_jsonl, "w", encoding="utf-8") as f_out:
    for line in tqdm(f_in, desc="Building LLM-ready chunks"):
        line = line.strip()
        if not line:
            continue
        try:
            rec = json.loads(line)
        except Exception:
            continue

        n_records += 1
        chunks = record_to_llm_chunks(rec)

        for ch in chunks:
            f_out.write(json.dumps(ch, ensure_ascii=False) + "\n")
            n_chunks += 1

print(f"✅ Done. Records processed: {n_records}")
print(f"✅ Chunks written: {n_chunks}")
print("✅ Output:", out_jsonl)

Building LLM-ready chunks: 0it [00:00, ?it/s]

✅ Done. Records processed: 20149
✅ Chunks written: 47440
✅ Output: llm_ready_outputs\studium_llm_chunks.jsonl


In [10]:
# --- Vérification rapide : afficher quelques chunks ---

import itertools

with open(out_jsonl, "r", encoding="utf-8") as f:
    sample = list(itertools.islice(f, 3))

for i, line in enumerate(sample, start=1):
    obj = json.loads(line)
    print("="*80)
    print("chunk_id:", obj["chunk_id"], "| doc_type:", obj["doc_type"])
    print(obj["text"][:1200], "...\n")

chunk_id: 15657_bio_0001 | doc_type: bio
person_reference: 15657
person_name: ANCELINUS Galli
url: http://studium-parisiense.univ-paris1.fr/individus/15657-ancelinusgalli
[identity]
- name: ANCELINUS Galli
- nameVariant: Anselinus GALLI (noms: Anselinus GALLI)
- shortDescription: Bachelier Décret
- datesOfActivity: 1435-1435 (dates: 1435-1435)
- gender: male
- status: Maître

[curriculum]
- university: Paris 1435-1435. (dates: 1435-1435 | lieux: Paris)
- grades:
  - Maître ès arts (?Paris) :1435 ; (dates: ≤1435 | lieux: Paris)
  - Bachelier en décret (Paris) en 1435 (9 avril) ; (dates: 1435 | lieux: Paris)
 ...

chunk_id: 15657_bibliography_0001 | doc_type: bibliography
person_reference: 15657
person_name: ANCELINUS Galli
url: http://studium-parisiense.univ-paris1.fr/individus/15657-ancelinusgalli
[bibliography]
- workReferences: FOURNIER: 2, 5 ;
- otherBases: STUDIUM : http://lamop-vs3.univ-paris1.fr/studium/ : Rédaction : Anne Tournieroux ; révision, Jean-Philippe Genet, 23/07/2020.
