In [243]:
# Import des bibliothèques 

import openpyxl
import pandas as pd
import re
from openpyxl.utils.cell import column_index_from_string
import requests
import io
import openpyxl

In [256]:
# Configuration des paramètres 

FILE_URL = 'https://raw.githubusercontent.com/mission-donnees-dett/analyse_preventifs_23_25/main/Tunnels_Cout%20pre%CC%81ventif_13.06.2025%20(5).xlsx'
TARGET_MARCHES = {"Bâtiment", "Propreté", "ContReg", "Eclairage", "AEV", "Automate", "PAU/TSE", "Onduleur", "Détection", "Ventilation", "Vidéo", "Pompage"}
TUNNEL_SHEETS = {'Boissy', 'Champigny', 'Guy Môquet', 'Moulin', 'Nogent',
    'Ambroise Paré', 'Belle-Rive', 'Chennevières', 'Fontenay', 'La Défense',
    'Nanterre Centre', 'Nanterre échangeur', 'Neuilly', 'Saint-Cloud', 'Sévines',
    'Bobigny', 'La Courneuve', 'Landy', 'Lumen-Norton', 'Taverny',
    'Antony', 'Fresnes', 'Bicêtre', 'Italie', 'Orly'}  # Liste des noms de feuilles tunnels
PREVENTIF_SHEET = 'Préventifs_tunnels'

In [257]:
# Téléchargement et chargement du classeur Excel depuis GitHub 

response = requests.get(FILE_URL)
if response.status_code != 200:
    raise FileNotFoundError(f"Failed to download file: {FILE_URL}")
wb = openpyxl.load_workbook(io.BytesIO(response.content), data_only=False)
ws_preventif = wb[PREVENTIF_SHEET]

In [258]:
# Résolution de valeur/formule de cellule (même avec réf. croisée) 

def resolve_cell_value(ws, cell):
    value = cell.value
    if isinstance(value, str) and value.startswith('='):
        match = re.match(r'=([A-Za-z0-9_]+)!([A-Z]+)(\d+)', value)
        if match:
            sheet_name, col, row = match.groups()
            try:
                target_ws = wb[sheet_name]
                target_cell = target_ws[f"{col}{row}"]
                return resolve_cell_value(target_ws, target_cell)
            except Exception as e:
                print(f"    ⚠️ Failed to resolve cross-sheet reference {value}: {e}")
                return None
        else:
            return None
    try:
        return float(value)
    except:
        return None

In [259]:
# Évaluation simple des formules (type A1+A2+A3)

def evaluate_simple_formula(ws, formula, row_num):
    if not isinstance(formula, str) or not formula.startswith('='):
        try:
            return float(formula)
        except:
            return None

    formula = formula.strip('=').replace(' ', '')
    parts = formula.split('+')
    total = 0
    for ref in parts:
        match = re.match(r'([A-Z]+)(\d+)?', ref)
        if not match:
            return None
        col_letter = match.group(1)
        row_number = int(match.group(2)) if match.group(2) else row_num
        try:
            col_index = column_index_from_string(col_letter)
            cell = ws.cell(row=row_number, column=col_index)
            val = resolve_cell_value(ws, cell)
            if val is None:
                return None
            total += float(val)
        except Exception:
            return None
    return total

In [260]:
# Analyse du type de formule et extraction des références

def parse_formula(formula):
    range_matches = re.findall(r'Préventifs_tunnels!G(\d+)(?::G(\d+))?', formula)
    refs = []
    for start_str, end_str in range_matches:
        start = int(start_str)
        if end_str:
            end = int(end_str)
            refs.extend(range(start, end + 1))
        else:
            refs.append(start)

    coeff = 0.5 if '/2' in formula else 1

    if '/2' in formula:
        form_typ = 'div_2'
    elif len(refs) == 1 and formula.strip('=') == f"Préventifs_tunnels!G{refs[0]}":
        form_typ = 'direct_reference'
    elif 'SUM' in formula.upper():
        form_typ = f"sum_{len(refs)}"
    elif '+' in formula:
        form_typ = f"sum_{len(refs)}" if len(refs) >= 2 else 'unknown'
    else:
        form_typ = 'unknown'

    return refs, form_typ, coeff

In [261]:
# Résolution du champ 'Nombre équipé' en cas de formule

