In [None]:
# ==============================================================================
# NOTEBOOK DE TESTES METAM√ìRFICOS - N√çVEL 2: AN√ÅLISE DE COER√äNCIA CAUSAL
# ==============================================================================
# Objetivo: Avaliar se o sistema produz respostas logicamente consistentes a
# transforma√ß√µes de entrada que possuem significado f√≠sico ou de neg√≥cio,
# explorando o conhecimento conceitual sobre as vari√°veis de entrada.
# ------------------------------------------------------------------------------

# üì¶ Imports
import pandas as pd
import numpy as np
import joblib
import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error
from scipy.stats import pearsonr
from pathlib import Path
from IPython.display import display

# ------------------------------------------------------------------------------
# üìÅ ETAPA 1: SETUP DO AMBIENTE DE TESTE
# ------------------------------------------------------------------------------

try:
    ethanol_path = Path(
        r"C:/Users/Paulo Eduardo/Documents/Disserta√ß√£o/ModelosML/Conect2ai/"
        r"MDPI2023-pollution/data/[Etanol] Trajeto Casa-Escola-UFRN/"
        r"trackLog-2023-Feb-13_06-38-49_seg.csv"
    )
    gasoline_path = Path(
        r"C:/Users/Paulo Eduardo/Documents/Disserta√ß√£o/ModelosML/Conect2ai/"
        r"MDPI2023-pollution/data/[Gasolina] Trajeto Casa-Escola-UFRN/"
        r"trackLog-2022-Dec-01_06-43-57_qui.csv"
    )

    df_ethanol = pd.read_csv(ethanol_path)
    df_gasoline = pd.read_csv(gasoline_path)

    print("Arquivos de dados carregados com sucesso.")

except FileNotFoundError as e:
    print(f"ERRO: Arquivos de dados n√£o encontrados. {e}")
    raise

# ------------------------------------------------------------------------------
# üîß PREPARA√á√ÉO DOS DADOS
# ------------------------------------------------------------------------------

def preparar_dados(df):
    """
    Padroniza nomes de colunas e calcula acelera√ß√£o a partir da velocidade,
    preservando coer√™ncia temporal entre amostras.
    """
    df_copy = df.copy()
    df_copy.columns = df_copy.columns.str.strip()

    col_map = {
        "Speed (OBD)(km/h)": "Speed(OBD)(km/h)",
        "Latitude ": "Latitude",
        "Longitude ": "Longitude"
    }

    df_copy.rename(
        columns={k: v for k, v in col_map.items() if k in df_copy.columns},
        inplace=True
    )

    if "Speed(OBD)(km/h)" in df_copy.columns:
        speeds_ms = df_copy["Speed(OBD)(km/h)"].fillna(0).values * 1000 / 3600
        df_copy["Acceleration"] = np.diff(speeds_ms, prepend=speeds_ms[0])

    return df_copy


df_ethanol = preparar_dados(df_ethanol)
df_gasoline = preparar_dados(df_gasoline)

# ------------------------------------------------------------------------------
# ü§ñ CARREGAMENTO DOS MODELOS
# ------------------------------------------------------------------------------
# Os modelos foram previamente treinados e validados fora deste notebook.
# Aqui, eles s√£o utilizados exclusivamente para avalia√ß√£o metam√≥rfica.

try:
    modelos = {
        "ethanol": {
            "afr": joblib.load(
                Path(
                    r"C:/Users/Paulo Eduardo/Documents/Disserta√ß√£o/ModelosML/"
                    r"Conect2ai/MDPI2023-pollution/models/"
                    r"LGBMRegressor_ethanol_afr.pkl"
                )
            ),
            "maf": joblib.load(
                Path(
                    r"C:/Users/Paulo Eduardo/Documents/Disserta√ß√£o/ModelosML/"
                    r"Conect2ai/MDPI2023-pollution/models/"
                    r"XGBRegressor_ethanol_maf.pkl"
                )
            ),
        },
        "gasoline": {
            "afr": joblib.load(
                Path(
                    r"C:/Users/Paulo Eduardo/Documents/Disserta√ß√£o/ModelosML/"
                    r"Conect2ai/MDPI2023-pollution/models/"
                    r"LGBMRegressor_gasoline_afr.pkl"
                )
            ),
            "maf": joblib.load(
                Path(
                    r"C:/Users/Paulo Eduardo/Documents/Disserta√ß√£o/ModelosML/"
                    r"Conect2ai/MDPI2023-pollution/models/"
                    r"XGBRegressor_gasoline_maf.pkl"
                )
            ),
        },
    }

    print("Modelos de IA carregados com sucesso.")

