# Resultados — Comparação de Modelos
Este notebook descobre conjuntos de dados de teste gerados (N1A/N1B/...) e compara os modelos treinados (Linear/MLP, LSTM, TFT) quando disponíveis.

Diretrizes: baseado no escopo de coleta (carga real A65), avaliamos previsões multi-passos (lead) por problema. Caso algum artefato/modelo não esteja disponível, a linha correspondente é marcada como 'indisponível'.

In [None]:
# Imports e utilitários
import os, json, sys, glob, math, random
from typing import Dict, List, Tuple, Optional
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# TensorFlow é opcional — carregaremos sob demanda
try:
    import tensorflow as tf
    _HAS_TF = True
except Exception:
    _HAS_TF = False

random.seed(42)
np.random.seed(42)

# ------------------------- Métricas ------------------------- #
def mae(y_true, y_pred):
    return float(np.mean(np.abs(y_true - y_pred)))
def rmse(y_true, y_pred):
    return float(math.sqrt(np.mean((y_true - y_pred) ** 2)))
def mape(y_true, y_pred, eps=1e-8):
    denom = np.clip(np.abs(y_true), eps, None)
    return float(100.0 * np.mean(np.abs((y_true - y_pred) / denom)))

# -------------------- Descoberta de problemas -------------------- #
def list_problem_dirs(root="data") -> List[str]:
    if not os.path.isdir(root):
        return []
    problems = [os.path.join(root, d) for d in os.listdir(root) if d.startswith('N') and os.path.isdir(os.path.join(root, d))]
    return sorted(problems)

