In [None]:
import pandas as pd
import numpy as np
import yaml
import xgboost as xgb
from scipy.interpolate import interp1d
from sklearn.metrics import mean_absolute_percentage_error
import plotly.express as px

# ==========================================
# ÉTAPE 1 : Chargement de toutes les données
# ==========================================

# 1. Specs véhicule (YAML)
with open("vehicle_specs.yaml", "r") as f:
    specs = yaml.safe_load(f)

# 2. Courbes de rendement Renault (CSVs)
df_eta_pt = pd.read_csv("eta_powertrain_speed.csv")
eta_pt_func = interp1d(df_eta_pt["speed_kmph"], df_eta_pt["eta_powertrain"], fill_value="extrapolate")

df_eta_bat = pd.read_csv("eta_battery_temp.csv")
eta_bat_func = interp1d(df_eta_bat["T_battery_C"], df_eta_bat["eta_battery"], fill_value="extrapolate")

# 3. Données de trajets réels (Full columns)
trips = pd.read_csv("trips_raw.csv")

# ==========================================
# ÉTAPE 2 : Modèle Physique Intégré (Cœur V2)
# ==========================================

def run_physics_model(df, specs):
    rho = 1.225  # Densité de l'air
    g = 9.81

    # prise en compte des imprécisions liées au modèle s'appliquant en conditions parfaites. la trainée est plus élevée (impuretés de l'air, imprécision de rain_proxy) et l'accélération plus faible (frottements internes du véhicule, adhérence non parfaite, pertes thermiques)
    residual_drag_error_corrector = 2.2
    residual_acceleration_error_corrector = 0.4

    # --- Dynamique du véhicule ---
    dt = df["time_s"].diff().fillna(1.0)
    v_mps = df["speed_kmph"] / 3.6
    accel = v_mps.diff().fillna(0) / dt

    # Utilisation de elevation_m pour la pente
    dist = v_mps * dt
    grade_sin = np.where(dist > 0.1, df["elevation_m"].diff().fillna(0) / dist, 0)

    # Utilisation de rain_proxy pour le frottement
    crr = np.where(df["rain_proxy"] == 1, specs["Crr"]["wet"], specs["Crr"]["dry"])

    # Utilisation de payload_kg pour la masse totale
    m_total = specs["mass_kg"] + df["payload_kg"]

    # --- Forces ---
    f_drag = 0.5 * rho * specs["CdA_m2"] * (v_mps**2)*residual_drag_error_corrector
    f_rolling = crr * m_total * g
    f_grade = m_total * g * grade_sin
    f_accel = m_total * accel * residual_acceleration_error_corrector

    p_traction_kw = ((f_drag + f_rolling + f_grade + f_accel) * v_mps) / 1000

    # --- Efficience Électrique ---
    # Utilisation de speed_kmph pour le rendement moteur
    eta_pt = eta_pt_func(df["speed_kmph"])

    # Utilisation de T_battery_C pour le rendement batterie
    eta_bat = eta_bat_func(df["T_battery_C"])

    # Utilisation directe de P_aux_load_kW (donnée OBD-II réelle)
    p_aux_kw = df["P_aux_load_kW"]

    # Calcul Puissance Batterie avec Régénération
    p_battery_kw = np.where(
        p_traction_kw >= 0,
        (p_traction_kw / eta_pt + p_aux_kw) / eta_bat,
        (p_traction_kw * 0.7 + p_aux_kw) / eta_bat # 70% de récup au freinage
    )

    # Consommation cumulée (kWh)
    return (p_battery_kw * dt / 3600).cumsum()

# Exécution
trips["E_pred_physics"] = run_physics_model(trips, specs)

# ==========================================
# ÉTAPE 3 : Calibration Hybride (ML sur toutes les colonnes)
# ==========================================

trips["residual"] = trips["E_total_kWh"] - trips["E_pred_physics"]

# On utilise TOUTES les colonnes disponibles comme features pour le ML
# car elles peuvent toutes influencer l'erreur résiduelle (vent, SOH batterie, etc.)
features = [
    "speed_kmph", "T_ambient_C", "rain_proxy",
    "T_battery_C", "payload_kg", "SOC_pct"
]

X = trips[features]
y = trips["residual"]

calibrator = xgb.XGBRegressor(n_estimators=100, max_depth=3, learning_rate=0.1)
calibrator.fit(X, y)

