I chose to make a model that takes in 5 years of stock data and will decide whether to buy, sell, or hold that stock position.

I am comparing a baseline Linear NN with two more advanced models to see whether predictions and accuracy improve with more complexity.

In [3]:
%pip install pandas numpy yfinance scikit-learn torch matplotlib


Note: you may need to restart the kernel to use updated packages.


In [4]:
import pandas as pd
import numpy as np
import yfinance as yf
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error


In [5]:
config = {
    'ticker': 'TSLA',
    'benchmark': '^GSPC',
    'start_date': '2020-01-01',
    'end_date': '2025-03-15',
    'seq_length': 30,
    'batch_size': 64,
    'hidden_size': 128,
    'num_layers': 3,
    'dropout': 0.4,
    'num_epochs': 200,
    'learning_rate': 0.0005,
    'train_ratio': 0.8
}

# Choose Stock and Download Data

In [None]:
def fetch_and_preprocess_data():
    """Fetch and preprocess data with enhanced technical indicators"""
    # Fetch data
    stock = yf.download(config['ticker'], start=config['start_date'], end=config['end_date'])
    bench = yf.download(config['benchmark'], start=config['start_date'], end=config['end_date'])

    # Calculate features
    df = stock[['Open', 'High', 'Low', 'Close', 'Volume']].copy()
    df['Bench_Close'] = bench['Close']

    # Technical indicators - original ones
    df['SMA_20'] = df['Close'].rolling(20).mean()
    df['EMA_20'] = df['Close'].ewm(span=20, adjust=False).mean()
    df['RSI'] = calculate_rsi(df['Close'])
    df['MACD'] = calculate_macd(df['Close'])
    df['ATR'] = calculate_atr(df)
    df['OBV'] = calculate_obv(df)

    # Additional technical indicators
    df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close'])
    df['Stoch_K'], df['Stoch_D'] = calculate_stochastic_oscillator(df)
    df['ADX'] = calculate_adx(df)
    df['Daily_Return'] = df['Close'].pct_change() * 100
    df['Volatility_21'] = df['Daily_Return'].rolling(21).std()
    df['Price_to_SMA_20'] = df['Close'] / df['SMA_20']
    df['Bench_Return'] = bench['Close'].pct_change() * 100
    df['Volume_Change'] = df['Volume'].pct_change() * 100
    df['High_Low_Ratio'] = df['High'] / df['Low']
    df['SMA_50'] = df['Close'].rolling(50).mean()
    df['EMA_50'] = df['Close'].ewm(span=50, adjust=False).mean()

    # Price momentum features
    df['Price_Momentum_5'] = df['Close'].pct_change(5) * 100
    df['Price_Momentum_10'] = df['Close'].pct_change(10) * 100
    df['Price_Momentum_20'] = df['Close'].pct_change(20) * 100

    # Target: Next day's percentage change
    df['Target'] = ((df['Close'].shift(-1) - df['Close']) / df['Close']) * 100
    df.dropna(inplace=True)

    df.columns = df.columns.get_level_values(0)


    return df


In [None]:
def calculate_rsi(series, window=14):
    """Relative Strength Index"""
    delta = series.diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(window).mean()
    avg_loss = loss.rolling(window).mean()

    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def calculate_macd(series, fast=12, slow=26, signal=9):
    """MACD Indicator"""
    ema_fast = series.ewm(span=fast, adjust=False).mean()
    ema_slow = series.ewm(span=slow, adjust=False).mean()
    macd = ema_fast - ema_slow
    return macd.ewm(span=signal, adjust=False).mean()

def calculate_atr(df, window=14):
    """Average True Range"""
    high_low = df['High'] - df['Low']
    high_close = (df['High'] - df['Close'].shift()).abs()
    low_close = (df['Low'] - df['Close'].shift()).abs()
    tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
    return tr.rolling(window).mean()

def calculate_obv(df):
    """On-Balance Volume"""
    obv = (np.sign(df['Close'].diff()) * df['Volume']).cumsum()
    return obv

