In [None]:
import ta
from sklearn.ensemble import RandomForestRegressor
from darts import TimeSeries
from darts.dataprocessing.transformers import Scaler
from darts import concatenate
from darts.utils.timeseries_generation import datetime_attribute_timeseries as dt_attr
from darts.models import NaiveDrift, NaiveSeasonal
import matplotlib.pyplot as plt
import numpy as np
import torch
import plotly.graph_objects as go
from darts.metrics import mae, rmse, mse, mape


torch.manual_seed(1)
np.random.seed(1)

In [None]:
#actual: actual series
#prediction: predicted series
#print_single can be [True, False]. If True print one prediction associated to 1 model
#otherwise a list of predictions associated to different models
def print_graph_backtesting(actual, prediction, print_single, model, horizon):
    fig = go.Figure()


    fig.add_trace(go.Scatter(x=actual.time_index,
                             y=actual.values().flatten(),
                             name='Real test points'))

    if print_single:
        fig.add_trace(go.Scatter(x=prediction.time_index,
                                 y=prediction.values().flatten(),
                                 name=f'Predicted points ({model})',
                                ))
    else:
        for key, pre in prediction.items():
            fig.add_trace(go.Scatter(x=pre.time_index,
                                     y=pre.values().flatten(),
                                     name=f'Predicted points ({key})',
                                    ))

    fig.update_layout(title_text = f"Backtesting for Bitcoin prices with Forecasting Horizon = {horizon}")
    fig.update_xaxes(title_text="Date")
    fig.update_yaxes(title_text="Price")
    fig.show()

#Collect Bitcoin data

In [None]:
dates = ['2013-04-01', '2016-07-19', '2017-04-01', '2019-12-31']
data_sets = []

In [None]:
import requests
import pandas as pd
from datetime import datetime

def fetch_crypto_compare_data(symbol, comparison_symbol, limit, to_timestamp):
    url = 'https://min-api.cryptocompare.com/data/v2/histoday'
    params = {
        'fsym': symbol,
        'tsym': comparison_symbol,
        'limit': limit,
        'toTs': to_timestamp
    }
    response = requests.get(url, params=params)
    data = response.json()['Data']['Data']
    return data

def get_crypto_compare_ohlcv(symbol, comparison_symbol, start_date, end_date):
    df = pd.DataFrame()
    current_date = end_date

    while current_date > start_date:
        data = fetch_crypto_compare_data(symbol, comparison_symbol, 2000, int(current_date.timestamp()))
        current_date = datetime.fromtimestamp(data[0]['time'])
        df = pd.concat([pd.DataFrame(data), df], ignore_index=True)

    df['time'] = pd.to_datetime(df['time'], unit='s')
    df = df[(df['time'] >= start_date) & (df['time'] <= end_date)]
    return df

In [None]:
data_sets = []

In [None]:
date_format = "%Y-%m-%d"

for i in range(len(dates) - 1):
  start_date = datetime.strptime(dates[0], date_format)
  end_date = datetime.strptime(dates[i + 1], date_format)
  df = get_crypto_compare_ohlcv('BTC', 'USD', start_date, end_date)
  df['time'] = pd.to_datetime(df['time'])
  df.set_index('time', inplace=True)
  print(pd.infer_freq(df.index))
  del df['conversionType']
  del df['conversionSymbol']
  df = ta.utils.dropna(df)
  df = ta.add_all_ta_features(
    df, "open", "high", "low", "close", "volumeto", fillna=True
  )
  df = df.fillna(0)
  data_sets.append(df)

In [None]:
data_sets[2] = data_sets[2][~data_sets[2].index.duplicated(keep='first')]
print(pd.infer_freq(data_sets[2].index))

# Feature engineering

In [None]:
past_covs = []
past_covs_scaled = []
future_covs_scaled = []
targets = []
trains_scaled = []
tests_scaled = []
targets_scaled = []
tests = []
scalers = []

