In [None]:
import math, torch
import numpy as np
import torch.nn as nn
import pandas as pd
import sklearn.preprocessing as prep
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # GPU or CPU
print(f'Using device: {device}')

From train 80%, validation 20%
<br>
Modified to train 60%, validation 20%, test 20% => Using "simple holdout validation"

Reference:
[Time series prediction](https://peaceful0907.medium.com/time-series-prediction-lstm%E7%9A%84%E5%90%84%E7%A8%AE%E7%94%A8%E6%B3%95-ed36f0370204)

![moving_window](Images/moving_window.png)

In [None]:
def create_sequences(data, n_past, n_forecast, col_index):
    X, Y = [], []
    L = len(data)
    for i in range(L-(n_past+n_forecast)): # 1 day every step: i = 0, 1, 2, ..., L - (n_past + n_forecast)
        X.append(data[i:i+n_past]) # Input Sequence, using n_past days as input
        Y.append(data[i+n_past-2:i+n_past+n_forecast-2][:,col_index]) # Output Sequence with 2 days overlap Input, predicting n_forecast days as output

    return torch.Tensor(np.array(X)), torch.Tensor(np.array(Y)).unsqueeze(2)

In [None]:
def preprocess(data_trend, train_ratio, test_ratio, n_past, n_forecast, col_index):
    scaler = prep.StandardScaler()
    data_trend = scaler.fit_transform(data_trend) # standardization

    train_index = int(len(data_trend)*train_ratio)
    test_index = int(train_index+len(data_trend)*test_ratio)

    train_data = data_trend[:train_index]
    test_data = data_trend[train_index:test_index]
    val_data = data_trend[test_index:]

    # print(f'train_data is data_trend[:{train_index}], shape is {train_data.shape}')
    # print(f'test_data is data_trend[{train_index}:{test_index}], shape is {test_data.shape}')
    # print(f'val_data is data_trend[{test_index}:], shape is {val_data.shape}')

    X_train, Y_train = create_sequences(train_data, n_past, n_forecast, col_index)
    X_test, Y_test = create_sequences(test_data, n_past, n_forecast, col_index)
    X_val, Y_val = create_sequences(val_data, n_past, n_forecast, col_index)

    return X_train, Y_train, X_test, Y_test, X_val, Y_val

In [None]:
df = pd.read_csv('2330.TW.csv')

data = df[[c for c in df.columns if c not in ['Date', 'Adj Close']]].values

# col_index = 3
# 0: Open, 1: High, 2: Low, 3: Close, 4: Volume
# 5 features to predict "Close"
X_train, Y_train, X_test, Y_test, X_val, Y_val = preprocess(data, train_ratio=0.6, test_ratio=0.2, n_past=20, n_forecast=5, col_index=3)

batch_size = 32

train_set = torch.utils.data.TensorDataset(X_train, Y_train)
train_dataloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=False)
test_set = torch.utils.data.TensorDataset(X_test, Y_test)
test_dataloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)
val_set = torch.utils.data.TensorDataset(X_val, Y_val)
val_dataloader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False)

print(len(train_dataloader), len(test_dataloader), len(val_dataloader))
3571/32, 1190/32, 1191/32

In [None]:
X_train.shape, Y_train.shape, X_test.shape, Y_test.shape, X_val.shape, Y_val.shape

In [None]:
df.head() # before removing Date and Adj Close

In [None]:
plt.plot(df['Close']) # plot the closing price of TSMC
plt.title('TSMC Stock Price')
plt.xlabel('Days passed')
plt.ylabel('Price')
plt.show()

Transformer-Decoder Architecture