def resolve_nb_eq_value(ws, cell, depth=0):
    MAX_DEPTH = 5
    if depth > MAX_DEPTH:
        return 0

    value = cell.value
    if isinstance(value, str) and value.startswith('='):
        matches_cross = re.findall(r"'([^']+)'!([A-Z]+)(\d+)", value)
        if matches_cross:
            total = 0
            for sheet_name, col, row in matches_cross:
                if sheet_name in wb.sheetnames:
                    try:
                        target_ws = wb[sheet_name]
                        target_cell = target_ws[f"{col}{row}"]
                        val = resolve_nb_eq_value(target_ws, target_cell, depth + 1)
                        total += val if val is not None else 0
                    except Exception:
                        continue
            return total

        matches_same = re.findall(r"([A-Z]+)(\d+)", value)
        if matches_same:
            total = 0
            for col, row in matches_same:
                try:
                    target_cell = ws[f"{col}{row}"]
                    val = resolve_nb_eq_value(ws, target_cell, depth + 1)
                    total += val if val is not None else 0
                except Exception:
                    continue
            return total

        try:
            expr = value.lstrip('=').replace(',', '.')
            if re.fullmatch(r'[\d\.\+\-\*\/\(\) ]+', expr):
                return float(eval(expr, {"__builtins__": None}, {}))
        except:
            return 0

    try:
        return float(value) if float(value) != 0 else 0
    except:
        return 0

In [262]:
# Résolution du champ 'Coût HT' même avec formule ou réf. croisée

def resolve_cout_ht(ws, cell, depth=0):
    MAX_DEPTH = 10
    if depth > MAX_DEPTH:
        return None

    val = cell.value
    if isinstance(val, str) and val.startswith('='):
        match_sheet = re.match(r"='([^']+)'!([A-Z]+)(\d+)", val)
        if match_sheet:
            sheet_name, col, row = match_sheet.groups()
            if sheet_name in wb.sheetnames:
                target_ws = wb[sheet_name]
                target_cell = target_ws[f"{col}{row}"]
                return resolve_cout_ht(target_ws, target_cell, depth + 1)
        match_same = re.match(r"=([A-Z]+)(\d+)", val)
        if match_same:
            col, row = match_same.groups()
            return resolve_cout_ht(ws, ws[f"{col}{row}"], depth + 1)

        cached_value = getattr(cell, 'cached_value', None)
        if cached_value is None:
            cached_value = getattr(cell, '_value', None)

        if cached_value is not None:
            try:
                return float(str(cached_value).replace(',', '.'))
            except:
                return 0
        return 0

    try:
        return float(val)
    except:
        return 0

In [263]:
# Traitement des lignes d'une feuille tunnel

def process_tunnel_sheet(ws_tunnel):
    print(f"\nProcessing tunnel sheet: {ws_tunnel.title}")

    header_row = ws_tunnel[2]
    headers = {}
    for idx, cell in enumerate(header_row):
        if cell.value:
            clean_header = str(cell.value).strip().replace('\n', ' ')
            headers[clean_header] = idx
    print(f"Headers found: {list(headers.keys())}")

    col_marche = headers.get('Marché')
    col_freq = headers.get('Fréquence totale')
    col_nbeq = headers.get('Nombre équipé')
    col_cht = headers.get('Coût HT')

    if None in (col_marche, col_freq, col_nbeq, col_cht):
        print(f"⚠️ Missing one or more required columns in sheet {ws_tunnel.title}. Skipping.")
        return [], {}

    sheet_results = []
    skipped_reasons = {}

    for row in ws_tunnel.iter_rows(min_row=3):
        marche_raw = row[col_marche].value
        if marche_raw is None:
            continue
        marche = str(marche_raw).strip()
        if marche not in TARGET_MARCHES:
            continue

        raw_freq = row[col_freq].value
        freq = evaluate_simple_formula(ws_tunnel, raw_freq, row[0].row)
        if freq is None:
            skipped_reasons[row[0].row] = "Invalid frequency"
            continue

        nb_eq = resolve_nb_eq_value(ws_tunnel, row[col_nbeq])
        if nb_eq is None:
            skipped_reasons[row[0].row] = "Could not resolve nb_eq"
            continue

        cell_cht = row[col_cht]
        cell_value = cell_cht.value
        cell_type = cell_cht.data_type

        mar_diminutif = marche
        tunnel = ws_tunnel.title

        if cell_type != 'f':
            try:
                cout_ht_resolved = float(cell_value)
            except:
                cout_ht_resolved = 0
            total = freq * nb_eq * cout_ht_resolved
            sheet_results.append({
                'mar_diminutif': mar_diminutif,
                'tunnel': tunnel,
                'ref_prix': None,
                'form_typ': 'direct_value',
                'frq_totale': freq,
                'nb_equipe': nb_eq,
                'coeff': 1,
                'cout_ht': cout_ht_resolved,
                'cout_total': round(total, 2)
            })
            continue

        formula = cell_value
        if 'Préventifs_tunnels!G' not in formula:
            skipped_reasons[row[0].row] = "Formula does not reference Préventifs_tunnels!G"
            continue

        refs, form_typ, coeff = parse_formula(formula)

        for ref_row in refs:
            ref_prix = ws_preventif[f'D{ref_row}'].value
            cout_ht_cell = ws_preventif[f'G{ref_row}']
            cout_ht_resolved = resolve_cout_ht(ws_preventif, cout_ht_cell)
            if cout_ht_resolved is None:
                skipped_reasons[row[0].row] = f"Could not resolve cout_ht for G{ref_row}"
                continue

            total = freq * nb_eq * coeff * cout_ht_resolved
            sheet_results.append({
                'mar_diminutif': mar_diminutif,
                'tunnel': tunnel,
                'ref_prix': ref_prix,
                'form_typ': form_typ,
                'frq_totale': freq,
                'nb_equipe': nb_eq,
                'coeff': coeff,
                'cout_ht': cout_ht_resolved,
                'cout_total': round(total, 2)
            })

    return sheet_results, skipped_reasons


