<a href="https://colab.research.google.com/github/kangwonlee/stock-price-prediction-transformer/blob/main/stock_price_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import logging
import math
import random


import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler


import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LayerNormalization, MultiHeadAttention, Dropout, GlobalAveragePooling1D



## Load and prepare the dataset



In [None]:
try:
  import yfinance as yf
except ModuleNotFoundError:
  !pip install yfinance
  import yfinance as yf

logging.basicConfig(level=logging.DEBUG)



In [None]:
def main(n=None):
    time_step = 100
    training_ratio = 0.67

    start = "2010-06-29"
    end = "2024-10-04"
    period = '1d'

    t_list = list(
        set(
          ['TSLA', 'AAPL', 'MSFT', 'AMZN', 'GOOGL',]
          + ['VOO', 'QQQ', 'IWM', 'NVDA', 'META']
          + ['XLY', 'XLK', 'XLU', 'XLB', 'XLI',]
          + ['XLRE', 'XLF', 'XLP', 'XLE', 'XLC', 'XLV']
        )
    )

    random.shuffle(t_list)

    for ticker in t_list[:n]:
      predict_price(
          ticker,
          time_step, training_ratio,
          start, end, period
      )



In [None]:
def predict_price(
      ticker, time_step, training_ratio,
      start="2010-06-29", end="2022-03-25", period='1d'
    ):
    logging.info(f'processing {ticker}')
    scaler, data_scaled, dates = get_scaled_data(
        ticker, start, end, period
    )

    """
    ## Parameters
    """

    training_size = int(len(data_scaled) * training_ratio)
    test_size = len(data_scaled) - training_size

    # separate data into training and test
    train_data = data_scaled[0:training_size,:]
    test_data  = data_scaled[training_size:len(data_scaled),:]

    X_train, y_train = create_dataset(train_data, time_step)
    X_test, y_test = create_dataset(test_data, time_step)

    """
    ## Reshape input for the model
    """

    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

    model = define_model(X_train)

    """## Model Summary"""

    model.summary()

    """## Train the model"""

    model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        epochs=50, batch_size=64, verbose=1
    )

    """
    ## Make predictions
    """

    # Make predictions
    train_predict = model.predict(X_train)
    test_predict = model.predict(X_test)

    # Inverse transform predictions
    train_predict = scaler.inverse_transform(train_predict)
    test_predict = scaler.inverse_transform(test_predict)

    # Evaluate the model (Optional: Calculate RMSE or other metrics)
    train_rmse = math.sqrt(mean_squared_error(y_train, scaler.inverse_transform(train_predict.reshape(-1, 1))))
    test_rmse = math.sqrt(mean_squared_error(y_test, scaler.inverse_transform(test_predict.reshape(-1, 1))))

    print(f"Train RMSE: {train_rmse}")
    print(f"Test RMSE: {test_rmse}")

    visualize_predictions(
        ticker, dates,
        scaler, data_scaled,
        time_step,
        train_predict, test_predict
    )
    logging.info(f'finished {ticker}')



In [None]:
def get_scaled_data(ticker, start="2010-06-29", end="2022-03-25", period='1d'):
    df = yf.download(ticker, start=start, end=end, period=period)

    data = df[['Close']].values

    scaler = MinMaxScaler(feature_range=(0, 1))
    data_scaled = scaler.fit_transform(data)

    dates = df.index.to_numpy()

    return scaler, data_scaled, dates



## Reshape input for the model



In [None]:
def create_dataset(dataset, time_step=1):
    dataX, dataY = [], []

    for i in range(len(dataset) - time_step - 1):
        a = dataset[i:(i + time_step), 0]
        dataX.append(a)
        dataY.append(dataset[i + time_step, 0])

    return np.array(dataX), np.array(dataY)


## Transformer Block



In [None]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = LayerNormalization(epsilon=1e-6)(inputs)
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = Dropout(dropout)(x)
    res = x + inputs

    x = LayerNormalization(epsilon=1e-6)(res)
    x = Dense(ff_dim, activation="relu")(x)
    x = Dropout(dropout)(x)
    x = Dense(inputs.shape[-1])(x)
    return x + res


## Model Definition



In [None]:
def define_model(X_train):

    """
    ## Model Definition
    """

    inputs = Input(shape=(X_train.shape[1], X_train.shape[2]))
    x = transformer_encoder(inputs, head_size=256, num_heads=4, ff_dim=4, dropout=0.1)
    x = GlobalAveragePooling1D(data_format='channels_first')(x)
    x = Dropout(0.1)(x)
    x = Dense(20, activation="relu")(x)
    outputs = Dense(1, activation="linear")(x)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer="adam", loss="mean_squared_error")

    return model



In [None]:
def visualize_predictions(ticker, dates, scaler, data_scaled, time_step, train_predict, test_predict):
    """
    ## Plotting the results
    """

    # Adjust the time_step offset for plotting
    trainPredictPlot = np.empty_like(data_scaled)
    trainPredictPlot[:, :] = np.nan
    trainPredictPlot[time_step:len(train_predict)+time_step, :] = train_predict

    # Shift test predictions for plotting
    testPredictPlot = np.empty_like(data_scaled)
    testPredictPlot[:, :] = np.nan
    testPredictPlot[len(train_predict)+(time_step*2)+1:len(data_scaled)-1, :] = test_predict

    # Plot baseline and predictions
    plt.figure(figsize=(12, 6))
    plt.semilogy(dates, scaler.inverse_transform(data_scaled), label='Actual Stock Price')
    plt.semilogy(dates, trainPredictPlot, label='Train Predict')
    plt.semilogy(dates, testPredictPlot, label='Test Predict')
    plt.title(f'Stock Price Prediction using Transformer {ticker}')
    plt.xlabel('Time')
    plt.ylabel('Stock Price')
    plt.legend()
    plt.grid(True)
    plt.savefig(f'{ticker}_result.png', dpi=300)


In [None]:
%%time
main()



In [None]:
import pathlib

result_folder = pathlib.Path('results')
result_folder.mkdir(exist_ok=True)

png_files = pathlib.Path().glob('*.png')
for png_file in png_files:
    png_file.rename(result_folder / png_file.name)

!zip -r results.zip results