except FileNotFoundError as e:
    print(f"ERRO: Arquivos de modelo n√£o encontrados. {e}")
    raise

# ------------------------------------------------------------------------------
# üîß ETAPA 2: FUN√á√ïES DE SUPORTE
# ------------------------------------------------------------------------------

def compute_co2(afr, maf, fuel_type):
    """
    Calcula a taxa de emiss√£o de CO‚ÇÇ a partir de AFR e MAF,
    considerando o tipo de combust√≠vel.
    """
    afr = np.asarray(afr)
    maf = np.asarray(maf)

    carbon_fraction = {
        "ethanol": 0.5217,
        "gasoline": 0.8571
    }

    fuel_rate = np.divide(
        maf,
        afr,
        out=np.zeros_like(maf, dtype=float),
        where=afr != 0
    )

    return fuel_rate * carbon_fraction[fuel_type] * (44.0 / 12.0)


def encontrar_coluna(df, alternativas):
    """
    Localiza dinamicamente colunas equivalentes,
    lidando com varia√ß√µes de nomenclatura nos datasets.
    """
    df.columns = df.columns.str.strip()
    for alt in alternativas:
        if alt in df.columns:
            return df[alt]
    return None


def pearson_safe(x, y):
    """
    Calcula a correla√ß√£o de Pearson de forma robusta,
    ignorando NaNs e evitando exce√ß√µes.
    """
    x = np.asarray(x)
    y = np.asarray(y)

    mask = ~np.isnan(x) & ~np.isnan(y)
    if mask.sum() < 2:
        return np.nan

    try:
        return pearsonr(x[mask], y[mask])[0]
    except Exception:
        return np.nan


# ------------------------------------------------------------------------------
# üî¨ FUN√á√ÉO PRINCIPAL DE TESTE ‚Äì N√çVEL 2
# ------------------------------------------------------------------------------

