# Machine Learning with PyTorch: Building an SPY Price Forecasting System

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error
import os
import matplotlib.dates as mdates
from matplotlib.gridspec import GridSpec

# --- Custom Early Stopping Implementation ---
class EarlyStopping:
    def __init__(self, patience=20, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')
        self.early_stop = False

    def check(self, val_loss, model):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# --- File Paths ---
data_path = os.getenv('DATA_PATH', './default_data/spy_sample.csv')
model_path = os.getenv('MODEL_PATH', './saved_models/model.pth')
plots_dir = os.getenv('PLOTS_DIR', './outputs/figures')

# Create directories
os.makedirs(os.path.dirname(model_save_path), exist_ok=True)
os.makedirs(plots_save_dir, exist_ok=True)

# --- Data Loading and Preprocessing ---
print("Loading data...")
df = pd.read_csv(data_file_path, parse_dates=['timestamp'])
df = df.sort_values('timestamp').reset_index(drop=True)

# Data quality check
print("Checking data quality...")
print(f"NaN values in raw data: {df.isna().sum().sum()}")
print(f"Infinite values in raw data: {np.isinf(df.select_dtypes(include=np.number)).sum().sum()}")

# --- Technical Indicators Calculation ---
def calculate_atr(df, window=14):
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
    atr = tr.rolling(window).mean()
    return atr

def calculate_rsi(df, window=14):
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

def calculate_macd(df, fast=12, slow=26, signal=9):
    ema_fast = df['close'].ewm(span=fast, adjust=False).mean()
    ema_slow = df['close'].ewm(span=slow, adjust=False).mean()
    macd = ema_fast - ema_slow
    signal_line = macd.ewm(span=signal, adjust=False).mean()
    return macd, signal_line

def calculate_ma(df, windows=[5, 10, 20]):
    ma = {}
    for window in windows:
        ma[f'MA{window}'] = df['close'].rolling(window).mean()
    return ma

def calculate_volatility(df, window=14):
    returns = np.log(df['close']).diff()  # Log returns for better volatility calculation
    volatility = returns.rolling(window).std() * np.sqrt(window)
    return volatility

# Calculate all technical indicators
print("Calculating technical indicators...")
df['ATR14'] = calculate_atr(df, 14)
df['RSI14'] = calculate_rsi(df, 14)
macd, signal = calculate_macd(df)
df['MACD'] = macd
df['MACD_Signal'] = signal
ma_dict = calculate_ma(df, [5, 10, 20])
for key, value in ma_dict.items():
    df[key] = value
df['Volatility%'] = calculate_volatility(df, 14) * 100
df['LogClose'] = np.log(df['close'])  # Add log transformation

# Handle NaN values
df.bfill(inplace=True)
df.ffill(inplace=True)

# Data validation
print(f"NaN values after processing: {df.isna().sum().sum()}")
print(f"Infinite values after processing: {np.isinf(df.select_dtypes(include=np.number)).sum().sum()}")

# --- Prepare Model Data ---
feature_cols = ['open', 'high', 'low', 'close', 'volume', 'ATR14', 'Volatility%', 'RSI14', 'MACD', 'MACD_Signal', 'MA5', 'MA10', 'MA20']

def create_windowed_data(df, feature_cols, window_size=6):
    X, y = [], []
    for i in range(window_size, len(df)):
        window_data = df.loc[i - window_size:i - 1, feature_cols].values.flatten()
        if not np.any(np.isnan(window_data)) and not np.any(np.isinf(window_data)):
            X.append(window_data)
            y.append(df.loc[i, 'LogClose'])  # Using log price as target
    return np.array(X), np.array(y)

X, y = create_windowed_data(df, feature_cols, window_size=6)
print(f"X shape: {X.shape}, y shape: {y.shape}")

# Data scaling
scaler_X = StandardScaler()
X_scaled = scaler_X.fit_transform(X)

scaler_y = StandardScaler()
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, shuffle=False)

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# Create DataLoader
train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=64, shuffle=True)
val_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=64, shuffle=False)

# --- Model Definition ---
class EnhancedMLP(nn.Module):
    def __init__(self, input_dim):
        super(EnhancedMLP, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.01),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.01),
            nn.Linear(64, 1))

    def forward(self, x):
        if torch.isnan(x).any() or torch.isinf(x).any():
            x = torch.nan_to_num(x)
        return self.net(x)

# --- Custom Loss Function ---
class DirectionalLoss(nn.Module):
    def __init__(self, alpha=0.5):
        super().__init__()
        self.alpha = alpha  # Weight for directional component
        self.mse = nn.MSELoss()

    def forward(self, pred, target):
        # Standard MSE component
        mse_loss = self.mse(pred, target)

        # Directional component
        pred_diff = pred[1:] - pred[:-1]
        target_diff = target[1:] - target[:-1]
        directional_loss = torch.mean(1 - torch.sign(pred_diff * target_diff).float()) / 2

        # Combined loss
        return (1 - self.alpha) * mse_loss + self.alpha * directional_loss

model = EnhancedMLP(input_dim=X_train.shape[1])
loss_fn = nn.HuberLoss(delta=1.0)  # Using Huber loss for robustness
# Alternatively: loss_fn = DirectionalLoss(alpha=0.3)  # Custom directional loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
early_stopping = EarlyStopping(patience=20)

