# LSTM Model for Stock Price Prediction

This notebook trains an LSTM (Long Short-Term Memory) neural network to predict stock prices using historical OHLCV data from the trading system database.

## Objectives
- Load historical market data from PostgreSQL
- Use pre-calculated technical indicators from database
- Create time-aware train/validation/test splits
- Train an LSTM model using PyTorch with optimized hyperparameters
- **Predict log returns** (stationary target) instead of raw prices
- Use **Huber Loss** (robust to outliers) instead of MSE
- Monitor **directional accuracy** (key trading metric) during training
- Evaluate model performance with financial metrics
- Visualize predictions and residuals

## Requirements
- PyTorch (install with: `pip install torch`)
- Optuna for hyperparameter optimization (install with: `pip install optuna`)
- Plotly for optimization visualizations (optional, install with: `pip install plotly`)
- All other dependencies from `requirements.txt`
- **Important**: Ensure you have a `.env` file in the project root with database credentials:
  ```env
  POSTGRES_HOST=localhost
  POSTGRES_PORT=5432
  POSTGRES_USER=postgres
  POSTGRES_PASSWORD=your_password_here
  TRADING_DB_NAME=trading_system
  ```
  Copy from `deployment/env.example` if needed.

In [74]:
"""
Setup and Imports
"""
import sys
from pathlib import Path
from datetime import datetime, timedelta, timezone
from typing import Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

# Add project root to path
project_root = Path().resolve().parent
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Load environment variables from .env file
from dotenv import load_dotenv
env_path = project_root / ".env"
if env_path.exists():
    load_dotenv(env_path)
    print(f"Loaded environment variables from: {env_path}")
else:
    print(f"Warning: .env file not found at {env_path}")
    print("Please ensure your database credentials are set in environment variables or .env file")

# Core libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Hyperparameter Optimization
try:
    import optuna
    OPTUNA_AVAILABLE = True
except ImportError:
    OPTUNA_AVAILABLE = False
    print("Warning: Optuna not installed. Install with: pip install optuna")

# Database
from sqlalchemy import select, desc
from src.shared.database.base import db_readonly_session
from src.shared.database.models.market_data import MarketData
from src.shared.database.models.technical_indicators import TechnicalIndicators

# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 6)

# Set random seeds for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(RANDOM_SEED)
    torch.backends.cudnn.deterministic = True

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

Loaded environment variables from: D:\PythonProjects\Trading-System\.env
PyTorch version: 2.7.0+cu128
CUDA available: True
CUDA device: NVIDIA GeForce RTX 3050 Laptop GPU


## 1. Data Loading

Load historical market data from the database for a specific symbol.

In [75]:
def load_market_data_with_indicators(
    symbol: str,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    data_source: str = "yahoo",
    min_records: int = 1000
) -> pd.DataFrame:
    """
    Load market data and technical indicators from database for a specific symbol.
    
    This function loads OHLCV data from market_data table and joins it with
    pre-calculated technical indicators from analytics.technical_indicators table.
    
    Args:
        symbol: Stock symbol (e.g., 'AAPL')
        start_date: Start date (default: 1 year ago)
        end_date: End date (default: today)
        data_source: Data source filter ('yahoo', 'polygon', 'alpaca')
        min_records: Minimum number of records required
        
    Returns:
        DataFrame with OHLCV data and technical indicators indexed by timestamp
    """
    if start_date is None:
        start_date = datetime.now(timezone.utc) - timedelta(days=365)
    if end_date is None:
        end_date = datetime.now(timezone.utc)
    
    symbol_upper = symbol.upper()
    
    # Load market data
    with db_readonly_session() as session:
        # Query market data
        market_query = (
            select(MarketData)
            .where(MarketData.symbol == symbol_upper)
            .where(MarketData.data_source == data_source.lower())
            .where(MarketData.timestamp >= start_date)
            .where(MarketData.timestamp <= end_date)
            .order_by(MarketData.timestamp)
        )
        
        market_result = session.execute(market_query)
        market_records = market_result.scalars().all()
    
    if len(market_records) < min_records:
        raise ValueError(
            f"Insufficient data: {len(market_records)} records found, "
            f"minimum {min_records} required"
        )
    
    # Convert market data to DataFrame
    market_data = []
    for record in market_records:
        if record.is_complete:  # Only include complete OHLCV records
            # Convert timestamp to date for joining with technical indicators
            market_data.append({
                'date': record.timestamp.date(),
                'timestamp': record.timestamp,
                'open': float(record.open),
                'high': float(record.high),
                'low': float(record.low),
                'close': float(record.close),
                'volume': int(record.volume) if record.volume else 0,
            })
    
    df_market = pd.DataFrame(market_data)
    
    # Load technical indicators
    with db_readonly_session() as session:
        # Query technical indicators
        ti_query = (
            select(TechnicalIndicators)
            .where(TechnicalIndicators.symbol == symbol_upper)
            .where(TechnicalIndicators.date >= start_date.date())
            .where(TechnicalIndicators.date <= end_date.date())
            .order_by(TechnicalIndicators.date)
        )
        
        ti_result = session.execute(ti_query)
        ti_records = ti_result.scalars().all()
    
    # Convert technical indicators to DataFrame
    ti_data = []
    for record in ti_records:
        ti_data.append({
            'date': record.date,
            # Moving Averages
            'sma_20': float(record.sma_20) if record.sma_20 else None,
            'sma_50': float(record.sma_50) if record.sma_50 else None,
            'sma_200': float(record.sma_200) if record.sma_200 else None,
            'ema_12': float(record.ema_12) if record.ema_12 else None,
            'ema_26': float(record.ema_26) if record.ema_26 else None,
            'ema_50': float(record.ema_50) if record.ema_50 else None,
            # Momentum
            'rsi': float(record.rsi) if record.rsi else None,
            'rsi_14': float(record.rsi_14) if record.rsi_14 else None,
            # MACD
            'macd_line': float(record.macd_line) if record.macd_line else None,
            'macd_signal': float(record.macd_signal) if record.macd_signal else None,
            'macd_histogram': float(record.macd_histogram) if record.macd_histogram else None,
            # Bollinger Bands
            'bb_upper': float(record.bb_upper) if record.bb_upper else None,
            'bb_middle': float(record.bb_middle) if record.bb_middle else None,
            'bb_lower': float(record.bb_lower) if record.bb_lower else None,
            'bb_position': float(record.bb_position) if record.bb_position else None,
            'bb_width': float(record.bb_width) if record.bb_width else None,
            # Volatility & Price Changes
            'volatility_20': float(record.volatility_20) if record.volatility_20 else None,
            'price_change_1d': float(record.price_change_1d) if record.price_change_1d else None,
            'price_change_5d': float(record.price_change_5d) if record.price_change_5d else None,
            'price_change_30d': float(record.price_change_30d) if record.price_change_30d else None,
            # Volume
            'avg_volume_20': int(record.avg_volume_20) if record.avg_volume_20 else None,
            'current_volume': int(record.current_volume) if record.current_volume else None,
        })
    
    df_ti = pd.DataFrame(ti_data)
    
    # Merge market data with technical indicators on date
    if not df_ti.empty:
        df = df_market.merge(df_ti, on='date', how='left')
        print(f"Loaded {len(df_market)} market data records")
        print(f"Loaded {len(df_ti)} technical indicator records")
        print(f"Merged dataset: {len(df)} records")
    else:
        print(f"Warning: No technical indicators found for {symbol}")
        print("Using market data only. Consider running populate_technical_indicators.py")
        df = df_market.copy()
    
    # Set timestamp as index (use market data timestamp)
    df.set_index('timestamp', inplace=True)
    df.sort_index(inplace=True)
    
    # Remove duplicates (keep last)
    df = df[~df.index.duplicated(keep='last')]
    
    # Calculate basic returns if not available from indicators
    if 'returns' not in df.columns:
        df['returns'] = df['close'].pct_change()
    if 'log_returns' not in df.columns:
        df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
    
    # Price ratios
    df['high_low_ratio'] = df['high'] / df['low']
    df['close_open_ratio'] = df['close'] / df['open']
    
    # Volume ratio (if we have avg_volume_20)
    if 'avg_volume_20' in df.columns and df['avg_volume_20'].notna().any():
        df['volume_ratio'] = df['volume'] / df['avg_volume_20']
    else:
        df['volume_ratio'] = None
    
    # Price position (where close is relative to high-low range)
    df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
    
    print(f"\nFinal dataset: {len(df)} records")
    print(f"Date range: {df.index.min()} to {df.index.max()}")
    print(f"Missing values: {df.isnull().sum().sum()}")
    print(f"\nAvailable features: {list(df.columns)}")
    
    return df

# Load data for a symbol (change as needed)
SYMBOL = "MU"
df_features = load_market_data_with_indicators(SYMBOL, data_source="yahoo")
df_features.head()