In [None]:
for d in data_sets:
  split_index = int(len(d) * 0.8)  # 80% for training, 20% for testing
  X_train = d.iloc[:split_index].drop('close', axis=1)
  y_train = d.iloc[:split_index]['close']
  X_test = d.iloc[split_index:].drop('close', axis=1)
  y_test = d.iloc[split_index:]['close']
  clf = RandomForestRegressor(n_jobs=-1)
  clf.fit(X_train, y_train)
  clf.score(X_test, y_test)
  features = {k: v for k, v in sorted(zip(X_train.columns, clf.feature_importances_), key=lambda x: x[1], reverse=True)}
  filtered_features = {feature: importance for feature, importance in features.items() if importance >= 0.01}
  filtered_features
  filtered_feature_names = list(filtered_features.keys())
  d = d.reset_index()
  covs = TimeSeries.from_dataframe(d, time_col="time", value_cols=filtered_feature_names)
  covs = covs.astype(np.float32)
  past_covs.append(covs)
  target = TimeSeries.from_dataframe(d, time_col="time", value_cols=["close"])
  target = target.astype(np.float32)
  targets.append(target)


# Create Train, Validation and Test Set + Past and Future Covariates

In [None]:
#create train and test series and scale them
for t in targets:

  split_index = int(len(t) * 0.8)
  train_target = t[:split_index]

  test_target = t[split_index:]

  tests.append(test_target)

  scaler_target = Scaler()

  scalers.append(scaler_target)

  scaler_target = scaler_target.fit(train_target)


  train_scaled = scaler_target.transform(train_target)
  test_scaled = scaler_target.transform(test_target)
  target_scaled = concatenate([train_scaled, test_scaled])

  trains_scaled.append(train_scaled)
  tests_scaled.append(test_scaled)
  targets_scaled.append(target_scaled)

  past_cov = concatenate(
    [
        dt_attr(t.time_index, "month", dtype=np.float32),
        dt_attr(t.time_index, "year", dtype=np.float32),
        dt_attr(t.time_index, "day", dtype=np.float32),
        dt_attr(t.time_index, "dayofweek", dtype=np.float32),
        dt_attr(t.time_index, "week", dtype=np.float32),
        dt_attr(t.time_index, "dayofyear", dtype=np.float32),
        past_covs[targets.index(t)],
    ],
    axis="component",
  )


  scaler_ft_past = Scaler()

  past_cov = scaler_ft_past.fit_transform(past_cov)
  past_covs_scaled.append(past_cov)

  future_cov = concatenate(
    [
        dt_attr(t.time_index, "month", dtype=np.float32),
        dt_attr(t.time_index, "year", dtype=np.float32),
        dt_attr(t.time_index, "day", dtype=np.float32),
        dt_attr(t.time_index, "dayofweek", dtype=np.float32),
        dt_attr(t.time_index, "week", dtype=np.float32),
        dt_attr(t.time_index, "dayofyear", dtype=np.float32),
    ],
    axis="component",
  )

  scaler_ft_future = Scaler()

  future_cov = scaler_ft_future.fit_transform(future_cov)
  future_covs_scaled.append(future_cov)

# Create csv file where to store all of the Error Metrics

In [None]:
import csv

headers = ['Model', 'MAE', 'RMSE', 'MSE', 'MAPE']
separator = ['-', '-', '-', '-', '-']

with open('error_metrics.csv', 'w', newline='') as file:
    writer = csv.writer(file)

    # Write the header
    writer.writerow(headers)

