In [1]:
import yfinance as yf
import pandas as pd
import datetime as dt
import numpy as np
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.tsa.arima.model import ARIMA, ARIMAResults
import statsmodels.api as sm
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from typing import Tuple, Union, List, Dict
import seaborn as sns
import tensorflow as tf
from keras.api.layers import LSTM, Dense, Dropout, Input
from keras.api.models import Sequential, Model, load_model
from keras.api.callbacks import EarlyStopping
from sklearn.preprocessing import MinMaxScaler
import sklearn.metrics as metrics
import json
from dateutil.relativedelta import relativedelta

from stock_price_prediction.types import TickerSymbol
from stock_price_prediction.data_helper import DataHelper

2025-03-02 18:21:18.668209: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-02 18:21:18.671250: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-02 18:21:18.687972: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-02 18:21:18.714513: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1740936078.739093    8372 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1740936078.74

### LSTM

In [None]:
def split_and_scale(
    data: pd.DataFrame, features: List[str]
) -> Tuple[pd.DataFrame, pd.DataFrame, MinMaxScaler]:
    train_size = int(len(data) * 0.8)
    train_data = data.iloc[:train_size]
    test_data = data.iloc[train_size:]

    # Initialize scaler and fit only on training data
    scaler = MinMaxScaler()
    train_scaled = pd.DataFrame(
        scaler.fit_transform(train_data[features]),
        columns=features,
        index=train_data.index,
    )

    # Transform test data using the same scaler (without fitting)
    test_scaled = pd.DataFrame(
        scaler.transform(test_data[features]), columns=features, index=test_data.index
    )

    return train_scaled, test_scaled, scaler


def create_sequences(
    data: pd.DataFrame, target_feature_index: int, seq_length: int = 30
):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data.iloc[i : i + seq_length].values)
        y.append(data.iloc[i + seq_length, target_feature_index])
    return np.array(X), np.array(y)


def make_predictions(
    model: Model,
    data: pd.DataFrame,
    seq_length: int,
    scaler: MinMaxScaler,
    days_ahead: int,
    feature_index_map: Dict[str, int],
    target_feature: str,
    features: List[str],
):
    if isinstance(data, pd.DataFrame):
        data = data.to_numpy()

    predictions = []

    # Start with the most recent known sequence
    input_seq = data[-seq_length:].copy()

    for _ in range(days_ahead):
        predicted_value = model.predict(
            input_seq.reshape(1, seq_length, -1), verbose=0
        )[0, 0]
        predictions.append(predicted_value)

        # Update sequence by appending prediction and shifting window
        new_entry = np.roll(input_seq, shift=-1, axis=0)
        new_entry[-1] = input_seq[-1]
        new_entry[-1, feature_index_map[target_feature]] = (
            predicted_value  # Replace "Adj Close"
        )
        input_seq = new_entry

    predictions_all_fetures = np.zeros((len(predictions), len(features)))
    predictions_all_fetures[:, feature_index_map[target_feature]] = predictions
    predictions_descaled = scaler.inverse_transform(predictions_all_fetures)[
        :, feature_index_map[target_feature]
    ]

    return predictions_descaled


def create_model(seq_length: int, features: List[str]) -> Sequential:
    return Sequential(
        [
            Input(shape=(seq_length, len(features))),
            LSTM(50, return_sequences=True),
            Dropout(0.2),
            LSTM(50, return_sequences=False),
            Dropout(0.2),
            Dense(25, activation="relu"),
            Dense(1),
        ]
    )


def plot_lstm_prediction(
    ticker: TickerSymbol, data: pd.DataFrame, predictions: Dict[int, np.ndarray]
) -> go.Figure:
    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=data.index,
            y=data["Adj Close"],
            mode="lines",
            name="Adj Close History",
        )
    )
    for days, preds in predictions.items():
        future_dates = pd.date_range(start=data.index[-1], periods=days + 1, freq="B")
        fig.add_trace(
            go.Scatter(
                x=future_dates,
                y=np.insert(preds, 0, data["Adj Close"].iloc[-1]),
                mode="lines",
                name=f"Forecast {days}d",
            )
        )

    fig.update_layout(
        title=f"{ticker.name} price prediction",
        xaxis_title="Date",
        yaxis_title="Adj Close Price",
        template="plotly_dark",
    )
    return fig

In [None]:
ticker = TickerSymbol.VZ

raw_data = DataHelper.fetch_stock_history(
    tickers=[ticker], start_date=dt.date(1950, 1, 1), end_date=dt.date.today()
)
data = DataHelper.preprocess_stock_history(stock_history=raw_data)

features = list(data.columns)
target_feature = "Adj Close"
feature_index_map = {feat: idx for idx, feat in enumerate(features)}

train_data, test_data, scaler = split_and_scale(data=data, features=features)

seq_length = 30
X_train, y_train = create_sequences(
    data=train_data,
    target_feature_index=feature_index_map[target_feature],
    seq_length=seq_length,
)
X_test, y_test = create_sequences(
    data=test_data,
    target_feature_index=feature_index_map[target_feature],
    seq_length=seq_length,
)

model = create_model(seq_length=seq_length, features=features)
model.compile(optimizer="adam", loss="mse")
early_stopping = EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)
history = model.fit(
    X_train,
    y_train,
    epochs=20,
    batch_size=32,
    validation_data=(X_test, y_test),
    callbacks=[early_stopping],
)