# --- Training Loop ---
num_epochs = 300
train_losses = []
val_losses = []
best_loss = float('inf')

print("Starting training...")
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for xb, yb in train_loader:
        optimizer.zero_grad()
        pred = model(xb)
        loss = loss_fn(pred, yb)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        running_loss += loss.item()

    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for xb, yb in val_loader:
            pred = model(xb)
            val_loss += loss_fn(pred, yb).item()

    epoch_loss = running_loss/len(train_loader)
    val_loss = val_loss/len(val_loader)
    train_losses.append(epoch_loss)
    val_losses.append(val_loss)

    scheduler.step(val_loss)

    # Early stopping check
    early_stopping.check(val_loss, model)
    if early_stopping.early_stop:
        print(f"Early stopping triggered at epoch {epoch+1}!")
        break

    if val_loss < best_loss:
        best_loss = val_loss
        torch.save(model.state_dict(), model_save_path)

    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {epoch_loss:.6f} - Val Loss: {val_loss:.6f}")

# Load best model
model.load_state_dict(torch.load(model_save_path))
print(f"Best model loaded with validation loss: {best_loss:.6f}")

# --- Plot Training Loss ---
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Training & Validation Loss Curve')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
loss_plot_path = os.path.join(plots_save_dir, 'training_loss.png')
plt.savefig(loss_plot_path)
plt.close()
print(f"Training loss plot saved to: {loss_plot_path}")

# --- Model Evaluation ---
model.eval()
with torch.no_grad():
    y_pred_scaled = model(X_test_tensor).squeeze().numpy()

# Inverse scaling and transform
y_pred = scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
y_test = scaler_y.inverse_transform(y_test_tensor.numpy().reshape(-1, 1)).flatten()

# Convert from log space to price
y_pred_actual = np.exp(y_pred)
y_test_actual = np.exp(y_test)

# Calculate directional accuracy
direction_acc = ((np.diff(y_pred) * np.diff(y_test)) > 0).mean()

# Metrics
mse = mean_squared_error(y_test_actual, y_pred_actual)
mae = mean_absolute_error(y_test_actual, y_pred_actual)
print(f"\n✅ Directional Accuracy: {direction_acc:.2%}")
print(f"✅ Final MSE: {mse:.4f}")
print(f"✅ Final MAE: {mae:.4f}")

# --- Visualization ---
test_indices = range(len(df) - len(y_test), len(df))
test_timestamps = df['timestamp'].iloc[test_indices].values

def plot_prediction_with_indicators(timestamps, y_true, y_pred, df_subset):
    fig = plt.figure(figsize=(16, 12))
    gs = GridSpec(4, 1, figure=fig)

    # Price prediction vs actual
    ax1 = fig.add_subplot(gs[0, 0])
    ax1.plot(timestamps, y_true, label='Actual Close', color='blue', linewidth=1.5)
    ax1.plot(timestamps, y_pred, label='Predicted Close', color='red', linestyle='--', linewidth=1)
    ax1.set_title('Price Prediction vs Actual')
    ax1.legend()
    ax1.grid(True)
    ax1.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d %H:%M'))

    # RSI
    ax2 = fig.add_subplot(gs[1, 0])
    ax2.plot(timestamps, df_subset['RSI14'].values, label='RSI14', color='purple')
    ax2.axhline(70, color='red', linestyle='--', alpha=0.3)
    ax2.axhline(30, color='green', linestyle='--', alpha=0.3)
    ax2.set_title('RSI (14-period)')
    ax2.legend()
    ax2.grid(True)

    # MACD
    ax3 = fig.add_subplot(gs[2, 0])
    ax3.plot(timestamps, df_subset['MACD'].values, label='MACD', color='blue')
    ax3.plot(timestamps, df_subset['MACD_Signal'].values, label='Signal', color='orange')
    ax3.axhline(0, color='black', linestyle='-', alpha=0.3)
    ax3.set_title('MACD (12,26,9)')
    ax3.legend()
    ax3.grid(True)

    # ATR
    ax4 = fig.add_subplot(gs[3, 0])
    ax4.plot(timestamps, df_subset['ATR14'].values, label='ATR14', color='green')
    ax4.set_title('Average True Range (ATR)')
    ax4.legend()
    ax4.grid(True)

    plt.tight_layout()
    return fig

# Generate and save plots
df_test_subset = df.iloc[test_indices].copy()
fig = plot_prediction_with_indicators(test_timestamps, y_test_actual, y_pred_actual, df_test_subset)
prediction_plot_path = os.path.join(plots_save_dir, 'prediction_with_indicators.png')
fig.savefig(prediction_plot_path)
plt.close(fig)
print(f"Prediction plot saved to: {prediction_plot_path}")

# Save metrics
metrics_file = os.path.join(plots_save_dir, 'metrics.txt')
with open(metrics_file, 'w') as f:
    f.write(f"MSE: {mse:.4f}\n")
    f.write(f"MAE: {mae:.4f}\n")
    f.write(f"Directional Accuracy: {direction_acc:.2%}\n")
print(f"Metrics saved to: {metrics_file}")

print("Training and evaluation completed!")