def add_row(row):
  with open('error_metrics.csv', 'a', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(row)

# Naive Models

In [None]:
naive_drift_model = NaiveDrift()
naive_model = NaiveSeasonal()
naive_seasonal_model = NaiveSeasonal(7)

# Backtesting Naive Drift Model

In [None]:
bt_horizons = [1, 7 , 30, 90]
from datetime import timedelta

In [None]:
backtesting_Drift = [[], [], []]
backtesting_Naive = [[], [], []]
backtesting_NaiveSeasonal = [[], [], []]

In [None]:
for t in targets_scaled:
  for h in bt_horizons:
    hist_Drift = naive_drift_model.historical_forecasts(
            t,
            start=tests_scaled[targets_scaled.index(t)].time_index[0] - timedelta(days=h),
            forecast_horizon=h,
            stride=1,
            verbose=True,
      )
    hist_predict_Drift = scalers[targets_scaled.index(t)].inverse_transform(hist_Drift)
    backtesting_Drift[targets_scaled.index(t)].append(hist_predict_Drift)

In [None]:
for i in range(3):
  for b in backtesting_Drift[i]:
    h = bt_horizons[backtesting_Drift[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'naive_drift_model',h)

In [None]:
add_row(['Backtesting Naive Drift Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_Drift[i]:
    print(f"MAE of Naive Drift Model forecasting horizon = {bt_horizons[backtesting_Drift[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of Naive Drift Model forecasting horizon = {bt_horizons[backtesting_Drift[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of Naive Drift Model forecasting horizon = {bt_horizons[backtesting_Drift[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of Naive Drift Model forecasting horizon = {bt_horizons[backtesting_Drift[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'Naive Drift fh={bt_horizons[backtesting_Drift[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# Backtesting Naive Model

In [None]:
for t in targets_scaled:
  for h in bt_horizons:
    hist_Naive = naive_model.historical_forecasts(
            t,
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
      )
    hist_predict_Naive = scalers[targets_scaled.index(t)].inverse_transform(hist_Naive)
    backtesting_Naive[targets_scaled.index(t)].append(hist_predict_Naive)

In [None]:
for i in range(3):
  for b in backtesting_Naive[i]:
    h = bt_horizons[backtesting_Naive[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'naive_model',h)

In [None]:
add_row(['Backtesting Naive Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_Naive[i]:
    print(f"MAE of Naive Model forecasting horizon = {bt_horizons[backtesting_Naive[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of Naive Model forecasting horizon = {bt_horizons[backtesting_Naive[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of Naive Model forecasting horizon = {bt_horizons[backtesting_Naive[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of Naive Model forecasting horizon = {bt_horizons[backtesting_Naive[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'Naive fh={bt_horizons[backtesting_Naive[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# Backtesting Naive Seasonal Model

In [None]:
for t in targets_scaled:
  for h in bt_horizons:
    hist_NaiveSeasonal = naive_seasonal_model.historical_forecasts(
            t,
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
      )
    hist_predict_NaiveSeasonal = scalers[targets_scaled.index(t)].inverse_transform(hist_NaiveSeasonal)
    backtesting_NaiveSeasonal[targets_scaled.index(t)].append(hist_predict_NaiveSeasonal)

In [None]:
for i in range(3):
  for b in backtesting_NaiveSeasonal[i]:
    h = bt_horizons[backtesting_NaiveSeasonal[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'naive_seasonal_model',h)

In [None]:
add_row(['Backtesting Naive Seasonal Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_NaiveSeasonal[i]:
    print(f"MAE of Naive Seasonal Model forecasting horizon = {bt_horizons[backtesting_NaiveSeasonal[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of Naive Seasonal Model forecasting horizon = {bt_horizons[backtesting_NaiveSeasonal[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of Naive Seasonal Model forecasting horizon = {bt_horizons[backtesting_NaiveSeasonal[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of Naive Seasonal Model forecasting horizon = {bt_horizons[backtesting_NaiveSeasonal[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'Naive Seasonal fh={bt_horizons[backtesting_NaiveSeasonal[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# Random Forest

In [None]:
from darts.models import RandomForest

def create_RF():
  return RandomForest(
      lags=[-i for i in range(1, 31)] + [-90, -180],  # Short-term and long-term lags
      lags_past_covariates=[-i for i in range(1, 31)] + [-90, -180],
      n_estimators=100,  # Number of trees in the forest
      max_depth=15,  # Maximum depth of each tree
  )

# Backtesting Random Forest

In [None]:
backtesting_RF = [[], [], []]
for t in targets_scaled:
  RF_model = create_RF()
  RF_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_RF = RF_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0] - timedelta(days=h),
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_RF = scalers[targets_scaled.index(t)].inverse_transform(hist_RF)
    backtesting_RF[targets_scaled.index(t)].append(hist_predict_RF)

In [None]:
for i in range(3):
  for b in backtesting_RF[i]:
    h = bt_horizons[backtesting_RF[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'Random Forest',h)

In [None]:
add_row(['Backtesting Random Forest','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_RF[i]:
    print(f"MAE of RF forecasting horizon = {bt_horizons[backtesting_RF[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of RF forecasting horizon = {bt_horizons[backtesting_RF[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of RF forecasting horizon = {bt_horizons[backtesting_RF[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of RF forecasting horizon = {bt_horizons[backtesting_RF[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'Random Forest fh={bt_horizons[backtesting_RF[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# LSTM Model

In [None]:
from darts.models import TCNModel, BlockRNNModel, TransformerModel, TFTModel


In [None]:
def create_LSTM():
  return BlockRNNModel(
      input_chunk_length=21,
      output_chunk_length=7,
      hidden_dim=104,
      model='LSTM',
      n_rnn_layers=4,
      dropout=0.18390991552139796,
      optimizer_kwargs={'lr': 0.00014678997505613995},
  )

In [None]:
backtesting_LSTM = [[], [], []]

for t in targets_scaled:
  LSTM_model = create_LSTM()
  LSTM_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_LSTM = LSTM_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_LSTM = scalers[targets_scaled.index(t)].inverse_transform(hist_LSTM)
    backtesting_LSTM[targets_scaled.index(t)].append(hist_predict_LSTM)

In [None]:
for i in range(3):
  for b in backtesting_LSTM[i]:
    h = bt_horizons[backtesting_LSTM[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'LSTM model',h)

In [None]:
add_row(['Backtesting LSTM Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_LSTM[i]:
    print(f"MAE of LSTM forecasting horizon = {bt_horizons[backtesting_LSTM[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of LSTM forecasting horizon = {bt_horizons[backtesting_LSTM[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of LSTM forecasting horizon = {bt_horizons[backtesting_LSTM[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of LSTM forecasting horizon = {bt_horizons[backtesting_LSTM[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'LSTM fh={bt_horizons[backtesting_LSTM[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# VanillaRNN Model

In [None]:
def create_vanillaRNN():
  return BlockRNNModel(
      input_chunk_length=21,
      output_chunk_length=7,
      model='RNN',
      hidden_dim=31,
      n_rnn_layers=1,
      dropout=0.34008569567908437,
      optimizer_kwargs={'lr': 0.0009241588491743642},
  )

In [None]:
backtesting_VanillaRNN = [[], [], []]

for t in targets_scaled:
  vanillaRNN_model = create_vanillaRNN()
  vanillaRNN_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_VanillaRNN = vanillaRNN_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_VanillaRNN = scalers[targets_scaled.index(t)].inverse_transform(hist_VanillaRNN)
    backtesting_VanillaRNN[targets_scaled.index(t)].append(hist_predict_VanillaRNN)

In [None]:
for i in range(3):
  for b in backtesting_VanillaRNN[i]:
    h = bt_horizons[backtesting_VanillaRNN[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'Vanilla RNN',h)

In [None]:
add_row(['Backtesting RNN Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_VanillaRNN[i]:
    print(f"MAE of RNN forecasting horizon = {bt_horizons[backtesting_VanillaRNN[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of RNN forecasting horizon = {bt_horizons[backtesting_VanillaRNN[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of RNN forecasting horizon = {bt_horizons[backtesting_VanillaRNN[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of RNN forecasting horizon = {bt_horizons[backtesting_VanillaRNN[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'RNN fh={bt_horizons[backtesting_VanillaRNN[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# GRU Model

In [None]:
def create_GRU():
  return BlockRNNModel(
      input_chunk_length=21,
      output_chunk_length=7,
      model='GRU',
      hidden_dim=75,
      n_rnn_layers=1,
      dropout=0.007824503730612165,
      optimizer_kwargs={'lr': 0.000566687831423731},
  )

In [None]:
backtesting_GRU = [[], [], []]

for t in targets_scaled:
  GRU_model = create_GRU()
  GRU_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_GRU = GRU_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_GRU = scalers[targets_scaled.index(t)].inverse_transform(hist_GRU)
    backtesting_GRU[targets_scaled.index(t)].append(hist_predict_GRU)

In [None]:
for i in range(3):
  for b in backtesting_GRU[i]:
    h = bt_horizons[backtesting_GRU[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'GRU model',h)

In [None]:
add_row(['Backtesting GRU Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_GRU[i]:
    print(f"MAE of GRU forecasting horizon = {bt_horizons[backtesting_GRU[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of GRU forecasting horizon = {bt_horizons[backtesting_GRU[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of GRU forecasting horizon = {bt_horizons[backtesting_GRU[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of GRU forecasting horizon = {bt_horizons[backtesting_GRU[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'GRU fh={bt_horizons[backtesting_GRU[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# Transformer Model

In [None]:
def create_Transformer():
  return TransformerModel(
      input_chunk_length = 21,
      output_chunk_length = 7,
      d_model=88,
      nhead=4,
      num_encoder_layers=4,
      num_decoder_layers=1,
      dropout=0.14295488378943239,
      optimizer_kwargs={'lr': 0.00046254166100837376},
  )

In [None]:
backtesting_Transformer = [[], [], []]

for t in targets_scaled:
  transformer_model = create_Transformer()
  transformer_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_Transformer = transformer_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_Transformer = scalers[targets_scaled.index(t)].inverse_transform(hist_Transformer)
    backtesting_Transformer[targets_scaled.index(t)].append(hist_predict_Transformer)

In [None]:
for i in range(3):
  for b in backtesting_Transformer[i]:
    h = bt_horizons[backtesting_Transformer[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'Transformer Model',h)

In [None]:
add_row(['Backtesting Transformer Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_Transformer[i]:
    print(f"MAE of Transformer Model forecasting horizon = {bt_horizons[backtesting_Transformer[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of Transformer Model forecasting horizon = {bt_horizons[backtesting_Transformer[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of Transformer Model forecasting horizon = {bt_horizons[backtesting_Transformer[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of Transformer Model forecasting horizon = {bt_horizons[backtesting_Transformer[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'Transformer fh={bt_horizons[backtesting_Transformer[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# TFT Model

In [None]:
def create_TFT():
  return TFTModel(
      input_chunk_length=21,
      output_chunk_length=7,
      hidden_size=30,
      lstm_layers=1,
      num_attention_heads=4,
      dropout=0.06118422728497173,
      hidden_continuous_size=10,
      optimizer_kwargs={'lr': 0.005081664304452033},
  )

In [None]:
backtesting_TFT = [[], [], []]

for t in targets_scaled:
  TFT_model = create_TFT()
  TFT_model.fit(trains_scaled[targets_scaled.index(t)],
                past_covariates = past_covs_scaled[targets_scaled.index(t)],
                future_covariates = future_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_TFT = TFT_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            future_covariates = future_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_TFT = scalers[targets_scaled.index(t)].inverse_transform(hist_TFT)
    backtesting_TFT[targets_scaled.index(t)].append(hist_predict_TFT)

In [None]:
for i in range(3):
  for b in backtesting_TFT[i]:
    h = bt_horizons[backtesting_TFT[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'TFT Model',h)

In [None]:
add_row(['Backtesting TFT Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_TFT[i]:
    print(f"MAE of TFT Model forecasting horizon = {bt_horizons[backtesting_TFT[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of TFT Model forecasting horizon = {bt_horizons[backtesting_TFT[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of TFT Model forecasting horizon = {bt_horizons[backtesting_TFT[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of TFT Model forecasting horizon = {bt_horizons[backtesting_TFT[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'TFT fh={bt_horizons[backtesting_TFT[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# NBEATS Model

In [None]:
from darts.models import NBEATSModel

def create_NBEATS():
  return NBEATSModel(
      input_chunk_length=21,
      output_chunk_length=7,
      num_stacks=19,
      num_blocks=4,
      num_layers=4,
      layer_widths=378,
      expansion_coefficient_dim=8,
      dropout=0.007293860107310261,
      add_encoders =  {
          'cyclic': {'past': ['month', 'dayofweek', 'dayofyear']},  # Capture seasonal trends
          'datetime_attribute': {'past': ['day', 'week', 'year']},
          'position': {'past': ['relative']},
          'transformer': Scaler(),
      },
      optimizer_kwargs={'lr': 0.0005903374698873167},
  )

In [None]:
backtesting_NBEATS = [[], [], []]

for t in targets_scaled:
  NBEATS_model = create_NBEATS()
  NBEATS_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_NBEATS = NBEATS_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_NBEATS = scalers[targets_scaled.index(t)].inverse_transform(hist_NBEATS)
    backtesting_NBEATS[targets_scaled.index(t)].append(hist_predict_NBEATS)

In [None]:
for i in range(3):
  for b in backtesting_NBEATS[i]:
    h = bt_horizons[backtesting_NBEATS[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'NBEATS Model',h)

In [None]:
add_row(['Backtesting NBEATS Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_NBEATS[i]:
    print(f"MAE of NBEATS Model forecasting horizon = {bt_horizons[backtesting_NBEATS[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of NBEATS Model forecasting horizon = {bt_horizons[backtesting_NBEATS[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of NBEATS Model forecasting horizon = {bt_horizons[backtesting_NBEATS[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of NBEATS Model forecasting horizon = {bt_horizons[backtesting_NBEATS[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'NBEATS fh={bt_horizons[backtesting_NBEATS[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# TCN Model

In [None]:
def create_TCN():
  return TCNModel(
      input_chunk_length=21,
      output_chunk_length=7,
      kernel_size=8,
      num_filters=36,
      weight_norm= False,
      dilation_base=2,
      dropout= 0.14321708644925318,
      optimizer_kwargs={'lr': 0.0005056311276120884},
      random_state=0,
      add_encoders = {
          'cyclic': {'past': ['month', 'dayofweek', 'dayofyear']},  # Capture seasonal trends
          'datetime_attribute': {'past': ['day', 'week', 'year']},
          'position': {'past': ['relative']},
          'transformer': Scaler(),
      }
  )

In [None]:
backtesting_TCN = [[], [], []]

for i in range(3):
  TCN_model = create_TCN()
  TCN_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_TCN = TCN_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_TCN = scalers[targets_scaled.index(t)].inverse_transform(hist_TCN)
    backtesting_TCN[targets_scaled.index(t)].append(hist_predict_TCN)

In [None]:
for i in range(3):
  for b in backtesting_TCN[i]:
    h = bt_horizons[backtesting_TCN[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'TCN Model',h)

In [None]:
add_row(['Backtesting TCN Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_TCN[i]:
    print(f"MAE of TCN Model forecasting horizon = {bt_horizons[backtesting_TCN[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of TCN Model forecasting horizon = {bt_horizons[backtesting_TCN[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of TCN Model forecasting horizon = {bt_horizons[backtesting_TCN[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of TCN Model forecasting horizon = {bt_horizons[backtesting_TCN[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'TCN fh={bt_horizons[backtesting_TCN[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# LightGBM Model

In [None]:
from darts.models.forecasting.lgbm import LightGBMModel

def create_LightGBM():
  return LightGBMModel(
      lags=[-i for i in range(1, 31)] + [-90, -180],  # Short-term and long-term lags
      lags_past_covariates=[-i for i in range(1, 31)] + [-90, -180],
      output_chunk_length = 7,
  )

In [None]:
backtesting_Transformer = [[], [], []]

for t in targets_scaled:
  transformer_model = create_Transformer()
  transformer_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_Transformer = transformer_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_Transformer = scalers[targets_scaled.index(t)].inverse_transform(hist_Transformer)
    backtesting_Transformer[targets_scaled.index(t)].append(hist_predict_Transformer)

In [None]:
backtesting_LightGBM = [[], [], []]
for t in targets_scaled:
  LightGBM_model = create_LightGBM()
  LightGBM_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_LightGBM = LightGBM_model.historical_forecasts(
            t,
            past_covariates =  past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )
    hist_predict_LightGBM = scalers[targets_scaled.index(t)].inverse_transform(hist_LightGBM)
    backtesting_LightGBM[targets_scaled.index(t)].append(hist_predict_LightGBM)

In [None]:
for i in range(3):
  for b in backtesting_LightGBM[i]:
    h = bt_horizons[backtesting_LightGBM[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'LightGBM Model',h)

In [None]:
add_row(['Backtesting LightGBM Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_LightGBM[i]:
    print(f"MAE of LightGBM forecasting horizon = {bt_horizons[backtesting_LightGBM[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of LightGBM forecasting horizon = {bt_horizons[backtesting_LightGBM[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of LightGBM forecasting horizon = {bt_horizons[backtesting_LightGBM[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of LightGBM forecasting horizon = {bt_horizons[backtesting_LightGBM[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'LightGBM fh={bt_horizons[backtesting_LightGBM[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# XGB Model

In [None]:
from darts.models import XGBModel

def create_XGB():
  return XGBModel (
      lags=[-i for i in range(1, 31)] + [-90, -180],  # Short-term and long-term lags
      lags_past_covariates=[-i for i in range(1, 31)] + [-90, -180],
      output_chunk_length = 7,
  )

In [None]:
backtesting_LightGBM = [[], [], []]
for t in targets_scaled:
  LightGBM_model = create_LightGBM()
  LightGBM_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_LightGBM = LightGBM_model.historical_forecasts(
            t,
            past_covariates =  past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )
    hist_predict_LightGBM = scalers[targets_scaled.index(t)].inverse_transform(hist_LightGBM)
    backtesting_LightGBM[targets_scaled.index(t)].append(hist_predict_LightGBM)

In [None]:
backtesting_XGB = [[], [], []]

for t in targets_scaled:
  XGB_model = create_XGB()
  XGB_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_XGB = XGB_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start = tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_XGB = scalers[targets_scaled.index(t)].inverse_transform(hist_XGB)
    backtesting_XGB[targets_scaled.index(t)].append(hist_predict_XGB)

In [None]:
for i in range(3):
  for b in backtesting_XGB[i]:
    h = bt_horizons[backtesting_XGB[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'XGB Model',h)

In [None]:
add_row(['Backtesting XGB Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_XGB[i]:
    print(f"MAE of XGB forecasting horizon = {bt_horizons[backtesting_XGB[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of XGB forecasting horizon = {bt_horizons[backtesting_XGB[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of XGB forecasting horizon = {bt_horizons[backtesting_XGB[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of XGB forecasting horizon = {bt_horizons[backtesting_XGB[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'XGB fh={bt_horizons[backtesting_XGB[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)

# CatBoost Model

In [None]:
from darts.models import CatBoostModel

def create_CatBoost():
  return CatBoostModel(
      lags=[-i for i in range(1, 31)] + [-90, -180],  # Short-term and long-term lags
      lags_past_covariates=[-i for i in range(1, 31)] + [-90, -180],
      output_chunk_length = 7,
  )

In [None]:
backtesting_XGB = [[], [], []]

for t in targets_scaled:
  XGB_model = create_XGB()
  XGB_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_XGB = XGB_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start = tests_scaled[targets_scaled.index(t)].time_index[0],
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_XGB = scalers[targets_scaled.index(t)].inverse_transform(hist_XGB)
    backtesting_XGB[targets_scaled.index(t)].append(hist_predict_XGB)

In [None]:
backtesting_CatBoost = [[], [], []]

for t in targets_scaled:
  CatBoost_model = create_CatBoost()
  CatBoost_model.fit(trains_scaled[targets_scaled.index(t)], past_covariates = past_covs_scaled[targets_scaled.index(t)])
  for h in bt_horizons:
    hist_CatBoost = CatBoost_model.historical_forecasts(
            t,
            past_covariates = past_covs_scaled[targets_scaled.index(t)],
            start=tests_scaled[targets_scaled.index(t)].time_index[0],,
            forecast_horizon=h,
            stride=1,
            verbose=True,
            retrain=False
      )

    hist_predict_CatBoost = [targets_scaled.index(t)].inverse_transform(hist_CatBoost)
    backtesting_CatBoost[targets_scaled.index(t)].append(hist_predict_CatBoost)

In [None]:
for i in range(3):
  for b in backtesting_CatBoost[i]:
    h = bt_horizons[backtesting_CatBoost[i].index(b)]
    print_graph_backtesting(tests[i], b, True, 'CatBoost Model',h)

In [None]:
add_row(['Backtesting Catboost Model','','','',''])
for i in range(3):
  add_row([f'time period = {i + 1}', '', '', '', ''])
  for b in backtesting_CatBoost[i]:
    print(f"MAE of CatBoost forecasting horizon = {bt_horizons[backtesting_CatBoost[i].index(b)]}: {mae(tests[i], b)}")
    print(f"RMSE of CatBoost forecasting horizon = {bt_horizons[backtesting_CatBoost[i].index(b)]}: {rmse(tests[i], b)}")
    print(f"MSE of CatBoost forecasting horizon = {bt_horizons[backtesting_CatBoost[i].index(b)]}: {mse(tests[i], b)}")
    print(f"MAPE of CatBoost forecasting horizon = {bt_horizons[backtesting_CatBoost[i].index(b)]}: {mape(tests[i], b)}")
    print("\n")
    row = [f'Catboost fh={bt_horizons[backtesting_CatBoost[i].index(b)]}', mae(tests[i], b), rmse(tests[i], b), mse(tests[i], b), mape(tests[i], b) ]
    add_row(row)