In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
import random

# --- Configuration ---
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 32
LEARNING_RATE = 0.001
EPOCHS = 20
PREDICTION_HORIZON = 1
TICKER = "^GSPC"
WINDOW_SIZES_TO_TEST = [10, 20, 30, 40, 50]
DROPOUT_RATE = 0.2
NUM_RUNS_FOR_STATS = 5 # Quantidade de rodadas para calcular o desvio padrão (±)

DATES = {
    "train_start": "2015-01-01", "train_end": "2021-12-31",
    "val_start":   "2022-01-01", "val_end":   "2022-12-31",
    "test_start":  "2023-01-01", "test_end":  "2024-12-31"
}

# --- Reproducibility ---
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# --- 1. Data Preparation ---
def fetch_raw_data():
    print(f"--- Fetching Data for {TICKER} ---")
    df = yf.download(TICKER, start=DATES["train_start"], end=DATES["test_end"], interval='1d', progress=False)
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)
    df.index = pd.to_datetime(df.index)
    df = df[['Open', 'High', 'Low', 'Close', 'Volume']].copy()
    return df

def get_scaled_data(df, start_date, end_date, scaler=None, fit_scaler=False):
    slice_df = df.loc[start_date:end_date].copy()
    if slice_df.empty:
        raise ValueError(f"No data for {start_date} to {end_date}")

    raw_values = slice_df.values
    if fit_scaler:
        scaler = MinMaxScaler()
        scaled_values = scaler.fit_transform(raw_values)
    else:
        scaled_values = scaler.transform(raw_values)
    return scaled_values, scaler, slice_df.index

def inverse_transform_close(scaled_data, scaler):
    dummy = np.zeros((len(scaled_data), scaler.n_features_in_))
    dummy[:, 3] = scaled_data.flatten()
    inversed = scaler.inverse_transform(dummy)
    return inversed[:, 3]

def create_windowed_data(data, window_size, horizon):
    X, y = [], []
    target_col_idx = 3 # Close price
    if len(data) <= window_size + horizon:
        return np.array(X), np.array(y)

    end_index = len(data) - window_size - horizon + 1
    for i in range(end_index):
        window = data[i : i + window_size]
        target = data[i + window_size + horizon - 1, target_col_idx]
        X.append(window)
        y.append(target)
    return np.array(X), np.array(y)

class StockDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# --- 2. Model Definitions ---
class TimeSeriesCNN(nn.Module):
    def __init__(self, num_features, window_size):
        super(TimeSeriesCNN, self).__init__()
        self.conv1 = nn.Conv1d(num_features, 64, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool1d(kernel_size=2)
        self.drop1 = nn.Dropout(0)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool1d(kernel_size=2)
        self.drop2 = nn.Dropout(0)
        final_length = window_size // 4
        self.flatten_dim = 128 * final_length
        self.fc1 = nn.Linear(self.flatten_dim, 50)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(50, 1)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.drop1(self.pool1(self.relu1(self.conv1(x))))
        x = self.drop2(self.pool2(self.relu2(self.conv2(x))))
        x = x.reshape(x.size(0), -1)
        x = self.relu3(self.fc1(x))
        x = self.fc2(x)
        return x

class TimeSeriesLSTM(nn.Module):
    def __init__(self, num_features, hidden_size1=64, hidden_size2=32):
        super(TimeSeriesLSTM, self).__init__()
        self.lstm1 = nn.LSTM(input_size=num_features, hidden_size=hidden_size1, batch_first=True)
        self.dense_inter = nn.Linear(hidden_size1, hidden_size1)
        self.relu_inter = nn.ReLU()
        self.dropout = nn.Dropout(DROPOUT_RATE)
        self.lstm2 = nn.LSTM(input_size=hidden_size1, hidden_size=hidden_size2, batch_first=True)
        self.fc = nn.Linear(hidden_size2, 1)

    def forward(self, x):
        out, _ = self.lstm1(x)
        out = self.relu_inter(self.dense_inter(out))
        out = self.dropout(out)
        out, _ = self.lstm2(out)
        out = out[:, -1, :]
        out = self.fc(out)
        return out

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]