In [264]:
# Exécution sur toutes les feuilles tunnel

all_results = []
all_skipped = {}

for tunnel_name in TUNNEL_SHEETS:
    if tunnel_name not in wb.sheetnames:
        print(f"⚠️ Sheet '{tunnel_name}' not found, skipping.")
        continue
    ws_tunnel = wb[tunnel_name]
    results, skipped = process_tunnel_sheet(ws_tunnel)
    all_results.extend(results)
    if skipped:
        all_skipped[tunnel_name] = skipped


# Création du DataFrame final

df = pd.DataFrame(all_results)
if not df.empty:
    cols = df.columns.tolist()
    for col_name in ['mar_diminutif', 'tunnel']:
        if col_name in cols:
            cols.insert(0, cols.pop(cols.index(col_name)))
    df = df[cols]

print("\nFinal DataFrame:")
print(df)

# Récapitulatif des lignes ignorées
print("\nSkipped rows summary (only for rows with marchés in target list):")
total_skipped = 0
for tunnel, skipped in all_skipped.items():
    count = len(skipped)
    total_skipped += count
    skips_desc = ', '.join(f"{row} ({reason})" for row, reason in skipped.items())
    print(f" - {tunnel}: {count} rows skipped [{skips_desc}]")
print(f"Total skipped rows: {total_skipped}")



Processing tunnel sheet: Guy Môquet
Headers found: ['Marché', 'Opération', 'Fréquence annuelle', 'Ajustement fréquence supplémentaire', 'Fréquence totale', 'Nombre équipé', 'Coût HT', 'Coût total HT', 'Commentaire/Question']

Processing tunnel sheet: Fresnes
Headers found: ['Marché', 'Opération', 'Fréquence annuelle', 'Ajustement fréquence supplémentaire', 'Fréquence totale', 'Nombre équipé', 'Coût HT', 'Coût total HT', 'Commentaire/Question']

Processing tunnel sheet: Bobigny
Headers found: ['Marché', 'Opération', 'Fréquence annuelle', 'Ajustement fréquence supplémentaire', 'Fréquence totale', 'Nombre équipé', 'Coût HT', 'Coût total HT', 'Commentaire/Question']

Processing tunnel sheet: La Courneuve
Headers found: ['Marché', 'Opération', 'Fréquence annuelle', 'Ajustement fréquence supplémentaire', 'Fréquence totale', 'Nombre équipé', 'Coût HT', 'Coût total HT', 'Commentaire/Question']

Processing tunnel sheet: Orly
Headers found: ['Marché', 'Opération', 'Fréquence annuelle', 'Ajustem

In [266]:
df.groupby('tunnel')['cout_total'].sum()

tunnel
Ambroise Paré          227419.93
Antony                 150926.14
Belle-Rive             284358.03
Bicêtre                219212.70
Bobigny                274904.34
Boissy                 157750.30
Champigny              254497.57
Chennevières           122960.62
Fontenay               156008.04
Fresnes                165767.49
Guy Môquet             178913.54
Italie                  78171.13
La Courneuve           101015.06
La Défense            1130110.88
Landy                  305631.10
Lumen-Norton           203176.21
Moulin                 180645.57
Nanterre Centre        308494.37
Nanterre échangeur     484508.68
Neuilly                124335.58
Nogent                 442966.85
Orly                   106075.22
Saint-Cloud            230813.48
Sévines                118078.62
Taverny                135365.62
Name: cout_total, dtype: float64

Maintenant, on veut rajouter les prdp_code (codes prix permanents) à ce table. 

In [268]:
csv_url = 'https://raw.githubusercontent.com/mission-donnees-dett/analyse_preventifs_23_25/main/prdp_prod_code_montants_designation_2324.csv'
df_csv = pd.read_csv(csv_url, delimiter=';')

mapping = dict(zip(df_csv['prod_code'], df_csv['prdp_code']))

df['prdp_code'] = df['ref_prix'].map(mapping)

ref_prix_index = df.columns.get_loc('ref_prix')
cols = df.columns.tolist()
cols.insert(ref_prix_index, cols.pop(cols.index('prdp_code')))
df = df[cols]

In [None]:
#df.to_csv('coutTunnelsPreventifs_parPrix.csv')