#Setup google-drive mounting (optional)

In [None]:
from google.colab import drive
import os

drive.mount('/content/drive')

In [None]:
# prompt: create a symbolic link to a google drive workdir 'xyz' to the root of colab

# Specify the path to your folder
gdrive = '/content/drive/MyDrive/AI/2025'
workdir = '/datasets'
slink = '/content' + workdir
fullpath = gdrive + workdir

# Check if the folder exists
if os.path.exists(fullpath):
  # Create the symbolic link
  try:
    os.symlink(fullpath, slink)
    print(f"Symbolic link created from '{fullpath}' to '{slink}'")
  except FileExistsError:
    print(f"Symbolic link '{slink}' already exists.")
  except OSError as e:
    print(f"Error creating symbolic link: {e}")
else:
  print(f"Error: Folder '{fullpath}' not found.")

# !ls -lh /content

In [None]:
# !unzip datasets/datasets.zip -d datasets/
# !rm datasets/datasets.zip

#Notebook Start

In [None]:
import pandas as pd
import warnings

warnings.filterwarnings('ignore')

folder = './datasets/247/hourly/'

df = pd.read_csv(folder + 'dataset.csv')
df

In [None]:
date_column_name = df.columns[0]
date_format = '%Y-%m-%d' if date_column_name.lower() == 'date' else '%Y-%m-%d %H:%M:%S'

date_column_name, date_format

In [None]:
# Converter a coluna para datetime removendo o fuso horário
if date_column_name.lower() == 'date':
    df[date_column_name] = pd.to_datetime(df[date_column_name])
else:
    df[date_column_name] = pd.to_datetime(df[date_column_name]).dt.tz_convert(None)

df

In [None]:
# prompt: plot df using plotly
import plotly.express as px

fig = px.line(df.drop('Real', axis=1), x=date_column_name, y=df.columns[2:])
fig.show()

In [None]:
split_date = pd.to_datetime('2024-08-26')
#split_date = pd.to_datetime('2025-01-01')

# Find the first row where the date is equal or greater than split_date
def get_split_date_index(df, split_date):
  for i in range(len(df)):
    if df.iloc[i, 0] >= split_date:
      return i

split_idx = get_split_date_index(df, split_date)
split_idx, df.iloc[split_idx, 0]

In [None]:
# needed for dumb4cast
df_train = df.iloc[:split_idx, :].drop('Real', axis=1)
df_train

In [None]:
# needed for dumb4cast
df_test = df.iloc[split_idx:, :].drop('Real', axis=1)
df_test

In [None]:
# loading pre-trained model
MODEL_NAMES = [
    # 'arima',
    'lstm',
    'gru',
    'mlp',
    'dlinear',
    'nlinear',
    'informer',
    'autoformer',
    'fedformer',
    'bitcn',
    'rnn',

    'tcn',
    'deepar',
    'dilatedrnn',
    #'nbeats',
    #'nbeatsx',
    'nhits',
    'tide',
    'deepnpts',
    'tft',
    'vanilla',
    'patchtst',
    'itransformer',
    #'timesnet'
]

In [None]:
# prompt: add last row of df_train to a new test_df dataframe
# Por que: o backtest precisa comparar os dados reais do dia anterior à previsão pra saber se compra ou vende
real_column = df['Real'][split_idx-1:]
test_df = df.iloc[split_idx-1:, :].drop('Real', axis=1)
print(real_column)
test_df

In [None]:
forecasts = []

for model_name in MODEL_NAMES:
  print(f'Loading {model_name.upper()} forecasts')
  forecast_df = pd.read_csv(folder + 'forecasts/forecast-' + model_name + '.csv')
  forecasts.append(forecast_df[test_df.columns])

In [None]:
# creating a baseline buy-and-hold dataframe

IB = 100.0 # initial balance for each asset

# copy test_df to bh_df and set every value to zero
bh_df = test_df.copy()
for col in bh_df.columns[1:]:
  bh_df[col] = 0
bh_df[bh_df.columns[0]] = test_df[test_df.columns[0]]

# except from the first column of bh_df, divide every other first row's value by IB (buying stocks)
for j in range(1, len(test_df.columns)):
  bh_df.iloc[0, j] = IB#test_df.iloc[0, j] / IB

# calculating how much each stock would worth every day
for i in range(1, len(test_df)):
  for j in range(1, len(test_df.columns)):
    bh_df.iloc[i, j] = bh_df.iloc[i-1, j] * (test_df.iloc[i, j] / test_df.iloc[i-1, j])

bh_df.reset_index(drop=True, inplace=True)

bh_df

In [None]:
# it looks all models prediction are just an one day shit from the real values.
# dumb_4cast is a simple one day shit from test-data, so it'll force a buy transcation if the current
# value had an increase compared to the previous day or sell if there was a drop of it's value.

