In [None]:
import os
from pathlib import Path
from typing import Optional

import numpy as np
import requests
import py7zr
import pandas as pd
import geopandas as gpd
from shapely import affinity
import shapely.geometry
import shapely.ops
import matplotlib.pylab as plt


# Dossier de travail -----------------------------------------------------------
DATA_DIR = Path("./data").expanduser()
DATA_DIR.mkdir(parents=True, exist_ok=True)

# Style de tracé par défaut pour GeoDataFrame.plot()
PLOT_STYLES = {
    "edgecolor": "black",  # couleur des contours
    "column": "name",      # colonne utilisée pour la coloration catégorielle
    "legend": False,       # pas de légende
    "cmap": "tab20",       # palette discrète
    "linewidth": 0.25,     # épaisseur des contours
}


# Utilitaires E/S --------------------------------------------------------------
def download_file(url: str, dest_path: Path, chunk_size: int = 8192) -> Path:
    """
    Télécharge un fichier en streaming vers dest_path.

    Args:
        url: URL du fichier à télécharger.
        dest_path: chemin local de sortie (répertoires créés si besoin).
        chunk_size: taille des blocs d'écriture.

    Returns:
        Le chemin du fichier téléchargé.
    """
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)

    with requests.get(url, stream=True) as resp:
        resp.raise_for_status()
        with dest_path.open("wb") as f:
            for chunk in resp.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    return dest_path


def unzip_7z(archive_path: Path, extract_dir: Path) -> Path:
    """
    Décompresse une archive .7z dans un dossier cible.

    Args:
        archive_path: chemin de l'archive .7z
        extract_dir: dossier de destination

    Returns:
        Le dossier d'extraction.
    """
    archive_path = Path(archive_path)
    extract_dir = Path(extract_dir)
    extract_dir.mkdir(parents=True, exist_ok=True)

    with py7zr.SevenZipFile(archive_path, mode="r") as z:
        z.extractall(path=extract_dir)

    return extract_dir


def convert_shp_to_geojson(shapefile_path: Path, geojson_path: Path) -> Path:
    """
    Charge un shapefile et l’exporte en GeoJSON.

    Args:
        shapefile_path: chemin vers le .shp
        geojson_path: chemin de sortie .geojson

    Returns:
        Le chemin du GeoJSON écrit.
    """
    shapefile_path = Path(shapefile_path)
    geojson_path = Path(geojson_path)
    geojson_path.parent.mkdir(parents=True, exist_ok=True)

    gdf = gpd.read_file(shapefile_path)
    gdf.to_file(geojson_path, driver="GeoJSON")
    return geojson_path


# Géométrie / carto ------------------------------------------------------------
def reposition(
    gdf: gpd.GeoDataFrame,
    idx,  # index, liste d'index ou masque booléen sélectionnant les entités à déplacer
    xoff: Optional[float] = None,
    yoff: Optional[float] = None,
    xscale: Optional[float] = None,
    yscale: Optional[float] = None,
    simplify: Optional[float] = None,
) -> gpd.GeoDataFrame:
    """
    Mise à l’échelle et translation d’un sous-ensemble de géométries autour de leur centroïde commun.

    - Le centroïde d'origine est calculé sur l’union des géométries sélectionnées.
    - L’échelle est appliquée avant la translation.
    - La simplification (si fournie) intervient à la fin (preserve_topology=False).

    Args:
        gdf: GeoDataFrame source.
        idx: indices/masque pour sélectionner les lignes à transformer.
        xoff, yoff: décalages (en unités du CRS) à appliquer.
        xscale, yscale: facteurs d’échelle (1 = inchangé).
        simplify: tolérance pour la simplification (None = pas de simplification).

    Returns:
        Un nouveau GeoDataFrame avec les géométries modifiées.
    """
    # union des géométries sélectionnées pour obtenir un point d’ancrage robuste
    anchor = gdf.loc[idx, "geometry"].union_all().centroid

    def _transform(geom):
        out = geom
        if xscale is not None or yscale is not None:
            out = affinity.scale(out, xfact=xscale or 1, yfact=yscale or 1, origin=anchor)
        if xoff is not None or yoff is not None:
            out = affinity.translate(out, xoff or 0, yoff or 0)
        if simplify is not None and simplify > 0:
            out = out.simplify(simplify, preserve_topology=False)
        return out

    res = gdf.copy()
    res.loc[idx, "geometry"] = res.loc[idx, "geometry"].apply(_transform)
    return res

In [None]:
# ---
# Téléchargement et préparation des données ADMIN-EXPRESS (IGN)
# Source : https://geoservices.ign.fr/adminexpress
# ---

