In [1]:
MODEL_NAME = "tft-nb-4"

In [2]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

from multiprocessing.dummy import freeze_support
import os
import sys
from dotenv import load_dotenv
load_dotenv()
os.environ['WANDB_NOTEBOOK_NAME'] = 'pytorch_stats_own_data.ipynb'
os.environ['WANDB_API_KEY'] = os.getenv('WANDB_API_KEY')

nb_dir = os.path.split(os.getcwd())[0]
if nb_dir not in sys.path:
    sys.path.append(nb_dir)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch

from darts import TimeSeries
from darts.models import TFTModel
from darts.dataprocessing.transformers import Scaler, MissingValuesFiller
from darts.utils.likelihood_models import QuantileRegression
from darts.metrics import mape, r2_score, rmse, mse
from darts import TimeSeries

import helper
import glob

import wandb
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.strategies import DDPStrategy, ddp2
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping

from tqdm.contrib.concurrent import process_map
import tqdm


AVAILABLE_GPUS = torch.cuda.device_count()
AVAILABLE_CPUS = os.cpu_count()
TRAINING_DATA_PATH = "../../../Data/london_clean/*.csv"

print(f"Available GPUs: {AVAILABLE_GPUS}")
print(f"Available CPUs: {AVAILABLE_CPUS}")



Available GPUs: 2
Available CPUs: 32
Available CPUs: 32


# Data

Taking the first 300 households from the London Dataset and converting them to a Darts TimeSeries.

In [3]:
def reader(x):
    return TimeSeries.from_csv(x, time_col='DateTime', value_cols='KWHhh',
                            fill_missing_dates=True, fillna_value=True, freq="30min").astype(np.float32)


def splitter():
    file_list = sorted(glob.glob("../../../Data/london_clean/*.csv"))[:1000]
    if file_list == []:
        raise Exception("No files found")
    return process_map(reader, file_list, chunksize=5)

if __name__ == "__main__":
    freeze_support()
    my_time_series_dataset = splitter()

  0%|          | 0/1000 [00:00<?, ?it/s]

In [4]:
## sets
training_sets = []
validation_sets = []
for x in my_time_series_dataset:
    train, val = x.split_after(0.85)
    training_sets.append(train)
    validation_sets.append(val)

# Model

We create a N-Beats model that utilizes the GPU, Weights, Biases logger and early stopping callback.

## Early stopping

An early stopping callback is used to stop the training if the validation loss does not improve after a certain number of epochs.


In [5]:
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
early_stop_callback = EarlyStopping(
    monitor="val_loss",
    min_delta=0.01,
    patience=10,
    verbose=True,
    mode="min"
    )

In [6]:
encoders = {
    # "datetime_attribute": {"future": ["DateTime"], "past": ["DateTime"]},
    "position": {"past": ["absolute"], "future": ["relative"]},
    "transformer": Scaler(),
}

In [7]:
wandb_logger = WandbLogger(project="Digital-Energy", log_model=True)


# input chunk = The length of the input sequence fed to the model
# output chunk = The length of the output sequence predicted by the model


# default quantiles for QuantileRegression
quantiles = [
    0.01,
    0.05,
    0.1,
    0.15,
    0.2,
    0.25,
    0.3,
    0.4,
    0.5,
    0.6,
    0.7,
    0.75,
    0.8,
    0.85,
    0.9,
    0.95,
    0.99,
]

input_chunk_length = 96
forecast_horizon = 1

model = TFTModel(
    input_chunk_length=input_chunk_length,
    output_chunk_length=forecast_horizon,
    hidden_size=89,
    lstm_layers=7,
    num_attention_heads=6,
    dropout=0.04,
    batch_size=2048,
    n_epochs=100,
    add_relative_index=True,
    add_encoders=encoders,
    work_dir="../../../Models",
    save_checkpoints=False,
    pl_trainer_kwargs={
    "enable_progress_bar": True,
    "enable_model_summary": True,
    "accelerator": "gpu",
    "devices": [1],
    "logger": wandb_logger,
    "callbacks": [early_stop_callback]
    },
    likelihood=QuantileRegression(
        quantiles=quantiles
    ),  # QuantileRegression is set per default
    random_state=42,
)