def calculate_bollinger_bands(close, window=20, num_std=2):
    """Calculate Bollinger Bands"""
    sma = close.rolling(window).mean()
    std = close.rolling(window).std()
    upper = sma + (std * num_std)
    lower = sma - (std * num_std)
    return upper, sma, lower

def calculate_stochastic_oscillator(df, k_window=14, d_window=3):
    """Calculate Stochastic Oscillator"""
    low_min = df['Low'].rolling(k_window).min()
    high_max = df['High'].rolling(k_window).max()

    # Calculate %K
    k = 100 * ((df['Close'] - low_min) / (high_max - low_min))

    # Calculate %D
    d = k.rolling(d_window).mean()

    return k, d

def calculate_adx(df, window=14):
    """Average Directional Index"""
    df = df.copy()

    # True Range
    df['TR'] = calculate_atr(df, 1)

    # +DM and -DM
    df['High_Shift'] = df['High'].shift(1)
    df['Low_Shift'] = df['Low'].shift(1)

    # Directional Movement
    df['+DM'] = np.where((df['High'] - df['High_Shift']) > (df['Low_Shift'] - df['Low']), 
                         np.maximum(df['High'] - df['High_Shift'], 0), 0)
    df['-DM'] = np.where((df['Low_Shift'] - df['Low']) > (df['High'] - df['High_Shift']),
                        np.maximum(df['Low_Shift'] - df['Low'], 0), 0)

    # Smoothed values
    df['+DI'] = 100 * (df['+DM'].rolling(window).sum() / df['TR'].rolling(window).sum())
    df['-DI'] = 100 * (df['-DM'].rolling(window).sum() / df['TR'].rolling(window).sum())

    # Directional Index
    df['DX'] = 100 * (abs(df['+DI'] - df['-DI']) / (df['+DI'] + df['-DI']))

    # Average Directional Index
    adx = df['DX'].rolling(window).mean()

    return adx


In [8]:
def create_sequences(data, target, seq_length):
    """Create time series sequences"""
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(target[i+seq_length])
    return np.array(X), np.array(y)


In [9]:
class EnhancedLSTM(nn.Module):
    """LSTM model with regularization and deep architecture"""
    def __init__(self, input_size, hidden_size, num_layers, dropout):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                           batch_first=True, dropout=dropout)
        self.attention = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1),
            nn.Softmax(dim=1)
        )
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.LayerNorm(64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        out, _ = self.lstm(x)
        attn_weights = self.attention(out)
        context = torch.sum(attn_weights * out, dim=1)
        return self.fc(context)


In [10]:
def train_model():
    # Data preparation
    df = fetch_and_preprocess_data()
    features = df.columns.drop('Target').tolist()
    target = df['Target'].values

    # Scaling
    scaler = RobustScaler()
    scaled_features = scaler.fit_transform(df[features])

    # Sequence creation
    X, y = create_sequences(scaled_features, target, config['seq_length'])

    # Train-test split
    split_idx = int(len(X) * config['train_ratio'])
    X_train, y_train = X[:split_idx], y[:split_idx]
    X_test, y_test = X[split_idx:], y[split_idx:]

    # Convert to tensors
    X_train = torch.FloatTensor(X_train)
    y_train = torch.FloatTensor(y_train)
    X_test = torch.FloatTensor(X_test)
    y_test = torch.FloatTensor(y_test)

    # Model setup
    model = EnhancedLSTM(
        input_size=X_train.shape[2],
        hidden_size=config['hidden_size'],
        num_layers=config['num_layers'],
        dropout=config['dropout']
    )
    criterion = nn.HuberLoss()
    optimizer = optim.AdamW(model.parameters(), lr=config['learning_rate'])
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10)

    # Training loop
    best_loss = float('inf')
    for epoch in range(config['num_epochs']):
        model.train()
        batch_loss = []
        for i in range(0, len(X_train), config['batch_size']):
            X_batch = X_train[i:i+config['batch_size']]
            y_batch = y_train[i:i+config['batch_size']]

            optimizer.zero_grad()
            outputs = model(X_batch).flatten()
            loss = criterion(outputs, y_batch)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            batch_loss.append(loss.item())

        # Validation
        model.eval()
        with torch.no_grad():
            test_preds = model(X_test).flatten()
            test_loss = criterion(test_preds, y_test)
            scheduler.step(test_loss)

            # Metrics
            mae = mean_absolute_error(y_test, test_preds)
            rmse = np.sqrt(mean_squared_error(y_test, test_preds))
            mape = np.mean(np.abs((y_test - test_preds) / np.abs(y_test))) * 100

        if test_loss < best_loss:
            best_loss = test_loss
            torch.save(model.state_dict(), 'best_lstm_model.pth')

        if epoch % 10 == 0:
            print(f"Epoch {epoch+1}/{config['num_epochs']}")
            print(f"Train Loss: {np.mean(batch_loss):.4f} | Test Loss: {test_loss:.4f}")
            print(f"MAE: {mae:.2f}% | RMSE: {rmse:.2f}% | MAPE: {mape:.2f}%\n")

    print(f"Training complete. Best validation loss: {best_loss:.4f}")
    return model, scaler