# URL du fichier compressé (.7z) à télécharger
adminexpress_url = (
    "https://data.geopf.fr/telechargement/download/ADMIN-EXPRESS/"
    "ADMIN-EXPRESS_3-2__SHP_WGS84G_FRA_2025-02-17/"
    "ADMIN-EXPRESS_3-2__SHP_WGS84G_FRA_2025-02-17.7z"
)

# Chemins locaux pour le fichier compressé et pour l’extraction
archive_path = DATA_DIR / "ADMIN-EXPRESS_3-2__SHP_WGS84G_FRA_2025-02-17.7z"
extraction_dir = DATA_DIR / "ADMIN-EXPRESS_3-2__SHP_WGS84G_FRA_2025-02-17"

# Étape 1 : Téléchargement du fichier ADMIN-EXPRESS
download_file(adminexpress_url, archive_path)
print("✅ Téléchargement terminé :", archive_path)

# Étape 2 : Décompression de l’archive
unzip_7z(archive_path, DATA_DIR)
print("✅ Extraction terminée :", extraction_dir)

# Répertoire contenant les shapefiles après extraction
shapefile_dir = (
    extraction_dir / "ADMIN-EXPRESS" /
    "1_DONNEES_LIVRAISON_2025-02-00188" /
    "ADE_3-2_SHP_WGS84G_FRA-ED2025-02-17"
)

# Chemin de sortie pour le GeoJSON
commune_geojson_path = DATA_DIR / "COMMUNE.geojson"

# Étape 3 : Conversion du shapefile en GeoJSON
convert_shp_to_geojson(shapefile_dir / "COMMUNE.shp", commune_geojson_path)
print("✅ Conversion terminée : COMMUNE.shp → COMMUNE.geojson")

In [None]:
# ---
# Chargement et simplification du fichier COMMUNE.geojson
# ---

# Étape 1 : Chargement du GeoJSON issu d'ADMIN-EXPRESS
communes_admin_express = gpd.read_file(DATA_DIR / "COMMUNE.geojson")
print("✅ Fichier COMMUNE.geojson chargé depuis ADMIN-EXPRESS")

# Étape 2 : Sélection des colonnes utiles et sauvegarde simplifiée
def save_as_geojson(gdf, keep_columns, output_filename):
    """
    Sélectionne certaines colonnes d’un GeoDataFrame et les exporte en GeoJSON.

    Args:
        gdf: GeoDataFrame source.
        keep_columns: liste des colonnes à conserver (y compris 'geometry').
        output_filename: nom du fichier de sortie (dans DATA_DIR).
    """
    simplified = gdf[keep_columns].copy()
    output_path = DATA_DIR / output_filename
    simplified.to_file(output_path, driver="GeoJSON")
    print(f"✅ GeoJSON simplifié sauvegardé : {output_path}")

# Exemple d’utilisation :
columns_to_keep = ["NOM", "INSEE_COM", "INSEE_DEP", "INSEE_REG", "geometry"]
save_as_geojson(communes_admin_express, columns_to_keep, "communes.geojson")

In [None]:
# ---
# Téléchargement et chargement des fichiers COG 2025 (INSEE)
# Source : https://www.insee.fr/fr/information/8377162
# ---

# Étape 1 : Téléchargement des fichiers CSV (communes, départements, régions)
download_file(
    "https://www.insee.fr/fr/statistiques/fichier/8377162/v_commune_2025.csv",
    DATA_DIR / "v_commune_2025.csv",
)
download_file(
    "https://www.insee.fr/fr/statistiques/fichier/8377162/v_departement_2025.csv",
    DATA_DIR / "v_departement_2025.csv",
)
download_file(
    "https://www.insee.fr/fr/statistiques/fichier/8377162/v_region_2025.csv",
    DATA_DIR / "v_region_2025.csv",
)
print("✅ Fichiers COG 2025 téléchargés")

# Étape 2 : Chargement des données
# Communes
cog_communes = pd.read_csv(DATA_DIR / "v_commune_2025.csv")
print("✅ v_commune_2025.csv chargé :", cog_communes.shape)

# Départements (on garde uniquement le code et le libellé)
cog_departements = pd.read_csv(DATA_DIR / "v_departement_2025.csv")
cog_departements = cog_departements[["DEP", "LIBELLE"]]
print("✅ v_departement_2025.csv chargé :", cog_departements.shape)

# Régions (on garde uniquement le code et le libellé)
cog_regions = pd.read_csv(DATA_DIR / "v_region_2025.csv")
cog_regions = cog_regions[["REG", "LIBELLE"]]
print("✅ v_region_2025.csv chargé :", cog_regions.shape)

In [None]:
# ---
# Compétence territoriale de la Police nationale et de la Gendarmerie
# Source : https://www.data.gouv.fr/fr/datasets/competence-territoriale-gendarmerie-et-police-nationales/
# ---