# -------------------- Loaders Linear -------------------- #
def load_linear_meta(problem_dir: str) -> Dict:
    meta_path = os.path.join(problem_dir, 'linear_dataset_test.meta.json')
    if os.path.exists(meta_path):
        with open(meta_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {}

def load_linear_parquet(problem_dir: str, meta: Dict) -> Tuple[pd.DataFrame, List[str], List[str]]:
    pq_path = meta.get('parquet_path') or os.path.join(problem_dir, 'linear_dataset_test.parquet')
    if not os.path.exists(pq_path):
        return pd.DataFrame(), [], []
    df = pd.read_parquet(pq_path)
    fcols = list(meta.get('feature_cols') or [])
    tcols = list(meta.get('target_cols') or [])
    missing = [c for c in (fcols + tcols) if c not in df.columns]
    if missing:
        print(f"[WARN] Colunas ausentes no parquet de {problem_dir}: {missing[:5]}{'...' if len(missing)>5 else ''}")
    return df, fcols, tcols

def infer_horizon_cols(target_cols: List[str]) -> List[str]:
    """Retorna apenas colunas de lead (ignora 'quantity_MW' presente como t0)."""
    if not target_cols:
        return []
    leads = [c for c in target_cols if c.startswith('quantity_MW_lead')]
    # fallback: se não houver padrão, usa todas menos a primeira
    if not leads and len(target_cols) > 1:
        return target_cols[1:]
    return leads

# -------------------- Descoberta de modelos -------------------- #
def discover_keras_models(search_roots: List[str]) -> List[str]:
    patterns = []
    for root in search_roots:
        patterns += glob.glob(os.path.join(root, '**', '*.keras'), recursive=True)
        patterns += glob.glob(os.path.join(root, '**', '*.h5'), recursive=True)
        # SavedModel dirs (contendo saved_model.pb)
        for pb in glob.glob(os.path.join(root, '**', 'saved_model.pb'), recursive=True):
            patterns.append(os.path.dirname(pb))
    # Filtrar ambientes (venv)
    patterns = [p for p in patterns if '/tfc_venv/' not in p and '.venv/' not in p and '/.git/' not in p]
    return sorted(set(patterns))

def safe_load_keras_model(path: str):
    if not _HAS_TF:
        return None
    try:
        if os.path.isdir(path):
            return tf.keras.models.load_model(path)
        return tf.keras.models.load_model(path, compile=False)
    except Exception as e:
        print(f"[WARN] Falha ao carregar modelo Keras {path}: {e}")
        return None

def model_label_from_path(path: str) -> str:
    base = os.path.basename(path.rstrip('/'))
    if base == 'variables':
        base = os.path.basename(os.path.dirname(path))
    return base

# -------------------- Avaliação -------------------- #
def align_predictions(Y: np.ndarray, Yp: np.ndarray, target_cols: List[str]) -> Tuple[np.ndarray, np.ndarray, List[str]]:
    """Alinha Y e Yp para mesma largura temporal e retorna colunas de lead usadas."""
    leads = infer_horizon_cols(target_cols)
    # Se Y inclui t0 + leads, recortar para apenas leads
    if Y.ndim == 2 and len(target_cols) >= 2 and target_cols[0] == 'quantity_MW' and len(leads) > 0:
        Yt = Y[:, 1:1+len(leads)]
    else:
        Yt = Y
    Yp = np.asarray(Yp)
    if Yp.ndim == 3:
        Yp = Yp.reshape(Yp.shape[0], -1)
    # Cortar para mesma largura
    if Yt.ndim == 2 and Yp.ndim == 2:
        H = min(Yt.shape[1], Yp.shape[1])
        if H <= 0:
            return Yt[:, :0], Yp[:, :0], []
        return Yt[:, :H], Yp[:, :H], leads[:H] if leads else [f'lead{i+1}' for i in range(H)]
    # fallback escalar
    H = 1
    return Yt.reshape(-1, 1), Yp.reshape(-1, 1), [leads[0] if leads else 'lead1']

def evaluate_overall(Yt: np.ndarray, Yp: np.ndarray) -> Dict[str, float]:
    return {
        'MAE': mae(Yt, Yp),
        'RMSE': rmse(Yt, Yp),
        'MAPE': mape(Yt, Yp)
    }

def evaluate_per_horizon(Yt: np.ndarray, Yp: np.ndarray) -> pd.DataFrame:
    if Yt.ndim != 2 or Yp.ndim != 2:
        return pd.DataFrame()
    H = Yt.shape[1]
    rows = []
    for h in range(H):
        yt = Yt[:, h]
        yp = Yp[:, h]
        rows.append({'h': h+1, 'MAE': mae(yt, yp), 'RMSE': rmse(yt, yp), 'MAPE': mape(yt, yp)})
    return pd.DataFrame(rows)

def baseline_repeat_last(Y: np.ndarray) -> np.ndarray:
    """Baseline: repetir o valor do passo t (primeira coluna de Y se existir) para todos os horizontes."""
    if Y.ndim == 1:
        return Y.copy()
    # Se Y inclui t0 + leads, use t0 (coluna 0); caso contrário, use a coluna 0 como proxy
    base = Y[:, 0].reshape(-1, 1)
    return np.repeat(base, Y.shape[1], axis=1)

def evaluate_keras_on_linear(model, X: np.ndarray, Y: np.ndarray, target_cols: List[str]) -> Tuple[Dict[str, float], pd.DataFrame]:
    try:
        Yp = model.predict(X, verbose=0)
        Yt, Yp, used_leads = align_predictions(Y, Yp, target_cols)
        overall = evaluate_overall(Yt, Yp)
        per_h = evaluate_per_horizon(Yt, Yp)
        if not per_h.empty:
            per_h['lead_name'] = used_leads[:len(per_h)]
        return overall, per_h
    except Exception as e:
        return {'error': str(e)}, pd.DataFrame()

def add_input_noise(X: np.ndarray, sigma: float, pad_sentinel: float = -999.0) -> np.ndarray:
    if sigma <= 0:
        return X
    noise = np.random.normal(loc=0.0, scale=sigma, size=X.shape).astype(X.dtype)
    if pad_sentinel is not None:
        mask = (X == pad_sentinel)
        Xn = X + noise
        Xn[mask] = pad_sentinel
        return Xn
    return X + noise

In [None]:
# Inventário de problemas e dados disponíveis
problems = list_problem_dirs('data')
inventory = []
for p in problems:
    lin_meta = load_linear_meta(p)
    lin_pq = os.path.exists(lin_meta.get('parquet_path', os.path.join(p, 'linear_dataset_test.parquet')))
    lstm_meta = os.path.exists(os.path.join(p, 'lstm_dataset_test.meta.json'))
    tft_pq = os.path.exists(os.path.join(p, 'tft_dataset_test.parquet'))
    n_rows = 0
    if lin_pq:
        try:
            n_rows = int(pd.read_parquet(lin_meta.get('parquet_path', os.path.join(p, 'linear_dataset_test.parquet')), columns=['quantity_MW']).shape[0])
        except Exception:
            pass
    inventory.append({
        'problem': os.path.basename(p),
        'linear_test_parquet': lin_pq,
        'lstm_test_meta': lstm_meta,
        'tft_test_parquet': tft_pq,
        'linear_meta_ok': bool(lin_meta),
        'rows': n_rows
    })
inv_df = pd.DataFrame(inventory).sort_values('problem').reset_index(drop=True)
inv_df

In [None]:
# Descoberta de modelos Keras (Linear/MLP/LSTM) salvos — opcional
# Procura por .keras, .h5 e SavedModel no repositório (exclui venv)
search_roots = ['.']
keras_models = discover_keras_models(search_roots)
print(f'Encontrados {len(keras_models)} possíveis modelos Keras.')
for m in keras_models[:10]:
    print(' -', m)
if len(keras_models) > 10:
    print(' ...')

In [None]:
# Avaliação: aplicar modelos Keras em datasets lineares (quando compatíveis)
# - Coleta métricas globais e por horizonte
# - Inclui baseline de repetição do último valor
results = []
per_h_results = []

search_roots = ['.']
keras_models = discover_keras_models(search_roots)
print(f'Encontrados {len(keras_models)} possíveis modelos Keras.')
for p in problems:
    meta = load_linear_meta(p)
    if not meta:
        continue
    df, fcols, tcols = load_linear_parquet(p, meta)
    if df.empty or not fcols or not tcols:
        continue
    # Checagem de tipos: evitar colunas não numéricas
    numeric_ok = all(np.issubdtype(df[c].dtype, np.number) for c in fcols)
    if not numeric_ok:
        print(f"[SKIP] {os.path.basename(p)}: features incluem colunas não numéricas.")
        continue
    X = df[fcols].to_numpy(dtype='float32', copy=False)
    Y = df[tcols].to_numpy(dtype='float32', copy=False)
    # Subamostra opcional para acelerar
    max_rows = 30000
    if len(X) > max_rows:
        X = X[:max_rows]
        Y = Y[:max_rows]

    # Baseline (repetir último valor)
    Yt, Yb, used_leads = align_predictions(Y, baseline_repeat_last(Y), tcols)
    base_overall = evaluate_overall(Yt, Yb)
    results.append({
        'problem': os.path.basename(p),
        'dataset': 'linear_test',
        'model_path': 'baseline_repeat_last',
        'model_label': 'baseline_repeat_last',
        **base_overall
    })
    ph = evaluate_per_horizon(Yt, Yb)
    if not ph.empty:
        ph['problem'] = os.path.basename(p)
        ph['model_label'] = 'baseline_repeat_last'
        per_h_results.append(ph)

    # Avaliar modelos descobertos
    for model_path in keras_models:
        model = safe_load_keras_model(model_path)
        if model is None:
            continue
        # Checagem leve de compatibilidade de entrada
        try:
            in_shape = getattr(model, 'input_shape', None)
        except Exception:
            in_shape = None
        if in_shape is not None:
            # Se múltiplas entradas, deixar passar
            if isinstance(in_shape, (list, tuple)) and len(in_shape) and isinstance(in_shape[0], (list, tuple)):
                pass
            else:
                last_dim = in_shape[-1] if isinstance(in_shape, (list, tuple)) else None
                if isinstance(last_dim, int) and last_dim > 0 and last_dim != X.shape[1]:
                    continue

        overall, per_h = evaluate_keras_on_linear(model, X, Y, tcols)
        label = model_label_from_path(model_path)
        row = {
            'problem': os.path.basename(p),
            'dataset': 'linear_test',
            'model_path': model_path,
            'model_label': label,
        }
        row.update(overall)
        results.append(row)
        if not per_h.empty:
            per_h['problem'] = os.path.basename(p)
            per_h['model_label'] = label
            per_h_results.append(per_h)

res_df = pd.DataFrame(results)
per_h_df = pd.concat(per_h_results, ignore_index=True) if per_h_results else pd.DataFrame()

if res_df.empty:
    print('Nenhum resultado disponível. Verifique se os modelos foram treinados e salvos neste repositório.')
else:
    display(res_df.sort_values(['problem', 'MAE']).reset_index(drop=True))
    if not per_h_df.empty:
        display(per_h_df.groupby(['problem', 'model_label'])['MAE'].mean().rename('MAE_mean').reset_index().sort_values(['problem','MAE_mean']))

## Visualização (opcional)
Se houver resultados:
- Tabela com top-3 modelos por problema (MAE).
- Gráfico de barras de MAE por modelo (média por horizonte).
- Curva MAE vs. horizonte para um problema selecionado.

In [None]:
# Top-3 por problema (MAE)
if 'res_df' in globals() and not res_df.empty:
    top3 = (res_df
            .dropna(subset=['MAE'])
            .sort_values(['problem', 'MAE'])
            .groupby('problem')
            .head(3)
            .reset_index(drop=True))
    display(top3)
else:
    print('Sem resultados gerais.')

# Barras: MAE médio por modelo (por problema)
if 'per_h_df' in globals() and not per_h_df.empty:
    agg = (per_h_df
           .groupby(['problem','model_label'])['MAE']
           .mean()
           .rename('MAE_mean')
           .reset_index())
    for prob in agg['problem'].unique():
        sub = agg[agg['problem']==prob].sort_values('MAE_mean')
        plt.figure(figsize=(10,4))
        plt.barh(sub['model_label'], sub['MAE_mean'], color='teal')
        plt.xlabel('MAE médio (por horizonte)')
        plt.title(f'{prob} — Comparação por modelo')
        plt.grid(True, axis='x', alpha=0.3)
        plt.tight_layout()
        plt.show()
else:
    print('Sem métricas por horizonte para gráficos.')

# Curva MAE vs horizonte (problema com mais modelos)
if 'per_h_df' in globals() and not per_h_df.empty:
    # Escolher o problema com mais linhas
    prob_counts = per_h_df['problem'].value_counts()
    sel_prob = prob_counts.index[0]
    sub = per_h_df[per_h_df['problem']==sel_prob]
    plt.figure(figsize=(10,4))
    for ml, g in sub.groupby('model_label'):
        plt.plot(g['h'], g['MAE'], label=ml, alpha=0.8)
    plt.xlabel('Horizonte (h)')
    plt.ylabel('MAE')
    plt.title(f'{sel_prob} — MAE vs Horizonte')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
else:
    print('Sem dados suficientes para curva por horizonte.')

### Notas
- LSTM e TFT: este notebook pode ser estendido para ler TFRecords/parquets específicos e checkpoints (PyTorch Lightning) quando disponíveis.
- Para garantir avaliação completa, execute os treinamentos em `Modelos.ipynb` antes e confirme que os artefatos de modelo são salvos dentro do repositório (evitando pastas de ambiente).

## Robustez a ruído (opcional)

Nesta seção, avaliamos a degradação de erro quando adicionamos ruído gaussiano às entradas X (apenas datasets lineares).
Execute após haver pelo menos um modelo avaliado.


In [None]:
# Avaliação com ruído nas entradas (σ em {0.00, 0.01, 0.03, 0.05, 0.10})
if 'problems' in globals() and len(problems) and 'keras_models' in globals():
    noise_rows = []
    sigmas = [0.00, 0.01, 0.03, 0.05, 0.10]
    for p in problems:
        meta = load_linear_meta(p)
        if not meta:
            continue
        df, fcols, tcols = load_linear_parquet(p, meta)
        if df.empty or not fcols or not tcols:
            continue
        if not all(np.issubdtype(df[c].dtype, np.number) for c in fcols):
            continue
        X0 = df[fcols].to_numpy(dtype='float32', copy=False)
        Y = df[tcols].to_numpy(dtype='float32', copy=False)
        # Subamostra opcional
        max_rows = 15000
        if len(X0) > max_rows:
            X0 = X0[:max_rows]
            Y = Y[:max_rows]
        # Precisa de ao menos um modelo compatível
        any_model = False
        for model_path in keras_models:
            model = safe_load_keras_model(model_path)
            if model is None:
                continue
            # Checagem leve de compatibilidade
            in_shape = getattr(model, 'input_shape', None) if hasattr(model, 'input_shape') else None
            if in_shape is not None:
                if isinstance(in_shape, (list, tuple)) and len(in_shape) and isinstance(in_shape[0], (list, tuple)):
                    pass
                else:
                    last_dim = in_shape[-1] if isinstance(in_shape, (list, tuple)) else None
                    if isinstance(last_dim, int) and last_dim > 0 and last_dim != X0.shape[1]:
                        continue
            any_model = True
            label = model_label_from_path(model_path)
            for s in sigmas:
                Xn = add_input_noise(X0, sigma=s, pad_sentinel=-999.0)
                overall, _ = evaluate_keras_on_linear(model, Xn, Y, tcols)
                noise_rows.append({
                    'problem': os.path.basename(p),
                    'model_label': label,
                    'sigma': s,
                    **overall
                })
        if not any_model:
            print(f"[INFO] {os.path.basename(p)}: sem modelo compatível para teste de ruído.")
    noise_df = pd.DataFrame(noise_rows)
    if not noise_df.empty:
        display(noise_df.sort_values(['problem','model_label','sigma']))
        for prob in noise_df['problem'].unique():
            sub = noise_df[noise_df['problem']==prob]
            plt.figure(figsize=(10,4))
            for ml, g in sub.groupby('model_label'):
                plt.plot(g['sigma'], g['MAE'], marker='o', label=ml)
            plt.title(f'{prob} — MAE vs σ (ruído de entrada)')
            plt.xlabel('σ')
            plt.ylabel('MAE')
            plt.grid(True, alpha=0.3)
            plt.legend()
            plt.tight_layout()
            plt.show()
    else:
        print('Sem resultados de robustez (nenhum modelo compatível encontrado).')
else:
    print('Execute a avaliação principal antes (modelos e problemas).')

In [None]:
## Exportar resultados (opcional)

# Salvar tabelas em data/results/ para consulta posterior
out_dir = os.path.join('data', 'results')
os.makedirs(out_dir, exist_ok=True)

if 'res_df' in globals() and not res_df.empty:
    res_df.to_parquet(os.path.join(out_dir, 'keras_linear_overall.parquet'), index=False)
    res_df.to_csv(os.path.join(out_dir, 'keras_linear_overall.csv'), index=False)
    print('[OK] Salvo overall em data/results/.')
else:
    print('[INFO] res_df vazio — nada salvo (overall).')

if 'per_h_df' in globals() and not per_h_df.empty:
    per_h_df.to_parquet(os.path.join(out_dir, 'keras_linear_per_horizon.parquet'), index=False)
    per_h_df.to_csv(os.path.join(out_dir, 'keras_linear_per_horizon.csv'), index=False)
    print('[OK] Salvo per_horizon em data/results/.')
else:
    print('[INFO] per_h_df vazio — nada salvo (per_horizon).')

if 'noise_df' in globals() and not noise_df.empty:
    noise_df.to_parquet(os.path.join(out_dir, 'keras_linear_noise.parquet'), index=False)
    noise_df.to_csv(os.path.join(out_dir, 'keras_linear_noise.csv'), index=False)
    print('[OK] Salvo noise em data/results/.')
else:
    print('[INFO] noise_df vazio — nada salvo (noise).')