In [11]:

def predict_next_day(model, scaler, ticker, date):
    """Predict percentage change for next trading day"""
    # Fetch latest data
    end_date = pd.to_datetime(date)
    start_date = end_date - pd.DateOffset(days=config['seq_length']*2)

    stock = yf.download(ticker, start=start_date, end=end_date)
    bench = yf.download(config['benchmark'], start=start_date, end=end_date)

    # Preprocess
    df = stock[['Open', 'High', 'Low', 'Close', 'Volume']].copy()
    df['Bench_Close'] = bench['Close']

    # Add technical indicators
    df = df.join(pd.DataFrame({
        'SMA_20': df['Close'].rolling(20).mean(),
        'EMA_20': df['Close'].ewm(span=20, adjust=False).mean(),
        'RSI': calculate_rsi(df['Close']),
        'MACD': calculate_macd(df['Close']),
        'ATR': calculate_atr(df),
        'OBV': calculate_obv(df)
    }))

    # Get last sequence
    seq = df.iloc[-config['seq_length']:][features]
    seq_scaled = scaler.transform(seq)

    # Predict
    model.eval()
    with torch.no_grad():
        prediction = model(torch.FloatTensor(seq_scaled).unsqueeze(0)).item()

    return prediction


# Main

In [12]:
trained_model, feature_scaler = train_model()

# Example prediction
prediction_date = '2025-03-16'
pred_pct = predict_next_day(trained_model, feature_scaler,
                        config['ticker'], prediction_date)

print(f"\nPredicted percentage change for {prediction_date}: {pred_pct:.2f}%")
print("Trading Recommendation:")
if pred_pct > 1.5:
    print("Strong Buy")
elif pred_pct > 0.5:
    print("Buy")
elif pred_pct < -1.5:
    print("Strong Sell")
elif pred_pct < -0.5:
    print("Sell")
else:
    print("Hold")


YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
  mape = np.mean(np.abs((y_test - test_preds) / np.abs(y_test))) * 100


TypeError: mean() received an invalid combination of arguments - got (out=NoneType, dtype=NoneType, axis=NoneType, ), but expected one of:
 * (*, torch.dtype dtype = None)
 * (tuple of ints dim, bool keepdim = False, *, torch.dtype dtype = None)
 * (tuple of names dim, bool keepdim = False, *, torch.dtype dtype = None)