# Étape 1 : Téléchargement du fichier CSV
download_file(
    "https://www.data.gouv.fr/fr/datasets/r/c53cd4d4-4623-4772-9b8c-bc72a9cdf4c2",
    DATA_DIR / "competences_pn_gn.csv",
)
print("✅ Fichier competences_pn_gn.csv téléchargé")

# Étape 2 : Chargement avec séparateur « ; »
competences_pn_gn = pd.read_csv(DATA_DIR / "competences_pn_gn.csv", sep=";")
print("✅ competences_pn_gn.csv chargé :", competences_pn_gn.shape)

# Étape 3 : Filtrage des communes relevant de la Police nationale
communes_pn = competences_pn_gn[competences_pn_gn["institution"] == "PN"]
print("✅ Communes sous compétence Police nationale :", communes_pn.shape)

# Étape 4 : Sauvegarde du sous-ensemble
output_path = DATA_DIR / "communes_pn.csv"
communes_pn.to_csv(output_path, index=False)
print("✅ Fichier sauvegardé :", output_path)

In [None]:
# ---
# Enrichissement ADMIN-EXPRESS avec COG + compétence PN (CPN),
# repositionnement des DROM, puis agrégation par service CPN
# ---

# 1) Chargement des jeux
communes_ae = gpd.read_file(DATA_DIR / "COMMUNE.geojson")
print("✅ ADMIN-EXPRESS chargé :", communes_ae.shape)

cpn = pd.read_csv(DATA_DIR / "communes_pn.csv")   # si tu as suivi le renommage précédent -> "communes_pn.csv"
print("✅ communes_pn.csv chargé :", cpn.shape)

# 2) Normalisation des noms de colonnes
communes_ae = communes_ae.rename(columns={"NOM": "name_commune"})

# 3) Jointures avec le COG (communes, départements, régions)
#    - On utilise des suffixes explicites pour éviter les collisions de 'LIBELLE'
communes_ae = pd.merge(
    communes_ae,
    cog_communes,
    left_on="INSEE_COM",
    right_on="COM",
    how="left",
    suffixes=("", "_cogcom"),
)
print("✅ Fusion avec COG communes :", communes_ae.shape)

communes_ae = pd.merge(
    communes_ae,
    cog_departements.rename(columns={"LIBELLE": "name_dep"}),
    on="DEP",
    how="left",
)
print("✅ Fusion avec COG départements :", communes_ae.shape)

# 4) Repositionnement visuel des Outre-mer (mise à l’échelle + translation)
communes_ae = reposition(communes_ae, communes_ae.name_dep == "Guadeloupe",  57.4, 25.4, 1.5, 1.5)
communes_ae = reposition(communes_ae, communes_ae.name_dep == "Martinique",  58.4, 27.1, 1.5, 1.5)
communes_ae = reposition(communes_ae, communes_ae.name_dep == "Guyane",      52.0, 37.7, 0.35, 0.35)
communes_ae = reposition(communes_ae, communes_ae.name_dep == "La Réunion", -55.0, 62.8, 1.5, 1.5)
communes_ae = reposition(communes_ae, communes_ae.name_dep == "Mayotte",    -43.0, 54.3, 1.5, 1.5)
print("✅ Repositionnement des Outre-mer terminé")

# 5) Jointure avec COG régions + libellé
communes_ae = pd.merge(
    communes_ae,
    cog_regions.rename(columns={"LIBELLE": "name_reg"}),
    on="REG",
    how="left",
)
print("✅ Fusion avec COG régions :", communes_ae.shape)

# 6) Ajout de la zone de défense (mapping sûr via dict.get)
map_region_to_dz = {
    "Auvergne-Rhône-Alpes": "Sud-Est",
    "Bourgogne-Franche-Comté": "Est",
    "Bretagne": "Ouest",
    "Centre-Val de Loire": "Ouest",
    "Corse": "Sud",
    "Grand Est": "Est",
    "Guadeloupe": "Antilles",
    "Guyane": "Guyane",
    "Hauts-de-France": "Nord",
    "La Réunion": "Sud de l'Océan Indien",
    "Martinique": "Antilles",
    "Mayotte": "Sud de l'Océan Indien",
    "Normandie": "Ouest",
    "Nouvelle-Aquitaine": "Sud-Ouest",
    "Occitanie": "Sud",
    "Pays de la Loire": "Ouest",
    "Provence-Alpes-Côte d'Azur": "Sud",
    "Île-de-France": "Paris",
}
communes_ae["name_dz"] = communes_ae["name_reg"].apply(lambda x: map_region_to_dz.get(x, "Inconnue"))