Reference:
[Transformers for Time-series Forecasting](https://medium.com/mlearning-ai/transformer-implementation-for-time-series-forecasting-a9db2db5c820)

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(1, max_len, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # input and output shape: [batch_size, seq_len, d_model]
        x = x + self.pe[:,:x.size(1)]

        return self.dropout(x)

In [None]:
class Transformer(nn.Module):
    def __init__(self, d_model, nhead, dropout, num_layers):
        super(Transformer, self).__init__()

        self.encoder_layer = torch.nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout, batch_first=True)
        self.transformer_encoder = torch.nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.decoder_layer = torch.nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead, dropout=dropout, batch_first=True)
        self.transformer_decoder = torch.nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
        self.decoder = torch.nn.Linear(d_model, 1)

        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.decoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()

    def forward(self, src, tgt):
        # print('Transformer source and target shapes:', src.shape, tgt.shape)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.shape[1]) # shape is 5x5
        memory = self.transformer_encoder(src) # memory is the output of encoder
        output = self.transformer_decoder(tgt, memory, tgt_mask) # the decoder takes in the tgt, memory, and tgt_mask
        output = self.decoder(output) # forecast
        return output

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.expansion_input = torch.nn.Linear(in_features=5, out_features=512)
        self.expansion_output = torch.nn.Linear(in_features=1, out_features=512)
        self.positional_encoding_input = PositionalEncoding(d_model=512, dropout=0.1, max_len=20)
        self.positional_encoding_output = PositionalEncoding(d_model=512, dropout=0.1, max_len=5)
        self.transformer = Transformer(d_model=512, nhead=8, dropout=0.1, num_layers=6)

    def forward(self, src, tgt):
        src = self.expansion_input(src)
        tgt = self.expansion_output(tgt)

        src = self.positional_encoding_input(src)
        tgt = self.positional_encoding_output(tgt)

        output = self.transformer(src, tgt)
        return output

In [None]:
# save train or validation loss
def log_loss(loss_val, train):
    if train:
        file_name = 'train_loss.txt'
    else:
        file_name = 'val_loss.txt'

    with open(file_name, 'a') as f:
        f.write(str(loss_val) + '\n')
        f.close()

In [None]:
def EMA(values, alpha=0.1):
    ema_values = [values[0]]
    for idx, item in enumerate(values[1:]):
        ema_values.append(alpha*item + (1-alpha) * ema_values[idx])
    return ema_values

