# 01 - Pretraitement et harmonisation

But: transformer les tables brutes en un format long standardise, pret a etre exploite dans les etapes suivantes.

In [1]:
from pathlib import Path
import sys

PROJECT_ROOT = Path.cwd().resolve()
if PROJECT_ROOT.name == "notebooks":
    PROJECT_ROOT = PROJECT_ROOT.parent
sys.path.append(str(PROJECT_ROOT))

import pandas as pd
import numpy as np
import csv
import math

from src.data_prep import (
    STANDARD_COLUMNS,
    load_raw,
    ensure_columns,
    add_election_metadata,
    build_code_bv,
    deduplicate_columns,
    coerce_numeric,
    basic_cleaning,
    validate_consistency,
    _normalize_label,
)

pd.set_option("display.max_columns", 50)

RAW_DIR = PROJECT_ROOT / "data" / "raw"
INTERIM_DIR = PROJECT_ROOT / "data" / "interim"
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"
RAW_DIR, INTERIM_DIR, PROCESSED_DIR


(PosixPath('/Users/steph/Code/Python/Jupyter/Elections_Sete/data/raw'),
 PosixPath('/Users/steph/Code/Python/Jupyter/Elections_Sete/data/interim'),
 PosixPath('/Users/steph/Code/Python/Jupyter/Elections_Sete/data/processed'))

## Configurer les meta-donnees par fichier

Remplir `meta_elections` pour chaque CSV brut. Exemple ci-dessous a ajuster en fonction des colonnes observees. Utiliser le bloc d'inspection des colonnes pour recuperer les noms exacts (souvent avec accents). Pour les fichiers multi-tours, utiliser `tour_column` pour splitter.

In [2]:
# Harmonisation des noms de colonnes (avant meta_elections)
import re
import unicodedata

_MOJIBAKE_REPLACEMENTS = {
    "Tï¿½te": "Tête",
    "T�te": "Tête",
    "t�te": "tête",
    "Prï¿½nom": "Prénom",
    "Pr�nom": "Prénom",
    "dï¿½partement": "département",
    "dÃ©partement": "département",
    "d�partement": "département",
    "d�p�t": "dépôt",
    "Bin�me": "Binôme",
    "bin�me": "binôme",
    "Nï¿½": "N°",
    "N�": "N°",
    "ExprimÃ©s": "Exprimés",
    "exprimÃ©s": "exprimés",
    "Exprimï¿½s": "Exprimés",
    "exprimï¿½s": "exprimés",
    "Exprim�s": "Exprimés",
    "exprim�s": "exprimés",
    "Libell�": "Libellé",
    "libell�": "libellé",
    "Abr�g�": "Abrégé",
    "abr�g�": "abrégé",
    "Ã©": "é",
    "Ã¨": "è",
    "Ãª": "ê",
    "Ã«": "ë",
    "Ã ": "à",
    "Ã¢": "â",
    "Ã§": "ç",
    "Ã¹": "ù",
    "Ã»": "û",
    "Ã¯": "ï",
    "Ã´": "ô",
    "Ã¶": "ö",
    "Ã‰": "É",
    "Ãˆ": "È",
    "ÃŠ": "Ê",
    "Ã‹": "Ë",
    "Ã€": "À",
    "Ã‚": "Â",
    "Ã‡": "Ç",
}


def _fix_label(label: str) -> str:
    fixed = label
    try:
        fixed = label.encode("latin1").decode("utf-8")
    except (UnicodeEncodeError, UnicodeDecodeError):
        fixed = label
    for bad, good in _MOJIBAKE_REPLACEMENTS.items():
        if bad in fixed:
            fixed = fixed.replace(bad, good)
    return " ".join(fixed.split())


def _normalize_label_harmon(label: str) -> str:
    fixed = _fix_label(label)
    fixed = re.sub(r"\s*/\s*", "/", fixed)
    fixed = unicodedata.normalize("NFD", fixed)
    fixed = "".join(ch for ch in fixed if unicodedata.category(ch) != "Mn")
    return fixed.lower()


_PERCENT_MAP = {
    "% abstentions": "% Abs/Ins",
    "% blancs/inscrits": "% Blancs/Ins",
    "% blancs/votants": "% Blancs/Vot",
    "% exprimes/inscrits": "% Exp/Ins",
    "% exprimes/votants": "% Exp/Vot",
    "% nuls/inscrits": "% Nuls/Ins",
    "% nuls/votants": "% Nuls/Vot",
}


_HARMONIZE_EXACT = {
    _normalize_label_harmon("% Vot/Ins"): "% Votants",
    _normalize_label_harmon("Code du d°partement"): "Code du département",
    _normalize_label_harmon("Exprim°s"): "Exprimés",
    _normalize_label_harmon("Exprim°s 1"): "Exprimés",
    _normalize_label_harmon("Libell° Abr°g° Liste 1"): "Libellé Abrégé Liste 1",
    _normalize_label_harmon("Libellé abrégé de liste 1"): "Libellé Abrégé Liste 1",
    _normalize_label_harmon("Libell° Etendu Liste 1"): "Libellé Etendu Liste 1",
    _normalize_label_harmon("Liste"): "Libellé Etendu Liste 1",
    _normalize_label_harmon("Libellé de liste 1"): "Libellé Etendu Liste 1",
    _normalize_label_harmon("Liste.1"): "Libellé Etendu Liste 1",
    _normalize_label_harmon("Libell° de la circonscription"): "Libellé de la circonscription",
    _normalize_label_harmon("Libell° de la commune"): "Libellé de la commune",
    _normalize_label_harmon("Libellé commune"): "Libellé de la commune",
    _normalize_label_harmon("Libell° du d°partement"): "Libellé du département",
    _normalize_label_harmon("Libellé département"): "Libellé du département",
    _normalize_label_harmon("Nom candidat 1"): "Nom 1",
    _normalize_label_harmon("Nom Tête de Liste 1"): "Nom 1",
    _normalize_label_harmon("Nom T°te de Liste 1"): "Nom 1",
    _normalize_label_harmon("Nom.1"): "Nom 1",
    _normalize_label_harmon("Pr°nom du candidat 1"): "Prénom 1",
    _normalize_label_harmon("Pr°nom du candidat t°te de liste"): "Prénom 1",
    _normalize_label_harmon("Pr°nom.1"): "Prénom 1",
    _normalize_label_harmon("NÂ°Panneau 1"): "N°Panneau 1",
    _normalize_label_harmon("N.Pan. 1"): "N°Panneau 1",
}