def executar_teste_plotly(
    df_original,
    df_modificado,
    modelo_afr,
    modelo_maf,
    fuel,
    nome_mr,
    co2_real=None
):
    features = ["Latitude", "Longitude", "Speed(OBD)(km/h)", "Acceleration"]

    for feature in features:
        if feature not in df_original.columns:
            df_original[feature] = 0
        if feature not in df_modificado.columns:
            df_modificado[feature] = 0

    # ------------------------------------------------------------------
    # Prediction (Baseline)
    # ------------------------------------------------------------------
    afr_baseline = modelo_afr.predict(df_original[features])
    maf_baseline = modelo_maf.predict(df_original[features])
    co2_baseline = compute_co2(afr_baseline, maf_baseline, fuel)

    # ------------------------------------------------------------------
    # Prediction (Metamorphic)
    # ------------------------------------------------------------------
    afr_metamorphic = modelo_afr.predict(df_modificado[features])
    maf_metamorphic = modelo_maf.predict(df_modificado[features])
    co2_metamorphic = compute_co2(afr_metamorphic, maf_metamorphic, fuel)

    # ------------------------------------------------------------------
    # M√©tricas quantitativas
    # ------------------------------------------------------------------
    rmse = mean_squared_error(co2_baseline, co2_metamorphic, squared=False)

    pearson_baseline_mr = pearson_safe(co2_baseline, co2_metamorphic)
    pearson_gt_baseline = pearson_safe(co2_real, co2_baseline) if co2_real is not None else np.nan
    pearson_gt_mr = pearson_safe(co2_real, co2_metamorphic) if co2_real is not None else np.nan

    # ------------------------------------------------------------------
    # Log textual
    # ------------------------------------------------------------------
    print(f"\n{'='*80}")
    print(f"üîÅ {nome_mr} ({fuel.upper()})")
    print("Metamorphic Test ‚Äì Level 2 (Causal Coherence)")
    print(
        f"RMSE (Baseline √ó Metamorphic): {rmse:.4f} | "
        f"Pearson (Baseline √ó Metamorphic): {pearson_baseline_mr:.3f} | "
        f"Pearson (Ground Truth √ó Baseline): {pearson_gt_baseline:.3f} | "
        f"Pearson (Ground Truth √ó Metamorphic): {pearson_gt_mr:.3f}"
    )
    print(f"{'='*80}")

    # ------------------------------------------------------------------
    # Tabela ilustrativa (primeiras 10 amostras)
    # ------------------------------------------------------------------
    tabela = pd.concat(
        [
            df_original[features].iloc[:10].add_suffix("_Baseline"),
            pd.DataFrame({" ": ["‚Üí"] * 10}),
            df_modificado[features].iloc[:10].add_suffix("_Metamorphic"),
            pd.DataFrame({"  ": [" "] * 10}),
            pd.DataFrame({
                "CO‚ÇÇ_Ground_Truth": co2_real[:10] if co2_real is not None else "N/A",
                "CO‚ÇÇ_Pred_Baseline": co2_baseline[:10],
                "CO‚ÇÇ_Pred_Metamorphic": co2_metamorphic[:10]
            }),
        ],
        axis=1
    ).fillna("N/A")

    display(
        tabela.style.set_caption(
            f"Data comparison ‚Äì {nome_mr} ({fuel.upper()})"
        )
    )

    # ------------------------------------------------------------------
    # Gr√°fico
    # ------------------------------------------------------------------
    fig = go.Figure()

    if co2_real is not None:
        fig.add_trace(
            go.Scatter(
                y=co2_real[:200],
                mode="lines",
                name="Ground Truth",
                line=dict(dash="dot", color="gray"),
            )
        )

    fig.add_trace(
        go.Scatter(
            y=co2_baseline[:200],
            mode="lines",
            name="Prediction (Baseline)",
            line=dict(dash="dash", color="blue"),
        )
    )

    fig.add_trace(
        go.Scatter(
            y=co2_metamorphic[:200],
            mode="lines",
            name="Prediction (Metamorphic)",
            line=dict(color="red"),
        )
    )

    fig.update_layout(
        title=dict(
            text=(
                f"CO‚ÇÇ Comparison (200 Samples) ‚Äì {nome_mr} ‚Äì {fuel.upper()}<br>"
                f"RMSE={rmse:.3f} | "
                f"Pearson(Baseline√óMetamorphic)={pearson_baseline_mr:.3f}"
            ),
            x=0.5,
        ),
        xaxis_title="Samples",
        yaxis_title="CO‚ÇÇ (g/s)",
        template="plotly_white",
    )

    fig.show()


# ------------------------------------------------------------------------------
# üîÅ ETAPA 3: METAMORPHIC TEST CASES ‚Äì LEVEL 2
# ------------------------------------------------------------------------------

