In [3]:
import yfinance as yf
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, LeakyReLU
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import matplotlib.pyplot as plt


# Fetch stock data from Yahoo Finance
def fetch_stock_data(ticker, start_date, end_date):
    data = yf.download(ticker, start=start_date, end=end_date)
    return data


# Compute technical indicators
def compute_technical_indicators(data):
    data['SMA30'] = data['Close'].rolling(window=30).mean()
    data['SMA100'] = data['Close'].rolling(window=100).mean()
    vol_window = 21
    data['Volatility'] = data['Close'].pct_change().rolling(window=vol_window).std() * np.sqrt(252)
    return data.dropna()


# Normalize and apply scaling
def preprocess_data(data):
    scaler = MinMaxScaler()
    data_scaled = scaler.fit_transform(data)
    return data_scaled, scaler


# Create sequences for time-series prediction
def create_sequences(data, seq_len, pred_len):
    sequences = []
    for i in range(len(data) - seq_len - pred_len + 1):
        sequences.append(data[i:i + seq_len + pred_len])
    return np.array(sequences)


# Build a neural network model
def build_model(input_shape, output_len):
    model = Sequential([
        Flatten(input_shape=input_shape),
        Dense(180),
        LeakyReLU(),
        Dense(360),
        LeakyReLU(),
        Dense(360),
        LeakyReLU(),
        Dense(output_len)
    ])
    return model


# Define the Black-Scholes PDE Loss Function
def black_scholes_pde_loss(y_true, y_pred, S, sigma, r, T):
    # Ensure `S` is part of the computation graph
    S = tf.convert_to_tensor(S, dtype=tf.float32)
    y_pred = tf.convert_to_tensor(y_pred, dtype=tf.float32)

    with tf.GradientTape(persistent=True) as tape:
        tape.watch(S)
        V = y_pred  # Assuming V is the model's output
        dV_dS = tape.gradient(V, S)

    # Ensure dV_dS is not None before computing the second derivative
    if dV_dS is None:
        raise ValueError("Gradient dV_dS is None. Ensure the model output depends on S.")

    d2V_dS2 = tape.gradient(dV_dS, S)

    # Time derivative (assuming y_true is the target future price)
    dV_dt = (y_pred - y_true) / T

    # Black-Scholes PDE residual
    pde_residual = (
        dV_dt +
        0.5 * sigma**2 * S**2 * d2V_dS2 +
        r * S * dV_dS -
        r * y_pred
    )

    mse_pde = tf.reduce_mean(tf.square(pde_residual))
    return mse_pde


# Predict and Plot
def predict_and_plot(ticker, data, s, model, seq_len, pred_len, scaler, cut_off):
    plt.figure(figsize=(14, 7))

    red_x, red_y, black_y, blue_x, blue_y, green_y = [], [], [], [], [], []

    for idx in range(0, s.shape[0], pred_len):
        future_input = s[idx, :seq_len, :].reshape(1, seq_len, -1)
        future = model.predict(future_input)[0]
        future_padded = np.hstack((future.reshape(-1, 1), np.zeros((future.shape[0], data.shape[1] - 1))))
        future_transform = scaler.inverse_transform(future_padded)[:, 0]

        future_true = s[idx, seq_len:, 0].reshape(-1, 1)
        future_true_padded = np.hstack((future_true, np.zeros((future_true.shape[0], data.shape[1] - 1))))
        future_true_transform = scaler.inverse_transform(future_true_padded)[:, 0]

        if idx < cut_off:
            red_x.append(idx)
            red_y.append(future_transform)
            black_y.append(future_true_transform)
        else:
            blue_x.append(idx)
            blue_y.append(future_transform)
            green_y.append(future_true_transform)

    plt.plot(red_x, red_y, color="red", label="Train Prediction")
    plt.plot(red_x, black_y, color="black", label="Train True")
    plt.plot(blue_x, blue_y, color="blue", label="Test Prediction")
    plt.plot(blue_x, green_y, color="green", label="Test True")
    plt.legend()
    plt.title(f"Predictions for {ticker}")
    plt.xlabel("Trading Days")
    plt.ylabel("Price")
    plt.show()


# Main workflow
def main(tickers):
    for ticker in tickers:
        # Fetch data
        stock_data = fetch_stock_data(ticker, start_date="2021-01-01", end_date="2024-09-30")
        stock_data = compute_technical_indicators(stock_data)

        # Select features
        features = ['Close', 'Volume', 'SMA30', 'SMA100', 'Volatility']
        data = stock_data[features].dropna().copy()
        processed_data, scaler = preprocess_data(data)

        # Create sequences
        seq_len = 180  # 6 months of data
        pred_len = 20  # Predict next 20 days
        sequences = create_sequences(processed_data, seq_len, pred_len)

        cut_off = int(0.8 * sequences.shape[0])
        X_train = sequences[:cut_off, :seq_len, :]
        X_test = sequences[cut_off:, :seq_len, :]
        y_train = sequences[:cut_off, seq_len:, 0]
        y_test = sequences[cut_off:, seq_len:, 0]

        # Define asset price input for loss function
        S_input = data['Close'].values.reshape(-1, 1)
        sigma = 0.2  # Example volatility
        r = 0.01  # Example risk-free rate
        T = 30 / 252  # Example time to maturity

        # Build and compile model
        model = build_model(input_shape=(seq_len, X_train.shape[2]), output_len=pred_len)
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss=lambda y_true, y_pred: black_scholes_pde_loss(y_true, y_pred, S_input, sigma, r, T)
        )

        # Train model
        model.fit(
            X_train, y_train,
            epochs=100,
            batch_size=32,
            verbose=1
        )

        # Predict and plot
        predict_and_plot(ticker, processed_data, sequences, model, seq_len, pred_len, scaler, cut_off)

    
# Run the main function with a list of tickers
tickers = ['TSLA']  # Add more tickers as needed
main(tickers)


[*********************100%***********************]  1 of 1 completed

Epoch 1/100



  super().__init__(**kwargs)


ValueError: Gradient dV_dS is None. Ensure the model output depends on S.