dumb_4cast = df_train.tail(2).copy()
dumb_4cast = pd.concat([dumb_4cast, df_test[:-1]], ignore_index=True)
dumb_4cast.reset_index(drop=True, inplace=True)

# prompt: subtract one day for every value of column Date from dumb_4cast
# convert it first
dumb_4cast[date_column_name] = pd.to_datetime(dumb_4cast[date_column_name])

if date_column_name.lower() == 'date':
    dumb_4cast[date_column_name] = dumb_4cast[date_column_name] + pd.DateOffset(days=1)
else:
    dumb_4cast[date_column_name] = dumb_4cast[date_column_name] + pd.DateOffset(hours=1)

# prompt: change dumb_4cast Date column back to string type
dumb_4cast[date_column_name] = dumb_4cast[date_column_name].dt.strftime(date_format)

dumb_4cast

In [None]:
MODEL_NAMES.append('dumb_4cast')
forecasts.append(dumb_4cast)

In [None]:
# prompt: create a second dataframe rev_df where the first column is the 'Date'
# column of bh_df and the second column is the sum of every other value on each row
#assets_count = len(test_df.columns) - 1 # except 'Date' column

#rev_df = pd.DataFrame()
#rev_df[date_column_name] = bh_df[date_column_name]
#rev_df["Buy'n hold"] = bh_df.iloc[:, 1:].sum(axis=1) / (assets_count)
#rev_df

Creating a dataframe with a wallet composed by the models predictions

In [None]:
def backtest(real_df, real_column, pred_df):
  # copy real_df to model_df and set every value to zero
  model_df = real_df.copy()
  for col in model_df.columns[1:]:
    model_df[col] = 0
  model_df[model_df.columns[0]] = real_df[real_df.columns[0]]

  model_df.reset_index(drop=True, inplace=True)

  # except from the first column of model_df, divide every other first row's value by IB (buying stocks)
  for j in range(1, len(real_df.columns)):
    model_df.iloc[0, j] = IB

  count = 0

  # calculating how much each stock would worth every day
  for i in range(1, len(real_df)):
    if not real_column.iloc[i]: # don't backtest if market is closed (filled transaction, not real)
      # copy previous row
      model_df.iloc[i, 1:] = model_df.iloc[i-1, 1:]
    else:
      count += 1
      for j in range(1, len(real_df.columns)):
        if pred_df.iloc[i-1, j] > real_df.iloc[i-1, j]: # predicted value is higher than current value: buy
          x = model_df.iloc[i-1, j] * (real_df.iloc[i, j] / real_df.iloc[i-1, j])
        else:                                           # predicted value is lower than current value: sell
          x = model_df.iloc[i-1, j] * (real_df.iloc[i-1, j] / real_df.iloc[i, j])

        if x > 0:
          model_df.iloc[i, j] = x
        else:
          model_df.iloc[i, j] = 0
          break

  print(f'\t{count} predicted steps')

  return model_df

# i = 0
# print(f'backtesting {MODEL_NAMES[i]}: ', end='')
# backtest(test_df, real_column, forecasts[i])

In [None]:
# prompt: count how many True values are in real_column
true_count = sum(real_column)
print(f"Number of True values in real_column: {true_count}")

In [None]:
%%time
#rev2_df = rev_df.copy()

backtests = [bh_df]

for i in range(len(MODEL_NAMES)):
  print(f'backtesting {MODEL_NAMES[i]}: ', end='')
  model_rev_df = backtest(test_df, real_column, forecasts[i])
  backtests.append(model_rev_df)
  #rev2_df[MODEL_NAMES[i].upper()] = model_rev_df.iloc[:, 1:].sum(axis=1) / (assets_count)

#rev2_df.tail(1)

In [None]:
# prompt: save all models in a 'backtest' folder inside workdir
import os

# Create the 'backtest' directory if it doesn't exist
backtest_dir = os.path.join(folder, 'backtests')  # Use workdir variable
os.makedirs(backtest_dir, exist_ok=True)

# Save buy and hold and dumb4cast to the backtest folder
bh_filename = os.path.join(backtest_dir, "buy-and-hold.csv")
bh_df.to_csv(bh_filename, index=False)

# Save each model's dataframe to a separate CSV file in the 'backtest' folder
for i, model_name in enumerate(MODEL_NAMES):
    filename = os.path.join(backtest_dir, f"{model_name}.csv")
    backtests[i+1].to_csv(filename, index=False)  # i+1 to skip bh_df

print(f"Backtest results saved to '{backtest_dir}'")

In [None]:
import IPython
IPython.display.Audio("file_example_MP3_1MG.mp3", autoplay=True)