In [4]:
import transformers
import numpy as np
import pandas as pd
import torch
from torch import nn, Tensor
import math

In [5]:
class PositionalEncoder(nn.Module):
    """
    Adapted from: 
    https://pytorch.org/tutorials/beginner/transformer_tutorial.html
    https://github.com/LiamMaclean216/Pytorch-Transfomer/blob/master/utils.py 
    """

    def __init__(self, dropout: float = 0.1, max_seq_len: int = 5000, d_model: int = 512):

        """
        Args:
            dropout: the dropout rate
            max_seq_len: the maximum length of the input sequences
            d_model: The dimension of the output of sub-layers in the model 
                     (Vaswani et al, 2017)
        """

        super().__init__()

        self.d_model = d_model
        
        self.dropout = nn.Dropout(p=dropout)

        # Create constant positional encoding matrix with values 
        # dependent on position and i
        position = torch.arange(max_seq_len).unsqueeze(1)
        
        exp_input = torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        
        div_term = torch.exp(exp_input) # Returns a new tensor with the exponential of the elements of exp_input
        
        pe = torch.zeros(max_seq_len, d_model)

        pe[:, 0::2] = torch.sin(position * div_term)
        
        pe[:, 1::2] = torch.cos(position * div_term) # torch.Size([target_seq_len, dim_val])

        pe = pe.unsqueeze(0).transpose(0, 1) # torch.Size([target_seq_len, input_size, dim_val])

        # register that pe is not a model parameter
        self.register_buffer('pe', pe)
        
    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [batch_size, enc_seq_len, dim_val]
        """

        add = self.pe[:x.size(1), :].squeeze(1)

        x = x + add

        return self.dropout(x)

In [8]:
class TimeSeriesTransformer(nn.Module):
    
    def __init__(
        self,
        input_size, 
        dec_seq_len,
        out_seq_len,
        max_seq_len,
        dim_val,
        n_encoder_layers = 4,
        n_decoder_layers = 4,
        dropout_encoder: float=0.2, 
        dropout_decoder: float=0.2,
        dropout_pos_enc: float=0.2,
        dim_feedforward_encoder: int=2048,
        dim_feedforward_decoder: int=2048,
        n_heads = 8,
    ):
        
        super().__init__()
        self.dec_seq_len = dec_seq_len

        print("input_size is: {}".format(input_size))
        print("dim_val is: {}".format(dim_val))
        
        self.encoder_input_layers = nn.Linear(in_features=input_size, out_features=dim_val)
        self.decoder_input_layer = nn.Linear(in_features=input_size, out_features=dim_val)
        
        
        self.positional_encoder = PositionalEncoder(dropout=dropout_pos_enc, max_seq_len=max_seq_len, d_model=dim_val)
        
        encoder_layers = nn.TransformerEncoderLayer(d_model=dim_val, nhead=n_heads, dropout=dropout_encoder, dim_feedforward = dim_feedforward_encoder)
        self.encoder = nn.TransformerEncoder(encoder_layer=encoder_layers, num_layers=n_encoder_layers)
        
        decoder_layer = nn.TransformerDecoderLayer(d_model=dim_val, nhead=n_heads, dropout=dropout_decoder, dim_feedforward = dim_feedforward_decoder)
        self.decoder = nn.TransformerDecoder(decoder_layer=decoder_layer, num_layers=n_decoder_layers)
        
        self.decoder_linear_mapping = nn.Linear(in_features=out_seq_len * dim_val, out_features=out_seq_len)
        
        
        
    def forward(self, src, trg, src_mask, trg_mask):
        
        src = self.encoder_input_layers(src)
        src = self.positional_encoder(src)
        
        src = self.encoder(src = src)
        
        
        trg = self.decoder_input_layer(trg)
        trg = self.decoder(trg = trg, trg_mask = trg_mask, memory = src, memory_mask = src_mask)
        
        
        decoder_output = self.decoder_linear_mapping(trg)
        return decoder_output

In [9]:
input_size = 1 # The number of features we want to use

dec_seq_len = 30
enc_seq_len = 150
out_seq_len = 1

dim_val = 512

max_seq_len = enc_seq_len



model = TimeSeriesTransformer(
    dec_seq_len=dec_seq_len,
    out_seq_len=out_seq_len,
    dim_val=dim_val,
    max_seq_len = max_seq_len,
    input_size = input_size, 
)

input_size is: 1
dim_val is: 512


In [12]:
def get_src_trg(
        sequence: torch.Tensor, 
        enc_seq_len: int, 
        dec_seq_len: int, 
        target_seq_len: int
    ):

    assert len(sequence) == enc_seq_len + target_seq_len, "Sequence length does not equal (input length + target length)"

    # encoder input
    src = sequence[:enc_seq_len] 

    # decoder input. As per the paper, it must have the same dimension as the 
    # target sequence, and it must contain the last value of src, and all
    # values of trg_y except the last (i.e. it must be shifted right by 1)
    trg = sequence[enc_seq_len-1:len(sequence)-1]

    assert len(trg) == target_seq_len, "Length of trg does not match target sequence length"

    # The target sequence against which the model output will be compared to compute loss
    trg_y = sequence[-target_seq_len:]

    assert len(trg_y) == target_seq_len, "Length of trg_y does not match target sequence length"

    return src, trg, trg_y.squeeze(-1) # change size from [batch_size, target_seq_len, num_features] to [batch_size, target_seq_len] 

In [13]:
def generate_square_mask(dim1, dim2, dim3):
    """
    dim1: int, batch_size * n_heads
    dim2: int, length of the input sequence 
    dim3: int, length of the encoder sequence length
    """
    return torch.triu(torch.ones(dim1, dim2, dim3) * float('-inf'), diagonal=1)

In [25]:
# # Input length
# enc_seq_len = 100

# # Output length
# output_sequence_length = 58

# # Heads in attention layers
# n_heads = 8

# batch_size = 32

# # Make src mask for decoder with size:
# # [batch_size*n_heads, output_sequence_length, enc_seq_len]
# src_mask = generate_square_mask(
#     dim1=batch_size*n_heads,
#     dim2=output_sequence_length,
#     dim3=enc_seq_len
#     )

# # Make tgt mask for decoder with size:
# # [batch_size*n_heads, output_sequence_length, output_sequence_length]
# tgt_mask = generate_square_mask( 
#     dim1=batch_size*n_heads,
#     dim2=output_sequence_length,
#     dim3=output_sequence_length
#     )

In [None]:
# output = model(
#     src=src, 
#     tgt=trg,
#     src_mask=src_mask,
#     tgt_mask=tgt_mask
# )