class TimeSeriesTransformer(nn.Module):
    def __init__(self, num_features, window_size, d_model=64, nhead=4, num_layers=2):
        super(TimeSeriesTransformer, self).__init__()
        self.input_linear = nn.Linear(num_features, d_model)
        self.relu = nn.ReLU()
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                                   dim_feedforward=128,
                                                   dropout=DROPOUT_RATE,
                                                   batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.decoder = nn.Linear(d_model * window_size, 1)

    def forward(self, x):
        x = self.input_linear(x)
        x = self.relu(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.reshape(x.size(0), -1)
        x = self.decoder(x)
        return x

# --- 3. Helper Functions ---
def calculate_metrics(y_true, y_pred):
    # mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-9))) * 100
    # Evitar divisão por zero ou valores muito pequenos de forma mais robusta
    y_true_safe = np.where(y_true == 0, 1e-9, y_true)
    mape = np.mean(np.abs((y_true - y_pred) / y_true_safe)) * 100
    return mape

def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    for batch_X, batch_y in loader:
        batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

def evaluate_model(model, data_scaled, window_size, scaler):
    X, y = create_windowed_data(data_scaled, window_size, PREDICTION_HORIZON)
    if len(X) == 0: return float('inf')

    loader = DataLoader(StockDataset(X, y), batch_size=BATCH_SIZE, shuffle=False)
    model.eval()
    preds, actuals = [], []
    with torch.no_grad():
        for batch_X, batch_y in loader:
            batch_X = batch_X.to(DEVICE)
            outputs = model(batch_X)
            preds.extend(outputs.cpu().numpy())
            actuals.extend(batch_y.numpy())

    real_preds = inverse_transform_close(np.array(preds), scaler)
    real_actuals = inverse_transform_close(np.array(actuals), scaler)
    mape = calculate_metrics(real_actuals, real_preds)
    return mape

# --- 4. Main Pipeline ---
if __name__ == "__main__":
    full_df = fetch_raw_data()

    # Scale Data
    train_scaled, main_scaler, _ = get_scaled_data(full_df, DATES["train_start"], DATES["train_end"], fit_scaler=True)
    val_scaled, _, _             = get_scaled_data(full_df, DATES["val_start"], DATES["val_end"], scaler=main_scaler)
    test_scaled, _, _            = get_scaled_data(full_df, DATES["test_start"], DATES["test_end"], scaler=main_scaler)

    print("\n" + "="*80)
    print(f"{'CONSTRUÇÃO DA TABELA DE RESULTADOS':^80}")
    print("="*80)

    # Lista para armazenar os resultados finais
    final_results = []

    # 1. Adicionar Baseline (Hardcoded conforme pedido)
    final_results.append({
        "Modelos": "Baseline",
        "Tamanho da Janela": "-",
        "Figuras de Mérito": "2.65%"
    })

    model_names = ["CNN", "LSTM", "Transformer"]

    for model_type in model_names:
        print(f"\n>>> Processando Modelo: {model_type}")

        # --- PASSO 1: Encontrar o melhor tamanho de janela (Tuning) ---
        best_window = None
        best_val_mape = float('inf')

        print(f"   [Tuning] Buscando melhor janela entre {WINDOW_SIZES_TO_TEST}...")

        for w_size in WINDOW_SIZES_TO_TEST:
            set_seed(42) # Seed fixa para comparação justa na validação

            # Preparar dados
            X_train, y_train = create_windowed_data(train_scaled, w_size, PREDICTION_HORIZON)
            if len(X_train) == 0: continue
            train_loader = DataLoader(StockDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
            num_features = X_train.shape[2]

            # Instanciar Modelo
            if model_type == "CNN": model = TimeSeriesCNN(num_features, w_size).to(DEVICE)
            elif model_type == "LSTM": model = TimeSeriesLSTM(num_features).to(DEVICE)
            elif model_type == "Transformer": model = TimeSeriesTransformer(num_features, w_size).to(DEVICE)

            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

            # Treinar (rápido para seleção)
            for epoch in range(EPOCHS):
                train_one_epoch(model, train_loader, optimizer, criterion)

            # Validar
            val_mape = evaluate_model(model, val_scaled, w_size, main_scaler)

            if val_mape < best_val_mape:
                best_val_mape = val_mape
                best_window = w_size

        print(f"   [Seleção] Melhor janela encontrada para {model_type}: {best_window} (Val MAPE: {best_val_mape:.2f}%)")

        # --- PASSO 2: Calcular Estatísticas (Média ± Std) no Teste ---
        test_mapes = []
        print(f"   [Stats] Executando {NUM_RUNS_FOR_STATS} rodadas no dataset de TESTE...")

        for run in range(NUM_RUNS_FOR_STATS):
            current_seed = 0 + run
            set_seed(current_seed) # Variar seed

            # Recriar dataset com a janela vencedora
            X_train, y_train = create_windowed_data(train_scaled, best_window, PREDICTION_HORIZON)
            train_loader = DataLoader(StockDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
            num_features = X_train.shape[2]

            if model_type == "CNN": model = TimeSeriesCNN(num_features, best_window).to(DEVICE)
            elif model_type == "LSTM": model = TimeSeriesLSTM(num_features).to(DEVICE)
            elif model_type == "Transformer": model = TimeSeriesTransformer(num_features, best_window).to(DEVICE)

            criterion = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

            for epoch in range(EPOCHS):
                train_one_epoch(model, train_loader, optimizer, criterion)

            # Avaliar no Teste
            t_mape = evaluate_model(model, test_scaled, best_window, main_scaler)
            test_mapes.append(t_mape)

        # Calcular Média e Desvio Padrão
        avg_mape = np.mean(test_mapes)
        std_mape = np.std(test_mapes)

        # Formatar String
        result_str = f"{avg_mape:.2f}% ± {std_mape:.2f}%"

        # Adicionar à tabela
        final_results.append({
            "Modelos": model_type,
            "Tamanho da Janela": best_window,
            "Figuras de Mérito": result_str
        })

    # --- 5. Exibição da Tabela Final ---
    df_results = pd.DataFrame(final_results)

    print("\n\n")
    print("="*60)
    print("TABELA FINAL DE RESULTADOS (MAPE)")
    print("="*60)
    # Ajustar opções do pandas para exibir bonito no console
    pd.set_option('display.max_columns', None)
    pd.set_option('display.width', 1000)
    pd.set_option('display.colheader_justify', 'center')

    print(df_results.to_markdown(index=False, tablefmt="grid"))

--- Fetching Data for ^GSPC ---

                       CONSTRUÇÃO DA TABELA DE RESULTADOS                       

>>> Processando Modelo: CNN
   [Tuning] Buscando melhor janela entre [10, 20, 30, 40, 50]...


  df = yf.download(TICKER, start=DATES["train_start"], end=DATES["test_end"], interval='1d', progress=False)


   [Seleção] Melhor janela encontrada para CNN: 10 (Val MAPE: 1.73%)
   [Stats] Executando 5 rodadas no dataset de TESTE...

>>> Processando Modelo: LSTM
   [Tuning] Buscando melhor janela entre [10, 20, 30, 40, 50]...
   [Seleção] Melhor janela encontrada para LSTM: 10 (Val MAPE: 2.34%)
   [Stats] Executando 5 rodadas no dataset de TESTE...

>>> Processando Modelo: Transformer
   [Tuning] Buscando melhor janela entre [10, 20, 30, 40, 50]...
   [Seleção] Melhor janela encontrada para Transformer: 10 (Val MAPE: 3.01%)
   [Stats] Executando 5 rodadas no dataset de TESTE...



TABELA FINAL DE RESULTADOS (MAPE)
+-------------+---------------------+---------------------+
| Modelos     | Tamanho da Janela   | Figuras de Mérito   |
| Baseline    | -                   | 2.65%               |
+-------------+---------------------+---------------------+
| CNN         | 10                  | 1.06% ± 0.17%       |
+-------------+---------------------+---------------------+
| LSTM        | 10       