In [None]:
def prepare_data(ticker, benchmark, start_date, end_date='2025-03-15', seq_length=30, train_ratio=0.8):
    """Prepare data for model training with proper feature engineering"""
    # Fetch data
    stock_data = yf.download(ticker, start=start_date, end=end_date)
    benchmark_data = yf.download(benchmark, start=start_date, end=end_date)
    
    # Basic features
    df = stock_data[['Open', 'High', 'Low', 'Close', 'Volume']].copy()
    df['Bench_Close'] = benchmark_data['Close']
    
    # Technical indicators - original ones
    df['SMA_20'] = df['Close'].rolling(20).mean()
    df['EMA_20'] = df['Close'].ewm(span=20, adjust=False).mean()
    df['RSI'] = calculate_rsi(df['Close'])
    df['MACD'] = calculate_macd(df['Close'])
    df['ATR'] = calculate_atr(df)
    df['OBV'] = calculate_obv(df)
    
    # Additional technical indicators
    df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close'])
    df['Stoch_K'], df['Stoch_D'] = calculate_stochastic_oscillator(df)
    df['ADX'] = calculate_adx(df)
    df['Daily_Return'] = df['Close'].pct_change() * 100
    df['Volatility_21'] = df['Daily_Return'].rolling(21).std()
    df['Price_to_SMA_20'] = df['Close'] / df['SMA_20']
    df['Bench_Return'] = df['Bench_Close'].pct_change() * 100
    df['Volume_Change'] = df['Volume'].pct_change() * 100
    df['High_Low_Ratio'] = df['High'] / df['Low']
    df['SMA_50'] = df['Close'].rolling(50).mean()
    df['EMA_50'] = df['Close'].ewm(span=50, adjust=False).mean()
    
    # Price momentum features
    df['Price_Momentum_5'] = df['Close'].pct_change(5) * 100
    df['Price_Momentum_10'] = df['Close'].pct_change(10) * 100
    df['Price_Momentum_20'] = df['Close'].pct_change(20) * 100
    
    # Target: Next day's percentage change
    df['Target'] = ((df['Close'].shift(-1) - df['Close']) / df['Close']) * 100
    
    # Drop NaN values
    df.dropna(inplace=True)
    
    # Save Close prices for later visualization
    close_prices = df['Close'].values
    
    # Features and target
    features = df.drop('Target', axis=1)
    feature_names = features.columns.tolist()
    target = df['Target'].values
    
    # Scaling features
    scaler = RobustScaler()
    scaled_features = scaler.fit_transform(features)
    
    # Create sequences
    X, y = create_sequences(scaled_features, target, seq_length)
    
    # Train-test split
    split_idx = int(len(X) * train_ratio)
    X_train, y_train = X[:split_idx], y[:split_idx]
    X_test, y_test = X[split_idx:], y[split_idx:]
    
    return X_train, y_train, X_test, y_test, scaler, feature_names, close_prices, df.index[seq_length:]