In [None]:
def plot_loss(train=True):
    plt.rcParams.update({'font.size': 10})

    with open('train_loss.txt', 'r') as f:
        loss_list = [float(line) for line in f.readlines()]

    if train:
        title = 'Train'
    else:
        title = 'Validation'

    EMA_loss = EMA(loss_list)

    plt.plot(loss_list, label='loss')
    plt.plot(EMA_loss, label='EMA loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title(title + '_loss')
    plt.savefig(f'{title}.png')
    plt.close()

In [None]:
def transformer(train_dataloader, EPOCH):
    model = Model()
    optimizer = torch.optim.AdamW(model.parameters())
    criterion = torch.nn.MSELoss()
    best_model = ''
    min_train_loss = float('inf')

    for epoch in range(1, EPOCH + 1):
        ### Training ###
        train_loss = 0
        model.train()

        for source, target in train_dataloader:
            optimizer.zero_grad()
            prediction = model(source, target)
            loss = criterion(prediction, target)
            loss.backward()
            optimizer.step()
            train_loss += loss.detach().item()

        if train_loss < min_train_loss:
            torch.save(model.state_dict(), f'best_train_{epoch}.pth')
            torch.save(optimizer.state_dict(), f'optimizer_{epoch}.pth')
            min_train_loss = train_loss
            best_model = f'best_train_{epoch}.pth'

        train_loss /= len(train_dataloader)
        log_loss(train_loss, train=True)

        print(f'Epoch: {epoch}, Train Loss: {train_loss}')

        ### Validation ###
        model.eval() # set model to evaluation mode
        n_forecast = 5
        with torch.no_grad():
            for source, target in val_dataloader:
                # use function to predict
                target_seq_dim = 1 # dimension of a batched model input that contains the target sequence values
                target = source[:, -1, 0] # tgt is equal to the last value of src

                # Iteratively concatenate tgt with the first element in the prediction
                for _ in range(n_forecast-1):
                    print(source.shape, target.shape)
                    src_mask = nn.Transformer.generate_square_subsequent_mask(source.shape[1]) # shape is 20x20
                    tgt_mask = nn.Transformer.generate_square_subsequent_mask(1) # shape is 5x5

                    # Make prediction
                    prediction = model(source, target, src_mask, tgt_mask)

                    # If statement simply makes sure that the predicted value is
                    # extracted and reshaped correctly
                    last_predicted_value = prediction[:, -1, :]
                    last_predicted_value = last_predicted_value.unsqueeze(-1) # tgt's size increases by 1 at each step

                    # Detach the predicted element from the graph and concatenate with
                    # tgt in dimension 1 or 0

                    # the last element in the model's prediction is iteratively concatenated with tgt
                    tgt = torch.cat((target, last_predicted_value.detach()), target_seq_dim)

                # Create masks
                src_mask = torch.triu(torch.ones(target.shape[1], source.shape[1]) * float('-inf'), diagonal=1)
                tgt_mask = torch.triu(torch.ones(target.shape[1], target.shape[1]) * float('-inf'), diagonal=1)

                # tgt will have tgt seq_len and final prediction will be made
                prediction = model(source, target, src_mask, tgt_mask)


                # prediction = run_encoder_decoder_inference(model=model, src=src, n_forecast=n_forecast)
                loss = criterion(prediction, target)
                print(f'Epoch: {epoch}, Validation Loss: {loss}')


    plot_loss(train=True)
    return best_model

In [None]:
best_model = transformer(train_dataloader, EPOCH=100)
print(best_model)

In [None]:
# for reference

# def run_encoder_decoder_inference(model, src, n_forecast):
#     # [batch_size, seq_len, d_model]
#     target_seq_dim = 1 # dimension of a batched model input that contains the target sequence values
#     tgt = src[:, -1, 0] # tgt is equal to the last value of src

#     # Iteratively concatenate tgt with the first element in the prediction
#     for _ in range(n_forecast-1):
#         print(src.shape, tgt.shape)
#         src_mask = nn.Transformer.generate_square_subsequent_mask(src.shape[1]) # shape is 20x20
#         tgt_mask = nn.Transformer.generate_square_subsequent_mask(1) # shape is 5x5

#         # Make prediction
#         prediction = model(src, tgt, src_mask, tgt_mask)

#         # If statement simply makes sure that the predicted value is
#         # extracted and reshaped correctly
#         last_predicted_value = prediction[:, -1, :]
#         last_predicted_value = last_predicted_value.unsqueeze(-1) # tgt's size increases by 1 at each step

#         # Detach the predicted element from the graph and concatenate with
#         # tgt in dimension 1 or 0

#         # the last element in the model's prediction is iteratively concatenated with tgt
#         tgt = torch.cat((tgt, last_predicted_value.detach()), target_seq_dim)

#     # Create masks
#     src_mask = torch.triu(torch.ones(tgt.shape[1], src.shape[1]) * float('-inf'), diagonal=1)
#     tgt_mask = torch.triu(torch.ones(tgt.shape[1], tgt.shape[1]) * float('-inf'), diagonal=1)

#     # tgt will have tgt seq_len and final prediction will be made
#     final_prediction = model(src, tgt, src_mask, tgt_mask)

#     return final_prediction

In [None]:
# epochs = 2
# n_forecast = 5
# enc_seq_len = 20

# model = Model()
# optimizer = torch.optim.AdamW(model.parameters())
# criterion = torch.nn.MSELoss()

# for epoch in range(epochs):
#     for i, (source, target) in enumerate(train_dataloader): # iterate over all (x, y) pairs in training dataloader
#         optimizer.zero_grad() # zero the parameter gradients
#         prediction = model(source, target) # make forecasts
#         loss = criterion(prediction, target) # compute and backprop loss
#         loss.backward() # backprop loss
#         optimizer.step() # take optimizer step
#         print(f'train epoch{epoch}, {i}th iteration, loss: {loss}')

#     # iterate over all (x,y) pairs in validation dataloader
#     model.eval() # set model to evaluation mode

#     with torch.no_grad():
#         for i, (src, tgt_y) in enumerate(val_dataloader):
#             prediction = run_encoder_decoder_inference(model=model, src=src, n_forecast=n_forecast)
#             loss = criterion(tgt_y, prediction)
#             print(f'val epoch{epoch}, {i}th iteration, loss: {loss}')