testes_metamorficos_n2 = {
    "CT_2A_001 ‚Äì [Multiplicative] Speed Scaling (√ó20)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"] * 20}
    ),

    "CT_2A_002 ‚Äì [Multiplicative] Speed Scaling (√ó40)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"] * 40}
    ),


    "CT_2A_003 ‚Äì [Multiplicative] Speed & Acceleration Scaling (√ó10)": 
        lambda df: df.copy().assign(
            **{
                "Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"] * 10,
                "Acceleration": df["Acceleration"] * 10
            }
        ),

    "CT_2A_004 ‚Äì [Multiplicative] Speed & Acceleration Scaling (√ó20)": 
        lambda df: df.copy().assign(
            **{
                "Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"] * 20,
                "Acceleration": df["Acceleration"] * 20
            }
        ),

    "CT_2A_005 ‚Äì [Multiplicative] Speed & Acceleration Scaling (√ó30)": 
        lambda df: df.copy().assign(
            **{
                "Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"] * 30,
                "Acceleration": df["Acceleration"] * 30
            }
        ),        


    "CT_2B_001 ‚Äì [Additive] Coordinate Translation (+1.0)": lambda df: df.copy().assign(
        Latitude=df["Latitude"] + 1.0,
        Longitude=df["Longitude"] + 1.0
    ),

    "CT_2B_002 ‚Äì [Additive] Coordinate Translation (+5.0)": lambda df: df.copy().assign(
        Latitude=df["Latitude"] + 5.0,
        Longitude=df["Longitude"] + 5.0
    ),

    "CT_2C_001 ‚Äì [Inversion] Acceleration Sign Inversion": lambda df: df.copy().assign(
        Acceleration=df["Acceleration"] * -1
    ),

    "CT_2D_001 ‚Äì [Temporal] Speed Lag (lag = 1)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"].shift(1)}
    ),

    "CT_2D_002 ‚Äì [Temporal] Speed Lag (lag = 5)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"].shift(5)}
    ),

    "CT_2D_003 ‚Äì [Temporal] Speed Lag (lag = 10)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"].shift(10)}
    ),

    "CT_2D_004 ‚Äì [Temporal] Speed Lag (lag = 25)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"].shift(25)}
    ),

    "CT_2D_005 ‚Äì [Temporal] Speed Lag (lag = 50)": lambda df: df.copy().assign(
        **{"Speed(OBD)(km/h)": df["Speed(OBD)(km/h)"].shift(50)}
    ),
}


# ------------------------------------------------------------------------------
# üß≤ ETAPA 4: TEST EXECUTION ‚Äì LEVEL 2
# ------------------------------------------------------------------------------

print("=" * 80)
print("STARTING LEVEL 2 ‚Äì METAMORPHIC TEST EXECUTION (CAUSAL COHERENCE)")
print("=" * 80)

# ------------------------------------------------------------------
# Ground Truth computation
# ------------------------------------------------------------------
afr_real_ethanol = encontrar_coluna(
    df_ethanol,
    ["AirFuelRatio(Commanded)(:1)", "Air Fuel Ratio(Commanded)(:1)"]
)
maf_real_ethanol = encontrar_coluna(
    df_ethanol,
    ["MassAirFlowRate(g/s)", "Mass Air Flow Rate(g/s)"]
)

co2_real_ethanol = (
    compute_co2(afr_real_ethanol.values, maf_real_ethanol.values, "ethanol")
    if afr_real_ethanol is not None and maf_real_ethanol is not None
    else None
)

afr_real_gasoline = encontrar_coluna(
    df_gasoline,
    ["AirFuelRatio(Commanded)(:1)", "Air Fuel Ratio(Commanded)(:1)"]
)
maf_real_gasoline = encontrar_coluna(
    df_gasoline,
    ["MassAirFlowRate(g/s)", "Mass Air Flow Rate(g/s)"]
)

co2_real_gasoline = (
    compute_co2(afr_real_gasoline.values, maf_real_gasoline.values, "gasoline")
    if afr_real_gasoline is not None and maf_real_gasoline is not None
    else None
)

ground_truth_values = {
    "ethanol": co2_real_ethanol,
    "gasoline": co2_real_gasoline
}

# ------------------------------------------------------------------
# Test execution loop
# ------------------------------------------------------------------
for nome_mr, transformacao in testes_metamorficos_n2.items():
    for fuel, df_original in [
        ("ethanol", df_ethanol),
        ("gasoline", df_gasoline)
    ]:
        df_modificado = transformacao(df_original.copy())

        executar_teste_plotly(
            df_original=df_original,
            df_modificado=df_modificado,
            modelo_afr=modelos[fuel]["afr"],
            modelo_maf=modelos[fuel]["maf"],
            fuel=fuel,
            nome_mr=nome_mr,
            co2_real=ground_truth_values[fuel]
        )