def visualize_predictions(model, X_test, y_test, close_prices, dates):
    """Visualize actual vs predicted price changes and absolute prices"""
    import matplotlib.pyplot as plt
    from matplotlib.dates import DateFormatter
    
    # Get predictions
    model.eval()
    with torch.no_grad():
        predictions = model(torch.FloatTensor(X_test)).flatten().numpy()
    
    # Calculate the test date range
    test_dates = dates[-len(y_test):]
    
    # Get the actual close prices for the test period
    test_close = close_prices[-len(y_test)-1:]
    
    # Calculate the predicted prices
    predicted_prices = []
    actual_prices = []
    
    for i in range(len(predictions)):
        # Calculate the predicted close price using the percentage change
        pred_close = test_close[i] * (1 + predictions[i]/100)
        actual_close = test_close[i] * (1 + y_test[i]/100)
        
        predicted_prices.append(pred_close)
        actual_prices.append(actual_close)
    
    # Create a figure with two subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
    
    # Plot percentage changes
    ax1.plot(test_dates, y_test, label='Actual % Change', color='blue', alpha=0.7)
    ax1.plot(test_dates, predictions, label='Predicted % Change', color='red', alpha=0.7)
    ax1.set_title('Actual vs Predicted Price Changes (%)', fontsize=16)
    ax1.set_ylabel('Percentage Change (%)')
    ax1.legend()
    ax1.grid(True)
    
    # Plot absolute prices
    ax2.plot(test_dates, test_close[1:], label='Actual Close Price', color='blue', alpha=0.7)
    ax2.plot(test_dates, predicted_prices, label='Predicted Close Price', color='red', alpha=0.7)
    ax2.set_title('Actual vs Predicted Stock Price', fontsize=16)
    ax2.set_xlabel('Date')
    ax2.set_ylabel('Price ($)')
    ax2.legend()
    ax2.grid(True)
    
    # Format x-axis dates
    date_format = DateFormatter('%Y-%m-%d')
    ax1.xaxis.set_major_formatter(date_format)
    ax2.xaxis.set_major_formatter(date_format)
    
    # Rotate date labels for better readability
    plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45)
    plt.setp(ax2.xaxis.get_majorticklabels(), rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # Calculate metrics
    mae = mean_absolute_error(y_test, predictions)
    rmse = np.sqrt(mean_squared_error(y_test, predictions))
    mape = np.mean(np.abs((y_test - predictions) / (np.abs(y_test) + 1e-7))) * 100
    
    print(f"Model Performance Metrics:")
    print(f"MAE: {mae:.2f}% | RMSE: {rmse:.2f}% | MAPE: {mape:.2f}%")

# Main execution cells
# Replace the train_model call and subsequent code with:

# Prepare data
X_train, y_train, X_test, y_test, scaler, feature_names, close_prices, dates = prepare_data(
    ticker=config['ticker'],
    benchmark=config['benchmark'],
    start_date=config['start_date'],
    seq_length=config['seq_length']
)

# Convert to tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test)

# Model setup
model = EnhancedLSTM(
    input_size=X_train.shape[2],
    hidden_size=config['hidden_size'],
    num_layers=config['num_layers'],
    dropout=config['dropout']
)
criterion = nn.HuberLoss()
optimizer = optim.AdamW(model.parameters(), lr=config['learning_rate'])
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10)

# Training loop
best_loss = float('inf')
for epoch in range(config['num_epochs']):
    model.train()
    batch_loss = []
    for i in range(0, len(X_train), config['batch_size']):
        X_batch = X_train_tensor[i:i+config['batch_size']]
        y_batch = y_train_tensor[i:i+config['batch_size']]
        
        optimizer.zero_grad()
        outputs = model(X_batch).flatten()
        loss = criterion(outputs, y_batch)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        batch_loss.append(loss.item())
    
    # Validation
    model.eval()
    with torch.no_grad():
        test_preds = model(X_test_tensor).flatten()
        test_loss = criterion(test_preds, y_test_tensor)
        scheduler.step(test_loss)
        
        # Metrics
        mae = mean_absolute_error(y_test, test_preds.numpy())
        rmse = np.sqrt(mean_squared_error(y_test, test_preds.numpy()))
        mape = np.mean(np.abs((y_test - test_preds.numpy()) / (np.abs(y_test) + 1e-7))) * 100
    
    if test_loss < best_loss:
        best_loss = test_loss
        torch.save(model.state_dict(), 'best_lstm_model.pth')
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch+1}/{config['num_epochs']}")
        print(f"Train Loss: {np.mean(batch_loss):.4f} | Test Loss: {test_loss:.4f}")
        print(f"MAE: {mae:.2f}% | RMSE: {rmse:.2f}% | MAPE: {mape:.2f}%\n")

print(f"Training complete. Best validation loss: {best_loss:.4f}")

# Visualize the results
visualize_predictions(model, X_test, y_test, close_prices, dates)

# Example prediction
prediction_date = '2025-03-16'
pred_pct = predict_next_day(model, scaler, config['ticker'], prediction_date)

print(f"\nPredicted percentage change for {prediction_date}: {pred_pct:.2f}%")
print("Trading Recommendation:")
if pred_pct > 1.5:
    print("Strong Buy")
elif pred_pct > 0.5:
    print("Buy")
elif pred_pct < -1.5:
    print("Strong Sell")
elif pred_pct < -0.5:
    print("Sell")
else:
    print("Hold")