[34m[1mwandb[0m: Currently logged in as: [33mtimmermansjoy[0m. Use [1m`wandb login --relogin`[0m to force relogin


Exception in thread Thread-16:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/wandb/apis/normalize.py", line 22, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/wandb/sdk/internal/internal_api.py", line 1434, in upload_urls
    raise CommError(f"Run does not exist {entity}/{project}/{run_id}.")
wandb.errors.CommError: Run does not exist timmermansjoy/digital-energy/12ywakxf.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.9/dist-packages/wandb/filesync/upload_job.py", line 56, in run
    success = self.push()
  File "/usr/local/lib/python3.9/dist-packages/wandb/filesync/upload_job.py", line 107, in push
    _, upload_headers, result = self._api.upload_urls(project, [self.save_name])
  File "/usr/local/lib/python3.9/dist-packages/wandb/ap

In [8]:
#wandb_logger.watch(model_nbeats) # sadly this feature does not work for Darts models
model.fit(series=training_sets, val_series=validation_sets, num_loader_workers=AVAILABLE_CPUS//2)

[2022-05-10 09:49:33,063] INFO | darts.models.forecasting.torch_forecasting_model | Train dataset contains 500000 samples.
[2022-05-10 09:49:33,063] INFO | darts.models.forecasting.torch_forecasting_model | Train dataset contains 500000 samples.
2022-05-10 09:49:33 darts.models.forecasting.torch_forecasting_model INFO: Train dataset contains 500000 samples.
[2022-05-10 09:49:33,084] INFO | darts.models.forecasting.torch_forecasting_model | Time series values are 32-bits; casting model to float32.
[2022-05-10 09:49:33,084] INFO | darts.models.forecasting.torch_forecasting_model | Time series values are 32-bits; casting model to float32.
2022-05-10 09:49:33 darts.models.forecasting.torch_forecasting_model INFO: Time series values are 32-bits; casting model to float32.
2022-05-10 09:49:33 pytorch_lightning.utilities.rank_zero INFO: GPU available: True, used: True
2022-05-10 09:49:33 pytorch_lightning.utilities.rank_zero INFO: TPU available: False, using: 0 TPU cores
2022-05-10 09:49:33 py

Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

[2022-05-10 09:54:30,198] ERROR | main_logger | ValueError: The dataset contains some time series that are too short to contain `max(self.input_chunk_length, self.shift + self.output_chunk_length)` (551-th series)
[2022-05-10 09:54:30,198] ERROR | main_logger | ValueError: The dataset contains some time series that are too short to contain `max(self.input_chunk_length, self.shift + self.output_chunk_length)` (551-th series)
[2022-05-10 09:54:30,198] ERROR | main_logger | ValueError: The dataset contains some time series that are too short to contain `max(self.input_chunk_length, self.shift + self.output_chunk_length)` (551-th series)
[2022-05-10 09:54:30,198] ERROR | main_logger | ValueError: The dataset contains some time series that are too short to contain `max(self.input_chunk_length, self.shift + self.output_chunk_length)` (551-th series)
[2022-05-10 09:54:30,198] ERROR | main_logger | ValueError: The dataset contains some time series that are too short to contain `max(self.input_

ValueError: Caught ValueError in DataLoader worker process 6.
Original Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/_utils/worker.py", line 287, in _worker_loop
    data = fetcher.fetch(index)
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/_utils/fetch.py", line 49, in fetch
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/usr/local/lib/python3.9/dist-packages/torch/utils/data/_utils/fetch.py", line 49, in <listcomp>
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/usr/local/lib/python3.9/dist-packages/darts/utils/data/sequential_dataset.py", line 327, in __getitem__
    past_target, past_covariate, future_target = self.ds_past[idx]
  File "/usr/local/lib/python3.9/dist-packages/darts/utils/data/shifted_dataset.py", line 530, in __getitem__
    raise_if_not(
  File "/usr/local/lib/python3.9/dist-packages/darts/logging.py", line 84, in raise_if_not
    raise ValueError(message)
ValueError: The dataset contains some time series that are too short to contain `max(self.input_chunk_length, self.shift + self.output_chunk_length)` (551-th series)


wandb: Network error (ReadTimeout), entering retry loop.
wandb: Network error (ReadTimeout), entering retry loop.
wandb: Network error (ReadTimeout), entering retry loop.


In [None]:
START = 3000
for i, x in enumerate(sorted(glob.glob("../../../Data/london_clean/*.csv"))[START:START+10]):

    df = pd.read_csv(x)
    df["DateTime"] = pd.to_datetime(df['DateTime'])
    series = TimeSeries.from_dataframe(df, value_cols=['KWHhh'], time_col="DateTime", fill_missing_dates=True, freq="30min").astype(np.float32)
    series = series[-500:]


    pred_series = model.historical_forecasts(
        series,
        forecast_horizon=1,
        stride=1,
        retrain=False,
        verbose=True,
    )

    print(f"rmse: {rmse(series, pred_series)}.")
    print(f"R2 score: {r2_score(series, pred_series)}.")

    helper.display_forecast(pred_series, series, "1 day", save=True, fig_name=f"{x.split('/')[-1]}-test", fig_size=(20,10))


In [None]:
START = 3000
for i, x in enumerate(sorted(glob.glob("../../../Data/london_clean/*.csv"))[START:START+10]):

    df = pd.read_csv(x)
    df["DateTime"] = pd.to_datetime(df['DateTime'])
    series = TimeSeries.from_dataframe(df, value_cols=['KWHhh'], time_col="DateTime", fill_missing_dates=True, freq="30min").astype(np.float32)
    series = series[-500:]


    pred_series = model.predict(
        1,
        series,
    )

    print(f"rmse: {rmse(series, pred_series)}.")
    print(f"R2 score: {r2_score(series, pred_series)}.")

    helper.display_forecast(pred_series, series, "1 day", save=True, fig_name=f"{i}-test", model_name=f"{MODEL_NAME}", fig_size=(20,10))

# Loading checkpoints of the model

loading the best checkpoint of the model. To compare the results of the model with the previous one.

In [None]:
# load the model
model = TFTModel.load_from_checkpoint(work_dir="../../../Models/", model_name=MODEL_NAME, best=True)

In [None]:
model = TFTModel.load_model(f"../../../Models/{MODEL_NAME}/_model.pth.tar")