In [1]:
pip install pandas openpyxl geopandas matplotlib



In [4]:
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Set
import warnings
warnings.filterwarnings('ignore')

@dataclass
class Infrastructure:
    """Représente une infrastructure électrique"""
    id: str
    type: str  # 'infra_intacte' ou 'a_remplacer'
    longueur: float
    geometry: any = None
    batiments_desservis: Set[str] = field(default_factory=set)
    nb_maisons_total: int = 0
    difficulte: float = 0.0
    est_reparee: bool = False
    phase_reparation: int = -1

    def calculer_difficulte(self):
        """Calcule la difficulté de l'infrastructure"""
        if self.nb_maisons_total == 0:
            self.difficulte = float('inf')
        else:
            self.difficulte = self.longueur / self.nb_maisons_total
        return self.difficulte

    def est_operationnelle(self):
        """Vérifie si l'infrastructure est opérationnelle"""
        return self.type == 'infra_intacte' or self.est_reparee

    def __repr__(self):
        status = "✅" if self.est_operationnelle() else "❌"
        return f"{status} Infra({self.id}, L={self.longueur:.1f}m, {self.nb_maisons_total} maisons, diff={self.difficulte:.2f})"


@dataclass
class Batiment:
    """Représente un bâtiment à raccorder"""
    id: str
    nb_maisons: int
    geometry: any = None
    infrastructures: List[Infrastructure] = field(default_factory=list)
    difficulte_totale: float = 0.0
    phase: int = -1

    def calculer_difficulte(self):
        """Calcule la difficulté totale du bâtiment (RECALCUL DYNAMIQUE)"""
        infras_non_op = [
            infra for infra in self.infrastructures
            if not infra.est_operationnelle()
        ]

        if not infras_non_op:
            self.difficulte_totale = 0.0
        else:
            self.difficulte_totale = sum(infra.difficulte for infra in infras_non_op)

        return self.difficulte_totale

    def est_raccorde(self):
        """Vérifie si toutes les infrastructures sont opérationnelles"""
        return all(infra.est_operationnelle() for infra in self.infrastructures)

    def get_infras_a_reparer(self):
        """Retourne les infrastructures à réparer"""
        return [infra for infra in self.infrastructures if not infra.est_operationnelle()]

    def __repr__(self):
        status = "🟢" if self.est_raccorde() else "🔴"
        return f"{status} Bat({self.id}, {self.nb_maisons} maisons, diff={self.difficulte_totale:.2f}, phase={self.phase})"


