In [None]:
import yfinance as yf 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch 
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import joblib
import os
import math
import time

In [None]:
# Load all IBOV tickers
tickers_df = pd.read_csv("../data/ibov_tickers.csv")
tickers = [f"{ticker}.SA" for ticker in tickers_df['codigo'].tolist()]

print(f"Total tickers to download: {len(tickers)}")
print(f"Tickers: {tickers[:10]}...")

In [None]:
# Download data for all tickers at once
print("Downloading data for all tickers...")
print("This may take a few minutes...")

data = yf.download(tickers, start="2020-01-01", end="2024-12-31", group_by='ticker', progress=True)

print(f"\n✓ Data downloaded successfully")
print(f"Shape: {data.shape}")

In [None]:
# Combine all tickers' Close prices into single DataFrame
all_prices = []

for ticker in tickers:
    try:
        if len(tickers) == 1:
            close_data = data['Close']
        else:
            close_data = data[ticker]['Close']
        
        if close_data.notna().sum() > 100:  # At least 100 valid data points
            df_ticker = close_data.dropna().to_frame()
            df_ticker.columns = ['Close']
            df_ticker['Ticker'] = ticker
            all_prices.append(df_ticker)
            print(f"✓ {ticker}: {len(df_ticker)} days")
    except Exception as e:
        print(f"✗ {ticker}: {e}")

# Combine all into one DataFrame
combined_df = pd.concat(all_prices, axis=0)
combined_df = combined_df.reset_index()
combined_df.columns = ['Date', 'Close', 'Ticker']

print(f"\n✓ Combined dataset shape: {combined_df.shape}")
print(f"Total data points: {len(combined_df):,}")
print(f"Successful tickers: {combined_df['Ticker'].nunique()}")
print(f"Date range: {combined_df['Date'].min()} to {combined_df['Date'].max()}")

In [None]:
# Prepare data for LSTM
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Extract close prices
close_prices = combined_df[['Close']].values

# Scale data
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(close_prices)

print(f"\n✓ Data scaled to range [0, 1]")
print(f"Original price range: R$ {close_prices.min():.2f} - R$ {close_prices.max():.2f}")
print(f"Scaled range: {scaled_data.min():.4f} - {scaled_data.max():.4f}")

In [None]:
# Create sequences for LSTM
def create_sequences(data, seq_length=50):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 50
X, y = create_sequences(scaled_data, seq_length)

print(f"✓ Created sequences with window size: {seq_length}")
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")

In [None]:
# Train/Test split (80/20)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]

# Convert to PyTorch tensors
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.FloatTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.FloatTensor(y_test).to(device)

print(f"✓ Data split completed")
print(f"Training set: {X_train.shape[0]:,} samples")
print(f"Test set: {X_test.shape[0]:,} samples")
print(f"Train/Test ratio: {X_train.shape[0]/X_test.shape[0]:.2f}")

In [None]:
# Define LSTM Model
class UnifiedLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=128, num_layers=3, dropout=0.2):
        super(UnifiedLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size, 
            hidden_size, 
            num_layers, 
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        out, _ = self.lstm(x, (h0, c0))
        out = self.dropout(out[:, -1, :])
        out = self.fc(out)
        return out

# Create model
model = UnifiedLSTM(input_size=1, hidden_size=128, num_layers=3, dropout=0.2).to(device)

print("✓ Unified LSTM Model Architecture:")
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")

In [None]:
# Training setup
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 100
batch_size = 256

print("="*80)
print("TRAINING UNIFIED MODEL")
print("="*80)
print(f"Epochs: {epochs}")
print(f"Batch size: {batch_size}")
print(f"Optimizer: Adam (lr=0.001)")
print(f"Loss: MSE")
print("="*80)

In [None]:
# Training loop
from torch.utils.data import TensorDataset, DataLoader

# Create DataLoader for batch training
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

history = {'train_loss': [], 'val_loss': []}

model.train()
start_time = time.time()

for epoch in range(epochs):
    epoch_loss = 0.0
    batch_count = 0
    
    for X_batch, y_batch in train_loader:
        # Forward pass
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        batch_count += 1
    
    # Calculate average training loss
    avg_train_loss = epoch_loss / batch_count
    history['train_loss'].append(avg_train_loss)
    
    # Validation loss
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_test)
        val_loss = criterion(val_outputs, y_test).item()
        history['val_loss'].append(val_loss)
    model.train()
    
    # Print progress
    if (epoch + 1) % 10 == 0 or epoch == 0:
        elapsed = time.time() - start_time
        print(f"Epoch [{epoch+1:3d}/{epochs}] - "
              f"Train Loss: {avg_train_loss:.6f}, "
              f"Val Loss: {val_loss:.6f}, "
              f"Time: {elapsed:.1f}s")

