In [None]:
import logging

import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
import os
import csv
from datetime import datetime, timedelta

import yfinance as yf

import lightning.pytorch as pl
from pytorch_forecasting import DeepAR, TimeSeriesDataSet
from pytorch_forecasting.metrics import MultivariateDistributionLoss
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error, r2_score, \
    explained_variance_score, accuracy_score, precision_score

pl.seed_everything(42)

In [None]:
def get_delimiter(file_path, bytes=4096):
    """
    Retrieves the delimiter of a csv file.
    Args:
        file_path: path to csv file to read
        bytes: n bytes to read to detect the delimiter (higher is more guaranteed accuracy)

    Returns:
        delimiter: delimiter of the csv file located in the given path

    """
    sniffer = csv.Sniffer()
    data = open(file_path, "r").read(bytes)
    delimiter = sniffer.sniff(data).delimiter
    return delimiter

In [None]:
def read_csvs_from_folder(folder_path, bytes=4096, list=False):
    """
    Reads all CSV files in a folder and stores them as pandas DataFrames in a list.
    Automatically detects the delimiter for each CSV file.

    Args:
        folder_path (str): Path to the folder containing CSV files.
        bytes (int): Number of bytes to read from each file for delimiter detection.

    Returns:
        Concatenated pandas DataFrame containing each CSV file in the folder.
    """
    # Get a list of all CSV files in the folder
    csv_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.csv')]

    # Read each CSV file into a pandas DataFrame and store them in a list
    dataframes = []
    for file in csv_files:
        delimiter = get_delimiter(file, bytes)
        df = pd.read_csv(file, delimiter=delimiter)
        dataframes.append(df)

    if not list:
        dataframes = pd.concat(dataframes, axis=0, ignore_index=True)
    return dataframes


In [None]:
def query_yf(stock: tuple, period: int) -> pd.DataFrame:
    """
    Query the Yahoo Finance API for a given stock and store the data.

    :param stock: tuple
        A tuple containing three elements:
        - company (str): The name of the company.
        - ticker (str): The stock ticker symbol of the company.
        - exchange (str): The stock exchange where the company is listed.
    :param period: int
        The number of days of data to retrieve.

    :return: data: a pandas dataframe of the data
    """
    try:
        # Extract company, ticker, and exchange from the stock tuple
        company, ticker, exchange = stock

        yesterday = datetime.now() - timedelta(days=period)
        today = datetime.now()

        yesterday_str = yesterday.strftime('%Y-%m-%d')
        today_str = today.strftime('%Y-%m-%d')
        today_str = '2023-12-18'

        # Query the API using yfinance
        stock_data = yf.Ticker(ticker)
        data = stock_data.history(period='1d', start=yesterday_str, end=today_str)

        # Add the company name to the dataframe
        data['Company'] = company
        data['Ticker'] = ticker
        data['Exchange'] = exchange

        # Reorder the columns
        data = data[['Company', 'Ticker', 'Exchange', 'Open', 'High', 'Low', 'Close', 'Volume']]

        # Convert DataFrame index to a 'DatetimeIndex' without time zone information
        data.index = data.index.tz_localize(None)  # this is necessary for some algorithms

        return data
    except Exception as e:
        print(f"Error retrieving data: {e}")
        return None

In [None]:
def query_yf_list(stocks: pd.DataFrame, period: int) -> pd.DataFrame:
    """
    Query the Yahoo Finance API for all stocks in a list based on their exchange.

    :param stocks: list
        A list of tuples containing three elements:
        - company (str): The name of the company.
        - ticker (str): The stock ticker symbol of the company.
        - exchange (str): The stock exchange where the company is listed.
    :param period: int
        The number of days of data to retrieve.

    :return: data: a list of pandas dataframes of the data
    """
    # Create an empty list
    data = []

    # Loop through the list of stocks
    for _, row in stocks.iterrows():
        stock = (row['Company'], row['Ticker'], row['Stock Exchange'])

        # Query the API using yfinance
        stock_data = query_yf(stock, period)

        # Append the data to the dataframe
        if stock_data is not None:
            data.append(stock_data)

    return data

In [None]:
def save_stocks_to_csv(data_list: [pd.DataFrame], folder_path: str = "stock_data"):
    """
    Save a list of stock dataframes to a folder, each as a separate CSV file {company_name} data.csv.
    :param data_list: list of dataframes to save
    :param folder_path: folder path to save csvs to
    """
    try:
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
        for df in data_list:
            df.reset_index(inplace=True)
            company = df.iloc[0]['Company']
            df.to_csv(os.path.join(folder_path, f"{company} data.csv"), index=None)
    except Exception as e:
        print(f"Error saving data: {e}")