def _harmonize_column(col: str) -> str:
    norm = _normalize_label_harmon(col)
    nums = re.findall(r"\d+", norm)
    first_num = nums[0] if nums else None

    if norm in _PERCENT_MAP:
        return _PERCENT_MAP[norm]

    if norm in _HARMONIZE_EXACT:
        return _HARMONIZE_EXACT[norm]

    if norm in {"n° tour", "nº tour", "no tour"}:
        return "N° tour"

    if norm in {"n°liste", "nliste", "nºliste", "n° liste", "n liste"}:
        return "N°Liste"

    if norm in {"n°panneau", "npanneau", "nºpanneau", "n° panneau", "n panneau"}:
        return "N°Panneau"

    if norm.startswith("n.pan"):
        return f"N°Panneau {first_num}" if first_num else "N°Panneau"

    if norm == "binome":
        return "Binôme"

    if norm == "bv":
        return "Numéro bureau de vote"

    if norm == "code commune":
        return "Code de la commune"

    if norm in {"code du departement", "code departement"}:
        return "Code du département"

    if norm == "numero bureau de vote":
        return "Numéro bureau de vote"

    if re.match(r"^code b\.?vote$", norm) or norm == "code bv" or norm == "code du b.vote":
        return "Numéro bureau de vote"

    if re.match(r"^n[°ºo]? de bureau de vote$", norm):
        return "Numéro bureau de vote"

    if norm.startswith("code nuance"):
        if first_num:
            return f"Code Nuance {first_num}"
        if "candidat" in norm or "liste" in norm:
            return "Code Nuance"

    if re.match(r"^exprimes(?:[\. ]+\d+)?$", norm):
        return "Exprimés"

    if norm.startswith("votants") and first_num:
        return "Votants"

    if norm.startswith("nombre de voix"):
        return f"Voix {first_num}" if first_num else "Voix"

    if "voix/exp" in norm or "voix/exprimes" in norm:
        return f"% Voix/Exp {first_num}" if first_num else "% Voix/Exp"

    if "voix/ins" in norm or "voix/inscrits" in norm:
        return f"% Voix/Ins {first_num}" if first_num else "% Voix/Ins"

    match = re.match(r"^% ?voix/(?:exp|exprime?s)[\. ]?(\d+)$", norm)
    if match:
        return f"% Voix/Exp {match.group(1)}"

    match = re.match(r"^% ?voix/(?:ins|inscrits)[\. ]?(\d+)$", norm)
    if match:
        return f"% Voix/Ins {match.group(1)}"

    if "libell" in norm and "liste" in norm:
        if "abrege" in norm:
            suffix = first_num or "1"
            return f"Libellé Abrégé Liste {suffix}"
        if "etendu" in norm or "de liste" in norm:
            suffix = first_num or "1"
            return f"Libellé Etendu Liste {suffix}"

    if norm.startswith("liste"):
        suffix = first_num or "1"
        return f"Libellé Etendu Liste {suffix}"

    if norm.startswith("n de depot du candidat"):
        return f"N°Panneau {first_num}" if first_num else "N°Panneau"

    if norm.startswith("n de depot de la liste"):
        return f"N°Panneau {first_num}" if first_num else "N°Panneau"

    match = re.match(r"^nom tete de liste(?: (\d+))?$", norm)
    if match:
        suffix = match.group(1) or first_num
        return f"Nom {suffix}" if suffix else "Nom Tête de Liste"

    if norm.startswith("nom du candidat"):
        if first_num:
            return f"Nom {first_num}"
        return "Nom du candidat tête de liste" if "tete de liste" in norm else "Nom du candidat"

    if norm.startswith("nom candidat"):
        return f"Nom {first_num}" if first_num else "Nom du candidat"

    match = re.match(r"^nom[\. ](\d+)$", norm)
    if match:
        return f"Nom {match.group(1)}"

    if norm.startswith("nom") and first_num:
        return f"Nom {first_num}"

    if norm.startswith("nuance liste") or norm.startswith("nuance candidat"):
        return f"Nuance {first_num}" if first_num else "Nuance"

    if norm == "prenom":
        return "Prénom 1"

    if norm.startswith("prenom du candidat") or norm.startswith("prenom candidat"):
        if first_num:
            return f"Prénom {first_num}"
        return "Prénom du candidat tête de liste" if "tete de liste" in norm else "Prénom du candidat"

    match = re.match(r"^prenom[\. ](\d+)$", norm)
    if match:
        return f"Prénom {match.group(1)}"

    if norm.startswith("prenom") and first_num:
        return f"Prénom {first_num}"

    if norm in {"sexe", "sexe candidat"}:
        return "Sexe 1"

    if norm.startswith("sexe candidat") and first_num:
        return f"Sexe {first_num}"

    match = re.match(r"^sexe(?: candidat)?[\. ](\d+)$", norm)
    if match:
        return f"Sexe {match.group(1)}"

    if norm.startswith("sexe") and first_num:
        return f"Sexe {first_num}"

    match = re.match(r"^(?:numero\s+de\s+|n[°ºo]?\s*)?panneau[\. ]?(\d+)$", norm)
    if match:
        return f"N°Panneau {match.group(1)}"

    match = re.match(r"^voix[\. ](\d+)$", norm)
    if match:
        return f"Voix {match.group(1)}"

    return col


def harmonize_columns(df: pd.DataFrame) -> pd.DataFrame:
    rename = {}
    for col in df.columns:
        new_col = _harmonize_column(col)
        if new_col != col:
            rename[col] = new_col
    return df.rename(columns=rename)


In [3]:
meta_elections = {
    "14_EU.csv": {
        "type_scrutin": "europeennes",
        "date_scrutin": "2014-05-25",
        "tour_column": "N° tour",
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Votants": "votants",
            "Exprimés": "exprimes",
            "ExprimÃ©s": "exprimes",
            "Nombre de voix du candidat": "voix",
            "Voix": "voix",
            "Nom du candidat": "nom_candidature",
            "Prénom du candidat": "nom_candidature",
            "Code nuance du candidat": "code_candidature",
        },
    },
    "14_MN14_T1T2.csv": {
        "type_scrutin": "municipales",
        "date_scrutin": "2014-03-23",
        "tour_column": "N° tour",
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Votants": "votants",
            "Exprimés": "exprimes",
            "Nombre de voix": "voix",
            "Nom du candidat tête de liste": "nom_candidature",
            "Prénom du candidat  tête de liste": "nom_candidature",
            "Code nuance de la liste": "code_candidature",
        },
    },
    "17_L_T1.csv": {
        "type_scrutin": "legislatives",
        "date_scrutin": "2017-06-11",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance": "code_candidature",
            "Nom": "nom_candidature",
        },
    },
    "17_L_T2.csv": {
        "type_scrutin": "legislatives",
        "date_scrutin": "2017-06-18",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance": "code_candidature",
            "Nom": "nom_candidature",
        },
    },
    "17_PR_T1.csv": {
        "type_scrutin": "presidentielles",
        "date_scrutin": "2017-04-23",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom": "nom_candidature",
            "Code nuance du candidat": "code_candidature",
        },
    },
    "17_PR_T2.csv": {
        "type_scrutin": "presidentielles",
        "date_scrutin": "2017-05-07",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom": "nom_candidature",
            "Code nuance du candidat": "code_candidature",
        },
    },
    "19_EU.csv": {
        "type_scrutin": "europeennes",
        "date_scrutin": "2019-05-26",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom Tête de Liste": "nom_candidature",
            "Nuance Liste": "code_candidature",
        },
    },
    "20_MN_T1.csv": {
        "type_scrutin": "municipales",
        "date_scrutin": "2020-03-15",
        "tour": 1,
        "sep": ";",
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom": "nom_candidature",
            "Liste": "nom_candidature",
            "Code Nuance": "code_candidature",
        },
    },
    "20_MN_T2.csv": {
        "type_scrutin": "municipales",
        "date_scrutin": "2020-06-28",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom": "nom_candidature",
            "Liste": "nom_candidature",
            "Code Nuance": "code_candidature",
        },
    },
    "21_DEP_T1.csv": {
        "type_scrutin": "departementales",
        "date_scrutin": "2021-06-20",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance": "code_candidature",
            "Binôme": "nom_candidature",
        },
    },
    "21_DEP_T2.csv": {
        "type_scrutin": "departementales",
        "date_scrutin": "2021-06-27",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance": "code_candidature",
            "Binôme": "nom_candidature",
        },
    },
    "21_REG_T1.csv": {
        "type_scrutin": "regionales",
        "date_scrutin": "2021-06-20",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance Liste": "code_candidature",
            "Libellé Abrégé Liste": "nom_candidature",
        },
    },
    "21_REG_T2.csv": {
        "type_scrutin": "regionales",
        "date_scrutin": "2021-06-27",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance Liste": "code_candidature",
            "Libellé Abrégé Liste": "nom_candidature",
        },
    },
    "22_L_T1.csv": {
        "type_scrutin": "legislatives",
        "date_scrutin": "2022-06-12",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance": "code_candidature",
            "Nom": "nom_candidature",
        },
    },
    "22_L_T2.csv": {
        "type_scrutin": "legislatives",
        "date_scrutin": "2022-06-19",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nuance": "code_candidature",
            "Nom": "nom_candidature",
        },
    },
    "22_PR_T1.csv": {
        "type_scrutin": "presidentielles",
        "date_scrutin": "2022-04-10",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom": "nom_candidature",
            "Code nuance du candidat": "code_candidature",
        },
    },
    "22_PR_T2.csv": {
        "type_scrutin": "presidentielles",
        "date_scrutin": "2022-04-24",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix": "voix",
            "Nom": "nom_candidature",
            "Code nuance du candidat": "code_candidature",
        },
    },
    "24_L_T1.csv": {
        "type_scrutin": "legislatives",
        "date_scrutin": "2024-06-30",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
        },
    },
    "24_L_T2.csv": {
        "type_scrutin": "legislatives",
        "date_scrutin": "2024-07-07",
        "tour": 2,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
        },
    },
    "24_EU.csv": {
        "type_scrutin": "europeennes",
        "date_scrutin": "2024-06-09",
        "tour": 1,
        "code_bv_cols": ["Code de la commune", "Numéro bureau de vote"],
        "rename_map": {
            "Inscrits": "inscrits",
            "Abstentions": "abstentions",
            "Votants": "votants",
            "Blancs": "blancs",
            "Nuls": "nuls",
            "Exprimés": "exprimes",
            "Voix 1": "voix",
            "Voix": "voix",
            "Nuance liste 1": "code_candidature",
            "Libellé abrégé de liste 1": "nom_candidature",
        },
    },
    # 24_L_T1T2.csv and 24_L_T2.csv to renseigner selon le schema reel
}