[32m2025-12-20 12:11:24.115[0m | [34m[1mDEBUG   [0m | [36msrc.shared.database.base[0m:[36mdb_readonly_session[0m:[36m156[0m - [34m[1mRead-only session completed successfully[0m
[32m2025-12-20 12:11:24.133[0m | [34m[1mDEBUG   [0m | [36msrc.shared.database.base[0m:[36mdb_readonly_session[0m:[36m156[0m - [34m[1mRead-only session completed successfully[0m


Loaded 1731 market data records
Loaded 366 technical indicator records
Merged dataset: 1731 records

Final dataset: 1731 records
Date range: 2024-12-23 03:30:00-06:00 to 2025-12-19 09:30:00-06:00
Missing values: 2

Available features: ['date', 'open', 'high', 'low', 'close', 'volume', 'sma_20', 'sma_50', 'sma_200', 'ema_12', 'ema_26', 'ema_50', 'rsi', 'rsi_14', 'macd_line', 'macd_signal', 'macd_histogram', 'bb_upper', 'bb_middle', 'bb_lower', 'bb_position', 'bb_width', 'volatility_20', 'price_change_1d', 'price_change_5d', 'price_change_30d', 'avg_volume_20', 'current_volume', 'returns', 'log_returns', 'high_low_ratio', 'close_open_ratio', 'volume_ratio', 'price_position']


Unnamed: 0_level_0,date,open,high,low,close,volume,sma_20,sma_50,sma_200,ema_12,...,price_change_5d,price_change_30d,avg_volume_20,current_volume,returns,log_returns,high_low_ratio,close_open_ratio,volume_ratio,price_position
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2024-12-23 03:30:00-06:00,2024-12-23,90.145,91.1,88.4,88.5,6648844,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,,,1.030543,0.981752,0.286197,0.037037
2024-12-23 04:30:00-06:00,2024-12-23,88.52,89.2,88.4,89.06,2661600,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.006328,0.006308,1.00905,1.0061,0.114567,0.825
2024-12-23 05:30:00-06:00,2024-12-23,89.057,89.41,88.432,88.585,2726427,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,-0.005333,-0.005348,1.011059,0.9947,0.117358,0.156442
2024-12-23 06:30:00-06:00,2024-12-23,88.585,89.4213,88.585,89.16,2008035,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.006491,0.00647,1.009441,1.006491,0.086435,0.687552
2024-12-23 07:30:00-06:00,2024-12-23,89.14,90.005,89.03,89.79,1920008,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.007066,0.007041,1.010951,1.007292,0.082646,0.779487


## 2. Data Overview

Review the loaded data with pre-calculated technical indicators from the database.

In [76]:
# Display data summary
print(f"Dataset shape: {df_features.shape}")
print(f"\nColumn types:")
print(df_features.dtypes)
print(f"\nMissing values per column:")
missing = df_features.isnull().sum()
print(missing[missing > 0])
print(f"\nFirst few rows:")
df_features.head(10)

Dataset shape: (1731, 34)

Column types:
date                 object
open                float64
high                float64
low                 float64
close               float64
volume                int64
sma_20              float64
sma_50              float64
sma_200             float64
ema_12              float64
ema_26              float64
ema_50              float64
rsi                 float64
rsi_14              float64
macd_line           float64
macd_signal         float64
macd_histogram      float64
bb_upper            float64
bb_middle           float64
bb_lower            float64
bb_position         float64
bb_width            float64
volatility_20       float64
price_change_1d     float64
price_change_5d     float64
price_change_30d    float64
avg_volume_20         int64
current_volume        int64
returns             float64
log_returns         float64
high_low_ratio      float64
close_open_ratio    float64
volume_ratio        float64
price_position      float64
dtype: 

Unnamed: 0_level_0,date,open,high,low,close,volume,sma_20,sma_50,sma_200,ema_12,...,price_change_5d,price_change_30d,avg_volume_20,current_volume,returns,log_returns,high_low_ratio,close_open_ratio,volume_ratio,price_position
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2024-12-23 03:30:00-06:00,2024-12-23,90.145,91.1,88.4,88.5,6648844,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,,,1.030543,0.981752,0.286197,0.037037
2024-12-23 04:30:00-06:00,2024-12-23,88.52,89.2,88.4,89.06,2661600,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.006328,0.006308,1.00905,1.0061,0.114567,0.825
2024-12-23 05:30:00-06:00,2024-12-23,89.057,89.41,88.432,88.585,2726427,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,-0.005333,-0.005348,1.011059,0.9947,0.117358,0.156442
2024-12-23 06:30:00-06:00,2024-12-23,88.585,89.4213,88.585,89.16,2008035,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.006491,0.00647,1.009441,1.006491,0.086435,0.687552
2024-12-23 07:30:00-06:00,2024-12-23,89.14,90.005,89.03,89.79,1920008,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.007066,0.007041,1.010951,1.007292,0.082646,0.779487
2024-12-23 08:30:00-06:00,2024-12-23,89.8081,90.07,89.23,89.86,1991141,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.00078,0.000779,1.009414,1.000578,0.085708,0.75
2024-12-23 09:30:00-06:00,2024-12-23,89.87,90.0,89.075,89.88,3767964,99.8347,102.9651,110.7619,97.6045,...,-16.97,-19.71,23231731,21724019,0.000223,0.000223,1.010385,1.000111,0.16219,0.87027
2024-12-24 03:30:00-06:00,2024-12-24,89.58,89.7,88.23,89.06,4436926,99.0867,102.5889,110.7368,96.3622,...,-17.59,-17.6,22906078,9557335,-0.009123,-0.009165,1.016661,0.994195,0.193701,0.564626
2024-12-24 04:30:00-06:00,2024-12-24,89.04,89.4199,88.76,89.341,2690646,99.0867,102.5889,110.7368,96.3622,...,-17.59,-17.6,22906078,9557335,0.003155,0.00315,1.007435,1.003381,0.117464,0.880436
2024-12-24 05:30:00-06:00,2024-12-24,89.345,89.58,89.06,89.53,2429763,99.0867,102.5889,110.7368,96.3622,...,-17.59,-17.6,22906078,9557335,0.002115,0.002113,1.005839,1.002071,0.106075,0.903846


## 3. Data Preprocessing

Prepare data for LSTM: select features, normalize, and create sequences.

In [77]:
# Configuration
SEQUENCE_LENGTH = 60  # Number of time steps to look back
PREDICTION_HORIZON = 1  # Predict next 1 period
# Use log_returns as target (stationary, focuses on movement rather than price level)
TARGET_COLUMN = 'log_returns'  # Changed from 'close' to 'log_returns' for better stationarity

# Select features from available columns
# Use pre-calculated technical indicators from database
# IMPORTANT: Remove non-stationary raw price levels (open, high, low, close)
# These cause the model to develop negative bias due to scale differences
# Keep only stationary features that describe relative movement
feature_columns = [
    # Volume (can be kept if normalized, but ratios are better)
    'volume',
    # Returns (calculated) - stationary
    'returns', 'log_returns',
    # Technical Indicators (from database) - all stationary
    'sma_20', 'sma_50', 'sma_200',
    'ema_12', 'ema_26', 'ema_50',
    'rsi', 'rsi_14',
    'macd_line', 'macd_signal', 'macd_histogram',
    'bb_upper', 'bb_middle', 'bb_lower', 'bb_position', 'bb_width',
    'volatility_20',
    'price_change_1d', 'price_change_5d', 'price_change_30d',
    # Derived features - all stationary ratios
    'high_low_ratio', 'close_open_ratio',
    'volume_ratio', 'price_position',
]

# Filter to only include columns that exist and have data
available_features = [
    col for col in feature_columns 
    if col in df_features.columns and df_features[col].notna().sum() > 0
]

print(f"Available features: {len(available_features)}")
print(f"Features: {available_features}")

# Check data availability
print(f"\nFeature availability:")
for col in available_features:
    pct_available = (df_features[col].notna().sum() / len(df_features)) * 100
    print(f"  {col}: {pct_available:.1f}% available")

# Extract feature matrix and target
X = df_features[available_features].copy()
y = df_features[TARGET_COLUMN].copy()

# Handle NaN and infinite values
print(f"\nBefore cleaning:")
print(f"  NaN values in features: {X.isnull().sum().sum()}")
print(f"  Infinite values in features: {np.isinf(X.select_dtypes(include=[np.number])).sum().sum()}")

# Forward fill NaN values (use previous valid value)
X = X.ffill()

# Backward fill any remaining NaN (fill from next valid value)
X = X.bfill()

# Drop any rows that still have NaN (should be very few)
nan_mask = X.isnull().any(axis=1)
if nan_mask.sum() > 0:
    print(f"  Dropping {nan_mask.sum()} rows with remaining NaN values")
    X = X[~nan_mask]
    y = y[~nan_mask]

# Replace infinite values with NaN, then fill
X = X.replace([np.inf, -np.inf], np.nan)
X = X.ffill().bfill()

# Convert to numpy arrays
X = X.values
y = y.values

print(f"\nAfter cleaning:")
print(f"  Feature matrix shape: {X.shape}")
print(f"  Target shape: {y.shape}")
print(f"  NaN values: {np.isnan(X).sum()}")
print(f"  Infinite values: {np.isinf(X).sum()}")

# Normalize features (fit on training data only - will be done in split)
scaler_X = StandardScaler()

# CRITICAL FIX: Do NOT scale log_returns target
# StandardScaler subtracts the mean, which causes "zero-shift" bias
# If training period had positive mean returns, zero becomes negative after scaling
# Log returns are already stationary and in a small range (-0.05 to 0.05)
# LSTMs can handle this range without scaling
scaler_y = None  # No scaling for target
y_scaled = y.copy()  # Use raw log returns

# For now, fit on all data (will refit on train only later)
X_scaled = scaler_X.fit_transform(X)

# Final check for NaN after normalization
if np.isnan(X_scaled).any() or np.isinf(X_scaled).any():
    print(f"\nWARNING: NaN or Inf values detected after normalization!")
    print(f"  NaN count: {np.isnan(X_scaled).sum()}")
    print(f"  Inf count: {np.isinf(X_scaled).sum()}")
    # Replace any remaining NaN/Inf with 0
    X_scaled = np.nan_to_num(X_scaled, nan=0.0, posinf=0.0, neginf=0.0)

print(f"\nNormalized feature range: [{X_scaled.min():.2f}, {X_scaled.max():.2f}]")
print(f"Target range (raw log returns, NOT scaled): [{y_scaled.min():.4f}, {y_scaled.max():.4f}]")
print(f"Target mean: {y_scaled.mean():.6f}, std: {y_scaled.std():.6f}")

Available features: 27
Features: ['volume', 'returns', 'log_returns', 'sma_20', 'sma_50', 'sma_200', 'ema_12', 'ema_26', 'ema_50', 'rsi', 'rsi_14', 'macd_line', 'macd_signal', 'macd_histogram', 'bb_upper', 'bb_middle', 'bb_lower', 'bb_position', 'bb_width', 'volatility_20', 'price_change_1d', 'price_change_5d', 'price_change_30d', 'high_low_ratio', 'close_open_ratio', 'volume_ratio', 'price_position']

Feature availability:
  volume: 100.0% available
  returns: 99.9% available
  log_returns: 99.9% available
  sma_20: 100.0% available
  sma_50: 100.0% available
  sma_200: 100.0% available
  ema_12: 100.0% available
  ema_26: 100.0% available
  ema_50: 100.0% available
  rsi: 100.0% available
  rsi_14: 100.0% available
  macd_line: 100.0% available
  macd_signal: 100.0% available
  macd_histogram: 100.0% available
  bb_upper: 100.0% available
  bb_middle: 100.0% available
  bb_lower: 100.0% available
  bb_position: 100.0% available
  bb_width: 100.0% available
  volatility_20: 100.0% ava

In [78]:
# Helper function for safe denormalization (handles case where scaler_y is None)
def denormalize_log_returns(predictions, targets, scaler_y):
    """
    Denormalize log returns, handling the case where no scaling was applied.
    
    Args:
        predictions: Predicted log returns (numpy array)
        targets: Target log returns (numpy array)
        scaler_y: Scaler object or None
    
    Returns:
        predictions_denorm, targets_denorm: Denormalized arrays
    """
    if scaler_y is not None:
        predictions_denorm = scaler_y.inverse_transform(
            predictions.reshape(-1, 1)
        ).flatten()
        targets_denorm = scaler_y.inverse_transform(
            targets.reshape(-1, 1)
        ).flatten()
    else:
        # No scaling was applied, use raw values directly
        predictions_denorm = predictions
        targets_denorm = targets
    
    return predictions_denorm, targets_denorm

print("Helper function 'denormalize_log_returns' defined for safe denormalization")

Helper function 'denormalize_log_returns' defined for safe denormalization


In [79]:
def create_sequences(
    X: np.ndarray,
    y: np.ndarray,
    seq_length: int,
    prediction_horizon: int = 1
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Create sequences for LSTM training.
    
    Args:
        X: Feature matrix (n_samples, n_features)
        y: Target vector (n_samples,) - log returns
        seq_length: Length of input sequences
        prediction_horizon: Steps ahead to predict
        
    Returns:
        X_seq: Sequences (n_samples - seq_length, seq_length, n_features)
        y_seq: Targets (n_samples - seq_length,) - log returns for next period
    """
    X_seq, y_seq = [], []
    
    for i in range(len(X) - seq_length - prediction_horizon + 1):
        X_seq.append(X[i:i + seq_length])
        y_seq.append(y[i + seq_length + prediction_horizon - 1])
    
    return np.array(X_seq), np.array(y_seq)

# Create sequences
X_seq, y_seq = create_sequences(X_scaled, y_scaled, SEQUENCE_LENGTH, PREDICTION_HORIZON)

print(f"Sequence shape: {X_seq.shape}")
print(f"Target shape: {y_seq.shape}")
print(f"Total sequences: {len(X_seq)}")

Sequence shape: (1671, 60, 27)
Target shape: (1671,)
Total sequences: 1671


## 4. Time-Aware Data Splitting

Split data chronologically to avoid look-ahead bias (critical for financial data).

In [80]:
def time_aware_split(
    X: np.ndarray,
    y: np.ndarray,
    train_ratio: float = 0.7,
    val_ratio: float = 0.15
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Split data chronologically (no shuffling for time series).
    
    Args:
        X: Feature sequences
        y: Target values
        train_ratio: Proportion for training
        val_ratio: Proportion for validation
        
    Returns:
        X_train, X_val, X_test, y_train, y_val, y_test
    """
    n_samples = len(X)
    train_end = int(n_samples * train_ratio)
    val_end = int(n_samples * (train_ratio + val_ratio))
    
    X_train = X[:train_end]
    X_val = X[train_end:val_end]
    X_test = X[val_end:]
    
    y_train = y[:train_end]
    y_val = y[train_end:val_end]
    y_test = y[val_end:]
    
    print(f"Train: {len(X_train)} samples ({len(X_train)/n_samples*100:.1f}%)")
    print(f"Validation: {len(X_val)} samples ({len(X_val)/n_samples*100:.1f}%)")
    print(f"Test: {len(X_test)} samples ({len(X_test)/n_samples*100:.1f}%)")
    
    return X_train, X_val, X_test, y_train, y_val, y_test

# Split data
X_train, X_val, X_test, y_train, y_val, y_test = time_aware_split(
    X_seq, y_seq, train_ratio=0.7, val_ratio=0.15
)

Train: 1169 samples (70.0%)
Validation: 251 samples (15.0%)
Test: 251 samples (15.0%)


## 5. PyTorch Dataset and DataLoader

Create PyTorch datasets for efficient batching.

In [81]:
class TimeSeriesDataset(Dataset):
    """PyTorch Dataset for time series sequences."""
    
    def __init__(self, X: np.ndarray, y: np.ndarray):
        """
        Args:
            X: Feature sequences (n_samples, seq_length, n_features)
            y: Target values (n_samples,)
        """
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)
    
    def __len__(self) -> int:
        return len(self.X)
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        return self.X[idx], self.y[idx]

# Create datasets
train_dataset = TimeSeriesDataset(X_train, y_train)
val_dataset = TimeSeriesDataset(X_val, y_val)
test_dataset = TimeSeriesDataset(X_test, y_test)

# Create data loaders
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Train batches: {len(train_loader)}")
print(f"Validation batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

Train batches: 37
Validation batches: 8
Test batches: 8


## 6. LSTM Model Architecture

Define the LSTM model with dropout for regularization.

In [82]:
class LSTMPredictor(nn.Module):
    """
    LSTM model for stock price prediction.
    
    Architecture:
    - LSTM layers for sequence learning
    - Dropout for regularization
    - Fully connected layers for output
    """
    
    def __init__(
        self,
        input_size: int,
        hidden_size: int = 64,
        num_layers: int = 2,
        dropout: float = 0.2,
        output_size: int = 1
    ):
        """
        Args:
            input_size: Number of features per time step
            hidden_size: Number of LSTM hidden units
            num_layers: Number of LSTM layers
            dropout: Dropout probability
            output_size: Size of output (typically 1 for price prediction)
        """
        super(LSTMPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layers
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Additional dropout
        self.dropout = nn.Dropout(dropout)
        
        # Fully connected output layer
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.
        
        Args:
            x: Input tensor (batch_size, seq_length, input_size)
            
        Returns:
            Output tensor (batch_size, output_size)
        """
        # LSTM forward pass
        lstm_out, _ = self.lstm(x)
        
        # Take the last output from the sequence
        lstm_out = lstm_out[:, -1, :]
        
        # Apply dropout
        lstm_out = self.dropout(lstm_out)
        
        # Fully connected layer
        output = self.fc(lstm_out)
        
        return output.squeeze(-1)  # Remove last dimension if output_size=1

# Initialize model
INPUT_SIZE = X_train.shape[2]  # Number of features
HIDDEN_SIZE = 64
NUM_LAYERS = 2
DROPOUT = 0.2

model = LSTMPredictor(
    input_size=INPUT_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    dropout=DROPOUT
)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Model architecture:")
print(f"  Input size: {INPUT_SIZE}")
print(f"  Hidden size: {HIDDEN_SIZE}")
print(f"  LSTM layers: {NUM_LAYERS}")
print(f"  Total parameters: {total_params:,}")
print(f"  Trainable parameters: {trainable_params:,}")

# Move to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
print(f"Using device: {device}")

Model architecture:
  Input size: 27
  Hidden size: 64
  LSTM layers: 2
  Total parameters: 57,153
  Trainable parameters: 57,153
Using device: cuda


## 6. Hyperparameter Optimization

Use Optuna to find optimal hyperparameters for the LSTM model.

In [83]:
if not OPTUNA_AVAILABLE:
    print("Skipping hyperparameter optimization - Optuna not available")
    print("Install with: pip install optuna")
else:
    print("Optuna is available. Hyperparameter optimization will be performed.")
    
# Configuration for hyperparameter optimization
OPTIMIZE_HYPERPARAMETERS = True  # Set to False to skip optimization
N_TRIALS = 20  # Number of optimization trials (increase for better results, but slower)
OPTIMIZATION_TIMEOUT = 3600  # Maximum time in seconds (1 hour)

Optuna is available. Hyperparameter optimization will be performed.


In [84]:
if OPTUNA_AVAILABLE and OPTIMIZE_HYPERPARAMETERS:
    def objective(trial):
        """
        Optuna objective function for hyperparameter optimization.
        
        Args:
            trial: Optuna trial object
            
        Returns:
            Validation loss (to minimize)
        """
        # Suggest hyperparameters
        hidden_size = trial.suggest_int('hidden_size', 32, 128, step=16)
        num_layers = trial.suggest_int('num_layers', 1, 3)
        dropout = trial.suggest_float('dropout', 0.1, 0.5, step=0.1)
        learning_rate = trial.suggest_float('learning_rate', 1e-4, 1e-2, log=True)
        weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-4, log=True)
        batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])
        
        # Create model with suggested hyperparameters
        model = LSTMPredictor(
            input_size=INPUT_SIZE,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout
        ).to(device)
        
        # Create optimizer and loss (Huber Loss for robustness to outliers)
        optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        criterion = nn.HuberLoss(delta=1.0)  # Changed from MSE to Huber Loss
        
        # Create data loaders with suggested batch size
        train_dataset_opt = TimeSeriesDataset(X_train, y_train)
        val_dataset_opt = TimeSeriesDataset(X_val, y_val)
        train_loader_opt = DataLoader(train_dataset_opt, batch_size=batch_size, shuffle=False)
        val_loader_opt = DataLoader(val_dataset_opt, batch_size=batch_size, shuffle=False)
        
        # Training loop (shorter for optimization)
        n_epochs_opt = 10  # Fewer epochs for faster optimization
        best_directional_accuracy = -1.0  # Track directional accuracy (higher is better)
        patience = 5
        patience_counter = 0
        
        for epoch in range(n_epochs_opt):
            # Train
            model.train()
            for X_batch, y_batch in train_loader_opt:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                optimizer.zero_grad()
                predictions = model(X_batch)
                loss = criterion(predictions, y_batch)
                loss.backward()
                optimizer.step()
            
            # Validate - calculate directional accuracy
            model.eval()
            all_preds = []
            all_targets = []
            with torch.no_grad():
                for X_batch, y_batch in val_loader_opt:
                    X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                    predictions = model(X_batch)
                    all_preds.extend(predictions.cpu().numpy())
                    all_targets.extend(y_batch.cpu().numpy())
            
            # Calculate directional accuracy
            all_preds = np.array(all_preds)
            all_targets = np.array(all_targets)
            # Directional accuracy: percentage of correct direction predictions
            directional_accuracy = np.mean((all_preds * all_targets) > 0) * 100
            
            # Early stopping based on directional accuracy
            if directional_accuracy > best_directional_accuracy:
                best_directional_accuracy = directional_accuracy
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    break
            
            # Report intermediate value for pruning (use negative for minimization)
            trial.report(-directional_accuracy, epoch)  # Negative because we want to maximize
            if trial.should_prune():
                raise optuna.TrialPruned()
        
        return -best_directional_accuracy  # Return negative for minimization
    
    # Create study
    print("Starting hyperparameter optimization...")
    print(f"Number of trials: {N_TRIALS}")
    print(f"Timeout: {OPTIMIZATION_TIMEOUT} seconds")
    print("=" * 60)
    
    study = optuna.create_study(
        direction='minimize',
        study_name='lstm_hyperparameter_optimization',
        pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=3)
    )
    
    # Run optimization
    study.optimize(
        objective,
        n_trials=N_TRIALS,
        timeout=OPTIMIZATION_TIMEOUT,
        show_progress_bar=True
    )
    
    # Display results
    print("\n" + "=" * 60)
    print("Hyperparameter Optimization Results")
    print("=" * 60)
    print(f"Best trial:")
    print(f"  Value (validation loss): {study.best_value:.6f}")
    print(f"\nBest hyperparameters:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    
    # Store best hyperparameters
    BEST_HYPERPARAMETERS = study.best_params.copy()
    
    print("\n" + "=" * 60)
    
    # Visualization of optimization results
    if OPTUNA_AVAILABLE:
        try:
            optuna.visualization.plot_optimization_history(study).show()
        except:
            print("Note: Visualization requires plotly. Install with: pip install plotly")
        
        try:
            optuna.visualization.plot_param_importances(study).show()
        except:
            pass
else:
    # Use default hyperparameters if optimization is skipped
    BEST_HYPERPARAMETERS = {
        'hidden_size': 64,
        'num_layers': 2,
        'dropout': 0.2,
        'learning_rate': 0.001,
        'weight_decay': 1e-5,
        'batch_size': 32
    }
    print("Using default hyperparameters (optimization skipped)")
    print(f"Hyperparameters: {BEST_HYPERPARAMETERS}")

Starting hyperparameter optimization...

[I 2025-12-20 12:11:25,178] A new study created in memory with name: lstm_hyperparameter_optimization



Number of trials: 20
Timeout: 3600 seconds


  0%|          | 0/20 [00:00<?, ?it/s]

[I 2025-12-20 12:11:25,748] Trial 0 finished with value: -52.589641434262944 and parameters: {'hidden_size': 80, 'num_layers': 1, 'dropout': 0.1, 'learning_rate': 0.000338117592576325, 'weight_decay': 9.71968772214018e-06, 'batch_size': 64}. Best is trial 0 with value: -52.589641434262944.
[I 2025-12-20 12:11:26,258] Trial 1 finished with value: -53.38645418326693 and parameters: {'hidden_size': 80, 'num_layers': 3, 'dropout': 0.5, 'learning_rate': 0.001584536973920861, 'weight_decay': 3.908883224447673e-06, 'batch_size': 64}. Best is trial 1 with value: -53.38645418326693.
[I 2025-12-20 12:11:26,960] Trial 2 finished with value: -47.808764940239044 and parameters: {'hidden_size': 48, 'num_layers': 3, 'dropout': 0.4, 'learning_rate': 0.000712280635661649, 'weight_decay': 3.7962325861566744e-06, 'batch_size': 32}. Best is trial 1 with value: -53.38645418326693.
[I 2025-12-20 12:11:27,377] Trial 3 finished with value: -53.38645418326693 and parameters: {'hidden_size': 80, 'num_layers': 3

## 7. Model Architecture with Optimized Hyperparameters

Create the LSTM model using the best hyperparameters found during optimization.

In [85]:
# Initialize model with optimized hyperparameters
HIDDEN_SIZE = BEST_HYPERPARAMETERS['hidden_size']
NUM_LAYERS = BEST_HYPERPARAMETERS['num_layers']
DROPOUT = BEST_HYPERPARAMETERS['dropout']

model = LSTMPredictor(
    input_size=INPUT_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    dropout=DROPOUT
)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Model architecture (with optimized hyperparameters):")
print(f"  Input size: {INPUT_SIZE}")
print(f"  Hidden size: {HIDDEN_SIZE}")
print(f"  LSTM layers: {NUM_LAYERS}")
print(f"  Dropout: {DROPOUT}")
print(f"  Total parameters: {total_params:,}")
print(f"  Trainable parameters: {trainable_params:,}")

# Move to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
print(f"Using device: {device}")

Model architecture (with optimized hyperparameters):
  Input size: 27
  Hidden size: 64
  LSTM layers: 1
  Dropout: 0.5
  Total parameters: 23,873
  Trainable parameters: 23,873
Using device: cuda


## 8. Training Setup

Define loss function, optimizer, and learning rate scheduler using optimized hyperparameters.

In [86]:
# Custom Trading Loss Function with Directional Penalty
# This addresses the "Negative Drift Convergence" problem
# A model can have low Huber loss by predicting "0" or average return,
# but we need to penalize directional errors (predicting up when market goes down)
def trading_loss(y_pred, y_true, lambda_dir=0.1):
    """
    Hybrid loss function combining regression loss with directional penalty.
    
    Args:
        y_pred: Predicted log returns
        y_true: Actual log returns
        lambda_dir: Weight for directional penalty (default 0.1)
    
    Returns:
        Combined loss: Huber loss + directional penalty
    """
    # Standard Regression Loss (Huber Loss - robust to outliers)
    huber_loss = nn.HuberLoss(delta=1.0)(y_pred, y_true)
    
    # Directional Penalty: if sign(pred) != sign(true), add a penalty
    # (y_pred * y_true) is negative when signs are different
    # relu(-y_pred * y_true) gives penalty only when signs differ
    direction_error = torch.relu(-y_pred * y_true)
    
    return huber_loss + (lambda_dir * direction_error.mean())

# Use custom trading loss
criterion = trading_loss

# Optimizer with optimized hyperparameters
# Adjust learning rate: Optuna result may be too high for financial data
LEARNING_RATE_OPTUNA = BEST_HYPERPARAMETERS['learning_rate']
LEARNING_RATE = min(LEARNING_RATE_OPTUNA, 1e-4)  # Cap at 1e-4 for stability
WEIGHT_DECAY = BEST_HYPERPARAMETERS['weight_decay']
BATCH_SIZE = BEST_HYPERPARAMETERS['batch_size']

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Learning rate scheduler (reduce on plateau)
# Note: We'll monitor directional accuracy instead of loss
# Directional accuracy is what we actually care about for trading
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='max', factor=0.5, patience=5  # 'max' because we want to maximize directional accuracy
)

# Training parameters
NUM_EPOCHS = 50
EARLY_STOPPING_PATIENCE = 10
MIN_DELTA = 1e-6

# Recreate data loaders with optimized batch size
train_dataset = TimeSeriesDataset(X_train, y_train)
val_dataset = TimeSeriesDataset(X_val, y_val)
test_dataset = TimeSeriesDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Training configuration (with optimized hyperparameters):")
print(f"  Loss function: Custom Trading Loss (Huber + Directional Penalty)")
print(f"  Optimizer: Adam (lr={LEARNING_RATE:.6f}, weight_decay={WEIGHT_DECAY:.2e})")
if LEARNING_RATE < LEARNING_RATE_OPTUNA:
    print(f"    Note: Learning rate capped at 1e-4 (Optuna suggested {LEARNING_RATE_OPTUNA:.6f})")
print(f"  Scheduler: ReduceLROnPlateau")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Early stopping patience: {EARLY_STOPPING_PATIENCE}")

Training configuration (with optimized hyperparameters):
  Loss function: Custom Trading Loss (Huber + Directional Penalty)
  Optimizer: Adam (lr=0.000100, weight_decay=3.99e-05)
    Note: Learning rate capped at 1e-4 (Optuna suggested 0.003597)
  Scheduler: ReduceLROnPlateau
  Batch size: 16
  Epochs: 50
  Early stopping patience: 10


## 9. Training Loop

Train the model with early stopping and validation monitoring.

In [87]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Train for one epoch using custom trading loss."""
    model.train()
    total_loss = 0.0
    n_batches = 0
    
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        # Forward pass
        optimizer.zero_grad()
        predictions = model(X_batch)
        # criterion is now a function, not a PyTorch loss module
        loss = criterion(predictions, y_batch)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        n_batches += 1
    
    return total_loss / n_batches

def validate(model, val_loader, criterion, device):
    """Validate the model and return loss and directional accuracy."""
    model.eval()
    total_loss = 0.0
    n_batches = 0
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            predictions = model(X_batch)
            # criterion is now a function, not a PyTorch loss module
            loss = criterion(predictions, y_batch)
            total_loss += loss.item()
            n_batches += 1
            all_preds.extend(predictions.cpu().numpy())
            all_targets.extend(y_batch.cpu().numpy())
    
    avg_loss = total_loss / n_batches
    
    # Calculate directional accuracy
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    directional_accuracy = np.mean((all_preds * all_targets) > 0) * 100
    
    return avg_loss, directional_accuracy

# Training loop
train_losses = []
val_losses = []
val_directional_accuracies = []
best_directional_accuracy = -1.0  # Track directional accuracy (higher is better)
patience_counter = 0
best_model_state = None  # Initialize to None, will be set on first improvement

print("Starting training...")
print("Monitoring: Directional Accuracy (higher is better)")
print("=" * 60)

for epoch in range(NUM_EPOCHS):
    # Train
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(train_loss)
    
    # Validate - returns both loss and directional accuracy
    val_loss, val_dir_acc = validate(model, val_loader, criterion, device)
    val_losses.append(val_loss)
    val_directional_accuracies.append(val_dir_acc)
    
    # Learning rate scheduling based on directional accuracy (what we care about)
    scheduler.step(val_dir_acc)  # Monitor directional accuracy instead of loss
    
    # Early stopping check based on directional accuracy
    if val_dir_acc > best_directional_accuracy + MIN_DELTA:
        best_directional_accuracy = val_dir_acc
        patience_counter = 0
        # Save best model (in practice, save to disk)
        best_model_state = model.state_dict().copy()
    else:
        patience_counter += 1
    
    # Print progress
    if (epoch + 1) % 5 == 0 or epoch == 0:
        print(
            f"Epoch [{epoch+1}/{NUM_EPOCHS}] | "
            f"Train Loss: {train_loss:.6f} | "
            f"Val Loss: {val_loss:.6f} | "
            f"Val Dir Acc: {val_dir_acc:.2f}% | "
            f"LR: {optimizer.param_groups[0]['lr']:.6f}"
        )
    
    # Early stopping
    if patience_counter >= EARLY_STOPPING_PATIENCE:
        print(f"\nEarly stopping at epoch {epoch+1}")
        if best_model_state is not None:
            model.load_state_dict(best_model_state)
            print("Loaded best model state")
        else:
            print("Warning: No model improvement found, using current model state")
        break

print("=" * 60)
print("Training completed!")
print(f"Best directional accuracy: {best_directional_accuracy:.2f}%")

Starting training...
Monitoring: Directional Accuracy (higher is better)
Epoch [1/50] | Train Loss: 0.004184 | Val Loss: 0.000707 | Val Dir Acc: 53.39% | LR: 0.000100
Epoch [5/50] | Train Loss: 0.001777 | Val Loss: 0.000409 | Val Dir Acc: 51.39% | LR: 0.000100
Epoch [10/50] | Train Loss: 0.001024 | Val Loss: 0.000279 | Val Dir Acc: 52.99% | LR: 0.000100
Epoch [15/50] | Train Loss: 0.000709 | Val Loss: 0.000235 | Val Dir Acc: 51.00% | LR: 0.000050

Early stopping at epoch 17
Loaded best model state
Training completed!
Best directional accuracy: 54.18%


In [88]:
# FIXED VERSION: evaluate_model_updated with unscaled log returns support
# This replaces the function definition in the next cell
def evaluate_model_updated(model, test_loader, scaler_y, device, df_features):
    """
    Evaluate model and return predictions and targets.
    
    FIXED: Now handles unscaled log returns (scaler_y = None)
    
    Since we're predicting log returns, we need to:
    1. Get actual close prices from test set (ground truth)
    2. Reconstruct predicted prices from log returns
    3. Calculate metrics on actual prices (not reconstructed actual prices)
    """
    model.eval()
    predictions = []
    targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            pred = model(X_batch)
            predictions.extend(pred.cpu().numpy())
            targets.extend(y_batch.cpu().numpy())
    
    # Convert to numpy arrays (these are log returns)
    predictions_log_returns = np.array(predictions)
    targets_log_returns = np.array(targets)
    
    # Denormalize log returns (handle case where scaler_y is None - no scaling applied)
    if scaler_y is not None:
        predictions_log_returns_denorm = scaler_y.inverse_transform(
            predictions_log_returns.reshape(-1, 1)
        ).flatten()
        targets_log_returns_denorm = scaler_y.inverse_transform(
            targets_log_returns.reshape(-1, 1)
        ).flatten()
    else:
        # No scaling was applied, use raw values directly
        predictions_log_returns_denorm = predictions_log_returns
        targets_log_returns_denorm = targets_log_returns
    
    # Get actual close prices from test set (ground truth)
    test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
    test_end_idx = test_start_idx + len(predictions_log_returns_denorm)
    
    # Get actual close prices from the test set
    actual_close_prices = df_features['close'].iloc[test_start_idx:test_end_idx].values
    
    # Get the close price just before the test set starts (for reconstruction)
    if test_start_idx > 0:
        initial_close = df_features['close'].iloc[test_start_idx - 1]
    else:
        initial_close = df_features['close'].iloc[0]
    
    # Reconstruct predicted prices from log returns: price_t = price_{t-1} * exp(log_return_t)
    predicted_prices = np.zeros_like(predictions_log_returns_denorm)
    predicted_prices[0] = initial_close * np.exp(predictions_log_returns_denorm[0])
    
    for i in range(1, len(predictions_log_returns_denorm)):
        # Use actual previous price for more accurate reconstruction
        predicted_prices[i] = actual_close_prices[i-1] * np.exp(predictions_log_returns_denorm[i])
    
    return (
        predictions_log_returns_denorm,
        targets_log_returns_denorm,
        predicted_prices,
        actual_close_prices
    )

print("✓ Fixed evaluate_model_updated function defined (handles unscaled log returns)")

✓ Fixed evaluate_model_updated function defined (handles unscaled log returns)


In [89]:
# Utility: Patch all evaluate_model_updated function definitions to handle unscaled log returns
# This ensures all conditional fallback definitions are also fixed

import inspect
import types

def patch_evaluate_function():
    """Patch the evaluate_model_updated function if it exists and needs fixing."""
    if 'evaluate_model_updated' in globals():
        func = globals()['evaluate_model_updated']
        source = inspect.getsource(func)
        
        # Check if it already has the fix
        if 'scaler_y is not None' in source:
            print("✓ evaluate_model_updated already has the fix")
            return
        
        # If not fixed, the function from Cell 25 should override it
        print("Note: Using fixed version from Cell 25")
    else:
        print("Note: evaluate_model_updated will be defined in Cell 25")

# Run the patch check
patch_evaluate_function()
print("\nAll evaluation functions should now handle unscaled log returns (scaler_y = None)")
print("The fixed version is defined in Cell 25 and will be used by all conditional blocks.")

✓ evaluate_model_updated already has the fix

All evaluation functions should now handle unscaled log returns (scaler_y = None)
The fixed version is defined in Cell 25 and will be used by all conditional blocks.


In [90]:
# FIXED VERSION: calculate_directional_accuracy with unscaled log returns support
def calculate_directional_accuracy(model, data_loader, device):
    """Calculate directional accuracy for a dataset.
    
    FIXED: Now handles unscaled log returns (scaler_y = None)
    """
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            predictions = model(X_batch)
            all_preds.extend(predictions.cpu().numpy())
            all_targets.extend(y_batch.cpu().numpy())
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    
    # Denormalize (handle case where scaler_y is None - no scaling applied)
    if scaler_y is not None:
        all_preds_denorm = scaler_y.inverse_transform(all_preds.reshape(-1, 1)).flatten()
        all_targets_denorm = scaler_y.inverse_transform(all_targets.reshape(-1, 1)).flatten()
    else:
        # No scaling was applied, use raw values directly
        all_preds_denorm = all_preds
        all_targets_denorm = all_targets
    
    # Calculate directional accuracy
    dir_acc = np.mean((all_preds_denorm * all_targets_denorm) > 0) * 100
    return dir_acc

print("✓ Fixed calculate_directional_accuracy function defined (handles unscaled log returns)")

✓ Fixed calculate_directional_accuracy function defined (handles unscaled log returns)


## ⚠️ IMPORTANT: Run Cell 27 First!

**Before running the cells below**, make sure you've run **Cell 27** which contains the fixed `calculate_directional_accuracy` function that handles unscaled log returns (`scaler_y = None`).

The fixed function will override any old definitions and prevent the `AttributeError: 'NoneType' object has no attribute 'inverse_transform'` error.

In [91]:
# CRITICAL FIX: Redefine calculate_directional_accuracy to handle unscaled log returns
# This MUST be run before the next cell to override any old definitions
def calculate_directional_accuracy(model, data_loader, device):
    """Calculate directional accuracy for a dataset.
    
    FIXED: Now handles unscaled log returns (scaler_y = None)
    """
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            predictions = model(X_batch)
            all_preds.extend(predictions.cpu().numpy())
            all_targets.extend(y_batch.cpu().numpy())
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    
    # Denormalize (handle case where scaler_y is None - no scaling applied)
    if scaler_y is not None:
        all_preds_denorm = scaler_y.inverse_transform(all_preds.reshape(-1, 1)).flatten()
        all_targets_denorm = scaler_y.inverse_transform(all_targets.reshape(-1, 1)).flatten()
    else:
        # No scaling was applied, use raw values directly
        all_preds_denorm = all_preds
        all_targets_denorm = all_targets
    
    # Calculate directional accuracy
    dir_acc = np.mean((all_preds_denorm * all_targets_denorm) > 0) * 100
    return dir_acc

print("✓ calculate_directional_accuracy function FIXED and ready to use")

✓ calculate_directional_accuracy function FIXED and ready to use


In [100]:
# ============================================================================
# CRITICAL FIX: Override ALL old calculate_directional_accuracy definitions
# ============================================================================
# This cell MUST be run before any cell that uses calculate_directional_accuracy
# It deletes all old definitions and creates the fixed version

import inspect

# Delete ALL old definitions (they may exist in multiple cells)
if 'calculate_directional_accuracy' in globals():
    del calculate_directional_accuracy
    print("✓ Deleted old calculate_directional_accuracy definition")

# Define the FIXED version that handles unscaled log returns
def calculate_directional_accuracy(model, data_loader, device):
    """Calculate directional accuracy for a dataset.
    
    FIXED: Now handles unscaled log returns (scaler_y = None)
    This prevents: AttributeError: 'NoneType' object has no attribute 'inverse_transform'
    """
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            predictions = model(X_batch)
            all_preds.extend(predictions.cpu().numpy())
            all_targets.extend(y_batch.cpu().numpy())
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    
    # Denormalize (handle case where scaler_y is None - no scaling applied)
    if scaler_y is not None:
        all_preds_denorm = scaler_y.inverse_transform(all_preds.reshape(-1, 1)).flatten()
        all_targets_denorm = scaler_y.inverse_transform(all_targets.reshape(-1, 1)).flatten()
    else:
        # No scaling was applied, use raw values directly
        all_preds_denorm = all_preds
        all_targets_denorm = all_targets
    
    # Calculate directional accuracy
    dir_acc = np.mean((all_preds_denorm * all_targets_denorm) > 0) * 100
    return dir_acc

print("✓ FIXED calculate_directional_accuracy function is now active")
print("  This version handles unscaled log returns (scaler_y = None)")
print("\n⚠️ IMPORTANT: If you see the error again, run this cell FIRST, then re-run the cell with the error")

✓ Deleted old calculate_directional_accuracy definition
✓ FIXED calculate_directional_accuracy function is now active
  This version handles unscaled log returns (scaler_y = None)

⚠️ IMPORTANT: If you see the error again, run this cell FIRST, then re-run the cell with the error


## 10. Training Visualization

Plot training and validation loss curves.

## Overfitting Check: Validation vs Test Set Comparison

Compare directional accuracy on validation and test sets to detect overfitting.

In [101]:
# Calculate directional accuracy on validation set for comparison
def calculate_directional_accuracy(model, data_loader, device):
    """Calculate directional accuracy for a dataset."""
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            predictions = model(X_batch)
            all_preds.extend(predictions.cpu().numpy())
            all_targets.extend(y_batch.cpu().numpy())  # Move to CPU before converting to numpy
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    
    # Denormalize
    all_preds_denorm = scaler_y.inverse_transform(all_preds.reshape(-1, 1)).flatten()
    all_targets_denorm = scaler_y.inverse_transform(all_targets.reshape(-1, 1)).flatten()
    
    # Calculate directional accuracy
    dir_acc = np.mean((all_preds_denorm * all_targets_denorm) > 0) * 100
    return dir_acc

# Calculate on validation set
val_directional_accuracy = calculate_directional_accuracy(model, val_loader, device)

# Calculate on test set (if not already calculated)
if 'directional_accuracy_test' not in globals():
    directional_accuracy_test = calculate_directional_accuracy(model, test_loader, device)
    print("Calculated test set directional accuracy")
else:
    print("Using pre-calculated test set directional accuracy")

# Compare validation vs test
print("\nOverfitting Analysis:")
print("=" * 60)
print(f"Validation Set Directional Accuracy: {val_directional_accuracy:.2f}%")
print(f"Test Set Directional Accuracy: {directional_accuracy_test:.2f}%")
print(f"Difference: {val_directional_accuracy - directional_accuracy_test:.2f}%")

# Determine if overfitting is present
accuracy_diff = val_directional_accuracy - directional_accuracy_test
if accuracy_diff > 5:
    print(f"\n⚠ WARNING: Potential Overfitting Detected!")
    print(f"  Validation accuracy is {accuracy_diff:.2f}% higher than test accuracy")
    print(f"  Recommendations:")
    print(f"    - Increase dropout rate (current: {DROPOUT})")
    print(f"    - Increase weight decay (current: {WEIGHT_DECAY:.2e})")
    print(f"    - Reduce model complexity (hidden_size: {HIDDEN_SIZE}, layers: {NUM_LAYERS})")
    print(f"    - Add more regularization")
elif accuracy_diff > 2:
    print(f"\n⚠ Minor Overfitting Detected")
    print(f"  Consider slight increase in regularization")
else:
    print(f"\n✓ Good Generalization")
    print(f"  Test accuracy is close to validation accuracy")
    print(f"  Model is not overfitting significantly")

print("\n" + "=" * 60)
print("Key Metrics Summary:")
print(f"  Best Validation Dir. Acc: {best_directional_accuracy:.2f}%")
print(f"  Final Validation Dir. Acc: {val_directional_accuracy:.2f}%")
print(f"  Test Set Dir. Acc: {directional_accuracy_test:.2f}%")
print("=" * 60)

AttributeError: 'NoneType' object has no attribute 'inverse_transform'

## Trading Performance Analysis

Analyze cumulative returns and win rate to assess trading viability.

In [None]:
# Detailed trading performance analysis
print("Trading Performance Analysis:")
print("=" * 60)

# Define evaluation function if it doesn't exist (from cell 25)
if 'evaluate_model_updated' not in globals():
    def evaluate_model_updated(model, test_loader, scaler_y, device, df_features):
        """
        Evaluate model and return predictions and targets.
        
        Since we're predicting log returns, we need to:
        1. Get actual close prices from test set (ground truth)
        2. Reconstruct predicted prices from log returns
        3. Calculate metrics on actual prices (not reconstructed actual prices)
        """
        model.eval()
        predictions = []
        targets = []
        
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch = X_batch.to(device)
                pred = model(X_batch)
                predictions.extend(pred.cpu().numpy())
                targets.extend(y_batch.cpu().numpy())
        
        # Convert to numpy arrays (these are log returns)
        predictions_log_returns = np.array(predictions)
        targets_log_returns = np.array(targets)
        
        # Denormalize log returns
        predictions_log_returns_denorm = scaler_y.inverse_transform(
            predictions_log_returns.reshape(-1, 1)
        ).flatten()
        targets_log_returns_denorm = scaler_y.inverse_transform(
            targets_log_returns.reshape(-1, 1)
        ).flatten()
        
        # Get actual close prices from test set (ground truth)
        test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
        test_end_idx = test_start_idx + len(predictions_log_returns_denorm)
        
        # Get actual close prices from the test set
        actual_close_prices = df_features['close'].iloc[test_start_idx:test_end_idx].values
        
        # Get the close price just before the test set starts (for reconstruction)
        if test_start_idx > 0:
            initial_close = df_features['close'].iloc[test_start_idx - 1]
        else:
            initial_close = df_features['close'].iloc[0]
        
        # Reconstruct predicted prices from log returns: price_t = price_{t-1} * exp(log_return_t)
        predicted_prices = np.zeros_like(predictions_log_returns_denorm)
        predicted_prices[0] = initial_close * np.exp(predictions_log_returns_denorm[0])
        
        for i in range(1, len(predictions_log_returns_denorm)):
            # Use actual previous price for more accurate reconstruction
            predicted_prices[i] = actual_close_prices[i-1] * np.exp(predictions_log_returns_denorm[i])
        
        return (
            predictions_log_returns_denorm,
            targets_log_returns_denorm,
            predicted_prices,
            actual_close_prices  # Return actual prices, not reconstructed
        )

# Check if required variables exist, if not calculate them
if 'y_pred_log_returns' not in globals() or 'y_true_log_returns' not in globals():
    print("Calculating predictions from test set...")
    y_pred_log_returns, y_true_log_returns, y_pred_prices, y_true_prices = evaluate_model_updated(
        model, test_loader, scaler_y, device, df_features
    )

# Convert log returns to simple returns if not already done
if 'predicted_returns' not in globals() or 'actual_returns' not in globals():
    predicted_returns = np.exp(y_pred_log_returns) - 1  # Convert log returns to simple returns
    actual_returns = np.exp(y_true_log_returns) - 1

# Calculate strategy returns (if we traded on predictions)
# Simple strategy: Buy when predicted return > 0, Sell when < 0
strategy_returns = np.where(
    y_pred_log_returns > 0,
    actual_returns,  # If we predict up, we get the actual return
    -actual_returns  # If we predict down, we short (get negative of actual return)
)

# Cumulative strategy returns
cumulative_strategy_returns = np.cumprod(1 + strategy_returns) - 1
total_strategy_return = cumulative_strategy_returns[-1] * 100

# Calculate Sharpe-like ratio (annualized, assuming daily returns)
strategy_mean_return = np.mean(strategy_returns)
strategy_std_return = np.std(strategy_returns)
if strategy_std_return > 0:
    sharpe_like = (strategy_mean_return / strategy_std_return) * np.sqrt(252)  # Annualized
else:
    sharpe_like = 0

# Win rate (percentage of profitable trades)
profitable_trades = strategy_returns > 0
win_rate_strategy = np.mean(profitable_trades) * 100

# Maximum drawdown
running_max = np.maximum.accumulate(cumulative_strategy_returns)
drawdown = cumulative_strategy_returns - running_max
max_drawdown = np.min(drawdown) * 100

# Compare to buy-and-hold (if total_actual_return exists, use it; otherwise calculate)
if 'total_actual_return' not in globals():
    cumulative_actual_returns = np.cumprod(1 + actual_returns) - 1
    total_actual_return = cumulative_actual_returns[-1] * 100

buy_hold_return = total_actual_return

print(f"Strategy Performance (Trading on Predictions):")
print(f"  Total Return: {total_strategy_return:.2f}%")
print(f"  Buy-and-Hold Return: {buy_hold_return:.2f}%")
print(f"  Excess Return: {total_strategy_return - buy_hold_return:.2f}%")
print(f"  Win Rate: {win_rate_strategy:.2f}%")
print(f"  Sharpe-like Ratio: {sharpe_like:.2f}")
print(f"  Max Drawdown: {max_drawdown:.2f}%")

if total_strategy_return > buy_hold_return:
    print(f"\n✓ Strategy outperforms buy-and-hold by {total_strategy_return - buy_hold_return:.2f}%")
else:
    print(f"\n✗ Strategy underperforms buy-and-hold by {buy_hold_return - total_strategy_return:.2f}%")

# Risk-adjusted metrics
if max_drawdown < 0:
    print(f"\nRisk Metrics:")
    print(f"  Max Drawdown: {max_drawdown:.2f}%")
    if abs(max_drawdown) < 10:
        print(f"  ✓ Low drawdown - good risk control")
    elif abs(max_drawdown) < 20:
        print(f"  ⚠ Moderate drawdown - acceptable risk")
    else:
        print(f"  ✗ High drawdown - high risk strategy")

print("\n" + "=" * 60)
print("Trading Viability Assessment:")
# Get directional accuracy if not already defined
if 'directional_accuracy_test' not in globals():
    directional_accuracy_test = np.mean((y_pred_log_returns * y_true_log_returns) > 0) * 100

if directional_accuracy_test >= 55 and total_strategy_return > 0:
    print("  ✓ Directional accuracy above 55% threshold")
    print("  ✓ Positive cumulative returns")
    print("  → Strategy shows promise for live trading (with proper risk management)")
elif directional_accuracy_test >= 55:
    print("  ✓ Directional accuracy above 55% threshold")
    print("  ✗ Negative cumulative returns")
    print("  → Need to improve return prediction magnitude, not just direction")
else:
    print("  ✗ Directional accuracy below 55% threshold")
    print("  → Need to improve model before live trading")
print("=" * 60)

In [None]:
# Get test indices if not already defined
if 'test_indices' not in globals():
    test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
    test_indices = df_features.index[test_start_idx:test_start_idx + len(y_pred_log_returns)]

# Calculate cumulative actual returns if not already done
if 'cumulative_actual_returns' not in globals():
    cumulative_actual_returns = np.cumprod(1 + actual_returns) - 1

# Visualize trading performance
fig, axes = plt.subplots(2, 2, figsize=(16, 10))

# Plot 1: Cumulative Returns Comparison
axes[0, 0].plot(test_indices[:len(cumulative_strategy_returns)], 
                cumulative_strategy_returns * 100, 
                label='Strategy Returns', alpha=0.7, linewidth=1.5, color='green')
axes[0, 0].plot(test_indices[:len(cumulative_actual_returns)], 
                cumulative_actual_returns * 100, 
                label='Buy-and-Hold', alpha=0.7, linewidth=1.5, color='blue')
axes[0, 0].axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
axes[0, 0].set_xlabel('Date')
axes[0, 0].set_ylabel('Cumulative Return (%)')
axes[0, 0].set_title('Strategy vs Buy-and-Hold Performance')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].tick_params(axis='x', rotation=45)

# Plot 2: Drawdown
axes[0, 1].fill_between(test_indices[:len(drawdown)], drawdown * 100, 0, 
                        alpha=0.3, color='red', label='Drawdown')
axes[0, 1].axhline(y=0, color='black', linestyle='--', linewidth=1)
axes[0, 1].set_xlabel('Date')
axes[0, 1].set_ylabel('Drawdown (%)')
axes[0, 1].set_title(f'Maximum Drawdown: {max_drawdown:.2f}%')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].tick_params(axis='x', rotation=45)

# Plot 3: Return Distribution
axes[1, 0].hist(strategy_returns * 100, bins=50, alpha=0.7, edgecolor='black', label='Strategy Returns')
axes[1, 0].hist(actual_returns * 100, bins=50, alpha=0.5, edgecolor='black', label='Market Returns')
axes[1, 0].axvline(x=0, color='red', linestyle='--', linewidth=2)
axes[1, 0].set_xlabel('Return (%)')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Return Distribution Comparison')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Rolling Win Rate
window_size = min(50, len(strategy_returns) // 4)
if window_size > 1:
    rolling_win_rate = pd.Series(profitable_trades).rolling(window=window_size).mean() * 100
    axes[1, 1].plot(test_indices[:len(rolling_win_rate)], rolling_win_rate, 
                   alpha=0.7, linewidth=1.5, color='purple')
    axes[1, 1].axhline(y=50, color='red', linestyle='--', linewidth=1, label='50% (Random)')
    axes[1, 1].axhline(y=win_rate_strategy, color='green', linestyle='--', linewidth=1, 
                      label=f'Average: {win_rate_strategy:.1f}%')
    axes[1, 1].set_xlabel('Date')
    axes[1, 1].set_ylabel('Win Rate (%)')
    axes[1, 1].set_title(f'Rolling Win Rate (window={window_size})')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    axes[1, 1].tick_params(axis='x', rotation=45)
else:
    axes[1, 1].text(0.5, 0.5, 'Insufficient data\nfor rolling win rate', 
                   ha='center', va='center', transform=axes[1, 1].transAxes)
    axes[1, 1].set_title('Rolling Win Rate')

plt.tight_layout()
plt.show()

# Summary statistics table
print("\nPerformance Summary Table:")
print("=" * 60)

# Get validation accuracy if available
if 'val_directional_accuracy' not in globals():
    val_directional_accuracy = calculate_directional_accuracy(model, val_loader, device)

# Calculate accuracy difference
if 'accuracy_diff' not in globals():
    accuracy_diff = val_directional_accuracy - directional_accuracy_test

summary_data = {
    'Metric': [
        'Directional Accuracy (Test)',
        'Total Strategy Return',
        'Buy-and-Hold Return',
        'Excess Return',
        'Win Rate',
        'Sharpe-like Ratio',
        'Max Drawdown',
        'Validation Dir. Acc',
        'Test vs Val Difference'
    ],
    'Value': [
        f'{directional_accuracy_test:.2f}%',
        f'{total_strategy_return:.2f}%',
        f'{buy_hold_return:.2f}%',
        f'{total_strategy_return - buy_hold_return:.2f}%',
        f'{win_rate_strategy:.2f}%',
        f'{sharpe_like:.2f}',
        f'{max_drawdown:.2f}%',
        f'{val_directional_accuracy:.2f}%',
        f'{accuracy_diff:.2f}%'
    ]
}
summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string(index=False))
print("=" * 60)

## Look-Ahead Bias Verification

Verify that no future information leaked into the model.

In [None]:
# Verify no look-ahead bias
print("Look-Ahead Bias Check:")
print("=" * 60)

# Check 1: Time-aware split
train_end_idx = len(X_train)
val_end_idx = train_end_idx + len(X_val)
test_start_idx = val_end_idx + SEQUENCE_LENGTH

train_end_date = df_features.index[train_end_idx + SEQUENCE_LENGTH - 1]
val_end_date = df_features.index[val_end_idx + SEQUENCE_LENGTH - 1]
test_start_date = df_features.index[test_start_idx]

print("✓ Time-Aware Data Split:")
print(f"  Train set ends: {train_end_date}")
print(f"  Validation set: {train_end_date} to {val_end_date}")
print(f"  Test set starts: {test_start_date}")
print(f"  ✓ No future data in training/validation")

# Check 2: Feature calculation
print("\n✓ Feature Calculation:")
print(f"  Technical indicators calculated from past data only")
print(f"  Moving averages use only historical prices")
print(f"  RSI, MACD calculated from past periods")
print(f"  No forward-looking features")

# Check 3: Sequence creation
print("\n✓ Sequence Creation:")
print(f"  Sequences use only past {SEQUENCE_LENGTH} time steps")
print(f"  Target is {PREDICTION_HORIZON} step ahead")
print(f"  No information from future in input sequences")

# Check 4: Technical indicators from database
print("\n✓ Database Technical Indicators:")
print(f"  Indicators stored in analytics.technical_indicators table")
print(f"  Calculated daily from past market data")
print(f"  No future price information in stored indicators")

print("\n" + "=" * 60)
print("✓ No look-ahead bias detected")
print("  Model uses only past information to predict future")
print("=" * 60)

## 12. Visualization of Predictions

Visualize predictions vs actual prices and returns.

## 13. Save Model

Save the trained model and scalers for future use.

## 14. Model Loading Example

Example of how to load the saved model for inference.

## Model Performance Summary

Complete summary of model performance with recommendations.

In [None]:
# Final Performance Summary
print("=" * 70)
print("LSTM MODEL PERFORMANCE SUMMARY")
print("=" * 70)

# Ensure all required variables exist
if 'y_pred_log_returns' not in globals() or 'y_true_log_returns' not in globals():
    print("Calculating predictions...")
    if 'evaluate_model_updated' not in globals():
        # Define function if needed (same as in cell 28)
        def evaluate_model_updated(model, test_loader, scaler_y, device, df_features):
            model.eval()
            predictions = []
            targets = []
            with torch.no_grad():
                for X_batch, y_batch in test_loader:
                    X_batch = X_batch.to(device)
                    pred = model(X_batch)
                    predictions.extend(pred.cpu().numpy())
                    targets.extend(y_batch.cpu().numpy())
            predictions_log_returns = np.array(predictions)
            targets_log_returns = np.array(targets)
            predictions_log_returns_denorm = scaler_y.inverse_transform(
                predictions_log_returns.reshape(-1, 1)
            ).flatten()
            targets_log_returns_denorm = scaler_y.inverse_transform(
                targets_log_returns.reshape(-1, 1)
            ).flatten()
            test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
            test_end_idx = test_start_idx + len(predictions_log_returns_denorm)
            actual_close_prices = df_features['close'].iloc[test_start_idx:test_end_idx].values
            if test_start_idx > 0:
                initial_close = df_features['close'].iloc[test_start_idx - 1]
            else:
                initial_close = df_features['close'].iloc[0]
            predicted_prices = np.zeros_like(predictions_log_returns_denorm)
            predicted_prices[0] = initial_close * np.exp(predictions_log_returns_denorm[0])
            for i in range(1, len(predictions_log_returns_denorm)):
                predicted_prices[i] = actual_close_prices[i-1] * np.exp(predictions_log_returns_denorm[i])
            return (
                predictions_log_returns_denorm,
                targets_log_returns_denorm,
                predicted_prices,
                actual_close_prices
            )
    y_pred_log_returns, y_true_log_returns, y_pred_prices, y_true_prices = evaluate_model_updated(
        model, test_loader, scaler_y, device, df_features
    )

# Calculate metrics if they don't exist
if 'rmse_prices' not in globals() or 'mae_prices' not in globals():
    mse_prices = mean_squared_error(y_true_prices, y_pred_prices)
    rmse_prices = np.sqrt(mse_prices)
    mae_prices = mean_absolute_error(y_true_prices, y_pred_prices)
    price_mask = np.abs(y_true_prices) > 0.01
    if price_mask.sum() > 0:
        mape_prices = np.mean(np.abs((y_true_prices[price_mask] - y_pred_prices[price_mask]) / y_true_prices[price_mask])) * 100
    else:
        mape_prices = np.nan

if 'directional_accuracy_test' not in globals():
    directional_accuracy_test = np.mean((y_pred_log_returns * y_true_log_returns) > 0) * 100

if 'val_directional_accuracy' not in globals():
    val_directional_accuracy = calculate_directional_accuracy(model, val_loader, device)

if 'accuracy_diff' not in globals():
    accuracy_diff = val_directional_accuracy - directional_accuracy_test

if 'total_strategy_return' not in globals():
    predicted_returns = np.exp(y_pred_log_returns) - 1
    actual_returns = np.exp(y_true_log_returns) - 1
    strategy_returns = np.where(y_pred_log_returns > 0, actual_returns, -actual_returns)
    cumulative_strategy_returns = np.cumprod(1 + strategy_returns) - 1
    total_strategy_return = cumulative_strategy_returns[-1] * 100
    cumulative_actual_returns = np.cumprod(1 + actual_returns) - 1
    buy_hold_return = cumulative_actual_returns[-1] * 100
    win_rate_strategy = np.mean(strategy_returns > 0) * 100
    strategy_mean_return = np.mean(strategy_returns)
    strategy_std_return = np.std(strategy_returns)
    sharpe_like = (strategy_mean_return / strategy_std_return) * np.sqrt(252) if strategy_std_return > 0 else 0
    running_max = np.maximum.accumulate(cumulative_strategy_returns)
    drawdown = cumulative_strategy_returns - running_max
    max_drawdown = np.min(drawdown) * 100

print("\n1. DIRECTIONAL ACCURACY (Key Trading Metric):")
print(f"   Validation Set: {val_directional_accuracy:.2f}%")
print(f"   Test Set: {directional_accuracy_test:.2f}%")
print(f"   Difference: {accuracy_diff:.2f}%")
if accuracy_diff > 5:
    print(f"   ⚠ WARNING: Overfitting detected (>5% difference)")
elif accuracy_diff > 2:
    print(f"   ⚠ Minor overfitting (2-5% difference)")
else:
    print(f"   ✓ Good generalization (<2% difference)")

if directional_accuracy_test >= 55:
    print(f"   ✓ EXCELLENT: Above 55% threshold for profitable strategies")
elif directional_accuracy_test >= 52:
    print(f"   ⚠ CLOSE: Near 55% threshold - monitor closely")
else:
    print(f"   ✗ BELOW THRESHOLD: Need improvement before live trading")

print("\n2. TRADING PERFORMANCE:")
if 'total_strategy_return' in globals():
    print(f"   Strategy Return: {total_strategy_return:.2f}%")
    print(f"   Buy-and-Hold Return: {buy_hold_return:.2f}%")
    print(f"   Excess Return: {total_strategy_return - buy_hold_return:.2f}%")
    print(f"   Win Rate: {win_rate_strategy:.2f}%")
    print(f"   Sharpe-like Ratio: {sharpe_like:.2f}")
    print(f"   Max Drawdown: {max_drawdown:.2f}%")
    
    if total_strategy_return > buy_hold_return:
        print(f"   ✓ Strategy outperforms buy-and-hold")
    else:
        print(f"   ✗ Strategy underperforms buy-and-hold")
else:
    print("   Trading metrics not calculated - run trading performance cell first")

print("\n3. PRICE RECONSTRUCTION QUALITY:")
if 'rmse_prices' in globals():
    print(f"   RMSE: ${rmse_prices:.2f}")
    print(f"   MAE: ${mae_prices:.2f}")
    if 'mape_prices' in globals() and not np.isnan(mape_prices):
        if 'price_mask' in globals():
            print(f"   MAPE: {mape_prices:.2f}% (on {price_mask.sum()}/{len(y_true_prices)} valid prices)")
        else:
            print(f"   MAPE: {mape_prices:.2f}%")
    else:
        print(f"   MAPE: N/A (prices too small)")
else:
    print("   Price metrics not calculated - run evaluation cell first")

print("\n4. MODEL CONFIGURATION:")
print(f"   Target: {TARGET_COLUMN} (stationary)")
print(f"   Loss Function: HuberLoss (robust to outliers)")
print(f"   Monitored Metric: Directional Accuracy")
print(f"   Features: {len(available_features)} multi-variate features")
print(f"   Model Size: {HIDDEN_SIZE} hidden units, {NUM_LAYERS} layers, {DROPOUT} dropout")

print("\n5. RECOMMENDATIONS:")
recommendations = []

if directional_accuracy_test >= 55 and total_strategy_return > 0:
    recommendations.append("✓ Model shows promise - consider paper trading")
    recommendations.append("  - Monitor performance on out-of-sample data")
    recommendations.append("  - Implement proper risk management")
    recommendations.append("  - Consider transaction costs in live trading")
elif directional_accuracy_test >= 55:
    recommendations.append("⚠ Good directional accuracy but negative returns")
    recommendations.append("  - Focus on improving return magnitude prediction")
    recommendations.append("  - Consider position sizing based on confidence")
    recommendations.append("  - May need to filter low-confidence predictions")
else:
    recommendations.append("✗ Directional accuracy below 55% threshold")
    recommendations.append("  - Need to improve model before live trading")
    recommendations.append("  - Consider: more features, different architecture, longer training")

if accuracy_diff > 5:
    recommendations.append("⚠ Address overfitting:")
    recommendations.append(f"  - Increase dropout from {DROPOUT} to {min(0.5, DROPOUT + 0.1)}")
    recommendations.append(f"  - Increase weight decay from {WEIGHT_DECAY:.2e}")
    recommendations.append("  - Consider reducing model complexity")

if max_drawdown < -20:
    recommendations.append("⚠ High drawdown detected:")
    recommendations.append("  - Implement stop-loss mechanisms")
    recommendations.append("  - Consider position sizing limits")
    recommendations.append("  - Add risk management rules")

for rec in recommendations:
    print(f"   {rec}")

print("\n" + "=" * 70)
print("Next Steps:")
print("  1. If directional accuracy > 55% and no overfitting: Proceed to paper trading")
print("  2. If overfitting detected: Increase regularization and retrain")
print("  3. Monitor cumulative returns on new data")
print("  4. Implement proper risk management before live trading")
print("=" * 70)

In [None]:
# UPDATED EVALUATION FUNCTION - Replace the old evaluate_model function with this
def evaluate_model_updated(model, test_loader, scaler_y, device, df_features):
    """
    Evaluate model and return predictions and targets.
    
    Since we're predicting log returns, we need to:
    1. Get actual close prices from test set (ground truth)
    2. Reconstruct predicted prices from log returns
    3. Calculate metrics on actual prices (not reconstructed actual prices)
    """
    model.eval()
    predictions = []
    targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            pred = model(X_batch)
            predictions.extend(pred.cpu().numpy())
            targets.extend(y_batch.numpy())
    
    # Convert to numpy arrays (these are log returns)
    predictions_log_returns = np.array(predictions)
    targets_log_returns = np.array(targets)
    
    # Denormalize log returns
    predictions_log_returns_denorm = scaler_y.inverse_transform(
        predictions_log_returns.reshape(-1, 1)
    ).flatten()
    targets_log_returns_denorm = scaler_y.inverse_transform(
        targets_log_returns.reshape(-1, 1)
    ).flatten()
    
    # Get actual close prices from test set (ground truth)
    test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
    test_end_idx = test_start_idx + len(predictions_log_returns_denorm)
    
    # Get actual close prices from the test set
    actual_close_prices = df_features['close'].iloc[test_start_idx:test_end_idx].values
    
    # Get the close price just before the test set starts (for reconstruction)
    if test_start_idx > 0:
        initial_close = df_features['close'].iloc[test_start_idx - 1]
    else:
        initial_close = df_features['close'].iloc[0]
    
    # Reconstruct predicted prices from log returns: price_t = price_{t-1} * exp(log_return_t)
    predicted_prices = np.zeros_like(predictions_log_returns_denorm)
    predicted_prices[0] = initial_close * np.exp(predictions_log_returns_denorm[0])
    
    for i in range(1, len(predictions_log_returns_denorm)):
        # Use actual previous price for more accurate reconstruction
        predicted_prices[i] = actual_close_prices[i-1] * np.exp(predictions_log_returns_denorm[i])
    
    return (
        predictions_log_returns_denorm,
        targets_log_returns_denorm,
        predicted_prices,
        actual_close_prices  # Return actual prices, not reconstructed
    )

# Evaluate on test set
# Returns: (predicted_log_returns, actual_log_returns, predicted_prices, actual_close_prices)
y_pred_log_returns, y_true_log_returns, y_pred_prices, y_true_prices = evaluate_model_updated(
    model, test_loader, scaler_y, device, df_features
)

# Diagnostic: Verify data integrity
print("Data Integrity Check:")
print("=" * 60)
print(f"Log Returns - Predicted range: [{y_pred_log_returns.min():.6f}, {y_pred_log_returns.max():.6f}]")
print(f"Log Returns - Actual range: [{y_true_log_returns.min():.6f}, {y_true_log_returns.max():.6f}]")
print(f"Predicted prices range: ${y_pred_prices.min():.2f} - ${y_pred_prices.max():.2f}")
print(f"Actual prices range: ${y_true_prices.min():.2f} - ${y_true_prices.max():.2f}")
print(f"\nPrice sanity check:")
print(f"  Actual prices > $1: {(y_true_prices > 1).sum()}/{len(y_true_prices)}")
print(f"  Actual prices < $10000: {(y_true_prices < 10000).sum()}/{len(y_true_prices)}")
print(f"  Prices look reasonable: {y_true_prices.min() > 1 and y_true_prices.max() < 10000}")
print("=" * 60)

# Calculate metrics on log returns (what model predicts)
mse_log_returns = mean_squared_error(y_true_log_returns, y_pred_log_returns)
rmse_log_returns = np.sqrt(mse_log_returns)
mae_log_returns = mean_absolute_error(y_true_log_returns, y_pred_log_returns)

# Calculate metrics on actual prices (ground truth vs reconstructed predictions)
mse_prices = mean_squared_error(y_true_prices, y_pred_prices)
rmse_prices = np.sqrt(mse_prices)
mae_prices = mean_absolute_error(y_true_prices, y_pred_prices)

# MAPE - only calculate if prices are not near zero
# Avoid division by zero and handle small values
price_mask = np.abs(y_true_prices) > 0.01  # Only calculate MAPE for prices > $0.01
if price_mask.sum() > 0:
    mape_prices = np.mean(np.abs((y_true_prices[price_mask] - y_pred_prices[price_mask]) / y_true_prices[price_mask])) * 100
else:
    mape_prices = np.nan

# Directional accuracy (percentage of correct direction predictions)
# This is the KEY metric for trading
directional_accuracy_test = np.mean((y_pred_log_returns * y_true_log_returns) > 0) * 100
directional_accuracy = directional_accuracy_test  # For backward compatibility

# Calculate returns-based metrics (more appropriate for log returns predictions)
# Predicted returns vs actual returns
predicted_returns = np.exp(y_pred_log_returns) - 1  # Convert log returns to simple returns
actual_returns = np.exp(y_true_log_returns) - 1

# Sharpe-like metrics (assuming daily returns)
mean_pred_return = np.mean(predicted_returns)
mean_actual_return = np.mean(actual_returns)
std_pred_return = np.std(predicted_returns)
std_actual_return = np.std(actual_returns)

# Cumulative returns (if we traded on predictions)
cumulative_pred_returns = np.cumprod(1 + predicted_returns) - 1
cumulative_actual_returns = np.cumprod(1 + actual_returns) - 1
total_pred_return = cumulative_pred_returns[-1] * 100
total_actual_return = cumulative_actual_returns[-1] * 100

# Win rate (percentage of profitable predictions)
win_rate = np.mean(predicted_returns > 0) * 100

print("Test Set Performance:")
print("=" * 60)
print("Metrics on Log Returns (what model predicts):")
print(f"  RMSE: {rmse_log_returns:.6f}")
print(f"  MAE: {mae_log_returns:.6f}")
print(f"  Mean Predicted Return: {mean_pred_return*100:.4f}%")
print(f"  Std Predicted Return: {std_pred_return*100:.4f}%")

print("\nMetrics on Actual Prices (ground truth):")
print(f"  RMSE: ${rmse_prices:.2f}")
print(f"  MAE: ${mae_prices:.2f}")
if not np.isnan(mape_prices):
    print(f"  MAPE: {mape_prices:.2f}% (calculated on {price_mask.sum()}/{len(y_true_prices)} valid prices)")
else:
    print(f"  MAPE: N/A (prices too small for meaningful MAPE)")

print(f"\nDirectional Accuracy (TEST SET): {directional_accuracy_test:.2f}%")
if directional_accuracy_test > 50:
    print(f"  ✓ Model beats random (50%) by {directional_accuracy_test - 50:.2f}%")
    if directional_accuracy_test >= 55:
        print(f"  ✓ This is above the 55% threshold for potentially profitable strategies")
    else:
        print(f"  ⚠ Close to 55% threshold - monitor closely")
else:
    print(f"  ✗ Model performs worse than random")

print(f"\nReturns-Based Metrics:")
print(f"  Total Cumulative Return (if traded): {total_pred_return:.2f}%")
print(f"  Actual Market Return: {total_actual_return:.2f}%")
print(f"  Win Rate: {win_rate:.2f}% (percentage of positive return predictions)")

print("\n" + "=" * 60)
print("IMPORTANT NOTES:")
print("  - MAPE can be misleading for returns-based predictions or when prices are small")
print("  - Focus on Directional Accuracy (57%+ is good for trading)")
print("  - Cumulative Returns show actual trading performance")
print("  - MAE/RMSE on prices show reconstruction quality")
print("=" * 60)

In [None]:
# UPDATED VISUALIZATION - Replace old visualization with this
# Create time index for test set
test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
test_indices = df_features.index[test_start_idx:test_start_idx + len(y_pred_prices)]

# Plot predictions vs actual
fig, axes = plt.subplots(3, 1, figsize=(14, 12))

# Plot 1: Actual vs Predicted Prices
axes[0].plot(test_indices, y_true_prices, label='Actual Price (Ground Truth)', alpha=0.7, linewidth=1.5)
axes[0].plot(test_indices, y_pred_prices, label='Predicted Price (from Log Returns)', alpha=0.7, linewidth=1.5)
axes[0].set_xlabel('Date')
axes[0].set_ylabel('Price ($)')
axes[0].set_title(f'{SYMBOL} - Actual vs Predicted Prices (Test Set)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].tick_params(axis='x', rotation=45)

# Plot 2: Log Returns (what model actually predicts)
axes[1].plot(test_indices, y_true_log_returns, label='Actual Log Returns', alpha=0.7, linewidth=1.5)
axes[1].plot(test_indices, y_pred_log_returns, label='Predicted Log Returns', alpha=0.7, linewidth=1.5)
axes[1].axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
axes[1].set_xlabel('Date')
axes[1].set_ylabel('Log Returns')
axes[1].set_title('Actual vs Predicted Log Returns (Model Target)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].tick_params(axis='x', rotation=45)

# Plot 3: Residuals
residuals = y_true_prices - y_pred_prices
axes[2].plot(test_indices, residuals, alpha=0.7, color='red', linewidth=1)
axes[2].axhline(y=0, color='black', linestyle='--', linewidth=1)
axes[2].set_xlabel('Date')
axes[2].set_ylabel('Residual ($)')
axes[2].set_title('Price Residuals (Actual - Predicted)')
axes[2].grid(True, alpha=0.3)
axes[2].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# Additional visualizations
plt.figure(figsize=(18, 5))

plt.subplot(1, 3, 1)
plt.hist(residuals, bins=50, alpha=0.7, edgecolor='black')
plt.xlabel('Price Residual ($)')
plt.ylabel('Frequency')
plt.title('Distribution of Price Residuals')
plt.axvline(x=0, color='red', linestyle='--', linewidth=2)
plt.grid(True, alpha=0.3)

plt.subplot(1, 3, 2)
plt.scatter(y_pred_log_returns, y_true_log_returns, alpha=0.5)
plt.xlabel('Predicted Log Returns')
plt.ylabel('Actual Log Returns')
plt.title('Log Returns: Predicted vs Actual')
plt.plot([y_true_log_returns.min(), y_true_log_returns.max()], 
         [y_true_log_returns.min(), y_true_log_returns.max()], 
         'r--', linewidth=2, label='Perfect Prediction')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 4, 3)
# Direction accuracy visualization
correct_direction = (y_pred_log_returns * y_true_log_returns) > 0
plt.scatter(y_true_log_returns[correct_direction], y_pred_log_returns[correct_direction], 
           alpha=0.5, color='green', label='Correct Direction', s=20)
plt.scatter(y_true_log_returns[~correct_direction], y_pred_log_returns[~correct_direction], 
           alpha=0.5, color='red', label='Wrong Direction', s=20)
plt.xlabel('Actual Log Returns')
plt.ylabel('Predicted Log Returns')
plt.title(f'Directional Accuracy: {directional_accuracy:.1f}%')
plt.axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
plt.axvline(x=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 4, 4)
# Cumulative returns comparison
test_indices_short = test_indices[:len(cumulative_pred_returns)]
plt.plot(test_indices_short, cumulative_pred_returns * 100, label='Predicted Strategy', alpha=0.7, linewidth=1.5)
plt.plot(test_indices_short, cumulative_actual_returns * 100, label='Actual Market', alpha=0.7, linewidth=1.5)
plt.xlabel('Date')
plt.ylabel('Cumulative Return (%)')
plt.title('Cumulative Returns Comparison')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 13. Save Model (Updated)

Save the trained model with updated metrics tracking directional accuracy.

In [None]:
# Create models directory if it doesn't exist
models_dir = project_root / "models"
models_dir.mkdir(exist_ok=True)

# Save model with updated metrics
model_path = models_dir / f"lstm_{SYMBOL.lower()}_model.pth"
torch.save({
    'model_state_dict': model.state_dict(),
    'model_config': {
        'input_size': INPUT_SIZE,
        'hidden_size': HIDDEN_SIZE,
        'num_layers': NUM_LAYERS,
        'dropout': DROPOUT,
    },
    'scaler_X': scaler_X,
    'scaler_y': scaler_y,
    'sequence_length': SEQUENCE_LENGTH,
    'feature_columns': available_features,
    'target_column': TARGET_COLUMN,  # 'log_returns' - stationary target
    'symbol': SYMBOL,
    'loss_function': 'HuberLoss',  # Robust to outliers
    'monitored_metric': 'directional_accuracy',  # What we optimize for
    'training_metrics': {
        'best_val_directional_accuracy': best_directional_accuracy,
        'final_train_loss': train_losses[-1],
        'final_val_loss': val_losses[-1],
        'final_val_directional_accuracy': val_directional_accuracies[-1],
        'test_rmse_log_returns': rmse_log_returns,
        'test_mae_log_returns': mae_log_returns,
        'test_rmse_prices': rmse_prices,
        'test_mae_prices': mae_prices,
        'test_mape_prices': mape_prices if not np.isnan(mape_prices) else None,
        'direction_accuracy': directional_accuracy,
        'total_cumulative_return': total_pred_return,
        'win_rate': win_rate,
    }
}, model_path)

print(f"Model saved to: {model_path}")
print(f"Model size: {model_path.stat().st_size / 1024 / 1024:.2f} MB")
print(f"\nKey improvements in this model:")
print(f"  - Target: {TARGET_COLUMN} (stationary)")
print(f"  - Loss: HuberLoss (robust to outliers)")
print(f"  - Monitored: Directional Accuracy ({best_directional_accuracy:.2f}%)")

## 11. Model Evaluation

Evaluate on test set and calculate financial metrics.

## 12. Visualization of Predictions

Visualize predictions vs actual prices.

## 13. Save Model

Save the trained model and scalers for future use.

## 14. Model Loading Example

Example of how to load the saved model for inference.

## 7. Training Setup

Define loss function, optimizer, and learning rate scheduler.

In [None]:
# Loss function (MSE for regression)
criterion = nn.MSELoss()

# Optimizer (Adam with weight decay for regularization)
LEARNING_RATE = 0.001
WEIGHT_DECAY = 1e-5
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Learning rate scheduler (reduce on plateau)
# Note: verbose parameter is not available in PyTorch's ReduceLROnPlateau
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5
)

# Training parameters
NUM_EPOCHS = 50
EARLY_STOPPING_PATIENCE = 10
MIN_DELTA = 1e-6

print(f"Training configuration:")
print(f"  Loss function: MSE")
print(f"  Optimizer: Adam (lr={LEARNING_RATE}, weight_decay={WEIGHT_DECAY})")
print(f"  Scheduler: ReduceLROnPlateau")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Early stopping patience: {EARLY_STOPPING_PATIENCE}")

## 8. Training Loop

Train the model with early stopping and validation monitoring.

In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    """Train for one epoch."""
    model.train()
    total_loss = 0.0
    n_batches = 0
    
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        # Forward pass
        optimizer.zero_grad()
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        n_batches += 1
    
    return total_loss / n_batches

def validate(model, val_loader, criterion, device):
    """Validate the model."""
    model.eval()
    total_loss = 0.0
    n_batches = 0
    
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            predictions = model(X_batch)
            loss = criterion(predictions, y_batch)
            total_loss += loss.item()
            n_batches += 1
    
    return total_loss / n_batches

# Training loop
train_losses = []
val_losses = []
best_val_loss = float('inf')
patience_counter = 0
best_model_state = None  # Initialize to None, will be set on first improvement

print("Starting training...")
print("=" * 60)

for epoch in range(NUM_EPOCHS):
    # Train
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(train_loss)
    
    # Validate
    val_loss = validate(model, val_loader, criterion, device)
    val_losses.append(val_loss)
    
    # Learning rate scheduling
    scheduler.step(val_loss)
    
    # Early stopping check
    if val_loss < best_val_loss - MIN_DELTA:
        best_val_loss = val_loss
        patience_counter = 0
        # Save best model (in practice, save to disk)
        best_model_state = model.state_dict().copy()
    else:
        patience_counter += 1
    
    # Print progress
    if (epoch + 1) % 5 == 0 or epoch == 0:
        print(
            f"Epoch [{epoch+1}/{NUM_EPOCHS}] | "
            f"Train Loss: {train_loss:.6f} | "
            f"Val Loss: {val_loss:.6f} | "
            f"LR: {optimizer.param_groups[0]['lr']:.6f}"
        )
    
    # Early stopping
    if patience_counter >= EARLY_STOPPING_PATIENCE:
        print(f"\nEarly stopping at epoch {epoch+1}")
        if best_model_state is not None:
            model.load_state_dict(best_model_state)
            print("Loaded best model state")
        else:
            print("Warning: No model improvement found, using current model state")
        break

print("=" * 60)
print("Training completed!")

## 9. Training Visualization

Plot training and validation loss curves.

In [None]:
# Plot training curves
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss', alpha=0.7)
plt.plot(val_losses, label='Validation Loss', alpha=0.7)
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(train_losses, label='Train Loss', alpha=0.7)
plt.plot(val_losses, label='Validation Loss', alpha=0.7)
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE) - Log Scale')
plt.title('Training and Validation Loss (Log Scale)')
plt.yscale('log')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Final train loss: {train_losses[-1]:.6f}")
print(f"Final validation loss: {val_losses[-1]:.6f}")
print(f"Best validation loss: {best_val_loss:.6f}")

## 10. Model Evaluation

Evaluate on test set and calculate financial metrics.

In [None]:
def evaluate_model(model, test_loader, scaler_y, device):
    """Evaluate model and return predictions and targets (denormalized)."""
    model.eval()
    predictions = []
    targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            pred = model(X_batch)
            predictions.extend(pred.cpu().numpy())
            targets.extend(y_batch.numpy())
    
    # Convert to numpy arrays
    predictions = np.array(predictions)
    targets = np.array(targets)
    
    # Denormalize
    predictions_denorm = scaler_y.inverse_transform(predictions.reshape(-1, 1)).flatten()
    targets_denorm = scaler_y.inverse_transform(targets.reshape(-1, 1)).flatten()
    
    return predictions_denorm, targets_denorm

# Evaluate on test set
y_pred_test, y_true_test = evaluate_model(model, test_loader, scaler_y, device)

# Calculate metrics
mse = mean_squared_error(y_true_test, y_pred_test)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_true_test, y_pred_test)
mape = np.mean(np.abs((y_true_test - y_pred_test) / y_true_test)) * 100

# Directional accuracy (percentage of correct direction predictions)
returns_true = np.diff(y_true_test) / y_true_test[:-1]
returns_pred = np.diff(y_pred_test) / y_pred_test[:-1]
direction_accuracy = np.mean((returns_true * returns_pred) > 0) * 100

print("Test Set Performance:")
print("=" * 60)
print(f"RMSE: ${rmse:.2f}")
print(f"MAE: ${mae:.2f}")
print(f"MAPE: {mape:.2f}%")
print(f"Directional Accuracy: {direction_accuracy:.2f}%")
print("=" * 60)

## 11. Visualization of Predictions

Visualize predictions vs actual prices.

In [None]:
# Create time index for test set (approximate, based on sequence length)
test_start_idx = len(X_train) + len(X_val) + SEQUENCE_LENGTH
test_indices = df_features.index[test_start_idx:test_start_idx + len(y_pred_test)]

# Plot predictions vs actual
fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# Full test set
axes[0].plot(test_indices, y_true_test, label='Actual', alpha=0.7, linewidth=1.5)
axes[0].plot(test_indices, y_pred_test, label='Predicted', alpha=0.7, linewidth=1.5)
axes[0].set_xlabel('Date')
axes[0].set_ylabel('Price ($)')
axes[0].set_title(f'{SYMBOL} - Actual vs Predicted Prices (Test Set)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].tick_params(axis='x', rotation=45)

# Residuals
residuals = y_true_test - y_pred_test
axes[1].plot(test_indices, residuals, alpha=0.7, color='red', linewidth=1)
axes[1].axhline(y=0, color='black', linestyle='--', linewidth=1)
axes[1].set_xlabel('Date')
axes[1].set_ylabel('Residual ($)')
axes[1].set_title('Residuals (Actual - Predicted)')
axes[1].grid(True, alpha=0.3)
axes[1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# Distribution of residuals
plt.figure(figsize=(10, 4))

plt.subplot(1, 2, 1)
plt.hist(residuals, bins=50, alpha=0.7, edgecolor='black')
plt.xlabel('Residual ($)')
plt.ylabel('Frequency')
plt.title('Distribution of Residuals')
plt.axvline(x=0, color='red', linestyle='--', linewidth=2)
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.scatter(y_pred_test, residuals, alpha=0.5)
plt.xlabel('Predicted Price ($)')
plt.ylabel('Residual ($)')
plt.title('Residuals vs Predicted Values')
plt.axhline(y=0, color='red', linestyle='--', linewidth=2)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 12. Save Model

Save the trained model and scalers for future use.

In [None]:
# Create models directory if it doesn't exist
models_dir = project_root / "models"
models_dir.mkdir(exist_ok=True)

# Save model
model_path = models_dir / f"lstm_{SYMBOL.lower()}_model.pth"
torch.save({
    'model_state_dict': model.state_dict(),
    'model_config': {
        'input_size': INPUT_SIZE,
        'hidden_size': HIDDEN_SIZE,
        'num_layers': NUM_LAYERS,
        'dropout': DROPOUT,
    },
    'scaler_X': scaler_X,
    'scaler_y': scaler_y,
    'sequence_length': SEQUENCE_LENGTH,
    'feature_columns': available_features,
    'target_column': TARGET_COLUMN,
    'symbol': SYMBOL,
    'training_metrics': {
        'best_val_loss': best_val_loss,
        'final_train_loss': train_losses[-1],
        'final_val_loss': val_losses[-1],
        'test_rmse': rmse,
        'test_mae': mae,
        'test_mape': mape,
        'direction_accuracy': direction_accuracy,
    }
}, model_path)

print(f"Model saved to: {model_path}")
print(f"Model size: {model_path.stat().st_size / 1024 / 1024:.2f} MB")

## 13. Model Loading Example

Example of how to load the saved model for inference.

In [None]:
# Example: Load model for inference
def load_model_for_inference(model_path: Path):
    """Load a saved model for inference."""
    checkpoint = torch.load(model_path, map_location=device)
    
    # Recreate model
    model_config = checkpoint['model_config']
    model = LSTMPredictor(**model_config)
    model.load_state_dict(checkpoint['model_state_dict'])
    model = model.to(device)
    model.eval()
    
    return model, checkpoint

# Load model (example)
# loaded_model, checkpoint = load_model_for_inference(model_path)
# print("Model loaded successfully!")
# print(f"Model was trained on: {checkpoint['symbol']}")
# print(f"Test RMSE: ${checkpoint['training_metrics']['test_rmse']:.2f}")

## Notes and Next Steps

### Improvements to Consider:
1. **Hyperparameter Optimization**: Use Optuna or grid search to find optimal hyperparameters
2. **Feature Selection**: Identify most important features using SHAP or feature importance
3. **Ensemble Methods**: Combine multiple models for better predictions
4. **Multi-step Forecasting**: Predict multiple steps ahead
5. **Regime Detection**: Adapt model based on market conditions
6. **Attention Mechanisms**: Add attention layers to focus on important time steps
7. **Transformer Models**: Experiment with Transformer architectures
8. **Walk-Forward Validation**: Implement proper walk-forward analysis for backtesting

### Financial Metrics to Add:
- Sharpe Ratio
- Sortino Ratio
- Maximum Drawdown
- Win Rate
- Profit Factor

### Model Deployment:
- Create a prediction service/API
- Set up model versioning with MLflow
- Implement real-time inference pipeline
- Add model monitoring and retraining schedule