In [1]:
import json
import os
from pathlib import Path
import ast

def _detect_format(file_path: str) -> str:
    """
    Détecte le format du fichier : 'array', 'object' ou 'jsonl'.
    """
    with open(file_path, "r", encoding="utf-8") as f:
        # Lire un peu de contenu pour détecter rapidement
        head = f.read(4096)
        if not head:
            raise ValueError("Fichier vide.")

        # Détection JSON classique
        first = head.lstrip()[:1]
        if first in ("[", "{"):
            try:
                _ = json.loads(head)  # si tout petit fichier
                return "array" if first == "[" else "object"
            except json.JSONDecodeError:
                # Si gros fichier, recharge tout pour trancher
                f.seek(0)
                data = json.load(f)
                return "array" if isinstance(data, list) else "object"

        # Sinon JSONL probable : vérifier première ligne non vide
        f.seek(0)
        for line in f:
            s = line.strip()
            if not s:
                continue
            try:
                json.loads(s)
                return "jsonl"
            except json.JSONDecodeError:
                # Parfois l'IA renvoie dict style Python -> tenter ast.literal_eval
                try:
                    ast.literal_eval(s)
                    return "jsonl"
                except Exception:
                    pass
                break

    # Si on arrive ici, on tente un dernier recours : charger entièrement
    with open(file_path, "r", encoding="utf-8") as f:
        try:
            data = json.load(f)
            return "array" if isinstance(data, list) else "object"
        except json.JSONDecodeError:
            return "jsonl"

def _even_splits(total: int, n: int):
    """
    Retourne une liste de tailles de segments qui se répartissent au mieux total éléments sur n parts.
    Exemple: total=10, n=3 -> [4,3,3]
    """
    if n <= 0:
        raise ValueError("n doit être > 0")
    base, rem = divmod(total, n)
    return [(base + 1 if i < rem else base) for i in range(n)]

def split_json_file(
    file_path: str,
    n: int,
    output_dir: str | None = None,
    keep_jsonl_format: bool = True,
) -> list[str]:
    """
    Coupe un fichier JSON en N sous-fichiers et renvoie une liste de textes JSON.
    - Gère tableau JSON, objet JSON et JSONL.
    - Si output_dir est fourni, écrit aussi les sous-fichiers sur disque.
    - Pour JSONL, si keep_jsonl_format=True : sorties au format JSONL.
      Sinon, chaque sortie est un tableau JSON (liste d'objets).

    Returns:
        List[str]: liste des contenus (texte) des N parties.
    """
    fmt = _detect_format(file_path)
    p = Path(file_path)
    if output_dir:
        Path(output_dir).mkdir(parents=True, exist_ok=True)

    outputs: list[str] = []

    if fmt == "array":
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, list):
            raise ValueError("Format détecté 'array' mais contenu non-liste.")

        sizes = _even_splits(len(data), n)
        idx = 0
        for part_id, sz in enumerate(sizes, start=1):
            chunk = data[idx: idx + sz]
            idx += sz
            text = json.dumps(chunk, ensure_ascii=False, indent=2)
            outputs.append(text)

            if output_dir is not None:
                out_path = Path(output_dir) / f"{p.stem}_part{part_id}.json"
                out_path.write_text(text, encoding="utf-8")

        # Si certaines parts sont vides (possible quand n > len(data))
        for part_id in range(len(sizes) + 1, n + 1):
            text = "[]"
            outputs.append(text)
            if output_dir is not None:
                out_path = Path(output_dir) / f"{p.stem}_part{part_id}.json"
                out_path.write_text(text, encoding="utf-8")

        return outputs

    if fmt == "object":
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, dict):
            raise ValueError("Format détecté 'object' mais contenu non-dict.")

        keys = list(data.keys())
        sizes = _even_splits(len(keys), n)
        idx = 0
        for part_id, sz in enumerate(sizes, start=1):
            part_keys = keys[idx: idx + sz]
            idx += sz
            chunk = {k: data[k] for k in part_keys}
            text = json.dumps(chunk, ensure_ascii=False, indent=2)
            outputs.append(text)

            if output_dir is not None:
                out_path = Path(output_dir) / f"{p.stem}_part{part_id}.json"
                out_path.write_text(text, encoding="utf-8")

        # Parts vides si n > nb de clés
        for part_id in range(len(sizes) + 1, n + 1):
            text = "{}"
            outputs.append(text)
            if output_dir is not None:
                out_path = Path(output_dir) / f"{p.stem}_part{part_id}.json"
                out_path.write_text(text, encoding="utf-8")

        return outputs

    # JSONL
    # On lit les lignes non vides et on vérifie qu'elles sont des JSON valides ou des dict Python.
    with open(file_path, "r", encoding="utf-8") as f:
        rows = []
        for line in f:
            s = line.strip()
            if not s:
                continue
            # Normaliser : si ce n'est pas du JSON strict, on tente ast.literal_eval puis dump JSON
            try:
                _ = json.loads(s)
                rows.append(s)
            except json.JSONDecodeError:
                # tenter dict/list style Python
                try:
                    obj = ast.literal_eval(s)
                    rows.append(json.dumps(obj, ensure_ascii=False))
                except Exception:
                    # si une ligne n'est pas parseable, on peut choisir de la sauter ou lever
                    raise ValueError(f"Ligne JSONL non parseable:\n{s}")

    sizes = _even_splits(len(rows), n)
    idx = 0
    for part_id, sz in enumerate(sizes, start=1):
        chunk_rows = rows[idx: idx + sz]
        idx += sz

        if keep_jsonl_format:
            text = "\n".join(chunk_rows) + ("\n" if chunk_rows else "")
            ext = ".jsonl"
        else:
            # Réémettre chaque part comme un tableau JSON
            objs = [json.loads(r) for r in chunk_rows]
            text = json.dumps(objs, ensure_ascii=False, indent=2)
            ext = ".json"

        outputs.append(text)

        if output_dir is not None:
            out_path = Path(output_dir) / f"{p.stem}_part{part_id}{ext}"
            out_path.write_text(text, encoding="utf-8")

    # Parts vides si n > nb de lignes
    for part_id in range(len(sizes) + 1, n + 1):
        if keep_jsonl_format:
            text, ext = "", ".jsonl"
        else:
            text, ext = "[]", ".json"
        outputs.append(text)
        if output_dir is not None:
            out_path = Path(output_dir) / f"{p.stem}_part{part_id}{ext}"
            out_path.write_text(text, encoding="utf-8")

    return outputs


