#Python Log Management: Using the Log Class

Logging is a crucial aspect in software development as it allows for recording and monitoring the behavior of a program during execution. In Python, the logging module provides a wide range of functionalities for efficient logging.

In this notebook, we'll explore the implementation of a simple Log class that simplifies the usage of the logging module. This class provides methods for logging messages of different severity levels such as DEBUG, INFO, WARNING, ERROR, and CRITICAL. Additionally, it offers flexibility by allowing the user to enable or disable logging, set the logging level, and choose between using the logging module or printing messages to the console.

Let's dive in and see how to use the Log class effectively for managing logs in Python projects.



In [None]:
import logging

class Log:
    _level = logging.INFO  # Default logging level
    _use_logging = True  # Flag to control whether to use logging module or print to console

    @staticmethod
    def setLogLevel(level=logging.INFO):
        """
        Set the logging level for the Log class.

        Args:
            level (int): The logging level to be set.
        """
        Log._level = level
        logging.basicConfig(level=Log._level)

    @staticmethod
    def useLogging(flag=True):
        """
        Enable or disable logging.

        Args:
            flag (bool): If True, enable logging; if False, disable logging.
        """
        Log._use_logging = flag

    @staticmethod
    def debug(message, *args):
        """
        Log a debug message.

        Args:
            message (str): The debug message.
            *args: Optional arguments to format the message.
        """
        Log.__log(message, logging.DEBUG, *args)

    @staticmethod
    def info(message, *args):
        """
        Log an info message.

        Args:
            message (str): The info message.
            *args: Optional arguments to format the message.
        """
        Log.__log(message, logging.INFO, *args)

    @staticmethod
    def warning(message, *args):
        """
        Log a warning message.

        Args:
            message (str): The warning message.
            *args: Optional arguments to format the message.
        """
        Log.__log(message, logging.WARNING, *args)

    @staticmethod
    def error(message, *args):
        """
        Log an error message.

        Args:
            message (str): The error message.
            *args: Optional arguments to format the message.
        """
        Log.__log(message, logging.ERROR, *args)

    @staticmethod
    def critical(message, *args):
        """
        Log a critical message.

        Args:
            message (str): The critical message.
            *args: Optional arguments to format the message.
        """
        Log.__log(message, logging.CRITICAL, *args)

    @staticmethod
    def __log(message, method, *args):
        """
        Internal method to log messages.

        Args:
            message (str): The message to be logged.
            method (int): The logging method (DEBUG, INFO, WARNING, ERROR, CRITICAL).
            *args: Optional arguments to format the message.
        """
        if Log._use_logging:
            logging.log(method, message, *args)
        else:
            if method >= Log._level:
                formatted_message = message % args if args else message
                print(f"[{Log._get_logging_level_name(method)}] {formatted_message}")

    @staticmethod
    def _get_logging_level_name(method):
        """
        Get the name of the logging level.

        Args:
            method (int): The logging method (DEBUG, INFO, WARNING, ERROR, CRITICAL).

        Returns:
            str: The name of the logging level.
        """
        if method == logging.DEBUG:
            return "DEBUG"
        elif method == logging.INFO:
            return "INFO"
        elif method == logging.WARNING:
            return "WARNING"
        elif method == logging.ERROR:
            return "ERROR"
        elif method == logging.CRITICAL:
            return "CRITICAL"
        else:
            return "UNKNOWN"


In [None]:
from torch import Tensor
from typing import Union
import torch
import pandas as pd
from pathlib import Path


def generate_square_subsequent_mask(dim1: int, dim2: int, device) -> Tensor:
    """
    Generates an upper-triangular matrix of -inf, with zeros on diag.
    Modified from:
    https://pytorch.org/tutorials/beginner/transformer_tutorial.html

    Args:

        dim1: int, for both src and tgt masking, this must be target sequence
              length

        dim2: int, for src masking this must be encoder sequence length (i.e.
              the length of the input sequence to the model),
              and for tgt masking, this must be target sequence length


    Return:

        A Tensor of shape [dim1, dim2]
    """
    return torch.triu(torch.ones(dim1, dim2) * float('-inf'), diagonal=1).to(device)


def get_indices_input_target(num_obs, input_len, step_size, forecast_horizon, target_len):
        """
        Produce all the start and end index positions of all sub-sequences.
        The indices will be used to split the data into sub-sequences on which
        the models will be trained.

        Returns a tuple with four elements:
        1) The index position of the first element to be included in the input sequence
        2) The index position of the last element to be included in the input sequence
        3) The index position of the first element to be included in the target sequence
        4) The index position of the last element to be included in the target sequence


        Args:
            num_obs (int): Number of observations in the entire dataset for which
                            indices must be generated.

            input_len (int): Length of the input sequence (a sub-sequence of
                             of the entire data sequence)

            step_size (int): Size of each step as the data sequence is traversed.
                             If 1, the first sub-sequence will be indices 0-input_len,
                             and the next will be 1-input_len.

            forecast_horizon (int): How many index positions is the target away from
                                    the last index position of the input sequence?
                                    If forecast_horizon=1, and the input sequence
                                    is data[0:10], the target will be data[11:taget_len].

            target_len (int): Length of the target / output sequence.
        """

        input_len = round(input_len) # just a precaution
        start_position = 0
        stop_position = num_obs-1 # because of 0 indexing

        subseq_first_idx = start_position
        subseq_last_idx = start_position + input_len
        target_first_idx = subseq_last_idx + forecast_horizon
        target_last_idx = target_first_idx + target_len
        Log.debug("target_last_idx is {}".format(target_last_idx))
        Log.debug("stop_position is {}".format(stop_position))
        indices = []
        while target_last_idx <= stop_position:
            indices.append((subseq_first_idx, subseq_last_idx, target_first_idx, target_last_idx))
            subseq_first_idx += step_size
            subseq_last_idx += step_size
            target_first_idx = subseq_last_idx + forecast_horizon
            target_last_idx = target_first_idx + target_len

        return indices

