# Transformer Vanilla
**Based on the class Module 10: Time Series in PyTorch.**  

* Instructor: [Jeff Heaton](https://sites.wustl.edu/jeffheaton/), McKelvey School of Engineering, [Washington University in St. Louis](https://engineering.wustl.edu/Programs/Pages/default.aspx)
* For more information visit the [class website](https://sites.wustl.edu/jeffheaton/t81-558/).

## Loading Sun Spot Data for a Transformer Time Series




In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import ReduceLROnPlateau



In [None]:
import psutil
print("Memória RAM Total:", psutil.virtual_memory().total)


Memória RAM Total: 13609451520


In [None]:
def read_and_prepare_df(df_path):
  df = pd.read_csv(df_path)
  df['Data'] = pd.to_datetime(df['Data'])
  return df
def split_train_test(df,max_date_train,min_date_test):
  treino = df.query("Data <= @max_date_train")
  teste= df.query("Data >= @min_date_test")
  return treino,teste

In [None]:
df_lsd = read_and_prepare_df('https://docs.google.com/uc?export=download&id=1-F71fdeBhScnn01Ev78BGc-ZLGr_WHJR')


In [None]:
# Data Preprocessing
df_train,df_test = split_train_test(df_lsd,'2023-07-31 23:00:00','2023-08-01 00:00:00')

spots_train = df_train['total_w'].to_numpy().reshape(-1, 1)
spots_test = df_test['total_w'].to_numpy().reshape(-1, 1)

scaler = StandardScaler()
spots_train = scaler.fit_transform(spots_train).flatten().tolist()
spots_test = scaler.transform(spots_test).flatten().tolist()



In [None]:
# Sequence Data Preparation
SEQUENCE_SIZE = 10

def to_sequences(seq_size, obs):
    x = []
    y = []
    for i in range(len(obs) - seq_size):
        window = obs[i:(i + seq_size)]
        after_window = obs[i + seq_size]
        x.append(window)
        y.append(after_window)
    return torch.tensor(x, dtype=torch.float32).view(-1, seq_size, 1), torch.tensor(y, dtype=torch.float32).view(-1, 1)

x_train, y_train = to_sequences(SEQUENCE_SIZE, spots_train)
x_test, y_test = to_sequences(SEQUENCE_SIZE, spots_test)

# Setup data loaders for batch
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)



# Position Encoding for Transformers





In [None]:
# Positional Encoding for Transformer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# Constructing the Transformer Model


* **input_dim**: The dimension of the input data, in this case we use only one input, the number of sunspots.
* **d_model**: The number of features in the transformer model's internal representations (also the size of embeddings). This controls how much a model can remember and process.
* **nhead**: The number of attention heads in the multi-head self-attention mechanism.
* **num_layers**: The number of transformer encoder layers.
dropout: The dropout probability.



In [None]:
# Model definition using Transformer
class TransformerModel(nn.Module):
    def __init__(self, input_dim=1, d_model=64, nhead=4, num_layers=2, dropout=0.2):
        super(TransformerModel, self).__init__()

        self.encoder = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.decoder = nn.Linear(d_model, 1)

    def forward(self, x):
        x = self.encoder(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.decoder(x[:, -1, :])
        return x

model = TransformerModel().to(device)





## Training the Model



In [None]:
import psutil
memoria_usada = []
def capturar_uso_memoria():
    # Captura o uso de memória RAM atual
    uso_memoria = psutil.virtual_memory().used
    memoria_usada.append(uso_memoria)


In [None]:
# Train the model
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=3, verbose=True)

epochs = 1000
early_stop_count = 0
min_val_loss = float('inf')

for epoch in range(epochs):
    model.train()
    for batch in train_loader:
        x_batch, y_batch = batch
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        capturar_uso_memoria()
    # Validation
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch in test_loader:
            x_batch, y_batch = batch
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            val_losses.append(loss.item())

    val_loss = np.mean(val_losses)
    scheduler.step(val_loss)

    if val_loss < min_val_loss:
        min_val_loss = val_loss
        early_stop_count = 0
    else:
        early_stop_count += 1

    if early_stop_count >= 5:
        print("Early stopping!")
        break
    print(f"Epoch {epoch + 1}/{epochs}, Validation Loss: {val_loss:.4f}")





Epoch 1/1000, Validation Loss: 0.1411
Epoch 2/1000, Validation Loss: 0.1007
Epoch 3/1000, Validation Loss: 0.0945
Epoch 4/1000, Validation Loss: 0.1091
Epoch 5/1000, Validation Loss: 0.1115
Epoch 6/1000, Validation Loss: 0.0935
Epoch 7/1000, Validation Loss: 0.0981
Epoch 8/1000, Validation Loss: 0.1376
Epoch 9/1000, Validation Loss: 0.1107
Epoch 10/1000, Validation Loss: 0.1196
Early stopping!


We can now evaluate the performance of this model.

In [None]:
# Evaluation
model.eval()
predictions = []
with torch.no_grad():
    for batch in test_loader:
        x_batch, y_batch = batch
        x_batch = x_batch.to(device)
        outputs = model(x_batch)
        predictions.extend(outputs.squeeze().tolist())

rmse = np.sqrt(np.mean((scaler.inverse_transform(np.array(predictions).reshape(-1, 1)) - scaler.inverse_transform(y_test.numpy().reshape(-1, 1)))**2))
print(f"Score (RMSE): {rmse:.4f}")

Score (RMSE): 74.5556


In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

def calculate_metrics(observed_values, predicted_values):
    # RMSE (Root Mean Squared Error)
    rmse = np.sqrt(mean_squared_error(observed_values, predicted_values))

    # MSE (Mean Squared Error)
    mse = mean_squared_error(observed_values, predicted_values)

    # MAE (Mean Absolute Error)
    mae = mean_absolute_error(observed_values, predicted_values)

    # MAPE (Mean Absolute Percentage Error)
    def mean_absolute_percentage_error(observed_values, predicted_values):
        return np.mean(np.abs((observed_values - predicted_values) / observed_values)) * 100
    mape = mean_absolute_percentage_error(observed_values, predicted_values)

    # R² (Coeficiente de Determinação)
    r2 = r2_score(observed_values, predicted_values)

    # MASE (Mean Absolute Scaled Error) - Necessário calcular os erros do modelo de benchmark
    naive_forecast = np.roll(observed_values, 1)  # Utilizando previsão ingênua (shift de 1)
    naive_errors = np.abs(observed_values - naive_forecast)
    mase = np.mean(np.abs(observed_values - predicted_values) / naive_errors)

    # sMAPE (Symmetric Mean Absolute Percentage Error)
    def symmetric_mean_absolute_percentage_error(observed_values, predicted_values):
        return np.mean(2 * np.abs(observed_values - predicted_values) / (np.abs(observed_values) + np.abs(predicted_values))) * 100
    smape = symmetric_mean_absolute_percentage_error(observed_values, predicted_values)

    return rmse, mse, mae, mape, r2, mase, smape

In [None]:

lista_unica = [item for sublist in y_test.numpy().tolist() for item in sublist]


In [None]:
rmse, mse, mae, mape, r2, mase, smape = calculate_metrics(scaler.inverse_transform(y_test.numpy().reshape(-1, 1)),scaler.inverse_transform(np.array(predictions).reshape(-1, 1)))
print("RMSE:", rmse)
print("MSE:", mse)
print("MAE:", mae)
print("MAPE:", mape)
print("R²:", r2)
print("MASE:", mase)
print("sMAPE:", smape)
print("Memoria RAM: ",max(memoria_usada))

RMSE: 74.55555952253076
MSE: 5558.531455717627
MAE: 44.07882405005383
MAPE: 3.617682213975991
R²: 0.9456787940606953
MASE: inf
sMAPE: 3.6972210642514853
Memoria RAM:  1555111936


  mase = np.mean(np.abs(observed_values - predicted_values) / naive_errors)