# ----------- Exemple d'utilisation -------------
if __name__ == "__main__":
    # Couper en 3 parties et écrire les fichiers dans ./out
    parts_text = split_json_file("/content/sample_data/52_annotation_empty.json", n=3, output_dir="out", keep_jsonl_format=True)
    print(f"{len(parts_text)} parties générées.")
    # parts_text est une liste de chaînes JSON/JSONL prêtes à l'usage.


3 parties générées.


In [9]:
import json
from pathlib import Path
from typing import Any, Dict, List, Tuple, Union

JsonDict = Dict[str, Any]
PathT = Tuple[str, ...]


def is_measurement_node(node: Any) -> bool:
    """
    Un 'objet' (feuille) est un dict qui contient au moins la clé 'valeur'.
    (On n'impose pas 'unité' pour couvrir les cas qui n'en ont pas.)
    """
    return isinstance(node, dict) and "valeur" in node and "unité" in node



def normalize_consecutive_duplicates(path: PathT) -> PathT:
    """Supprime les doublons consécutifs dans le chemin ('A','A','B' -> 'A','B')."""
    if not path:
        return path
    norm = [path[0]]
    for k in path[1:]:
        if k != norm[-1]:
            norm.append(k)
    return tuple(norm)


def collect_measurements(node: Any, base_path: PathT = ()) -> List[Tuple[PathT, JsonDict]]:
    """
    Parcourt récursivement un JSON et renvoie la liste [(path, leaf_dict), ...]
    où leaf_dict est un objet-mesure (dict avec 'valeur').
    """
    out: List[Tuple[PathT, JsonDict]] = []

    if is_measurement_node(node):
        out.append((base_path, node))  # la feuille est ici
        return out

    if isinstance(node, dict):
        for k, v in node.items():
            out.extend(collect_measurements(v, base_path + (k,)))
    elif isinstance(node, list):
        # si jamais il y a des listes, on indexe (ex: ...,"items","[0]", ...)
        for i, v in enumerate(node):
            out.extend(collect_measurements(v, base_path + (f"[{i}]",)))
    # autres types: rien à faire
    return out