def get_indices_entire_sequence(data: pd.DataFrame, window_size: int, step_size: int) -> list:
        """
        Produce all the start and end index positions that is needed to produce
        the sub-sequences.

        Returns a list of tuples. Each tuple is (start_idx, end_idx) of a sub-
        sequence. These tuples should be used to slice the dataset into sub-
        sequences. These sub-sequences should then be passed into a function
        that slices them into input and target sequences.

        Args:
            num_obs (int): Number of observations (time steps) in the entire
                           dataset for which indices must be generated, e.g.
                           len(data)

            window_size (int): The desired length of each sub-sequence. Should be
                               (input_sequence_length + target_sequence_length)
                               E.g. if you want the model to consider the past 100
                               time steps in order to predict the future 50
                               time steps, window_size = 100+50 = 150

            step_size (int): Size of each step as the data sequence is traversed
                             by the moving window.
                             If 1, the first sub-sequence will be [0:window_size],
                             and the next will be [1:window_size].

        Return:
            indices: a list of tuples
        """

        stop_position = len(data)-1 # 1- because of 0 indexing

        # Start the first sub-sequence at index position 0
        subseq_first_idx = 0

        subseq_last_idx = window_size

        indices = []

        while subseq_last_idx <= stop_position:

            indices.append((subseq_first_idx, subseq_last_idx))

            subseq_first_idx += step_size

            subseq_last_idx += step_size

        return indices


def read_data(
          file_data_path: Union[str, Path],
          timestamp_col_name: str) -> pd.DataFrame:
    """
    Read data from csv file and return pd.Dataframe object

    Args:

        data_dir: str or Path object specifying the path to the directory
                  containing the data

        target_col_name: str, the name of the column containing the target variable

        timestamp_col_name: str, the name of the column or named index
                            containing the timestamps
    """

    # Ensure that `data_dir` is a Path object
    file_data_path = Path(file_data_path)



    Log.debug("Reading file in {}".format(file_data_path))

    data = pd.read_csv(
        file_data_path,
        parse_dates=[timestamp_col_name],
        index_col=[timestamp_col_name],
        infer_datetime_format=True,
        low_memory=False
    )

    # Make sure all "n/e" values have been removed from df.
    if is_ne_in_df(data):
        raise ValueError("data frame contains 'n/e' values. These must be handled")

    data = to_numeric_and_downcast_data(data)

    # Make sure data is in ascending order by timestamp
    data.sort_values(by=[timestamp_col_name], inplace=True)

    return data

def is_ne_in_df(df:pd.DataFrame):
    """
    Some raw data files contain cells with "n/e". This function checks whether
    any column in a df contains a cell with "n/e". Returns False if no columns
    contain "n/e", True otherwise
    """

    for col in df.columns:

        true_bool = (df[col] == "n/e")

        if any(true_bool):
            return True

    return False


def to_numeric_and_downcast_data(df: pd.DataFrame):
    """
    Downcast columns in df to smallest possible version of it's existing data
    type
    """
    fcols = df.select_dtypes('float').columns

    icols = df.select_dtypes('integer').columns

    df[fcols] = df[fcols].apply(pd.to_numeric, downcast='float')

    df[icols] = df[icols].apply(pd.to_numeric, downcast='integer')

    return df

#Positional Encoding in Transformers: Understanding the PositionalEncoder Class

In Natural Language Processing (NLP), Transformers play a crucial role in various tasks such as language translation and text analysis. A fundamental concept within Transformers is positional encoding, which imparts sequential information to the input data.

This notebook explores the PositionalEncoder class, a vital component for incorporating positional encoding in Transformer models. Derived from PyTorch's Transformer tutorial, this class enriches input embeddings with positional information, enabling Transformers to understand the order of tokens within a sequence.

Through this notebook, we'll dissect the PositionalEncoder class, understand its functionality, and illustrate its role in enhancing Transformer performance in NLP tasks.

Let's dive into positional encoding with the PositionalEncoder class.

In [None]:
import torch
import torch.nn as nn
import math
from torch import nn, Tensor