In [4]:
preferred_encoding = {
    "14_EU.csv": "utf-8",
    "14_MN14_T1T2.csv": "utf-8",
    "17_L_T1.csv": "latin-1",
    "17_L_T2.csv": "latin-1",
    "17_PR_T1.csv": "latin-1",
    "17_PR_T2.csv": "latin-1",
    "19_EU.csv": "latin-1",
    "20_MN_T1.csv": "latin-1",
    "20_MN_T2.csv": "latin-1",
    "21_DEP_T1.csv": "latin-1",
    "21_DEP_T2.csv": "latin-1",
    "21_REG_T1.csv": "latin-1",
    "21_REG_T2.csv": "latin-1",
    "22_L_T1.csv": "latin-1",
    "22_L_T2.csv": "utf-8",
    "22_PR_T1.csv": "latin-1",
    "22_PR_T2.csv": "latin-1",
    "24_L_T1.csv": "utf-8",
    "24_L_T2.csv": "utf-8",
    "24_EU.csv": "utf-8",
    "24_L_T1T2.csv": "utf-8",
    "24_L_T2.csv": "utf-8",
}
fallback_encodings = ["utf-8-sig", "utf-8", "latin-1", "cp1252"]
candidate_block_sizes = {
    "L": 8,
    "PR": 7,
    "EU": 7,
    "MN": 9,
    "DEP": 6,
    "REG": 8,
}
def _has_numbered_suffix(columns):
    import re
    return any(re.search(r"(\.\d+|\s\d+)$", col) for col in columns)
def read_raw_with_expanded_headers(path: Path, meta: dict, nrows: int | None = None) -> pd.DataFrame:
    sep = meta.get("sep", ";")
    decimal = meta.get("decimal", ",")
    enc_hint = meta.get("encoding") or preferred_encoding.get(path.name) or "latin-1"
    column_names = None
    read_kwargs: dict = {
        "sep": sep,
        "decimal": decimal,
        "engine": "python",
        "index_col": False,
    }
    if nrows is not None:
        read_kwargs["nrows"] = nrows
    try:
        with path.open(encoding=enc_hint) as f:
            reader = csv.reader(f, delimiter=sep)
            raw_header = next(reader)
            max_fields = len(raw_header)
            for row in reader:
                max_fields = max(max_fields, len(row))
        header_len = len(raw_header)
        if _has_numbered_suffix(raw_header):
            # en-tetes deja numerotes => on ne touche pas
            read_kwargs.update({"header": 0})
        else:
            parts = path.stem.split("_")
            election_key = parts[1] if len(parts) > 1 else None
            block_len = candidate_block_sizes.get(election_key)
            if block_len:
                base_count = header_len - block_len
                if base_count <= 0:
                    raise ValueError("Impossible de determiner le bloc candidat (base <= 0)")
                candidate_count = max(1, math.ceil((max_fields - base_count) / block_len))
                base_cols = raw_header[:base_count]
                candidate_cols = raw_header[base_count:]
                column_names = base_cols.copy()
                for idx in range(candidate_count):
                    suffix = idx + 1
                    column_names.extend([f"{col} {suffix}" for col in candidate_cols])
                print(
                    f"{path.name}: en-tetes etendus a {len(column_names)} colonnes "
                    f"({candidate_count} blocs candidat, max {max_fields} champs)."
                )
                read_kwargs.update({"names": column_names, "header": None, "skiprows": 1})
            else:
                read_kwargs.update({"header": 0})
    except Exception as exc:
        print(f"{path.name}: impossible d'etendre les en-tetes ({exc}).")
        read_kwargs.update({"header": 0})
    to_try: list[str] = []
    if path.name in preferred_encoding:
        to_try.append(preferred_encoding[path.name])
    if enc_hint not in to_try:
        to_try.append(enc_hint)
    to_try.extend(enc for enc in fallback_encodings if enc not in to_try)
    df = None
    last_exc: Exception | None = None
    for enc in to_try:
        try:
            df = pd.read_csv(
                path,
                encoding=enc,
                on_bad_lines="skip",
                **read_kwargs,
            )
            break
        except (pd.errors.ParserError, UnicodeDecodeError) as exc:
            last_exc = exc
            continue
        except Exception as exc:
            last_exc = exc
            continue
    if df is None:
        raise last_exc if last_exc else RuntimeError(f"Lecture impossible pour {path.name}")
    df.columns = [_normalize_label(c) for c in df.columns]
    df = harmonize_columns(df)
    return df


## Inspecter les entetes pour completer les mappings

Executer ce bloc pour lister les colonnes de chaque fichier brut et ajuster `rename_map` et `code_bv_cols` en consequence.