trips["E_pred_hybrid"] = trips["E_pred_physics"] + calibrator.predict(X)

# ==========================================
# ÉTAPE 4 : Résultats
# ==========================================

# On filtre les valeurs de début de trajet pour éviter les divisions par zéro
mask = trips["E_total_kWh"] > 0.01

mape_phys = mean_absolute_percentage_error(trips.loc[mask, "E_total_kWh"], trips.loc[mask, "E_pred_physics"])
mape_hyb = mean_absolute_percentage_error(trips.loc[mask, "E_total_kWh"], trips.loc[mask, "E_pred_hybrid"])

print(f"--- RÉSULTATS V2 (DIGITAL TWIN) ---")
print(f"Erreur Modèle Physique (avec courbes Renault) : {mape_phys*100:.2f}%")
print(f"Erreur Modèle Hybride (Calibré) : {mape_hyb*100:.2f}%")


--- RÉSULTATS V2 (DIGITAL TWIN) ---
Erreur Modèle Physique (avec courbes Renault) : 9.83%
Erreur Modèle Hybride (Calibré) : 0.91%


In [160]:
plot_df = trips.copy()

fig_energy = px.line(
    plot_df,
    x="time_s",
    y=["E_total_kWh", "E_pred_physics", "E_pred_hybrid"],
    labels={
        "value": "Cumulative Energy (kWh)",
        "time_s": "Time (s)",
        "variable": "Signal"
    },
    title="Cumulative Energy Consumption – Ground Truth vs Models"
)

fig_energy.update_layout(
    legend_title_text="",
    hovermode="x unified"
)

fig_energy.show()


In [161]:
plot_df["residual_physics"] = plot_df["E_total_kWh"] - plot_df["E_pred_physics"]
plot_df["residual_hybrid"] = plot_df["E_total_kWh"] - plot_df["E_pred_hybrid"]

fig_residuals = px.line(
    plot_df,
    x="time_s",
    y=["residual_physics", "residual_hybrid"],
    labels={
        "value": "Residual Energy (kWh)",
        "time_s": "Time (s)",
        "variable": "Residual Type"
    },
    title="Residuals Over Time"
)

fig_residuals.add_hline(y=0, line_dash="dash", line_color="black")

fig_residuals.update_layout(
    hovermode="x unified"
)

fig_residuals.show()


In [162]:
mask = trips["E_total_kWh"] > 0.01

parity_df = trips.loc[mask].copy()

fig_parity = px.scatter(
    parity_df,
    x="E_total_kWh",
    y="E_pred_physics",
    opacity=0.4,
    labels={
        "E_total_kWh": "Measured Energy (kWh)",
        "E_pred_physics": "Predicted Energy (kWh)"
    },
    title="Parity Plot – Physics Model"
)

# Perfect prediction line
max_e = parity_df["E_total_kWh"].max()
fig_parity.add_shape(
    type="line",
    x0=0, y0=0,
    x1=max_e, y1=max_e,
    line=dict(color="black", dash="dash")
)

fig_parity.show()


fig_parity_hyb = px.scatter(
    parity_df,
    x="E_total_kWh",
    y="E_pred_hybrid",
    opacity=0.4,
    labels={
        "E_total_kWh": "Measured Energy (kWh)",
        "E_pred_hybrid": "Predicted Energy (kWh)"
    },
    title="Parity Plot – Hybrid Physics + ML Model"
)

fig_parity_hyb.add_shape(
    type="line",
    x0=0, y0=0,
    x1=max_e, y1=max_e,
    line=dict(color="black", dash="dash")
)

fig_parity_hyb.show()



In [163]:
error_df = trips.loc[mask].copy()

error_df["abs_error_phys"] = np.abs(
    error_df["E_total_kWh"] - error_df["E_pred_physics"]
)

error_df["abs_error_hyb"] = np.abs(
    error_df["E_total_kWh"] - error_df["E_pred_hybrid"]
)


fig_speed_error = px.scatter(
    error_df,
    x="speed_kmph",
    y="abs_error_phys",
    opacity=0.3,
    trendline="lowess",
    labels={
        "speed_kmph": "Speed (km/h)",
        "abs_error_phys": "Absolute Error (kWh)"
    },
    title="Physics Model Absolute Error vs Speed"
)

fig_speed_error.show()