# model.save(f"models/lstm_{ticker.name}.keras")

# y_pred = model.predict(X_test)
# log = {
#     "ticker": ticker.name,
#     "Features": features,
#     "Target feature": target_feature,
#     "MSE": metrics.mean_squared_error(y_true=y_test, y_pred=y_pred),
#     "MAE": metrics.mean_absolute_error(y_true=y_test, y_pred=y_pred),
#     "5% of mean Adj Close": data["Adj Close"].mean() * 0.05,
#     "5% of mean Adj Close last 10 years": data["Adj Close"][
#         data.index.date >= dt.date(data.index[-1].year - 10, 1, 1)
#     ].mean()
#     * 0.05,
# }
# with open(f"logs/log_{ticker.name}.json", "a") as file:
#     json.dump(log, file)
#     file.write("\n")

In [None]:
y_pred = model.predict(X_test)
log = {
    "ticker": ticker.name,
    "Features": features,
    "Target feature": target_feature,
    "MSE": metrics.mean_squared_error(y_true=y_test, y_pred=y_pred),
    "MAE": metrics.mean_absolute_error(y_true=y_test, y_pred=y_pred),
    "5% of mean Adj Close": data["Adj Close"].mean() * 0.05,
    "5% of mean Adj Close last 10 years": data["Adj Close"][
        data.index.date >= dt.date(data.index[-1].year - 10, 1, 1)
    ].mean()
    * 0.05,
}
# with open(f"logs/log_{ticker.name}.json", "w") as file:
#     json.dump(log, file)
#     file.write("\n")

In [None]:
predictions = {
    days: make_predictions(
        model=model,
        data=test_data,
        seq_length=seq_length,
        scaler=scaler,
        days_ahead=days,
        feature_index_map=feature_index_map,
        target_feature=target_feature,
        features=features,
    )
    for days in [7, 14, 28]
}

plot_lstm_prediction(ticker=ticker, data=data, predictions=predictions).show()

### ARIMA

In [2]:
def get_arima_coefficients(series: pd.Series) -> Tuple[int, int, int]:
    p, d, q = 0, 0, 0

    # Perform differencing testing and adjust d-parameter
    while not adf_test(series):
        series = series.pct_change()
        series = series.dropna()
        d += 1

    if not kpss_test(series):
        d += 1

    # Get parameters p and q from Autocorrelation and Partial Autocorrelation functions
    p = int(sm.tsa.pacf(series).round(0).sum())
    q = int(sm.tsa.acf(series).round(0).sum())
    return p, d, q


def adf_test(series: pd.Series) -> bool:
    """
    Check for stationarity using Augmented Dickey-Fuller test.

    Attributes:
        series: input data to be tested on stationarity.

    Returns:
        bool: True if data are stationary, False if not.
    """
    result = adfuller(series)
    # ADF Statistic: result[0], p-value: result[1]
    if result[1] <= 0.05:
        return True
    else:
        return False


def kpss_test(series: pd.Series) -> bool:
    result = kpss(series)
    if result[1] >= 0.05:
        return True
    else:
        return False

In [3]:
ticker = TickerSymbol.KO
raw_data = DataHelper.fetch_stock_history(
    tickers=[ticker], start_date=dt.date(1950, 1, 1), end_date=dt.date.today()
)
data = DataHelper.preprocess_stock_history(stock_history=raw_data)

arima_coefs = get_arima_coefficients(series=data["Adj Close"])
arima_model = ARIMA(data["Adj Close"], order=arima_coefs)
arima_result: ARIMAResults = arima_model.fit()


n_steps = 30

forecast = arima_result.forecast(steps=n_steps)
forecast_dates = pd.date_range(start=data.index[-1], periods=n_steps, freq="B")
forecast_series = pd.Series(forecast, index=forecast_dates)
forecast_series.iloc[0] = data["Adj Close"].iloc[-1]


simulations = arima_result.simulate(
    nsimulations=30,
    anchor=data["Adj Close"].index[-1],
    repetitions=10,
)
simulations.iloc[0] = data["Adj Close"].iloc[-1]

look-up table. The actual p-value is greater than the p-value returned.

  result = kpss(series)
  warn('Non-stationary starting autoregressive parameters'
  warn('Non-invertible starting MA parameters found.'


In [6]:
arima_result.mle_retvals

{'fopt': 0.6284183494155811,
 'gopt': array([ 4.27558433e-05, -4.28304059e-05,  4.01747191e-05]),
 'fcalls': 100,
 'warnflag': 0,
 'converged': True,
 'iterations': 16}

In [None]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x=data.index, y=data["Adj Close"], mode="lines", name="Adj Close History"
    )
)
fig.add_trace(
    go.Scatter(
        x=data.index, y=arima_result.fittedvalues, mode="lines", name="Fitted Values"
    )
)
fig.add_trace(
    go.Scatter(
        x=forecast_series.index, y=forecast_series, mode="lines", name="Prediction"
    )
)
for i, sim in enumerate(simulations):
    fig.add_trace(
        go.Scatter(
            x=forecast_series.index,
            y=simulations[sim],
            mode="lines",
            name=f"Simulation {i+1}",
        )
    )
fig.update_layout(
    title=f"ARIMA{arima_coefs} model",
    xaxis_title="Date",
    yaxis_title="Value",
    template="plotly_dark",
)
fig.show()