class PositionalEncoder(nn.Module):
    """
    The authors of the original transformer paper describe very succinctly what
    the positional encoding layer does and why it is needed:

    "Since our model contains no recurrence and no convolution, in order for the
    model to make use of the order of the sequence, we must inject some
    information about the relative or absolute position of the tokens in the
    sequence." (Vaswani et al, 2017)
    Adapted from:
    https://pytorch.org/tutorials/beginner/transformer_tutorial.html
    """

    def __init__(
        self,
        dropout: float=0.1,
        max_seq_len: int=5000,
        d_model: int=512,
        batch_first: bool=False
        ):

        """
        Parameters:
            dropout: the dropout rate
            max_seq_len: the maximum length of the input sequences
            d_model: The dimension of the output of sub-layers in the model
                     (Vaswani et al, 2017)
        """

        super().__init__()

        self.d_model = d_model

        self.dropout = nn.Dropout(p=dropout)

        self.batch_first = batch_first

        # adapted from PyTorch tutorial
        position = torch.arange(max_seq_len).unsqueeze(1)

        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        if self.batch_first:
            pe = torch.zeros(1, max_seq_len, d_model)

            pe[0, :, 0::2] = torch.sin(position * div_term)

            pe[0, :, 1::2] = torch.cos(position * div_term)
        else:
            pe = torch.zeros(max_seq_len, 1, d_model)

            pe[:, 0, 0::2] = torch.sin(position * div_term)

            pe[:, 0, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [batch_size, enc_seq_len, dim_val] or
               [enc_seq_len, batch_size, dim_val]
        """
        if self.batch_first:
            x = x + self.pe[:,:x.size(1)]
        else:
            x = x + self.pe[:x.size(0)]

        return self.dropout(x)


#Time Series Forecasting with TimeSeriesTransformer Class

Time series forecasting is a critical task in various domains, from finance to weather prediction. With the advent of Transformer architectures, the landscape of time series forecasting has witnessed significant advancements.

In this notebook, we'll explore the TimeSeriesTransformer class, which implements a Transformer model tailored for time series forecasting. Inspired by the work of Wu et al. (2020), this class incorporates key concepts from the Transformer architecture to effectively model temporal data.

The TimeSeriesTransformer class is designed with flexibility and performance in mind. It leverages principles from Vaswani et al. (2017) and PyTorch's Transformer module to create a robust framework for time series prediction. Unlike traditional approaches, this class separates input layers, positional encoding layers, and linear mapping layers from the encoder and decoder, enhancing modularity and usability.

For a detailed explanation of the code and its underlying concepts, refer to the accompanying article available here.

Let's delve into time series forecasting using the TimeSeriesTransformer class and explore its capabilities in modeling and predicting temporal data.

In [None]:

import torch.nn as nn
from torch import nn, Tensor
import torch.nn.functional as F

class TimeSeriesTransformer(nn.Module):

    """
    This class implements a transformer model that can be used for times series
    forecasting. This time series transformer model is based on the paper by
    Wu et al (2020) [1]. The paper will be referred to as "the paper".

    A detailed description of the code can be found in my article here:

    https://towardsdatascience.com/how-to-make-a-pytorch-transformer-for-time-series-forecasting-69e073d4061e

    In cases where the paper does not specify what value was used for a specific
    configuration/hyperparameter, this class uses the values from Vaswani et al
    (2017) [2] or from PyTorch source code.

    Unlike the paper, this class assumes that input layers, positional encoding
    layers and linear mapping layers are separate from the encoder and decoder,
    i.e. the encoder and decoder only do what is depicted as their sub-layers
    in the paper. For practical purposes, this assumption does not make a
    difference - it merely means that the linear and positional encoding layers
    are implemented inside the present class and not inside the
    Encoder() and Decoder() classes.

    [1] Wu, N., Green, B., Ben, X., O'banion, S. (2020).
    'Deep Transformer Models for Time Series Forecasting:
    The Influenza Prevalence Case'.
    arXiv:2001.08317 [cs, stat] [Preprint].
    Available at: http://arxiv.org/abs/2001.08317 (Accessed: 9 March 2022).

    [2] Vaswani, A. et al. (2017)
    'Attention Is All You Need'.
    arXiv:1706.03762 [cs] [Preprint].
    Available at: http://arxiv.org/abs/1706.03762 (Accessed: 9 March 2022).

    """

    def __init__(self,
        input_size: int,
        dec_seq_len: int,
        batch_first: bool,
        out_seq_len: int=58,
        dim_val: int=512,
        n_encoder_layers: int=4,
        n_decoder_layers: int=4,
        n_heads: int=8,
        dropout_encoder: float=0.2,
        dropout_decoder: float=0.2,
        dropout_pos_enc: float=0.1,
        dim_feedforward_encoder: int=2048,
        dim_feedforward_decoder: int=2048,
        num_predicted_features: int=1
        ):

        """
        Args:

            input_size: int, number of input variables. 1 if univariate.

            dec_seq_len: int, the length of the input sequence fed to the decoder

            dim_val: int, aka d_model. All sub-layers in the model produce
                     outputs of dimension dim_val

            n_encoder_layers: int, number of stacked encoder layers in the encoder

            n_decoder_layers: int, number of stacked encoder layers in the decoder

            n_heads: int, the number of attention heads (aka parallel attention layers)

            dropout_encoder: float, the dropout rate of the encoder

            dropout_decoder: float, the dropout rate of the decoder

            dropout_pos_enc: float, the dropout rate of the positional encoder

            dim_feedforward_encoder: int, number of neurons in the linear layer
                                     of the encoder

            dim_feedforward_decoder: int, number of neurons in the linear layer
                                     of the decoder

            num_predicted_features: int, the number of features you want to predict.
                                    Most of the time, this will be 1 because we're
                                    only forecasting FCR-N prices in DK2, but in
                                    we wanted to also predict FCR-D with the same
                                    model, num_predicted_features should be 2.
        """

        super().__init__()

        self.dec_seq_len = dec_seq_len

        #print("input_size is: {}".format(input_size))
        #print("dim_val is: {}".format(dim_val))

        # Creating the three linear layers needed for the model
        self.encoder_input_layer = nn.Linear(
            in_features=input_size,
            out_features=dim_val
            )

        self.decoder_input_layer = nn.Linear(
            in_features=input_size,
            out_features=dim_val
            )

        self.linear_mapping = nn.Linear(
            in_features=dim_val,
            out_features=num_predicted_features
            )

        # Create positional encoder
        self.positional_encoding_layer = PositionalEncoder(
            d_model=dim_val,
            dropout=dropout_pos_enc
            )

        # The encoder layer used in the paper is identical to the one used by
        # Vaswani et al (2017) on which the PyTorch module is based.
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=dim_val,
            nhead=n_heads,
            dim_feedforward=dim_feedforward_encoder,
            dropout=dropout_encoder,
            batch_first=batch_first
            )

        # Stack the encoder layers in nn.TransformerDecoder
        # It seems the option of passing a normalization instance is redundant
        # in my case, because nn.TransformerEncoderLayer per default normalizes
        # after each sub-layer
        # (https://github.com/pytorch/pytorch/issues/24930).
        self.encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer,
            num_layers=n_encoder_layers,
            norm=None
            )

        decoder_layer = nn.TransformerDecoderLayer(
            d_model=dim_val,
            nhead=n_heads,
            dim_feedforward=dim_feedforward_decoder,
            dropout=dropout_decoder,
            batch_first=batch_first
            )

        # Stack the decoder layers in nn.TransformerDecoder
        # It seems the option of passing a normalization instance is redundant
        # in my case, because nn.TransformerDecoderLayer per default normalizes
        # after each sub-layer
        # (https://github.com/pytorch/pytorch/issues/24930).
        self.decoder = nn.TransformerDecoder(
            decoder_layer=decoder_layer,
            num_layers=n_decoder_layers,
            norm=None
            )

    def forward(self, src: Tensor, tgt: Tensor, src_mask: Tensor=None,
                tgt_mask: Tensor=None) -> Tensor:
        """
        Returns a tensor of shape:

        [target_sequence_length, batch_size, num_predicted_features]

        Args:

            src: the encoder's output sequence. Shape: (S,E) for unbatched input,
                 (S, N, E) if batch_first=False or (N, S, E) if
                 batch_first=True, where S is the source sequence length,
                 N is the batch size, and E is the number of features (1 if univariate)

            tgt: the sequence to the decoder. Shape: (T,E) for unbatched input,
                 (T, N, E)(T,N,E) if batch_first=False or (N, T, E) if
                 batch_first=True, where T is the target sequence length,
                 N is the batch size, and E is the number of features (1 if univariate)

            src_mask: the mask for the src sequence to prevent the model from
                      using data points from the target sequence

            tgt_mask: the mask for the tgt sequence to prevent the model from
                      using data points from the target sequence


        """

        #print("From model.forward(): Size of src as given to forward(): {}".format(src.size()))
        #print("From model.forward(): tgt size = {}".format(tgt.size()))

        # Pass throguh the input layer right before the encoder
        src = self.encoder_input_layer(src) # src shape: [batch_size, src length, dim_val] regardless of number of input features
        #print("From model.forward(): Size of src after input layer: {}".format(src.size()))

        # Pass through the positional encoding layer
        src = self.positional_encoding_layer(src) # src shape: [batch_size, src length, dim_val] regardless of number of input features
        #print("From model.forward(): Size of src after pos_enc layer: {}".format(src.size()))

        # Pass through all the stacked encoder layers in the encoder
        # Masking is only needed in the encoder if input sequences are padded
        # which they are not in this time series use case, because all my
        # input sequences are naturally of the same length.
        # (https://github.com/huggingface/transformers/issues/4083)
        src = self.encoder( # src shape: [batch_size, enc_seq_len, dim_val]
            src=src
            )
        #print("From model.forward(): Size of src after encoder: {}".format(src.size()))

        # Pass decoder input through decoder input layer
        decoder_output = self.decoder_input_layer(tgt) # src shape: [target sequence length, batch_size, dim_val] regardless of number of input features
        #print("From model.forward(): Size of decoder_output after linear decoder layer: {}".format(decoder_output.size()))

        #if src_mask is not None:
            #print("From model.forward(): Size of src_mask: {}".format(src_mask.size()))
        #if tgt_mask is not None:
            #print("From model.forward(): Size of tgt_mask: {}".format(tgt_mask.size()))

        # Pass throguh decoder - output shape: [batch_size, target seq len, dim_val]
        decoder_output = self.decoder(
            tgt=decoder_output,
            memory=src,
            tgt_mask=tgt_mask,
            memory_mask=src_mask
            )

        #print("From model.forward(): decoder_output shape after decoder: {}".format(decoder_output.shape))

        # Pass through linear mapping
        decoder_output = self.linear_mapping(decoder_output) # shape [batch_size, target seq len]
        #print("From model.forward(): decoder_output size after linear_mapping = {}".format(decoder_output.size()))

        return decoder_output


#Transformer-based Time Series Dataset with TransformerDataset Class

The TransformerDataset class plays a crucial role in preparing time series data for training Transformer models. This class, designed as a PyTorch Dataset, facilitates the slicing and formatting of time series sequences into appropriate input and target pairs for the Transformer model.

In this notebook, we'll explore the functionalities of the TransformerDataset class and understand how it streamlines the preprocessing of time series data. Inspired by the work of Wu et al. (2020) and the principles outlined in Vaswani et al. (2017), this class adheres to best practices in handling temporal data for Transformer-based models.

The TransformerDataset class accepts raw time series data and splits it into sequences suitable for training. Through this notebook, we'll dive into its methods and demonstrate how to use it effectively in conjunction with Transformer models.

Let's dive into the world of time series data preparation using the TransformerDataset class and unlock its potential for enhancing Transformer-based time series forecasting.

In [None]:
import os
import torch
from torch.utils.data import Dataset
import pandas as pd
from typing import Tuple

class TransformerDataset(Dataset):
    """
    Dataset class used for transformer models.

    """
    def __init__(self,
        data: torch.tensor,
        indices: list,
        enc_seq_len: int,
        dec_seq_len: int,
        target_seq_len: int
        ) -> None:

        """
        Args:

            data: tensor, the entire train, validation or test data sequence
                        before any slicing. If univariate, data.size() will be
                        [number of samples, number of variables]
                        where the number of variables will be equal to 1 + the number of
                        exogenous variables. Number of exogenous variables would be 0
                        if univariate.

            indices: a list of tuples. Each tuple has two elements:
                     1) the start index of a sub-sequence
                     2) the end index of a sub-sequence.
                     The sub-sequence is split into src, trg and trg_y later.

            enc_seq_len: int, the desired length of the input sequence given to the
                     the first layer of the transformer model.

            target_seq_len: int, the desired length of the target sequence (the output of the model)

            target_idx: The index position of the target variable in data. Data
                        is a 2D tensor
        """

        super().__init__()

        self.indices = indices

        self.data = data

        Log.info("From get_src_trg: data size = {}".format(data.size()))

        self.enc_seq_len = enc_seq_len

        self.dec_seq_len = dec_seq_len

        self.target_seq_len = target_seq_len



    def __len__(self):

        return len(self.indices)

    def __getitem__(self, index):
        """
        Returns a tuple with 3 elements:
        1) src (the encoder input)
        2) trg (the decoder input)
        3) trg_y (the target)
        """
        # Get the first element of the i'th tuple in the list self.indicesasdfas
        start_idx = self.indices[index][0]

        # Get the second (and last) element of the i'th tuple in the list self.indices
        end_idx = self.indices[index][1]

        sequence = self.data[start_idx:end_idx]

        #Log.debug("From __getitem__: sequence length = {}".format(len(sequence)))

        src, trg, trg_y = self.get_src_trg(
            sequence=sequence,
            enc_seq_len=self.enc_seq_len,
            dec_seq_len=self.dec_seq_len,
            target_seq_len=self.target_seq_len
            )

        return src, trg, trg_y

    def get_src_trg(
        self,
        sequence: torch.Tensor,
        enc_seq_len: int,
        dec_seq_len: int,
        target_seq_len: int
        ) -> Tuple[torch.tensor, torch.tensor, torch.tensor]:

        """
        Generate the src (encoder input), trg (decoder input) and trg_y (the target)
        sequences from a sequence.

        Args:

            sequence: tensor, a 1D tensor of length n where
                    n = encoder input length + target sequence length

            enc_seq_len: int, the desired length of the input to the transformer encoder

            target_seq_len: int, the desired length of the target sequence (the
                            one against which the model output is compared)

        Return:

            src: tensor, 1D, used as input to the transformer model

            trg: tensor, 1D, used as input to the transformer model

            trg_y: tensor, 1D, the target sequence against which the model output
                is compared when computing loss.

        """
        assert len(sequence) == enc_seq_len + target_seq_len, f"Sequence length {len(sequence)} does not equal (input length {enc_seq_len} + target length {target_seq_len})"

        # encoder input
        src = sequence[:enc_seq_len]

        # decoder input. As per the paper, it must have the same dimension as the
        # target sequence, and it must contain the last value of src, and all
        # values of trg_y except the last (i.e. it must be shifted right by 1)
        trg = sequence[enc_seq_len-1:len(sequence)-1]

        assert len(trg) == target_seq_len, "Length of trg does not match target sequence length"

        # The target sequence against which the model output will be compared to compute loss
        trg_y = sequence[-target_seq_len:]

        assert len(trg_y) == target_seq_len, "Length of trg_y does not match target sequence length"

        return src, trg, trg_y.squeeze(-1) # change size from [batch_size, target_seq_len, num_features] to [batch_size, target_seq_len]


This function is for encoder-decoder type models in which the decoder requires an input, tgt, which - during training - is the target sequence. During inference, the values of tgt are unknown, and the values therefore have to be generated iteratively.  
This function returns a prediction of length forecast_window for each batch in src

In [None]:
import torch.nn as nn
import torch

def run_encoder_decoder_inference(
    model: nn.Module,
    src: torch.Tensor,
    forecast_window: int,
    batch_size: int,
    device,
    batch_first: bool=False
    ) -> torch.Tensor:

    """
    NB! This function is currently only tested on models that work with
    batch_first = False


    NB! If you want the inference to be done without gradient calculation,
    make sure to call this function inside the context manager torch.no_grad like:
    with torch.no_grad:
        run_encoder_decoder_inference()

    The context manager is intentionally not called inside this function to make
    it usable in cases where the function is used to compute loss that must be
    backpropagated during training and gradient calculation hence is required.

    If use_predicted_tgt = True:
    To begin with, tgt is equal to the last value of src. Then, the last element
    in the model's prediction is iteratively concatenated with tgt, such that
    at each step in the for-loop, tgt's size increases by 1. Finally, tgt will
    have the correct length (target sequence length) and the final prediction
    will be produced and returned.

    Args:
        model: An encoder-decoder type model where the decoder requires
               target values as input. Should be set to evaluation mode before
               passed to this function.

        src: The input to the model

        forecast_horizon: The desired length of the model's output, e.g. 58 if you
                         want to predict the next 58 hours of FCR prices.

        batch_size: batch size

        batch_first: If true, the shape of the model input should be
                     [batch size, input sequence length, number of features].
                     If false, [input sequence length, batch size, number of features]

    """

    # Dimension of a batched model input that contains the target sequence values
    target_seq_dim = 0 if batch_first == False else 1

    # Take the last value of thetarget variable in all batches in src and make it tgt
    # as per the Influenza paper
    tgt = src[-1, :, 0] if batch_first == False else src[:, -1, 0] # shape [1, batch_size, 1]

    # Change shape from [batch_size] to [1, batch_size, 1]
    if batch_size == 1 and batch_first == False:
        tgt = tgt.unsqueeze(0).unsqueeze(0) # change from [1] to [1, 1, 1]

    # Change shape from [batch_size] to [1, batch_size, 1]
    if batch_first == False and batch_size > 1:
        tgt = tgt.unsqueeze(0).unsqueeze(-1)

    # Iteratively concatenate tgt with the first element in the prediction
    for _ in range(forecast_window-1):

        # Create masks
        dim_a = tgt.shape[1] if batch_first == True else tgt.shape[0]

        dim_b = src.shape[1] if batch_first == True else src.shape[0]

        tgt_mask = generate_square_subsequent_mask(
            dim1=dim_a,
            dim2=dim_a,
            device=device
            )

        src_mask = generate_square_subsequent_mask(
            dim1=dim_a,
            dim2=dim_b,
            device=device
            )

        # Make prediction
        prediction = model(src, tgt, src_mask, tgt_mask)

        # If statement simply makes sure that the predicted value is
        # extracted and reshaped correctly
        if batch_first == False:

            # Obtain the predicted value at t+1 where t is the last time step
            # represented in tgt
            last_predicted_value = prediction[-1, :, :]

            # Reshape from [batch_size, 1] --> [1, batch_size, 1]
            last_predicted_value = last_predicted_value.unsqueeze(0)

        else:

            # Obtain predicted value
            last_predicted_value = prediction[:, -1, :]

            # Reshape from [batch_size, 1] --> [batch_size, 1, 1]
            last_predicted_value = last_predicted_value.unsqueeze(-1)

        # Detach the predicted element from the graph and concatenate with
        # tgt in dimension 1 or 0
        tgt = torch.cat((tgt, last_predicted_value.detach()), target_seq_dim)

    # Create masks
    dim_a = tgt.shape[1] if batch_first == True else tgt.shape[0]

    dim_b = src.shape[1] if batch_first == True else src.shape[0]

    tgt_mask = generate_square_subsequent_mask(
        dim1=dim_a,
        dim2=dim_a,
        device=device
        )

    src_mask = generate_square_subsequent_mask(
        dim1=dim_a,
        dim2=dim_b,
        device=device
        )

    # Make final prediction
    final_prediction = model(src, tgt, src_mask, tgt_mask)

    return final_prediction

#Training an Encoder-Decoder Model for Time Series Forecasting

In time series forecasting, training a model involves iteratively optimizing its parameters to minimize the discrepancy between predicted and actual values. This notebook focuses on training an encoder-decoder model, a powerful architecture widely used for time series prediction tasks.

The train_encoder_decoder function presented here facilitates the training process by iteratively updating the model's parameters using backpropagation and gradient descent. It leverages the PyTorch framework to define and optimize the model, compute loss, and update parameters efficiently.

Through this notebook, we'll delve into the intricacies of training an encoder-decoder model for time series forecasting. We'll explore the training procedure, including data loading, model initialization, loss computation, and parameter optimization. Additionally, we'll monitor the training progress and evaluate model performance using appropriate metrics.

Let's embark on the journey of training an encoder-decoder model for time series forecasting and unlock its potential for accurate and reliable predictions.

In [None]:

import torch.nn as nn
import torch
from torch.utils.data import DataLoader

def train_encoder_decoder(
    model: nn.Module,
    train_loader: DataLoader,
    epochs: int,
    output_sequence_length: int,
    enc_seq_len: int,
    batch_first: bool,
    device
):
    """
    Train the encoder-decoder model.

    Args:
        model (nn.Module): The encoder-decoder model.
        train_loader (DataLoader): DataLoader containing the training data.
        epochs (int): Number of epochs for training.
        forecast_window (int): Length of the forecast window.
        enc_seq_len (int): Length of the input sequence to the encoder.
    """

    optimizer = torch.optim.Adam(model.parameters())  # Initialize Adam optimizer

    criterion = torch.nn.MSELoss()  # Mean squared error loss

    # Iterate over all epochs
    for epoch in range(epochs):

        running_loss = 0.
        last_loss = 0.

        # Iterate over all (x,y) pairs in training dataloader
        for i, (src, tgt, tgt_y) in enumerate(train_loader):

            src = src.to(device)
            tgt = tgt.to(device)
            tgt_y = tgt_y.to(device)


            if batch_first == False:
                shape_before = src.shape
                src = src.permute(1, 0, 2)
                #Log.debug("src shape changed from {} to {}".format(shape_before, src.shape))

                shape_before = tgt.shape
                tgt = tgt.permute(1, 0, 2)
                #Log.debug("src shape changed from {} to {}".format(shape_before, tgt.shape))


            # zero the parameter gradients
            optimizer.zero_grad()

            # Generate masks
            # tgt_mask = generate_square_subsequent_mask(
            #     dim1=forecast_window,
            #     dim2=forecast_window,
            #     device=device
            # )

            # src_mask = generate_square_subsequent_mask(
            #     dim1=forecast_window,
            #     dim2=enc_seq_len,
            #     device=device
            # )

            src_mask = generate_square_subsequent_mask(
                dim1=output_sequence_length,
                dim2=enc_seq_len,
                device=device
                )

            # Make tgt mask for decoder with size:
            # [batch_size*n_heads, output_sequence_length, output_sequence_length]
            tgt_mask = generate_square_subsequent_mask(
                dim1=output_sequence_length,
                dim2=output_sequence_length,
                device=device
                )

            # Make forecasts
            prediction = model(src, tgt, src_mask, tgt_mask)

            if batch_first == False:
                shape_before = prediction.shape
                prediction = prediction.permute(1, 0, 2)
                Log.debug("prediction shape changed from {} to {}".format(shape_before, prediction.shape))

            # Compute and backprop loss
            loss = criterion(tgt_y.squeeze(), prediction.squeeze())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss +=  loss.item()

        last_loss = running_loss / len(train_loader)
        Log.info(f"Training: Epoch {epoch + 1}/{epochs}, Loss: {last_loss}")


#Time Series Forecasting with Transformer Models

Time series forecasting is a critical task across various domains, including finance, energy, and weather prediction. Transformer models, initially introduced for natural language processing, have shown remarkable performance in handling sequential data, making them an attractive choice for time series forecasting tasks.

This notebook explores the application of transformer models for time series forecasting. We'll dive into the implementation of a Transformer-based architecture specifically designed for predicting future values in a time series.

In [None]:

from torch.utils.data import DataLoader
import torch
import datetime
import numpy as np

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Log.setLogLevel(level=logging.INFO)
Log.useLogging(False)

# Hyperparams
epochs = 200
test_size = 0.2
batch_size = 128
target_col_name = "FCR_N_PriceEUR"
timestamp_col = "timestamp"

## Params
dim_val = 512
n_heads = 8

dec_seq_len = 92 # length of input given to decoder
enc_seq_len = 153 # length of input given to encoder
output_sequence_length = 48 # target sequence length. If hourly data and length = 48, you predict 2 days ahead
window_size = enc_seq_len + output_sequence_length # used to slice data into sub-sequences
step_size = 1 # Step size, i.e. how many time steps does the moving window move at each step

max_seq_len = enc_seq_len
batch_first = False

# Define input variables
exogenous_vars = [] # should contain strings. Each string must correspond to a column name
input_variables = [target_col_name] + exogenous_vars
target_idx = 0 # index position of target in batched trg_y

input_size = len(input_variables)

# Read data
training_data = read_data(file_data_path='./train_dataset.csv', timestamp_col_name=timestamp_col)

# Remove test data from dataset
#training_data = data[:-(round(len(data)*test_size))]

# Make list of (start_idx, end_idx) pairs that are used to slice the time series sequence into chunkc.
# Should be training data indices only
training_indices = get_indices_entire_sequence(
    data=training_data,
    window_size=window_size,
    step_size=step_size)

# Making instance of custom dataset class
training_data = TransformerDataset(
    data=torch.tensor(training_data[input_variables].values).float(),
    indices=training_indices,
    enc_seq_len=enc_seq_len,
    dec_seq_len=dec_seq_len,
    target_seq_len=output_sequence_length
    )

# Making dataloader
train_loader = DataLoader(training_data, batch_size)


model = TimeSeriesTransformer(
    input_size=len(input_variables),
    dec_seq_len=enc_seq_len,
    batch_first=batch_first,
    num_predicted_features=1
    ).to(device)





  data = pd.read_csv(


[INFO] From get_src_trg: data size = torch.Size([36788, 1])




In [None]:
train_encoder_decoder(model=model, train_loader=train_loader, epochs=epochs, output_sequence_length=output_sequence_length, enc_seq_len=enc_seq_len, batch_first=batch_first, device=device)

[INFO] Training: Epoch 1/200, Loss: 348.37739129791726
[INFO] Training: Epoch 2/200, Loss: 401.3985114568597
[INFO] Training: Epoch 3/200, Loss: 369.3647774016107
[INFO] Training: Epoch 4/200, Loss: 414.17929723379495
[INFO] Training: Epoch 5/200, Loss: 406.7018840371312
[INFO] Training: Epoch 6/200, Loss: 405.41550454226405
[INFO] Training: Epoch 7/200, Loss: 404.5412164580572
[INFO] Training: Epoch 8/200, Loss: 401.9374685670946
[INFO] Training: Epoch 9/200, Loss: 391.13170999413603
[INFO] Training: Epoch 10/200, Loss: 400.77065998524216
[INFO] Training: Epoch 11/200, Loss: 404.4892951500166
[INFO] Training: Epoch 12/200, Loss: 403.0248767070837
[INFO] Training: Epoch 13/200, Loss: 402.1826987095646
[INFO] Training: Epoch 14/200, Loss: 401.0964548212665
[INFO] Training: Epoch 15/200, Loss: 401.2214650607609
[INFO] Training: Epoch 16/200, Loss: 401.0642009898499
[INFO] Training: Epoch 17/200, Loss: 400.982388392195
[INFO] Training: Epoch 18/200, Loss: 401.1536498411552
[INFO] Training

KeyboardInterrupt: 

Saving trained model

In [None]:
import torch
torch.save(model.getStateDict(), "checkpoint.pth")

In [None]:
from torch.utils.data import DataLoader
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

Log.setLogLevel(level=logging.DEBUG)


# criterion = torch.nn.MSELoss()

model.eval()

test_data = read_data(file_data_path='./test_dataset.csv', timestamp_col_name=timestamp_col)


test_indices = get_indices_entire_sequence(
    data=test_data,
    window_size=window_size,
    step_size=step_size)

# Making instance of custom dataset class
test_data = TransformerDataset(
    data=torch.tensor(test_data[input_variables].values).float(),
    indices=test_indices,
    enc_seq_len=enc_seq_len,
    dec_seq_len=dec_seq_len,
    target_seq_len=output_sequence_length
    )

# Making dataloader
test_loader = DataLoader(test_data, batch_size)

src_mask = generate_square_subsequent_mask(
    dim1=output_sequence_length,
    dim2=enc_seq_len,
    device=device
    )

# Make tgt mask for decoder with size:
# [batch_size*n_heads, output_sequence_length, output_sequence_length]
tgt_mask = generate_square_subsequent_mask(
    dim1=output_sequence_length,
    dim2=output_sequence_length,
    device=device
)

labels = []
predictions = []
with torch.no_grad():

    for i, (src, tgt, tgt_y) in enumerate(test_loader):
        Log.debug("Testing: %d / %d", i, len(test_loader))
        src     = src.to(device)
        tgt_y   = tgt_y.to(device)
        tgt     = tgt.to(device)

        # prediction = run_encoder_decoder_inference(
        #     model=model,
        #     src=src,
        #     forecast_window=window_size,
        #     batch_size=src.shape[1],
        #     device=device,
        #     batch_first=batch_first
        #     )

        if batch_first == False:
            shape_before = src.shape
            src = src.permute(1, 0, 2)
            Log.debug("src shape changed from {} to {}".format(shape_before, src.shape))

            shape_before = tgt.shape
            tgt = tgt.permute(1, 0, 2)
            Log.debug("src shape changed from {} to {}".format(shape_before, tgt.shape))

        prediction = model(
            src=src,
            tgt=tgt,
            src_mask=src_mask,
            tgt_mask=tgt_mask
        )

        #loss = criterion(tgt_y, prediction)
        labels.extend(tgt_y.squeeze().cpu().flatten().tolist())
        predictions.extend(prediction.squeeze().cpu().flatten().tolist())

# Log.info("Labels len: {}".format(len(labels)))
# Log.info("Predictions len: {}".format(len(predictions)))
# Log.info("Labels: {}".format(labels))
# Log.info("Predictions: {}".format(predictions))

test_predictions_np = np.array(predictions)
test_labels_np = np.array(labels)

r2   = r2_score(test_labels_np, test_predictions_np)
mse = mean_squared_error(test_labels_np, test_predictions_np)
mae = mean_absolute_error(test_labels_np, test_predictions_np)

Log.info("Results: r2: %f, mse: %f, mae: %f",r2, mse, mae)