# 7) Garde seulement les communes relevant de la PN (CPN)
#    - aligne les types des codes si besoin
comm_insee = communes_ae["INSEE_COM"].astype(str)
codes_cpn = cpn["code_commune"].astype(str)
mask_pn = comm_insee.isin(codes_cpn)

communes_pn_ae = communes_ae.loc[mask_pn].copy()
print("✅ Communes sous compétence PN retenues :", communes_pn_ae.shape)

# 8) Jointure avec le fichier CPN pour récupérer le nom du service
communes_pn_ae = pd.merge(
    communes_pn_ae,
    cpn[["code_commune", "service"]],
    left_on="INSEE_COM",
    right_on="code_commune",
    how="left",
)
cols_communes_pn = ["REG", "DEP", "name_dep", "name_reg", "name_dz", "service", "name_commune", "geometry"]
save_as_geojson(communes_pn_ae, cols_communes_pn, "communes_pn.geojson")

cpn_ae = communes_pn_ae.dissolve(by="service").reset_index()

print("✅ Agrégation par service réalisée :", cpn_ae.shape)

# 9) Contrôle éventuel : lignes sans DEP (après agrégation)
if "DEP" in cpn_ae.columns:
    print("ℹ️ Services avec DEP manquant :")
    print(cpn_ae[cpn_ae["DEP"].isna()][["service"]])

# 10) Export final des CPN
cols_cpn = ["REG", "DEP", "name_dep", "name_reg", "name_dz", "service", "geometry"]
save_as_geojson(cpn_ae, cols_cpn, "cpn.geojson")

In [None]:
# ---
# Construction des zones de défense à partir des communes ADMIN-EXPRESS enrichies
# ---

# Agrégation des communes par zone de défense
# dz_ae = communes_ae.dissolve(by="name_dz").reset_index()
# Pourquoi un dissolve direct par zone de défense depuis les communes pose problème :
# - Topologie hétérogène à l’échelle communale : de micro-chevauchements, interstices et
#   géométries multipart (îlots, polygones littoraux) existent entre communes. Un dissolve
#   "global" sur des milliers de petites frontières cumule ces défauts, laissant des trous
#   ou créant des lignes internes résiduelles dans la zone de défense finale.
# - Limites non parfaitement coïncidentes : certaines communes côtières/rivières et
#   communes déléguées ont des découpages particuliers (zones maritimes, bancs, enclaves).
#   Dissoudre directement à l’échelle DZ agrège des frontières qui ne s’imbriquent pas
#   exactement et dégrade la topologie.
# - Charge et robustesse : fusionner d’un coup un très grand nombre de polygones est coûteux
#   et plus susceptible d’échouer (erreurs GEOS, self-intersections) ou de produire des
#   géométries invalides.
# - Perte de l’alignement administratif : la frontière « officielle » d’une DZ est la somme
#   des régions (elles-mêmes somme des départements, etc.). Dissoudre par paliers
#   communes ➜ départements ➜ régions ➜ zones de défense réduit le nombre d’arêtes à
#   fusionner à chaque étape, corrige implicitement beaucoup de micro-écarts, et garantit
#   que chaque niveau hérite de limites cohérentes et propres.
# => En pratique, le dissolve progressif est plus stable, plus rapide et produit des
#    géométries valides et propres, conformes aux niveaux administratifs intermédiaires.
dep_ae = communes_ae.dissolve(by="name_dep").reset_index()
reg_ae = dep_ae.dissolve(by="name_reg").reset_index()
dz_ae = reg_ae.dissolve(by="name_dz").reset_index()


print("✅ Agrégation par zones de défense terminée :", dz_ae.shape)

# Colonnes à garder
dz_keep_cols = ["name_dz", "geometry"]

# Sauvegarde en GeoJSON
save_as_geojson(dz_ae, dz_keep_cols, "dz.geojson")

In [None]:
# Export des zones DZ en GeoJSON
for dz_name in cpn_ae['name_dz'].unique():
    dz_subset = cpn_ae[cpn_ae['name_dz'] == dz_name]
    save_as_geojson(dz_subset, dz_keep_cols, f"dz_{dz_name}.geojson")

In [None]:
# Sauvegarde de la correspondance départements (code INSEE ↔ nom)
departements_ref = (
    communes_ae[['INSEE_DEP', 'name_dep']]
    .drop_duplicates()
    .sort_values('INSEE_DEP')
)
departements_ref.to_csv(DATA_DIR / 'insee_dep_name_dep.csv', index=False)

# Sauvegarde de la correspondance régions (code INSEE ↔ nom)
regions_ref = (
    communes_ae[['INSEE_REG', 'name_reg']]
    .drop_duplicates()
    .sort_values('INSEE_REG')
)
regions_ref.to_csv(DATA_DIR / 'insee_reg_name_reg.csv', index=False)