total_time = time.time() - start_time
print(f"\n✓ Training completed in {total_time:.1f} seconds ({total_time/60:.1f} minutes)")

In [None]:
# Plot training history
fig, ax = plt.subplots(figsize=(12, 5))

ax.plot(history['train_loss'], label='Training Loss', linewidth=2)
ax.plot(history['val_loss'], label='Validation Loss', linewidth=2)
ax.set_xlabel('Epoch', fontsize=12)
ax.set_ylabel('Loss (MSE)', fontsize=12)
ax.set_title('Unified IBOV Model - Training History', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Final Training Loss: {history['train_loss'][-1]:.6f}")
print(f"Final Validation Loss: {history['val_loss'][-1]:.6f}")

In [None]:
# Evaluate model on test set
model.eval()

with torch.no_grad():
    y_pred = model(X_test).cpu().numpy()
    y_true = y_test.cpu().numpy()

# Calculate metrics on scaled data
mse = mean_squared_error(y_true, y_pred)
rmse = math.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
mape = np.mean(np.abs((y_true - y_pred) / np.clip(y_true, 1e-8, None))) * 100

# Inverse transform to get real prices
y_pred_inv = scaler.inverse_transform(y_pred)
y_true_inv = scaler.inverse_transform(y_true)

# Calculate metrics on real prices
rmse_real = math.sqrt(mean_squared_error(y_true_inv, y_pred_inv))
mae_real = mean_absolute_error(y_true_inv, y_pred_inv)

print("="*80)
print("MODEL EVALUATION RESULTS")
print("="*80)
print("\nScaled Data Metrics:")
print(f"  MSE:  {mse:.6f}")
print(f"  RMSE: {rmse:.6f}")
print(f"  MAE:  {mae:.6f}")
print(f"  R²:   {r2:.6f}")
print(f"  MAPE: {mape:.2f}%")

print("\nReal Price Metrics:")
print(f"  RMSE: R$ {rmse_real:.4f}")
print(f"  MAE:  R$ {mae_real:.4f}")

print("="*80)

In [None]:
# Visualize predictions
sample_size = 500  # Show last 500 predictions

fig, axes = plt.subplots(2, 1, figsize=(15, 10))

# Plot 1: Full test set predictions
axes[0].plot(y_true_inv, label='Actual', color='blue', linewidth=1.5, alpha=0.7)
axes[0].plot(y_pred_inv, label='Predicted', color='orange', linewidth=1.5, alpha=0.7)
axes[0].set_title('Unified Model - Full Test Set Predictions', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Sample Index')
axes[0].set_ylabel('Price (R$)')
axes[0].legend(fontsize=11)
axes[0].grid(True, alpha=0.3)

# Add metrics text
metrics_text = (f"R² = {r2:.4f}\n"
                f"RMSE = R$ {rmse_real:.4f}\n"
                f"MAE = R$ {mae_real:.4f}\n"
                f"MAPE = {mape:.2f}%")
axes[0].text(0.02, 0.98, metrics_text, transform=axes[0].transAxes,
            verticalalignment='top', fontsize=10,
            bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))

# Plot 2: Zoomed in on last N samples
axes[1].plot(y_true_inv[-sample_size:], label='Actual', color='blue', linewidth=2, marker='o', markersize=3)
axes[1].plot(y_pred_inv[-sample_size:], label='Predicted', color='orange', linewidth=2, marker='s', markersize=3, alpha=0.7)
axes[1].set_title(f'Zoomed View - Last {sample_size} Predictions', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Sample Index')
axes[1].set_ylabel('Price (R$)')
axes[1].legend(fontsize=11)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Prediction error analysis
errors = y_true_inv - y_pred_inv
errors_pct = (errors / y_true_inv) * 100

fig, axes = plt.subplots(1, 3, figsize=(16, 4))

# Error distribution
axes[0].hist(errors.flatten(), bins=50, color='skyblue', edgecolor='black')
axes[0].axvline(0, color='red', linestyle='--', linewidth=2, label='Zero Error')
axes[0].set_title('Prediction Error Distribution', fontsize=12, fontweight='bold')
axes[0].set_xlabel('Error (R$)')
axes[0].set_ylabel('Frequency')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Percentage error distribution
axes[1].hist(errors_pct.flatten(), bins=50, color='lightcoral', edgecolor='black')
axes[1].axvline(0, color='red', linestyle='--', linewidth=2, label='Zero Error')
axes[1].set_title('Percentage Error Distribution', fontsize=12, fontweight='bold')
axes[1].set_xlabel('Error (%)')
axes[1].set_ylabel('Frequency')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Scatter plot: Predicted vs Actual
axes[2].scatter(y_true_inv, y_pred_inv, alpha=0.3, s=10)
min_val = min(y_true_inv.min(), y_pred_inv.min())
max_val = max(y_true_inv.max(), y_pred_inv.max())
axes[2].plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='Perfect Prediction')
axes[2].set_title('Predicted vs Actual Prices', fontsize=12, fontweight='bold')
axes[2].set_xlabel('Actual Price (R$)')
axes[2].set_ylabel('Predicted Price (R$)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Mean Error: R$ {errors.mean():.4f}")
print(f"Std Error: R$ {errors.std():.4f}")
print(f"Mean Absolute % Error: {np.abs(errors_pct).mean():.2f}%")

In [None]:
# Save the unified model and scaler
os.makedirs("../models", exist_ok=True)
os.makedirs("../scalers", exist_ok=True)

model_path = "../models/unified_ibov_lstm.pt"
scaler_path = "../scalers/unified_ibov_scaler.joblib"

torch.save(model.state_dict(), model_path)
joblib.dump(scaler, scaler_path)

print("="*80)
print("MODEL SAVED")
print("="*80)
print(f"Model: {model_path}")
print(f"Scaler: {scaler_path}")
print(f"\nModel can be loaded with:")
print("  model = UnifiedLSTM(...)")
print("  model.load_state_dict(torch.load('models/unified_ibov_lstm.pt'))")
print("  scaler = joblib.load('scalers/unified_ibov_scaler.joblib')")
print("="*80)

In [None]:
# Save training summary
summary = {
    'model_type': 'Unified LSTM for All IBOV Tickers',
    'training_date': pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S'),
    'data_range': f"{combined_df['Date'].min()} to {combined_df['Date'].max()}",
    'total_tickers': combined_df['Ticker'].nunique(),
    'total_samples': len(combined_df),
    'train_samples': len(X_train),
    'test_samples': len(X_test),
    'sequence_length': seq_length,
    'epochs': epochs,
    'batch_size': batch_size,
    'architecture': {
        'input_size': 1,
        'hidden_size': 128,
        'num_layers': 3,
        'dropout': 0.2
    },
    'metrics': {
        'R2': float(r2),
        'RMSE': float(rmse_real),
        'MAE': float(mae_real),
        'MAPE': float(mape)
    },
    'training_time_minutes': total_time / 60
}

summary_df = pd.DataFrame([summary])
summary_df.to_csv('../data/unified_model_summary.csv', index=False)

print("✓ Training summary saved to: data/unified_model_summary.csv")

## Summary

This unified model was trained on **ALL IBOV tickers combined** into a single dataset. 

**Advantages:**
- Single model learns general market patterns
- Faster inference (no need to load multiple models)
- Learns cross-stock relationships

**Model can be used to:**
- Predict stock prices for any IBOV ticker
- Analyze market trends
- Make future predictions