In [None]:
def day_average(df: pd.DataFrame, column: str = 'Close', days: int = 30, company_name: str = None) -> pd.DataFrame:
    """
    Compute the n-day average of the stock price.
    :param column: column to average
    :param df: dataframe of stock data
    :param days: number of days to average
    :return: dataframe of stock data with added column for an n day average
    """
    if company_name is not None:
        column_title = f"{company_name}-{column} Day Average"
    else:
        column_title = f"{column} Day Average"
    df[column_title] = df[column].rolling(days).mean().shift(1)  # shift to not include today
    day_mean = df[column].iloc[0:days + 1].mean()
    df[column_title] = df[column_title].fillna(day_mean)

    return df

In [None]:
def previous_day(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add the previous day's data to the dataframe as columns.
    :param df: dataframe of stock data
    :return: dataframe of stock data with added column for previous day's closing price
    """
    df['Previous Close'] = df['Close'].shift(1)
    df['Previous Close'].iloc[0] = df['Close'].iloc[0]

    df['Previous Open'] = df['Open'].shift(1)
    df['Previous Open'].iloc[0] = df['Open'].iloc[0]

    df['Previous High'] = df['High'].shift(1)
    df['Previous High'].iloc[0] = df['High'].iloc[0]

    df['Previous Low'] = df['Low'].shift(1)
    df['Previous Low'].iloc[0] = df['Low'].iloc[0]

    df['Previous Volume'] = df['Volume'].shift(1)
    df['Previous Volume'].iloc[0] = df['Volume'].iloc[0]

    return df

In [None]:
def direction_evaluation(df: pd.DataFrame, truth_column: str, predicted_column: str) -> pd.DataFrame:
    """
    Add a column to the dataframe indicating whether the predicted direction (up or down) matches the actual subsequent movement
    from the previous day's price to today's price.

    :param df: dataframe of data with actual and predicted price columns
    :param truth_column: string for the title of the truth column (actual price)
    :param predicted_column: string for the title of the predicted price column
    :return: dataframe with an additional column indicating if the predicted direction is correct
    """
    df = df.copy()
    # Determine the actual direction by comparing today's actual price to yesterday's actual price
    df['Actual Direction'] = np.where(df[truth_column] > df[truth_column].shift(1), 'Up', 'Down')
    # Determine the predicted direction by comparing today's predicted price to yesterday's actual price
    df['Predicted Direction'] = np.where(df[predicted_column] > df[truth_column].shift(1), 'Up', 'Down')
    # Compare the actual direction to the predicted direction
    df['Direction Correct'] = np.where(df['Actual Direction'] == df['Predicted Direction'], 'Correct', 'Incorrect')
    return df

In [None]:
def simulate_trading(df: pd.DataFrame, initial_funds: int = 10000, truth_column: str = 'Close', predicted_column: str = 'Predicted Close') -> (float, float, float):
    """
    Simulate trading based on predictions to calculate profitability and average profit per trade.

    :param df: dataframe of data with actual and predicted price columns
    :param initial_funds: initial investment amount
    :param truth_column: string for the title of the truth column (actual price)
    :param predicted_column: string for the title of the predicted price column
    :return: net gain or loss from trading strategy, average profit per trade, net gain or loss percent
    """
    funds = initial_funds
    shares = 0
    trades = []

    df = df.copy()
    df['Predicted Tomorrow'] = df[predicted_column].shift(-1)  # shift predictions to align with the day they are for

    for i in range(len(df) - 1):  # minus 1 because the last day's prediction is for a day outside of our dataframe
        today_price = df[truth_column].iloc[i]
        predicted_tomorrow_price = df['Predicted Tomorrow'].iloc[i]

        if not np.isnan(predicted_tomorrow_price) or not np.isnan(today_price):
            # If the predicted price for tomorrow is higher than today's price, buy
            if predicted_tomorrow_price > today_price:
                shares_bought = funds // today_price
                funds -= shares_bought * today_price
                shares += shares_bought

            # The next day, sell all shares if any were bought
            if shares > 0:
                next_day_price = df[truth_column].iloc[i + 1]
                trade_profit = shares * (next_day_price - today_price)  # calculate profit for this trade
                funds += shares * next_day_price
                shares = 0  # reset shares to 0 after selling
                trades.append(trade_profit)  # keep track of the profit from this trade

    net_gain = funds - initial_funds
    if not trades:
        average_profit = 0
    else:
        average_profit = sum(trades) / len(trades)
    net_gain_percent = net_gain / initial_funds * 100

    return net_gain, average_profit, net_gain_percent

In [None]:
def calculate_standard_metrics(truth_series: pd.Series, predicted_series: pd.Series) -> (float, float, float, float, float, float):
    """
    Calculate standard metrics for evaluating a trading strategy.
    :param truth_series: series of actual prices
    :param predicted_series: series of predicted prices
    :return: RMSE, MAE, MAPE, Rsq, Explained Variance
    """
    rmse = mean_squared_error(truth_series, predicted_series, squared=False)
    mae = mean_absolute_error(truth_series, predicted_series)
    mape = mean_absolute_percentage_error(truth_series, predicted_series)
    rsq = r2_score(truth_series, predicted_series)
    ev = explained_variance_score(truth_series, predicted_series)
    return rmse, mae, mape, rsq, ev

In [None]:
def calculate_accuracy(df: pd.DataFrame, truth_column: str = 'Close', predicted_column: str = 'Predicted Close') -> (float, float):
    """
    Calculate the accuracy of a trading strategy by comparing the predicted direction to the actual direction.
    :param df: dataframe of data with actual and predicted price columns
    :param truth_column: column containing the actual price
    :param predicted_column: column containing the predicted price
    :return: accuracy, precision
    """
    df = direction_evaluation(df, truth_column, predicted_column)
    accuracy = accuracy_score(df['Actual Direction'], df['Predicted Direction'])
    precision = precision_score(df['Actual Direction'], df['Predicted Direction'], pos_label='Up')
    return accuracy, precision

In [None]:
def compare_to_ETF(urnm: pd.DataFrame, df: pd.DataFrame = None, window: int = 100, initial_funds: int = 10000):
    """
    Compare the profitability of a trading strategy to the profitability of investing in the S&P 500 URNM ETF.
    :param urnm: dataframe of URNM data
    :param df: dataframe of data with actual and predicted price columns for a stock
    :param truth_column: column containing the actual price
    :param predicted_column: column containing the predicted price
    :return: metrics of using the ETF as a trading strategy

    Note this can be done in two ways:
        1. Investing into the ETF and holding.
        2. Using the ETF as a trading strategy.
    This function will do the first option.
    """
    urnm_copy = urnm.copy()
    if df is not None:
        start_date = df['Date'].iloc[0]
        end_date = df['Date'].iloc[-1]
        urnm = urnm[(urnm['Date'] >= start_date) & (urnm['Date'] <= end_date)]
    else:
        start_date = urnm['Date'].iloc[-window]
        end_date = urnm['Date'].iloc[-1]

    # option one, invest and Hold
    start_price = urnm['Close'].iloc[0]
    shares = initial_funds // start_price

    end_price = urnm['Close'].iloc[-1]
    funds = shares * end_price

    hold_net_gain = funds - initial_funds
    net_gain_percent = hold_net_gain / initial_funds * 100

    return hold_net_gain, net_gain_percent


In [None]:
def summarize_metrics(folder_path: str, model: str):
    """
    Summarize the metrics of each stock in the list.
    :param folder_path: str
        Path to the folder containing CSV files of the event horizon metrics for each stock.
    :return: None
    """
    metrics_data = read_csvs_from_folder(folder_path)
    event_horizons = metrics_data['Event Horizon'].unique()
    print("unique event horizons: ", event_horizons)
    event_horizon_dfs = {eh: metrics_data[metrics_data['Event Horizon'] == eh] for eh in event_horizons}

    summary_data = []
    for horizon, df in event_horizon_dfs.items():
        summary = {
            "Event Horizon": horizon,
            "Average RMSE": df['RMSE'].mean(),
            "Average MAPE": df['MAPE'].mean(),
            "Average MAE": df['MAE'].mean(),
            "Average RSquared": df['R2'].mean(),
            "Average Accuracy": df['Accuracy'].mean(),
            "Average Precision": df['Precision'].mean(),
            "Average Net Gain": df['Net Gain'].mean(),
            "Average Net Gain %": df['Net Gain %'].mean(),
            "Average Profit": df['Avg Profit'].mean()
        }
        summary_data.append(summary)

    summary_df = pd.DataFrame(summary_data)
    save_path = os.path.join(os.getcwd(), f'{model} Summary.csv')
    summary_df.to_csv(save_path, index=False)

In [None]:
def run_deepar(data: pd.DataFrame, max_prediction_length: int, max_encoder_length: int) -> [dict]:
    """
    Runs the DeepAR algorithm on the input DataFrame and returns a list of dictionaries containing the metrics for each event horizon.

    :param df: The input DataFrame containing stock market data. It must include the columns:
    'Date', 'Company', 'Ticker', 'Exchange', 'Open', 'High', 'Low', 'Close', and 'Volume'.

    :return: dict: A list of dictionaries containing the metrics for each event horizon.
      - 'RMSE' (float): Root Mean Squared Error of the model's predictions.
      - 'MAE' (float): Mean Absolute Error of the model's predictions.
      - 'MAPE' (float): Mean Absolute Percentage Error of the model's predictions.
      - 'RSQ' (float): R-squared value indicating the goodness of fit.
      - 'Accuracy' (float): Accuracy of the model's directional predictions.
      - 'Precision' (float): Precision of the model's directional predictions.
      - 'Net Gain' (float): Net gain from simulated trading based on model predictions.
      - 'Avg Profit' (float): Average profit per trade from simulated trading.
      - 'Net Gain Percent' (float): Net gain percentage from simulated trading.
    """

    logging.info("Preprocessing data for DeepAR")
    # minor preprocessing
    data = day_average(data)
    data = previous_day(data)
    data['Date'] = pd.to_datetime(data['Date'])
    original_df = data.copy()
    data.drop(['Ticker', 'Exchange'], axis=1, inplace=True)
    data.reset_index(inplace=True)

    logging.info("Splitting data into training and validation sets")
    # train / test split
    batch_size = 32
    length = len(data)
    n_batches = length // batch_size
    train_n_batches = round(n_batches * 0.7)
    test_n_batches = n_batches - train_n_batches
    test_size = test_n_batches * batch_size
    if test_n_batches % batch_size != 0:
        test_size -= test_n_batches % batch_size
    if test_size < max_prediction_length:
        print(f"Test size is too small for largest event horizon for: {data['Company'].iloc[0]}")
    split = train_n_batches * batch_size

    # encoder errors occur when there isnt enough data
    # I dont think this is the correct check but I'm still working on figuring out how to correctly check
    if test_size < max_prediction_length + max_encoder_length:
        max_encoder_length = 30  # this is the minimum encoder length, 6 weeks instead of 12 (5 day trade weeks)

    logging.info("Number of batches: " + str(n_batches))
    logging.info("Train batches: " + str(train_n_batches))
    logging.info("Test batches: " + str(test_n_batches))
    logging.info("Max encoder length: " + str(max_encoder_length))

    logging.info("Creating TimeSeriesDataSet")
    # create training and validation sets and dataloaders
    training = TimeSeriesDataSet(
        data[lambda x: x.index < split],
        time_idx="index",
        target="Close",
        group_ids=["Company"],  # column name that identifies a time series
        max_encoder_length=max_encoder_length,
        max_prediction_length=max_prediction_length,
        static_categoricals=['Company'],
        time_varying_known_reals=["Day Average", "Previous Close", "Previous Open", "Previous High", "Previous Low",
                                  "Previous Volume"],
        time_varying_unknown_reals=["Close"]  # this is the target
    )
    logging.info(f"Dataset created for {data['Company'].iloc[0]}")
    logging.info("Creating dataloaders")
    train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=3,
                                              persistent_workers=True)
    val_dataloader = TimeSeriesDataSet.from_dataset(training, data.iloc[split:split + test_size],
                                                    min_prediction_idx=split,
                                                    stop_randomization=True).to_dataloader(train=False,
                                                                                           batch_size=batch_size,
                                                                                           num_workers=3,
                                                                                           persistent_workers=True)
    logging.info("Creating model")
    # create model and train
    loss = MultivariateDistributionLoss()
    deepar = DeepAR.from_dataset(dataset=training,
                                 learning_rate=5e-4,
                                 hidden_size=12,
                                 rnn_layers=3,
                                 optimizer="AdamW",
                                 loss=loss
                                 )
    trainer = pl.Trainer(
        accelerator='auto',
        max_epochs=100,
        gradient_clip_val=0.1,
        enable_checkpointing=True
    )

    logging.info("Fitting model")
    trainer.fit(
        deepar,
        train_dataloaders=train_dataloader,
        val_dataloaders=val_dataloader
    )
    best_model_path = trainer.checkpoint_callback.best_model_path
    best_deepar = DeepAR.load_from_checkpoint(best_model_path)

    logging.info("Generating predictions")
    # generating predictions for test set
    predict_df = data.iloc[split:]
    predictions = best_deepar.predict(predict_df).cpu().numpy().reshape(-1)

    # evaluation for each event horizon
    event_horizons = [1, 5, 10, 20, 50, 100]
    metrics = []  # this will be a list of dictionaries containing the metrics for each event horizon
    try:
        for horizon in event_horizons:
            logging.info("Calculating metrics for event horizon: " + str(horizon))
            val_data = data.iloc[split:split + horizon]

            val_data['Predicted Close'] = predictions[:horizon]
            metrics_dict = {}
            rmse, mae, mape, rsq, ev = calculate_standard_metrics(val_data['Close'], val_data['Predicted Close'])
            metrics_dict['RMSE'] = rmse
            metrics_dict['MAE'] = mae
            metrics_dict['MAPE'] = mape
            metrics_dict['RSQ'] = rsq

            direction_df = direction_evaluation(val_data, 'Close', 'Predicted Close')
            accuracy, precision = calculate_accuracy(direction_df, 'Close', 'Predicted Close')
            metrics_dict['Accuracy'] = accuracy
            metrics_dict['Precision'] = precision

            net_gain, avg_profit, net_gain_percent = simulate_trading(direction_df, 10000)
            metrics_dict['Net Gain'] = net_gain
            metrics_dict['Avg Profit'] = avg_profit
            metrics_dict['Net Gain Percent'] = net_gain_percent
            metrics.append(metrics_dict)

        horizons_metrics_list = []
        for i in range(len(event_horizons)):
            row = {
                'Company': original_df['Company'][0],
                'Ticker': original_df['Ticker'][0],
                'Exchange': original_df['Exchange'][0],
                'Event Horizon': event_horizons[i],
                'RMSE': metrics[i]['RMSE'],
                'MAE': metrics[i]['MAE'],
                'MAPE': metrics[i]['MAPE'],
                'RSQ': metrics[i]['RSQ'],
                'Accuracy': metrics[i]['Accuracy'],
                'Precision': metrics[i]['Precision'],
                'Net Gain': metrics[i]['Net Gain'],
                'Avg Profit': metrics[i]['Avg Profit'],
                'Net Gain Percent': metrics[i]['Net Gain Percent']
            }
            horizons_metrics_list.append(row)

    except Exception as e:
        print(f"Error: {e} when calculating metrics for: {data['Company'].iloc[0]}")

    return horizons_metrics_list

# Start Here to Run DeepAR
Run ll of the above cells,
Then follow the comments for the Load or Query data section.

### Load or Query Data:
To load or query data, you will need to upload either the given stock list file or the stock_data folder.

In [None]:
# run this cell for querying using the stock list file
stocks_list = pd.read_csv('Uranium Company Master List.csv')
periods = 365*5 # 3 years, however, some stocks only have 2022 to the present (was 5 years, need to conserve memory on my machine)
data_list = query_yf_list(stocks_list, periods)
for data in data_list:
    data.reset_index(inplace=True)  # must have an integer index for Prophet

In [None]:
# run this cell for loading data from a folder mounted to your Google Drive environment (optional)
data_folder = 'stock_data'
data_list = read_csvs_from_folder(data_folder, list=True)
urnm_data_folder = 'stock_data/SP Uranium ETF'
urnm_data = read_csvs_from_folder(urnm_data_folder, list=False)


In [None]:
# run this to query data for the SP 500 Uranium ETF
stock = "Sprott Uranium Miners ETF", "URNM", "XNYS"
period = 365*5
urnm_data = query_yf(stock, period)
urnm_data.reset_index(inplace=True)


In [None]:
# run this cell to save the data to a folder (optional)
save_stocks_to_csv(data_list, folder_path='stock_data')
urnm_data.to_csv(os.path.join("stock_data/SP Uranium ETF", "Sprott Uranium Miners ETF data.csv"), index='Date')


### Running DeepAR

In [None]:
max_encoder_length = 60
max_prediction_length = 100
for data in data_list:
    horizons_metrics_list = run_deepar(data, max_prediction_length, max_encoder_length)
    horizons_df = pd.DataFrame(horizons_metrics_list)
    company_name = horizons_df['Company'][0]

    results_dir = os.path.join(os.getcwd(), 'results')
    if not os.path.exists(results_dir):
        os.makedirs(results_dir)
    deepar_dir = os.path.join(os.getcwd(), 'results/DeepAR')
    if not os.path.exists(deepar_dir):
        os.makedirs(deepar_dir)

    file_path = os.path.join(deepar_dir, f"DeepAR Metrics-{company_name}.csv")
    horizons_df.to_csv(file_path)