In [5]:
raw_files = sorted(RAW_DIR.glob("*.csv"))
for path in raw_files:
    meta = meta_elections.get(path.name, {})
    try:
        sample = read_raw_with_expanded_headers(path, meta, nrows=0)
        print(path.name, "=>", list(sample.columns))
    except Exception as exc:
        print(path.name, "=> erreur:", exc)


14_EU.csv: en-tetes etendus a 13 colonnes (1 blocs candidat, max 13 champs).
14_EU.csv => ['N° tour', 'Code du département', 'Code de la commune', 'Nom de la commune', 'Numéro bureau de vote', 'Inscrits', 'Votants', 'Exprimés', 'NÂ° de dépôt du candidat 1', 'Nom 1', 'Prénom 1', 'Code Nuance 1', 'Voix 1']
14_MN14_T1T2.csv => ['N° tour', 'Code du département', 'Code de la commune', 'Nom de la commune', 'Numéro bureau de vote', 'Inscrits', 'Votants', 'Exprimés', 'NÂ° de dépôt de la liste', 'Nom du candidat tête de liste', 'Prénom du candidat tête de liste', 'Code Nuance', 'Voix']
17_L_T1.csv: en-tetes etendus a 237 colonnes (27 blocs candidat, max 237 champs).
17_L_T1.csv => ['Code du département', 'Libellé du département', 'Code de la circonscription', 'Libellé de la circonscription', 'Code de la commune', 'Libellé de la commune', 'Numéro bureau de vote', 'Inscrits', 'Abstentions', '% Abs/Ins', 'Votants', '% Votants', 'Blancs', '% Blancs/Ins', '% Blancs/Vot', 'Nuls', '% Nuls/Ins', '% Nul

In [6]:
# Tableau comparatif des colonnes (après harmonisation)
columns_by_dataset = {}
for path in raw_files:
    meta = meta_elections.get(path.name, {})
    try:
        sample = read_raw_with_expanded_headers(path, meta, nrows=0)
        columns_by_dataset[path.stem] = list(sample.columns)
    except Exception as exc:
        print(f"{path.name} => erreur: {exc}")

if not columns_by_dataset:
    print("Aucun dataframe charge pour le comparatif.")
else:
    all_columns = sorted({col for cols in columns_by_dataset.values() for col in cols})
    comparison = pd.DataFrame(index=all_columns)
    comparison.index.name = "colonne"
    for name, cols in columns_by_dataset.items():
        positions = {col: idx + 1 for idx, col in enumerate(cols)}
        comparison[name] = [positions.get(col) for col in comparison.index]

    out_path = PROJECT_ROOT / "reports" / "colonnes_comparatif.csv"
    out_path.parent.mkdir(parents=True, exist_ok=True)
    comparison.to_csv(out_path, index=True)
    print(f"Tableau exporte vers {out_path}")


14_EU.csv: en-tetes etendus a 13 colonnes (1 blocs candidat, max 13 champs).
17_L_T1.csv: en-tetes etendus a 237 colonnes (27 blocs candidat, max 237 champs).
17_L_T2.csv: en-tetes etendus a 45 colonnes (3 blocs candidat, max 45 champs).
17_PR_T1.csv: en-tetes etendus a 98 colonnes (11 blocs candidat, max 98 champs).
17_PR_T2.csv: en-tetes etendus a 35 colonnes (2 blocs candidat, max 35 champs).
19_EU.csv: en-tetes etendus a 257 colonnes (34 blocs candidat, max 257 champs).
20_MN_T2.csv: en-tetes etendus a 568 colonnes (61 blocs candidat, max 568 champs).
21_DEP_T1.csv: en-tetes etendus a 105 colonnes (14 blocs candidat, max 105 champs).
21_DEP_T2.csv: en-tetes etendus a 39 colonnes (3 blocs candidat, max 39 champs).
21_REG_T1.csv: en-tetes etendus a 132 colonnes (14 blocs candidat, max 132 champs).
21_REG_T2.csv: en-tetes etendus a 60 colonnes (5 blocs candidat, max 60 champs).
22_L_T1.csv: en-tetes etendus a 197 colonnes (22 blocs candidat, max 197 champs).
22_L_T2.csv: en-tetes eten

## Fonctions utilitaires de standardisation

In [7]:
def harmonize_dataframe(df_raw: pd.DataFrame, meta: dict, rename_map: dict | None = None) -> pd.DataFrame:
    rename_map = rename_map or {}
    df = df_raw.rename(columns=rename_map)
    df = deduplicate_columns(df)
    df = build_code_bv(df, meta)
    df = add_election_metadata(df, meta)
    df = ensure_columns(df, STANDARD_COLUMNS)
    df = coerce_numeric(df)
    df = basic_cleaning(df)
    ordered_cols = STANDARD_COLUMNS + [col for col in df.columns if col not in STANDARD_COLUMNS]
    return df[ordered_cols]


def standardize_file(file_name: str, meta: dict) -> pd.DataFrame:
    path = RAW_DIR / file_name
    df_raw = read_raw_with_expanded_headers(path, meta)
    rename_map = meta.get("rename_map", {})

    if meta.get("tour_column"):
        tour_col = meta["tour_column"]
        tours = meta.get("tours") or sorted(df_raw[tour_col].dropna().unique())
        frames = []
        for tour_value in tours:
            meta_tour = {**meta, "tour": int(tour_value)}
            frames.append(harmonize_dataframe(df_raw[df_raw[tour_col] == tour_value], meta_tour, rename_map))
        return pd.concat(frames, ignore_index=True)

    if "tour" not in meta:
        raise KeyError(f"tour manquant pour {file_name}")

    return harmonize_dataframe(df_raw, meta, rename_map)


## Execution de l'harmonisation

Ajuster `meta_elections` puis lancer ce bloc pour consolider toutes les tables brutes.

In [8]:
import gc

def build_code_bv_simple(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    canon_map = {_normalize_label(col): col for col in df.columns}
    dept_key = _normalize_label("Code du département")
    commune_key = _normalize_label("Code de la commune")
    bv_key = _normalize_label("Numéro bureau de vote")

    def _clean_digits(series: pd.Series) -> pd.Series:
        cleaned = (
            series.astype(str)
            .str.replace(".0", "", regex=False)
            .str.replace(r"\D", "", regex=True)
        )
        cleaned = cleaned.replace("", pd.NA)
        return cleaned

    dept = pd.Series(pd.NA, index=df.index, dtype="string")
    if dept_key in canon_map:
        dept = _clean_digits(df[canon_map[dept_key]]).str.zfill(2).astype("string")

    df["code_departement"] = dept

    if "code_commune" in df.columns:
        commune_raw = df["code_commune"]
    elif commune_key in canon_map:
        commune_raw = df[canon_map[commune_key]]
    else:
        commune_raw = pd.Series(pd.NA, index=df.index)

    commune_digits = _clean_digits(commune_raw)
    commune_len = commune_digits.str.len()
    commune_norm = pd.Series(pd.NA, index=df.index, dtype="string")
    mask5 = commune_len == 5
    commune_norm[mask5] = commune_digits[mask5]
    mask3 = commune_len == 3
    has_dept = dept.notna()
    commune_norm[mask3 & has_dept] = (
        dept[mask3 & has_dept] + commune_digits[mask3 & has_dept].str.zfill(3)
    )
    mask_other = commune_norm.isna() & commune_digits.notna()
    commune_norm[mask_other] = commune_digits[mask_other].str.zfill(5)
    commune_norm = commune_norm.astype("string")

    if "bureau" in df.columns:
        bureau_raw = df["bureau"]
    elif bv_key in canon_map:
        bureau_raw = df[canon_map[bv_key]]
    else:
        bureau_raw = pd.Series(pd.NA, index=df.index)

    bureau_norm = _clean_digits(bureau_raw).str.zfill(4).astype("string")

    # Normalize before rebuilding code_bv to keep BV-level aggregation consistent.
    df["code_commune"] = commune_norm
    df["bureau"] = bureau_norm
    df["code_bv"] = np.where(
        df["code_commune"].notna() & df["bureau"].notna(),
        df["code_commune"].astype(str) + df["bureau"].astype(str),
        np.nan,
    )

    # Normalisation tolerante (par fichier)
    df["code_commune"] = (
        df["code_commune"]
        .astype("string")
        .str.replace(".0", "", regex=False)
    )

    df["bureau"] = (
        df["bureau"]
        .astype("string")
        .str.replace(".0", "", regex=False)
        .str.zfill(4)
    )

    df["code_bv"] = np.where(
        df["code_commune"].notna() & df["bureau"].notna(),
        df["code_commune"].astype(str) + df["bureau"].astype(str),
        pd.NA,
    )

    # Diagnostic leger (pas bloquant)
    invalid_communes = df["code_commune"].dropna().str.len().ne(5).sum()
    if invalid_communes > 0:
        print(
            f"[WARN] {invalid_communes} codes_commune non normalises "
            f"dans ce fichier (normalisation finale apres concat)."
        )

    return df



harmonized_dir = INTERIM_DIR / "harmonized"
harmonized_dir.mkdir(parents=True, exist_ok=True)

csv_paths: list[Path] = []
cols_summary = []
frames_34: list[pd.DataFrame] = []
for file_name, meta in meta_elections.items():
    print(f"Standardisation de {file_name}...")
    df_std = standardize_file(file_name, meta)
    df_std = build_code_bv_simple(df_std)
    dupes = list(df_std.columns[df_std.columns.duplicated()])
    if dupes:
        print("  Colonnes dupliquees apres standardisation:", dupes)
    csv_path = harmonized_dir / f"{Path(file_name).stem}_harmonized.csv"
    df_std.to_csv(csv_path, sep=";", index=False)
    cols_summary.append(
        {
            "file": file_name,
            "columns": list(df_std.columns),
            "shape": df_std.shape,
            "path": str(csv_path),
        }
    )
    csv_paths.append(csv_path)

    canon_map_std = {_normalize_label(col): col for col in df_std.columns}
    dept_key_std = _normalize_label("Code du département")
    if dept_key_std in canon_map_std:
        dept_vals = (
            df_std[canon_map_std[dept_key_std]]
            .astype(str)
            .str.strip()
            .str.replace(r"\D", "", regex=True)
            .str.zfill(2)
        )
        dept_mask = dept_vals == "34"
    else:
        code_digits = df_std["code_bv"].astype(str).str.replace(r"\D", "", regex=True)
        dept_mask = code_digits.str.lstrip("0").str[:2] == "34"
    df_34 = df_std[dept_mask].copy()
    frames_34.append(df_34)

    del df_std
    gc.collect()

for item in cols_summary:
    rows, cols = item["shape"]
    print(f"{item['file']}: {rows} lignes, {cols} colonnes")

if frames_34:
    elections_long = pd.concat(frames_34, ignore_index=True)
else:
    elections_long = pd.DataFrame(columns=STANDARD_COLUMNS)


# ============================
# NORMALISATION FINALE INSEE
# ============================

elections_long["code_commune"] = (
    elections_long["code_commune"]
    .astype("string")
    .str.replace(".0", "", regex=False)
)

mask_3 = elections_long["code_commune"].str.len() == 3
elections_long.loc[mask_3, "code_commune"] = (
    elections_long.loc[mask_3, "code_departement"].astype(str).str.zfill(2)
    + elections_long.loc[mask_3, "code_commune"].str.zfill(3)
)

elections_long["code_commune"] = elections_long["code_commune"].str.zfill(5)

elections_long["bureau"] = (
    elections_long["bureau"]
    .astype("string")
    .str.replace(".0", "", regex=False)
    .str.zfill(4)
)

elections_long["code_bv"] = (
    elections_long["code_commune"] + elections_long["bureau"]
)

# ASSERTIONS STRICTES (ICI SEULEMENT)
assert elections_long["code_commune"].str.len().eq(5).all(), "Code commune final invalide"
assert elections_long["bureau"].str.len().eq(4).all(), "Code bureau final invalide"
assert elections_long["code_bv"].str.len().eq(9).all(), "code_bv final invalide"

cols_summary


Standardisation de 14_EU.csv...
14_EU.csv: en-tetes etendus a 13 colonnes (1 blocs candidat, max 13 champs).
[WARN] 39368 codes_commune non normalises dans ce fichier (normalisation finale apres concat).
Standardisation de 14_MN14_T1T2.csv...
Standardisation de 17_L_T1.csv...
17_L_T1.csv: en-tetes etendus a 237 colonnes (27 blocs candidat, max 237 champs).
Standardisation de 17_L_T2.csv...
17_L_T2.csv: en-tetes etendus a 45 colonnes (3 blocs candidat, max 45 champs).
Standardisation de 17_PR_T1.csv...
17_PR_T1.csv: en-tetes etendus a 98 colonnes (11 blocs candidat, max 98 champs).
Standardisation de 17_PR_T2.csv...
17_PR_T2.csv: en-tetes etendus a 35 colonnes (2 blocs candidat, max 35 champs).
Standardisation de 19_EU.csv...
19_EU.csv: en-tetes etendus a 257 colonnes (34 blocs candidat, max 257 champs).
Standardisation de 20_MN_T1.csv...
Standardisation de 20_MN_T2.csv...
20_MN_T2.csv: en-tetes etendus a 568 colonnes (61 blocs candidat, max 568 champs).
Standardisation de 21_DEP_T1.csv

[{'file': '14_EU.csv',
  'columns': ['code_bv',
   'nom_bv',
   'annee',
   'date_scrutin',
   'type_scrutin',
   'tour',
   'inscrits',
   'votants',
   'abstentions',
   'blancs',
   'nuls',
   'exprimes',
   'code_candidature',
   'nom_candidature',
   'voix',
   'N° tour',
   'Code du département',
   'Code de la commune',
   'Nom de la commune',
   'Numéro bureau de vote',
   'NÂ° de dépôt du candidat 1',
   'Nom 1',
   'Prénom 1',
   'Code Nuance 1',
   'Voix 1',
   'code_departement',
   'code_commune',
   'bureau'],
  'shape': (1656868, 28),
  'path': '/Users/steph/Code/Python/Jupyter/Elections_Sete/data/interim/harmonized/14_EU_harmonized.csv'},
 {'file': '14_MN14_T1T2.csv',
  'columns': ['code_bv',
   'nom_bv',
   'annee',
   'date_scrutin',
   'type_scrutin',
   'tour',
   'inscrits',
   'votants',
   'abstentions',
   'blancs',
   'nuls',
   'exprimes',
   'code_candidature',
   'nom_candidature',
   'voix',
   'N° tour',
   'Code du département',
   'Code de la commune',
 

## Nettoyage de base et validation

In [9]:
if "elections_long" not in globals():
    if "frames_34" in globals() and frames_34:
        elections_long = pd.concat(frames_34, ignore_index=True)
    elif (INTERIM_DIR / "elections_long.parquet").exists():
        elections_long = pd.read_parquet(INTERIM_DIR / "elections_long.parquet")
    elif (INTERIM_DIR / "elections_long.csv").exists():
        elections_long = pd.read_csv(INTERIM_DIR / "elections_long.csv", sep=";")
    else:
        raise RuntimeError("elections_long est manquant : executez la cellule de standardisation.")

elections_long["voix"] = elections_long["voix"].fillna(0)

issues = validate_consistency(elections_long)
for name, df_issue in issues.items():
    print(name, "=>", len(df_issue))


votants_gt_inscrits => 0
exprimes_balance_off => 0
sum_voix_vs_exprimes => 10676


## Sauvegarde

In [10]:
INTERIM_DIR.mkdir(parents=True, exist_ok=True)

elections_long_to_save = elections_long.copy()
for col in elections_long_to_save.columns:
    if elections_long_to_save[col].dtype == "object":
        elections_long_to_save[col] = elections_long_to_save[col].astype("string")

elections_long_to_save.to_parquet(INTERIM_DIR / "elections_long.parquet", index=False)
elections_long_to_save.to_csv(INTERIM_DIR / "elections_long.csv", sep=";", index=False)
elections_long_to_save.shape


(45876, 889)

## Mapping blocs


In [11]:
MAPPING_PATH = PROJECT_ROOT / "data" / "mapping_candidats_blocs.csv"
mapping_blocs = pd.read_csv(MAPPING_PATH, sep=";")
mapping_blocs["code_candidature"] = (
    mapping_blocs["code_candidature"].astype(str).str.strip().str.upper()
)
for col in ["bloc_1", "bloc_2", "bloc_3"]:
    if col in mapping_blocs.columns:
        mapping_blocs[col] = mapping_blocs[col].replace(r"^\s*$", np.nan, regex=True)

if "bloc_1" in mapping_blocs.columns:
    mapping_blocs.loc[mapping_blocs["code_candidature"] == "NC", "bloc_1"] = "inconnue"
    for col in ["bloc_2", "bloc_3"]:
        if col in mapping_blocs.columns:
            mapping_blocs.loc[mapping_blocs["code_candidature"] == "NC", col] = np.nan

mapping_blocs.head()


Unnamed: 0,code_candidature,nom_candidature,bloc_1,bloc_2,bloc_3
0,NC,Nuance non communiquee,inconnue,,
1,LDIV,Divers,centre,,
2,DIV,Divers,centre,,
3,LDVD,Divers droite,droite_modere,droite_dure,
4,LDVG,Divers gauche,gauche_modere,,


## Long format candidats


In [12]:
def build_candidates_long(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    required_cols = [
        "code_bv",
        "annee",
        "date_scrutin",
        "type_scrutin",
        "tour",
        "inscrits",
        "votants",
        "exprimes",
        "abstentions",
        "blancs",
        "nuls",
    ]
    for col in required_cols:
        if col not in df.columns:
            df[col] = np.nan

    voix_cols = [c for c in df.columns if re.match(r"^Voix \d+$", c)]
    code_cols = [c for c in df.columns if re.match(r"^Code Nuance \d+$", c)]
    nuance_cols = [c for c in df.columns if re.match(r"^Nuance \d+$", c)]
    nom_cols = [c for c in df.columns if re.match(r"^Nom \d+$", c)]

    _ = (code_cols, nuance_cols, nom_cols)

    base_cols = required_cols + [c for c in ["nom_bv", "nom_candidature"] if c in df.columns]
    has_numbered_voix = bool(voix_cols) and df[voix_cols].notna().any().any()

    def _compose_nom(idx: int) -> pd.Series | None:
        series = pd.Series(pd.NA, index=df.index, dtype="string")
        etendu_col = f"Libellé Etendu Liste {idx}"
        abrege_col = f"Libellé Abrégé Liste {idx}"
        nom_col = f"Nom {idx}"
        prenom_col = f"Prénom {idx}"

        if etendu_col in df.columns:
            series = series.fillna(df[etendu_col].astype("string"))
        if abrege_col in df.columns:
            series = series.fillna(df[abrege_col].astype("string"))
        if nom_col in df.columns and prenom_col in df.columns:
            prenom = df[prenom_col].fillna("").astype(str).str.strip()
            nom = df[nom_col].fillna("").astype(str).str.strip()
            combined = (prenom + " " + nom).str.strip().replace("", pd.NA)
            series = series.fillna(combined)
        elif nom_col in df.columns:
            series = series.fillna(df[nom_col].astype("string"))
        elif prenom_col in df.columns:
            series = series.fillna(df[prenom_col].astype("string"))

        if series.isna().all():
            return None
        return series

    if has_numbered_voix:
        indices = sorted({int(c.split()[-1]) for c in voix_cols})
        frames = []
        for idx in indices:
            voix_col = f"Voix {idx}"
            if voix_col not in df.columns:
                continue
            temp = df[base_cols].copy()
            temp["voix"] = df[voix_col]
            if f"Code Nuance {idx}" in df.columns:
                temp["code_candidature_raw"] = df[f"Code Nuance {idx}"]
            elif f"Nuance {idx}" in df.columns:
                temp["code_candidature_raw"] = df[f"Nuance {idx}"]
            else:
                temp["code_candidature_raw"] = np.nan

            nom_series = _compose_nom(idx)
            if nom_series is not None:
                temp["nom_candidature"] = nom_series

            temp["idx"] = idx
            frames.append(temp)
        if frames:
            long_df = pd.concat(frames, ignore_index=True)
        else:
            long_df = pd.DataFrame(columns=base_cols + ["idx", "voix", "code_candidature_raw"])
    elif "voix" in df.columns:
        long_df = df[base_cols + ["voix"]].copy()
        if "code_candidature" in df.columns:
            long_df["code_candidature_raw"] = df["code_candidature"]
        elif "code_candidature_raw" in df.columns:
            long_df["code_candidature_raw"] = df["code_candidature_raw"]
        else:
            long_df["code_candidature_raw"] = np.nan
        long_df["idx"] = 1
        long_df = long_df[base_cols + ["idx", "voix", "code_candidature_raw"]]
    else:
        long_df = pd.DataFrame(columns=base_cols + ["idx", "voix", "code_candidature_raw"])

    long_df["code_candidature_raw"] = (
        long_df["code_candidature_raw"].astype(str).str.strip().str.upper()
    )
    long_df.loc[
        long_df["code_candidature_raw"].isin(["", "NAN", "NONE"]),
        "code_candidature_raw",
    ] = np.nan

    if "nom_candidature" in long_df.columns:
        long_df["nom_candidature"] = (
            long_df["nom_candidature"].astype("string").str.strip()
        )
        long_df["nom_candidature"] = long_df["nom_candidature"].replace("", pd.NA)

    long_df["voix"] = pd.to_numeric(long_df["voix"], errors="coerce")
    long_df = long_df[long_df["voix"].notna() & (long_df["voix"] > 0)]

    return long_df


voix_num_cols = [col for col in elections_long.columns if re.match(r"^Voix \d+$", col)]
if voix_num_cols:
    wide_mask = elections_long[voix_num_cols].notna().any(axis=1)
    df_long = elections_long[~wide_mask].copy()
    df_wide = elections_long[wide_mask].copy()
    candidates_long = pd.concat(
        [build_candidates_long(df_long), build_candidates_long(df_wide)],
        ignore_index=True,
    )
else:
    candidates_long = build_candidates_long(elections_long)

candidates_long = candidates_long.merge(
    mapping_blocs,
    how="left",
    left_on="code_candidature_raw",
    right_on="code_candidature",
)
candidates_long["bloc_1_final"] = candidates_long["bloc_1"].fillna("autre")


if "nom_candidature_x" in candidates_long.columns:
    candidates_long["nom_candidature"] = candidates_long["nom_candidature_x"]
    if "nom_candidature_y" in candidates_long.columns:
        candidates_long["nom_candidature"] = candidates_long["nom_candidature"].fillna(
            candidates_long["nom_candidature_y"]
        )
elif "nom_candidature" not in candidates_long.columns:
    candidates_long["nom_candidature"] = candidates_long["code_candidature_raw"]

candidates_long["nom_candidature"] = (
    candidates_long["nom_candidature"]
    .replace("", pd.NA)
    .fillna(candidates_long["code_candidature_raw"])
)

candidates_long = (
    candidates_long
    .groupby(
        [
            "code_bv",
            "date_scrutin",
            "type_scrutin",
            "tour",
            "nom_candidature",
            "bloc_1_final",
        ],
        as_index=False,
        dropna=False,
    )
    .agg(
        {
            "voix": "max",
            "exprimes": "first",
            "inscrits": "first",
            "votants": "first",
            "abstentions": "first",
            "blancs": "first",
            "nuls": "first",
            "annee": "first",
            "code_candidature_raw": "first",
        }
    )
)

assert (
    candidates_long
    .groupby(["code_bv", "date_scrutin", "tour", "nom_candidature"], dropna=False)
    .size()
    .max()
    == 1
), "Doublons candidats detectes dans candidates_long"

unmapped_nuances = (
    candidates_long[
        candidates_long["bloc_1_final"].isna() & candidates_long["code_candidature_raw"].notna()
    ]["code_candidature_raw"]
    .value_counts()
    .reset_index()
)
unmapped_nuances.columns = ["code_candidature_raw", "count"]
print("Top 30 nuances non mappees:")
print(unmapped_nuances.head(30))
candidates_long.shape


Top 30 nuances non mappees:
Empty DataFrame
Columns: [code_candidature_raw, count]
Index: []


(121273, 15)

## Features BV


In [13]:
for col in ["inscrits", "votants", "exprimes", "abstentions", "blancs", "nuls"]:
    if col not in candidates_long.columns:
        candidates_long[col] = 0
    candidates_long[col] = pd.to_numeric(candidates_long[col], errors="coerce").fillna(0)

# Always keep code_bv in groupby to compute BV-level bloc totals.
bv_bloc_voix = (
    candidates_long
    .groupby(
        ["code_bv", "date_scrutin", "type_scrutin", "tour", "bloc_1_final"],
        as_index=False,
    )["voix"]
    .sum()
)

bloc_pivot = bv_bloc_voix.pivot_table(
    index=["code_bv", "date_scrutin", "type_scrutin", "tour"],
    columns="bloc_1_final",
    values="voix",
    fill_value=0,
    aggfunc="sum",
)
bloc_pivot.columns = [
    "voix_bloc_" + str(c).strip().lower().replace(" ", "_") for c in bloc_pivot.columns
]
bv_bloc_pivot = bloc_pivot.reset_index()

bv_totaux = (
    candidates_long
    .groupby(["code_bv", "date_scrutin", "type_scrutin", "tour"], as_index=False)
    .agg(
        {
            "exprimes": "first",
            "inscrits": "first",
            "votants": "first",
            "abstentions": "first",
            "blancs": "first",
            "nuls": "first",
        }
    )
)

bv_features = bv_totaux.merge(
    bv_bloc_pivot,
    on=["code_bv", "date_scrutin", "type_scrutin", "tour"],
    how="left",
)

voix_bloc_cols = [c for c in bv_features.columns if c.startswith("voix_bloc_")]
if voix_bloc_cols:
    bv_features[voix_bloc_cols] = bv_features[voix_bloc_cols].fillna(0)

for col in ["voix_bloc_extreme_droite", "voix_bloc_extreme_gauche"]:
    if col not in bv_features.columns:
        bv_features[col] = 0
        voix_bloc_cols.append(col)

bv_features["voix_bloc_total"] = bv_features[voix_bloc_cols].sum(axis=1)
denom = bv_features["voix_bloc_total"].replace(0, np.nan)

for col in voix_bloc_cols:
    bloc = col.replace("voix_bloc_", "")
    part_col = f"part_bloc_{bloc}"
    bv_features[part_col] = (bv_features[col] / denom).fillna(0)

bv_features["taux_participation"] = (
    bv_features["votants"] / bv_features["inscrits"].replace(0, np.nan)
).fillna(0)
bv_features["part_blancs"] = (
    bv_features["blancs"] / bv_features["votants"].replace(0, np.nan)
).fillna(0)
bv_features["part_nuls"] = (
    bv_features["nuls"] / bv_features["votants"].replace(0, np.nan)
).fillna(0)
bv_features["part_abstention"] = (
    bv_features["abstentions"] / bv_features["inscrits"].replace(0, np.nan)
).fillna(0)

bv_features["polarisation"] = (
    (bv_features["voix_bloc_extreme_droite"] + bv_features["voix_bloc_extreme_gauche"])
    / denom
).fillna(0)

part_bloc_cols = [c for c in bv_features.columns if c.startswith("part_bloc_")]
bv_features["fragmentation_hhi"] = bv_features[part_bloc_cols].pow(2).sum(axis=1).fillna(0)

bv_features = bv_features.sort_values(["code_bv", "date_scrutin", "tour"]).reset_index(drop=True)

# Assertions: sum of bloc parts should be 1 and BV variability must exist.
delta = (bv_features[part_bloc_cols].sum(axis=1) - 1).abs()
assert (delta < 1e-6).all(), "Somme des part_bloc_* != 1"

check = (
    bv_features
    .groupby(["date_scrutin", "type_scrutin", "tour"])["part_bloc_extreme_droite"]
    .nunique()
)
assert (check > 1).any(), "parts_bloc constantes : aggregation non BV"

mun = bv_features[bv_features["type_scrutin"] == "municipales"]
assert mun.groupby(["date_scrutin", "tour"])["part_bloc_extreme_droite"].nunique().mean() > 1


## Features temporelles


In [14]:
if not pd.api.types.is_datetime64_any_dtype(bv_features["date_scrutin"]):
    bv_features["date_scrutin"] = pd.to_datetime(
        bv_features["date_scrutin"], errors="coerce", dayfirst=True
    )

bv_features = bv_features.sort_values(["code_bv", "date_scrutin", "tour"]).reset_index(drop=True)

lag_base_cols = [c for c in bv_features.columns if c.startswith("part_bloc_")]
lag_base_cols += ["taux_participation", "part_blancs", "part_nuls", "part_abstention"]

for col in lag_base_cols:
    bv_features[f"{col}_lag1"] = bv_features.groupby("code_bv")[col].shift(1)
    bv_features[f"{col}_lag2"] = bv_features.groupby("code_bv")[col].shift(2)

indexed_cols = [col for col in bv_features.columns if re.search(r"\s\d+$", col)]
if indexed_cols:
    bv_features = bv_features.drop(columns=indexed_cols)

if "nom_candidature" in bv_features.columns:
    bv_features = bv_features.drop(columns=["nom_candidature"])

if "nom_bv" in bv_features.columns:
    bv_features = bv_features.drop(columns=["nom_bv"])

bv_features.shape
bv_features.head()


Unnamed: 0,code_bv,date_scrutin,type_scrutin,tour,exprimes,inscrits,votants,abstentions,blancs,nuls,voix_bloc_autre,voix_bloc_centre,voix_bloc_droite_modere,voix_bloc_extreme_droite,voix_bloc_extreme_gauche,voix_bloc_gauche_dure,voix_bloc_gauche_modere,voix_bloc_inconnue,voix_bloc_total,part_bloc_autre,part_bloc_centre,part_bloc_droite_modere,part_bloc_extreme_droite,part_bloc_extreme_gauche,part_bloc_gauche_dure,...,fragmentation_hhi,part_bloc_autre_lag1,part_bloc_autre_lag2,part_bloc_centre_lag1,part_bloc_centre_lag2,part_bloc_droite_modere_lag1,part_bloc_droite_modere_lag2,part_bloc_extreme_droite_lag1,part_bloc_extreme_droite_lag2,part_bloc_extreme_gauche_lag1,part_bloc_extreme_gauche_lag2,part_bloc_gauche_dure_lag1,part_bloc_gauche_dure_lag2,part_bloc_gauche_modere_lag1,part_bloc_gauche_modere_lag2,part_bloc_inconnue_lag1,part_bloc_inconnue_lag2,taux_participation_lag1,taux_participation_lag2,part_blancs_lag1,part_blancs_lag2,part_nuls_lag1,part_nuls_lag2,part_abstention_lag1,part_abstention_lag2
0,10001,2014-05-25,europeennes,1,549,1193,566,0.0,0.0,0.0,61.0,42.0,108.0,245.0,1.0,41.0,51.0,0.0,549.0,0.111111,0.076503,0.196721,0.446266,0.001821,0.074681,...,0.270261,,,,,,,,,,,,,,,,,,,,,,,,
1,10001,2017-04-23,presidentielles,1,1050,1306,1077,229.0,19.0,8.0,1050.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1050.0,1.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.111111,,0.076503,,0.196721,,0.446266,,0.001821,,0.074681,,0.092896,,0.0,,0.474434,,0.0,,0.0,,0.0,
2,10001,2017-05-07,presidentielles,2,921,1306,1044,262.0,88.0,35.0,921.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,921.0,1.0,0.0,0.0,0.0,0.0,0.0,...,1.0,1.0,0.111111,0.0,0.076503,0.0,0.196721,0.0,0.446266,0.0,0.001821,0.0,0.074681,0.0,0.092896,0.0,0.0,0.824655,0.474434,0.017642,0.0,0.007428,0.0,0.175345,0.0
3,10001,2017-06-11,legislatives,1,638,1306,651,655.0,10.0,3.0,638.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,638.0,1.0,0.0,0.0,0.0,0.0,0.0,...,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.799387,0.824655,0.084291,0.017642,0.033525,0.007428,0.200613,0.175345
4,10001,2017-06-18,legislatives,2,553,1306,602,704.0,35.0,14.0,553.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,553.0,1.0,0.0,0.0,0.0,0.0,0.0,...,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.498469,0.799387,0.015361,0.084291,0.004608,0.033525,0.501531,0.200613


## Exports


In [15]:
INTERIM_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

candidates_long.to_parquet(INTERIM_DIR / "candidates_long.parquet", index=False)
bv_features.to_parquet(PROCESSED_DIR / "bv_features.parquet", index=False)

unmapped_path = INTERIM_DIR / "unmapped_nuances.csv"
unmapped_nuances.to_csv(unmapped_path, index=False)
print("unmapped_path")
print(unmapped_path)
print("candidates_long.shape")
print(candidates_long.shape)
print("bv_features.shape")
print(bv_features.shape)
print("candidates_long.head()")
print(candidates_long.head())


unmapped_path
/Users/steph/Code/Python/Jupyter/Elections_Sete/data/interim/unmapped_nuances.csv
candidates_long.shape
(121273, 15)
bv_features.shape
(16918, 57)
candidates_long.head()
     code_bv date_scrutin type_scrutin  tour       nom_candidature  \
0  000010001   2014-05-25  europeennes     1            Anne N�GRE   
1  000010001   2014-05-25  europeennes     1           Eric MAHUET   
2  000010001   2014-05-25  europeennes     1         Francis LENNE   
3  000010001   2014-05-25  europeennes     1  Jean-Claude MARTINEZ   
4  000010001   2014-05-25  europeennes     1    Jean-Luc MELENCHON   

    bloc_1_final  voix  exprimes  inscrits  votants  abstentions  blancs  \
0         centre   2.0       549      1193      566          0.0     0.0   
1         centre   2.0       549      1193      566          0.0     0.0   
2         centre   4.0       549      1193      566          0.0     0.0   
3  droite_modere   3.0       549      1193      566          0.0     0.0   
4    gauche_dur

In [16]:
bv_features.groupby(
    ["date_scrutin", "type_scrutin", "tour"]
)["part_bloc_extreme_droite"].nunique().head(10)


date_scrutin  type_scrutin     tour
2014-05-25    europeennes      1       820
2017-04-23    presidentielles  1         1
2017-05-07    presidentielles  2         1
2017-06-11    legislatives     1         1
2017-06-18    legislatives     2         1
2019-05-26    europeennes      1         1
2020-03-15    municipales      1       238
2020-06-28    municipales      2        87
2021-06-20    departementales  1         1
              regionales       1         1
Name: part_bloc_extreme_droite, dtype: int64

In [17]:
candidates_long[
    candidates_long["type_scrutin"] == "presidentielles"
][["code_bv", "date_scrutin", "voix", "bloc_1_final"]].head(20)


Unnamed: 0,code_bv,date_scrutin,voix,bloc_1_final
16,10001,2017-04-23,38.0,autre
17,10001,2017-04-23,174.0,autre
18,10001,2017-04-23,8.0,autre
19,10001,2017-04-23,155.0,autre
20,10001,2017-04-23,1.0,autre
21,10001,2017-04-23,12.0,autre
22,10001,2017-04-23,186.0,autre
23,10001,2017-04-23,421.0,autre
24,10001,2017-04-23,8.0,autre
25,10001,2017-04-23,36.0,autre


In [18]:
candidates_long[ 
    (candidates_long["type_scrutin"] == "presidentielles")
    & (candidates_long["date_scrutin"] == "2017-04-23")
].groupby("code_bv")["voix"].sum().describe()


count     912.000000
mean      687.328947
std       289.611078
min        14.000000
25%       550.500000
50%       735.000000
75%       885.250000
max      1428.000000
Name: voix, dtype: float64

In [19]:
check = (
    bv_features
    .groupby(["date_scrutin", "type_scrutin", "tour"])
    ["part_bloc_extreme_droite"]
    .nunique()
)

assert (check > 1).any(), (
    "ERREUR : certaines parts_bloc sont constantes par scrutin/tour. "
    "Le calcul n'est pas au niveau bureau de vote."
)