def insert_path(root: JsonDict, path: PathT, value: JsonDict, dedup: bool = True) -> None:
    """Insère value dans root au chemin path (en créant les sous-dicts)."""
    if dedup:
        path = normalize_consecutive_duplicates(path)
    if not path:
        # cas extrême: le noeud racine est lui-même une feuille (peu probable)
        root.update(value)
        return

    cur = root
    for key in path[:-1]:
        if key not in cur or not isinstance(cur[key], dict):
            cur[key] = {}
        cur = cur[key]
    cur[path[-1]] = value


def split_dict_by_measurements(
    data: JsonDict,
    max_objects_per_part: int,
    dedup_consecutive: bool = True,
) -> List[JsonDict]:
    """
    Découpe un dict JSON en parties de <= max_objects_per_part 'feuilles' (objets-mesures).
    Chaque partie contient uniquement les branches nécessaires pour atteindre ses feuilles.
    """
    if max_objects_per_part <= 0:
        raise ValueError("max_objects_per_part doit être > 0")

    leaves = collect_measurements(data)
    if not leaves:
        # Rien à découper : retourne une partie vide
        return [{}]

    parts: List[JsonDict] = []
    for i in range(0, len(leaves), max_objects_per_part):
        chunk = leaves[i:i + max_objects_per_part]
        partial: JsonDict = {}
        for path, leaf in chunk:
            insert_path(partial, path, leaf, dedup=dedup_consecutive)
        parts.append(partial)

    return parts


def split_json_file_by_measurements(
    file_path: Union[str, Path],
    max_objects_per_part: int,
    output_dir: Union[str, Path, None] = None,
    dedup_consecutive: bool = True,
    indent: int = 2,
) -> List[str]:
    """
    Charge un fichier JSON objet/arborescent, le découpe par feuilles (mesures),
    écrit éventuellement les morceaux et renvoie la liste des **textes JSON**.
    """
    p = Path(file_path)
    data = json.loads(Path(file_path).read_text(encoding="utf-8"))

    parts = split_dict_by_measurements(
        data, max_objects_per_part=max_objects_per_part, dedup_consecutive=dedup_consecutive
    )

    json_texts: List[str] = []
    for idx, part in enumerate(parts, start=1):
        text = json.dumps(part, ensure_ascii=False, indent=indent)
        json_texts.append(text)

        if output_dir is not None:
            Path(output_dir).mkdir(parents=True, exist_ok=True)
            out_path = Path(output_dir) / f"{p.stem}_byLeaf_part{idx}.json"
            out_path.write_text(text, encoding="utf-8")

    return json_texts





In [11]:

if __name__ == "__main__":
    # Suppose que 'rapport.json' contient ta structure arborescente
    parts_text = split_json_file_by_measurements(
        "/content/sample_data/52_annotation_empty.json",
        max_objects_per_part=5,   # <- nb max de mesures par fichier
        output_dir="out_parts",    # optionnel
        dedup_consecutive=True     # enlève les "Hormonologie" imbriqués en doublon
    )

    print(f"Generated {len(parts_text)} parts.")
    for i, t in enumerate(parts_text, 1):
        print("\n*******************")
        print(t)

Generated 7 parts.

*******************
{
  "Hematologie": {
    "NumerationGlobulaire": {
      "Hematies": {
        "valeur": null,
        "unité": "T/L"
      },
      "Hematocrite": {
        "valeur": null,
        "unité": "%"
      },
      "CONCENTRATION GLOB.MOYENNE (CCMH)": {
        "valeur": null,
        "unité": "g/dL"
      }
    },
    "FormuleLeucocytaire": {
      "Leucocytes": {
        "valeur": null,
        "unité": "G/L"
      },
      "Neutrophiles": {
        "valeur": null,
        "unité": "%"
      }
    }
  }
}

*******************
{
  "Hematologie": {
    "FormuleLeucocytaire": {
      "Eosinophiles": {
        "valeur": null,
        "unité": "%"
      },
      "Basophiles": {
        "valeur": null,
        "unité": "%"
      },
      "Lymphocytes": {
        "valeur": null,
        "unité": "%"
      },
      "Monocytes": {
        "valeur": null,
        "unité": "%"
      }
    },
    "NumerationPlaquettaire": {
      "Plaquettes": {
        "valeur