class PlanificateurRaccordement:
    """Gestionnaire de la planification du raccordement électrique"""

    def __init__(self, xlsx_path: str, shp_infra_path: str, shp_bat_path: str):
        self.xlsx_path = xlsx_path
        self.shp_infra_path = shp_infra_path
        self.shp_bat_path = shp_bat_path

        self.df_reseau = None
        self.gdf_infrastructures = None
        self.gdf_batiments = None

        self.infrastructures: Dict[str, Infrastructure] = {}
        self.batiments: Dict[str, Batiment] = {}
        self.plan_raccordement: List[Dict] = []

        self.col_id_infra = None
        self.col_id_bat = None

    def _detecter_colonne_id(self, gdf, nom_type):
        """Détecte automatiquement la colonne ID dans un GeoDataFrame"""
        # Ordre de priorité pour la détection
        colonnes_possibles = [
            'infra_id', 'id_bat', 'bat_id', 'batiment_id',  # Colonnes spécifiques
            'id', 'ID', 'Id', 'fid', 'FID', 'objectid', 'OBJECTID'  # Colonnes génériques
        ]

        for col in colonnes_possibles:
            if col in gdf.columns:
                print(f"     ✅ Colonne ID détectée pour {nom_type}: '{col}'")
                return col

        print(f"     ⚠️  Aucune colonne ID trouvée pour {nom_type}")
        return None

    def charger_donnees(self):
        """Charge toutes les données (XLSX + Shapefiles)"""
        print("📊 Chargement des données...")

        # Charger le fichier XLSX
        self.df_reseau = pd.read_excel(self.xlsx_path)
        print(f"  ✅ {len(self.df_reseau)} lignes chargées du fichier réseau")

        # Charger les shapefiles
        self.gdf_infrastructures = gpd.read_file(self.shp_infra_path)
        self.gdf_batiments = gpd.read_file(self.shp_bat_path)

        print(f"  ✅ {len(self.gdf_infrastructures)} infrastructures géospatiales")
        print(f"  ✅ {len(self.gdf_batiments)} bâtiments géospatiaux")

        # Détecter automatiquement la colonne ID
        self.col_id_infra = self._detecter_colonne_id(self.gdf_infrastructures, 'infrastructure')
        self.col_id_bat = self._detecter_colonne_id(self.gdf_batiments, 'bâtiment')

        # Créer les objets Infrastructure
        for infra_id in self.df_reseau['infra_id'].unique():
            infra_data = self.df_reseau[self.df_reseau['infra_id'] == infra_id].iloc[0]

            geom = None
            if self.col_id_infra and infra_id in self.gdf_infrastructures[self.col_id_infra].values:
                geom = self.gdf_infrastructures[self.gdf_infrastructures[self.col_id_infra] == infra_id].geometry.iloc[0]

            self.infrastructures[infra_id] = Infrastructure(
                id=infra_id,
                type=infra_data['infra_type'],
                longueur=infra_data['longueur'],
                geometry=geom
            )

        # Créer les objets Bâtiment et établir les relations
        for bat_id in self.df_reseau['id_batiment'].unique():
            bat_data = self.df_reseau[self.df_reseau['id_batiment'] == bat_id]
            nb_maisons = bat_data['nb_maisons'].iloc[0]

            geom = None
            if self.col_id_bat and bat_id in self.gdf_batiments[self.col_id_bat].values:
                geom = self.gdf_batiments[self.gdf_batiments[self.col_id_bat] == bat_id].geometry.iloc[0]

            self.batiments[bat_id] = Batiment(
                id=bat_id,
                nb_maisons=nb_maisons,
                geometry=geom
            )

            # Associer les infrastructures au bâtiment
            for _, row in bat_data.iterrows():
                infra = self.infrastructures[row['infra_id']]
                self.batiments[bat_id].infrastructures.append(infra)
                infra.batiments_desservis.add(bat_id)
                infra.nb_maisons_total += nb_maisons

        # Calculer les difficultés initiales
        for infra in self.infrastructures.values():
            infra.calculer_difficulte()

        for bat in self.batiments.values():
            bat.calculer_difficulte()

        print(f"\n✅ Modèle créé:")
        print(f"   • {len(self.batiments)} bâtiments")
        print(f"   • {len(self.infrastructures)} infrastructures")
        print(f"   • {sum(b.nb_maisons for b in self.batiments.values())} maisons au total\n")

    def identifier_phase_0(self):
        """Identifie les bâtiments déjà raccordés (Phase 0)"""
        phase_0 = []
        for bat in self.batiments.values():
            if bat.est_raccorde():
                bat.phase = 0
                phase_0.append(bat)

        nb_maisons_phase_0 = sum(b.nb_maisons for b in phase_0)
        print(f"🟢 Phase 0: {len(phase_0)} bâtiments déjà raccordés ({nb_maisons_phase_0} maisons)")
        return phase_0

    def planifier_raccordement(self):
        """Algorithme principal de planification avec affichage simplifié"""
        print("\n" + "="*70)
        print("🔧 PLANIFICATION DES RACCORDEMENTS")
        print("="*70 + "\n")

        # Phase 0: Bâtiments déjà raccordés
        self.identifier_phase_0()

        # Liste des bâtiments à raccorder
        batiments_a_raccorder = [
            bat for bat in self.batiments.values()
            if bat.phase == -1
        ]

        phase = 1
        total_maisons_raccordees = sum(b.nb_maisons for b in self.batiments.values() if b.phase == 0)
        cout_total = 0
        total_bat = len(batiments_a_raccorder)

        print(f"📋 {total_bat} bâtiments à raccorder\n")
        print("Progression du raccordement:")
        print("-" * 70)

        # Affichage par paliers
        palier = max(1, total_bat // 10)  # Afficher environ 10 fois

        while batiments_a_raccorder:
            # Recalculer les difficultés
            for infra in self.infrastructures.values():
                infra.calculer_difficulte()

            for bat in batiments_a_raccorder:
                bat.calculer_difficulte()

            # Trier par difficulté croissante
            batiments_a_raccorder.sort(key=lambda b: b.difficulte_totale)

            # Prendre le bâtiment le moins difficile
            batiment = batiments_a_raccorder.pop(0)
            batiment.phase = phase

            # Identifier et réparer les infrastructures
            infras_a_reparer = batiment.get_infras_a_reparer()

            # Supprimer les doublons dans la liste des infrastructures à réparer
            infras_uniques = list({infra.id: infra for infra in infras_a_reparer}.values())

            cout_phase = sum(infra.longueur for infra in infras_uniques)
            cout_total += cout_phase

            # Marquer les infrastructures comme réparées
            for infra in infras_uniques:
                infra.est_reparee = True
                infra.phase_reparation = phase

            # Enregistrer dans le plan
            self.plan_raccordement.append({
                'phase': phase,
                'batiment_id': batiment.id,
                'nb_maisons': batiment.nb_maisons,
                'difficulte': batiment.difficulte_totale,
                'nb_infras_reparees': len(infras_uniques),
                'infras_reparees': ', '.join([i.id for i in infras_uniques]),
                'cout_phase': cout_phase,
                'cout_cumule': cout_total
            })

            total_maisons_raccordees += batiment.nb_maisons

            # Affichage simplifié (par paliers)
            if phase == 1 or phase % palier == 0 or len(batiments_a_raccorder) == 0:
                pourcentage = ((total_bat - len(batiments_a_raccorder)) / total_bat) * 100
                barre_prog = "█" * int(pourcentage / 5) + "░" * (20 - int(pourcentage / 5))
                print(f"[{barre_prog}] {pourcentage:5.1f}% | Phase {phase:3d} | "
                      f"Coût: {cout_total:8.2f}m | Maisons: {total_maisons_raccordees}")

            phase += 1

        print("-" * 70)
        print(f"\n{'='*70}")
        print("✅ PLANIFICATION TERMINÉE")
        print(f"{'='*70}")
        print(f"\n📊 Résumé:")
        print(f"   • Nombre de phases: {phase - 1}")
        print(f"   • Maisons raccordées: {total_maisons_raccordees}")
        print(f"   • Coût total: {cout_total:.2f}m")
        print(f"   • Coût moyen/maison: {cout_total/total_maisons_raccordees:.2f}m")

        # Statistiques sur les infrastructures
        nb_infras_reparees = sum(1 for infra in self.infrastructures.values() if infra.est_reparee)
        print(f"   • Infrastructures réparées: {nb_infras_reparees}/{len(self.infrastructures)}\n")

    def analyser_mutualisation(self):
        """Analyse les opportunités de mutualisation"""
        print("="*70)
        print("🔄 ANALYSE DE LA MUTUALISATION")
        print("="*70 + "\n")

        mutualisations = []
        batiments_mutualises = set()  # Pour éviter de compter plusieurs fois les mêmes bâtiments

        for infra in self.infrastructures.values():
            if len(infra.batiments_desservis) > 1:
                mutualisations.append({
                    'infra_id': infra.id,
                    'type': infra.type,
                    'nb_batiments': len(infra.batiments_desservis),
                    'nb_maisons': infra.nb_maisons_total,
                    'longueur': infra.longueur,
                    'efficacite': infra.nb_maisons_total / infra.longueur,
                    'phase_reparation': infra.phase_reparation if infra.est_reparee else 'N/A'
                })
                # Ajouter tous les bâtiments desservis par cette infrastructure
                batiments_mutualises.update(infra.batiments_desservis)

        if mutualisations:
            df_mut = pd.DataFrame(mutualisations).sort_values('efficacite', ascending=False)
            print(f"🎯 {len(mutualisations)} infrastructures mutualisées détectées\n")
            print(f"Top 10 infrastructures les plus efficaces:")
            print(df_mut.head(10).to_string(index=False))

            # Calculer le nombre de maisons réellement concernées (sans doublon)
            nb_maisons_mutualises = sum(
                self.batiments[bat_id].nb_maisons
                for bat_id in batiments_mutualises
            )
            total_maisons = sum(b.nb_maisons for b in self.batiments.values())

            print(f"\n📊 Statistiques de mutualisation:")
            print(f"   • Bâtiments concernés: {len(batiments_mutualises)}/{len(self.batiments)} "
                  f"({100*len(batiments_mutualises)/len(self.batiments):.1f}%)")
            print(f"   • Maisons concernées: {nb_maisons_mutualises}/{total_maisons} "
                  f"({100*nb_maisons_mutualises/total_maisons:.1f}%)")
            print(f"   • Longueur totale mutualisée: {df_mut['longueur'].sum():.2f}m")
            print(f"   • Efficacité moyenne: {df_mut['efficacite'].mean():.2f} maisons/m\n")
        else:
            print("Aucune mutualisation détectée.\n")

        return mutualisations

    def generer_rapport(self, output_path: str = "rapport_raccordement.xlsx"):
        """Génère un rapport Excel du plan de raccordement"""
        print(f"📄 Génération du rapport Excel...")

        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            # Feuille 1: Plan de raccordement
            df_plan = pd.DataFrame(self.plan_raccordement)
            df_plan.to_excel(writer, sheet_name='Plan_Raccordement', index=False)

            # Feuille 2: Détails des bâtiments
            batiments_data = [{
                'id_batiment': bat.id,
                'nb_maisons': bat.nb_maisons,
                'phase': bat.phase,
                'difficulte_finale': bat.difficulte_totale,
                'nb_infrastructures': len(bat.infrastructures)
            } for bat in self.batiments.values()]
            df_bat = pd.DataFrame(batiments_data).sort_values('phase')
            df_bat.to_excel(writer, sheet_name='Batiments', index=False)

            # Feuille 3: Détails des infrastructures
            infras_data = [{
                'infra_id': infra.id,
                'type_initial': infra.type,
                'longueur': infra.longueur,
                'nb_batiments_desservis': len(infra.batiments_desservis),
                'nb_maisons_total': infra.nb_maisons_total,
                'difficulte': infra.difficulte,
                'phase_reparation': infra.phase_reparation if infra.est_reparee else 'N/A'
            } for infra in self.infrastructures.values()]
            df_infra = pd.DataFrame(infras_data).sort_values('difficulte')
            df_infra.to_excel(writer, sheet_name='Infrastructures', index=False)

        print(f"   ✅ Rapport généré: {output_path}\n")
        return df_plan

    def visualiser_plan(self, output_path: str = "carte_raccordement.png"):
        """Génère une carte du plan de raccordement"""
        print(f"🗺️  Génération de la carte...")

        if self.gdf_infrastructures is None or self.gdf_batiments is None:
            print("   ⚠️  Shapefiles non chargés\n")
            return

        if self.col_id_infra is None or self.col_id_bat is None:
            print("   ⚠️  Colonnes ID non détectées\n")
            return

        fig, axes = plt.subplots(1, 2, figsize=(20, 10))

        # Carte 1: État initial
        ax1 = axes[0]
        ax1.set_title("État Initial (Avant Réparations)", fontsize=16, fontweight='bold')

        for infra_id, infra in self.infrastructures.items():
            color = 'green' if infra.type == 'infra_intacte' else 'red'
            gdf_temp = self.gdf_infrastructures[self.gdf_infrastructures[self.col_id_infra] == infra_id]
            if not gdf_temp.empty:
                gdf_temp.plot(ax=ax1, color=color, linewidth=2, alpha=0.7)

        self.gdf_batiments.plot(ax=ax1, color='blue', markersize=30, alpha=0.6)
        ax1.legend(['Infra intacte', 'Infra à remplacer', 'Bâtiments'], loc='upper right')

        # Carte 2: Phases de raccordement
        ax2 = axes[1]
        ax2.set_title("Plan de Raccordement (Par Phase)", fontsize=16, fontweight='bold')

        phases = sorted(set(infra.phase_reparation for infra in self.infrastructures.values() if infra.phase_reparation > 0))
        colors = plt.cm.viridis(np.linspace(0, 1, max(len(phases), 1)))

        for phase, color in zip(phases, colors):
            for infra in self.infrastructures.values():
                if infra.phase_reparation == phase:
                    gdf_temp = self.gdf_infrastructures[self.gdf_infrastructures[self.col_id_infra] == infra.id]
                    if not gdf_temp.empty:
                        gdf_temp.plot(ax=ax2, color=color, linewidth=2, alpha=0.7, label=f'Phase {phase}')

        for infra in self.infrastructures.values():
            if infra.type == 'infra_intacte':
                gdf_temp = self.gdf_infrastructures[self.gdf_infrastructures[self.col_id_infra] == infra.id]
                if not gdf_temp.empty:
                    gdf_temp.plot(ax=ax2, color='lightgreen', linewidth=1, alpha=0.5)

        for bat in self.batiments.values():
            gdf_temp = self.gdf_batiments[self.gdf_batiments[self.col_id_bat] == bat.id]
            if not gdf_temp.empty:
                if bat.phase == 0:
                    color = 'lightgreen'
                else:
                    phase_idx = phases.index(bat.phase) if bat.phase in phases else 0
                    color = colors[phase_idx] if len(colors) > 0 else 'blue'
                gdf_temp.plot(ax=ax2, color=color, markersize=30, alpha=0.8)

        handles, labels = ax2.get_legend_handles_labels()
        by_label = dict(zip(labels, handles))
        ax2.legend(by_label.values(), by_label.keys(), loc='upper right')

        plt.tight_layout()
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        print(f"   ✅ Carte sauvegardée: {output_path}\n")
        plt.close()


# Fonction principale
def main(xlsx_path: str, shp_infra_path: str, shp_bat_path: str):
    """Point d'entrée principal"""
    print("\n" + "="*70)
    print("🏗️  PLANIFICATION DU RACCORDEMENT ÉLECTRIQUE")
    print("="*70 + "\n")

    planificateur = PlanificateurRaccordement(xlsx_path, shp_infra_path, shp_bat_path)
    planificateur.charger_donnees()
    planificateur.planifier_raccordement()
    planificateur.analyser_mutualisation()
    rapport = planificateur.generer_rapport()
    planificateur.visualiser_plan()

    print("="*70)
    print("✅ PROCESSUS TERMINÉ")
    print("="*70 + "\n")

    return planificateur, rapport


# Exemple d'utilisation
if __name__ == "__main__":
    xlsx_path = "reseau_en_arbre.xlsx"
    shp_infra_path = "infrastructures.shp"
    shp_bat_path = "batiments.shp"

    planificateur, rapport = main(xlsx_path, shp_infra_path, shp_bat_path)


🏗️  PLANIFICATION DU RACCORDEMENT ÉLECTRIQUE

📊 Chargement des données...
  ✅ 6107 lignes chargées du fichier réseau
  ✅ 644 infrastructures géospatiales
  ✅ 381 bâtiments géospatiaux
     ✅ Colonne ID détectée pour infrastructure: 'infra_id'
     ✅ Colonne ID détectée pour bâtiment: 'id_bat'

✅ Modèle créé:
   • 381 bâtiments
   • 644 infrastructures
   • 389 maisons au total


🔧 PLANIFICATION DES RACCORDEMENTS

🟢 Phase 0: 296 bâtiments déjà raccordés (304 maisons)
📋 85 bâtiments à raccorder

Progression du raccordement:
----------------------------------------------------------------------
[░░░░░░░░░░░░░░░░░░░░]   1.2% | Phase   1 | Coût:    37.55m | Maisons: 305
[█░░░░░░░░░░░░░░░░░░░]   9.4% | Phase   8 | Coût:    68.36m | Maisons: 312
[███░░░░░░░░░░░░░░░░░]  18.8% | Phase  16 | Coût:   199.78m | Maisons: 320
[█████░░░░░░░░░░░░░░░]  28.2% | Phase  24 | Coût:   311.55m | Maisons: 328
[███████░░░░░░░░░░░░░]  37.6% | Phase  32 | Coût:   510.20m | Maisons: 336
[█████████░░░░░░░░░░░]  4

In [None]:
# Load the shapefiles to inspect their column names
try:
    gdf_infrastructures_cols = gpd.read_file("infrastructures.shp")
    print("Columns in infrastructures.shp:")
    print(gdf_infrastructures_cols.columns)

    gdf_batiments_cols = gpd.read_file("batiments.shp")
    print("\nColumns in batiments.shp:")
    print(gdf_batiments_cols.columns)
except Exception as e:
    print(f"Error loading shapefiles: {e}")